Ensure multithreaded tests pass

Former-commit-id: 0bcce502859647529b8835a4a101b64ddf0d625c
This commit is contained in:
John Sully 2020-10-28 04:55:56 +00:00
parent 9120d36bfc
commit 4694161550
4 changed files with 72 additions and 51 deletions

View File

@ -1777,6 +1777,13 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
int processed = 0;
serverAssert(iel == (serverTL - g_pserver->rgthreadvar));
if (listLength(serverTL->clients_pending_asyncwrite))
{
AeLocker locker;
locker.arm(nullptr);
ProcessPendingAsyncWrites();
}
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
@ -1825,13 +1832,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) {
}
}
if (listLength(serverTL->clients_pending_asyncwrite))
{
AeLocker locker;
locker.arm(nullptr);
ProcessPendingAsyncWrites();
}
return processed;
}
@ -3337,6 +3337,13 @@ void processEventsWhileBlocked(int iel) {
}
}
/* Since we're about to release our lock we need to flush the repl backlog queue */
bool fReplBacklog = g_pserver->repl_batch_offStart >= 0;
if (fReplBacklog) {
flushReplBacklogToClients();
g_pserver->repl_batch_idxStart = -1;
g_pserver->repl_batch_offStart = -1;
}
aeReleaseLock();
serverAssert(!GlobalLocksAcquired());
@ -3369,6 +3376,12 @@ void processEventsWhileBlocked(int iel) {
locker.arm(nullptr);
locker.release();
// Restore it so the calling code is not confused
if (fReplBacklog) {
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}
for (client *c : vecclients)
c->lock.lock();

View File

@ -198,8 +198,8 @@ void createReplicationBacklog(void) {
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1;
/* Allow transmission to clients */
serverTL->repl_batch_idxStart = 0;
serverTL->repl_batch_offStart = g_pserver->master_repl_offset;
g_pserver->repl_batch_idxStart = 0;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}
/* This function is called when the user modifies the replication backlog
@ -220,19 +220,19 @@ void resizeReplicationBacklog(long long newsize) {
* worse often we need to alloc additional space before freeing the
* old buffer. */
if (serverTL->repl_batch_idxStart >= 0) {
if (g_pserver->repl_batch_idxStart >= 0) {
// We need to keep critical data so we can't shrink less than the hot data in the buffer
newsize = std::max(newsize, g_pserver->master_repl_offset - serverTL->repl_batch_offStart);
newsize = std::max(newsize, g_pserver->master_repl_offset - g_pserver->repl_batch_offStart);
char *backlog = (char*)zmalloc(newsize);
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - serverTL->repl_batch_offStart;
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - g_pserver->repl_batch_offStart;
if (g_pserver->repl_backlog_idx >= serverTL->repl_batch_idxStart) {
auto cbActiveBacklog = g_pserver->repl_backlog_idx - serverTL->repl_batch_idxStart;
memcpy(backlog, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbActiveBacklog);
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
auto cbActiveBacklog = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbActiveBacklog);
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
} else {
auto cbPhase1 = g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart;
memcpy(backlog, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbPhase1);
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx;
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
@ -240,7 +240,7 @@ void resizeReplicationBacklog(long long newsize) {
zfree(g_pserver->repl_backlog);
g_pserver->repl_backlog = backlog;
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
serverTL->repl_batch_idxStart = 0;
g_pserver->repl_batch_idxStart = 0;
} else {
zfree(g_pserver->repl_backlog);
g_pserver->repl_backlog = (char*)zmalloc(newsize);
@ -275,11 +275,11 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
serverAssert(GlobalLocksAcquired());
const unsigned char *p = (const unsigned char*)ptr;
if (serverTL->repl_batch_idxStart >= 0) {
long long minimumsize = g_pserver->master_repl_offset + len - serverTL->repl_batch_offStart+1;
if (g_pserver->repl_batch_idxStart >= 0) {
long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1;
if (minimumsize > g_pserver->repl_backlog_size) {
flushReplBacklogToClients();
minimumsize = g_pserver->master_repl_offset + len - serverTL->repl_batch_offStart+1;
minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1;
if (minimumsize > g_pserver->repl_backlog_size) {
// This is an emergency overflow, we better resize to fit
@ -416,7 +416,7 @@ static int writeProtoNum(char *dst, const size_t cchdst, long long num)
void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) {
int j;
serverAssert(GlobalLocksAcquired());
serverAssert(serverTL->repl_batch_offStart >= 0);
serverAssert(g_pserver->repl_batch_offStart >= 0);
if (dictid < 0)
dictid = 0; // this can happen if we send a PING before any real operation
@ -2257,8 +2257,8 @@ void readSyncBulkPayload(connection *conn) {
* we are starting a new history. */
memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid));
g_pserver->master_repl_offset = mi->master->reploff;
if (serverTL->repl_batch_offStart >= 0)
serverTL->repl_batch_offStart = g_pserver->master_repl_offset;
if (g_pserver->repl_batch_offStart >= 0)
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}
clearReplicationId2();
@ -2266,7 +2266,7 @@ void readSyncBulkPayload(connection *conn) {
* accumulate the backlog regardless of the fact they have sub-slaves
* or not, in order to behave correctly if they are promoted to
* masters after a failover. */
if (g_pserver->repl_backlog == NULL) createReplicationBacklog();
if (g_pserver->repl_backlog == NULL) runAndPropogateToReplicas(createReplicationBacklog);
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success");
if (cserver.supervised_mode == SUPERVISED_SYSTEMD) {
@ -2526,7 +2526,7 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read
/* If this instance was restarted and we read the metadata to
* PSYNC from the persistence file, our replication backlog could
* be still not initialized. Create it. */
if (g_pserver->repl_backlog == NULL) createReplicationBacklog();
if (g_pserver->repl_backlog == NULL) runAndPropogateToReplicas(createReplicationBacklog);
return PSYNC_CONTINUE;
}
@ -4360,13 +4360,15 @@ static void propagateMasterStaleKeys()
void flushReplBacklogToClients()
{
serverAssert(GlobalLocksAcquired());
if (g_pserver->repl_batch_offStart < 0)
return;
if (serverTL->repl_batch_offStart != g_pserver->master_repl_offset) {
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
bool fAsyncWrite = false;
// Ensure no overflow
serverAssert(serverTL->repl_batch_offStart < g_pserver->master_repl_offset);
serverAssert(g_pserver->master_repl_offset - serverTL->repl_batch_offStart <= g_pserver->repl_backlog_size);
serverAssert(serverTL->repl_batch_idxStart != g_pserver->repl_backlog_idx);
serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset);
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
listIter li;
listNode *ln;
@ -4383,25 +4385,25 @@ void flushReplBacklogToClients()
else
fAsyncWrite = true;
if (g_pserver->repl_backlog_idx >= serverTL->repl_batch_idxStart) {
long long cbCopy = g_pserver->repl_backlog_idx - serverTL->repl_batch_idxStart;
serverAssert((g_pserver->master_repl_offset - serverTL->repl_batch_offStart) == cbCopy);
serverAssert((g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart) >= (cbCopy));
serverAssert((serverTL->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size);
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {
long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart;
serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy);
serverAssert((g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart) >= (cbCopy));
serverAssert((g_pserver->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size);
addReplyProto(replica, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbCopy);
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy);
} else {
auto cbPhase1 = g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart;
addReplyProto(replica, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbPhase1);
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - serverTL->repl_batch_offStart));
serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));
}
}
if (fAsyncWrite)
ProcessPendingAsyncWrites();
// This may be called multiple times per "frame" so update with our progress flushing to clients
serverTL->repl_batch_idxStart = g_pserver->repl_backlog_idx;
serverTL->repl_batch_offStart = g_pserver->master_repl_offset;
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
}
}

View File

@ -2235,7 +2235,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
int iel = ielFromEventLoop(eventLoop);
locker.arm();
processClients();
serverAssert(g_pserver->repl_batch_offStart < 0);
runAndPropogateToReplicas(processClients);
/* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout();
@ -2321,6 +2322,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
serverAssert(g_pserver->repl_batch_offStart < 0);
locker.disarm();
if (!fSentReplies)
handleClientsWithPendingWrites(iel, aof_state);
@ -3553,6 +3555,7 @@ void call(client *c, int flags) {
/* We need to transfer async writes before a client's repl state gets changed. Otherwise
we won't be able to propogate them correctly. */
if (c->cmd->flags & CMD_CATEGORY_REPLICATION) {
flushReplBacklogToClients();
ProcessPendingAsyncWrites();
}
@ -5316,8 +5319,8 @@ void loadDataFromDisk(void) {
{
memcpy(g_pserver->replid,rsi.repl_id,sizeof(g_pserver->replid));
g_pserver->master_repl_offset = rsi.repl_offset;
if (serverTL->repl_batch_offStart >= 0)
serverTL->repl_batch_offStart = g_pserver->master_repl_offset;
if (g_pserver->repl_batch_offStart >= 0)
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
listIter li;
listNode *ln;
@ -5568,6 +5571,7 @@ int main(int argc, char **argv) {
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
crc64_init();
serverAssert(g_pserver->repl_batch_offStart < 0);
uint8_t hashseed[16];
getRandomHexChars((char*)hashseed,sizeof(hashseed));

View File

@ -1371,8 +1371,6 @@ struct redisServerThreadVars {
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
long unsigned commandsExecuted = 0;
bool fRetrySetAofEvent = false;
long long repl_batch_offStart = -1;
long long repl_batch_idxStart = -1;
std::vector<client*> vecclientsProcess;
};
@ -1828,6 +1826,10 @@ struct redisServer {
char *bio_cpulist; /* cpu affinity list of bio thread. */
char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */
char *bgsave_cpulist; /* cpu affinity list of bgsave process. */
long long repl_batch_offStart = -1;
long long repl_batch_idxStart = -1;
};
typedef struct pubsubPattern {
@ -2887,18 +2889,18 @@ template<typename FN_PTR, class ...TARGS>
void runAndPropogateToReplicas(FN_PTR *pfn, TARGS... args) {
// Store the replication backlog starting params, we use this to know how much data was written.
// these are TLS in case we need to expand the buffer and therefore need to update them
bool fNestedProcess = (serverTL->repl_batch_idxStart >= 0);
bool fNestedProcess = (g_pserver->repl_batch_idxStart >= 0);
if (!fNestedProcess) {
serverTL->repl_batch_offStart = g_pserver->master_repl_offset;
serverTL->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
}
pfn(args...);
if (!fNestedProcess) {
flushReplBacklogToClients();
serverTL->repl_batch_offStart = -1;
serverTL->repl_batch_idxStart = -1;
g_pserver->repl_batch_offStart = -1;
g_pserver->repl_batch_idxStart = -1;
}
}