diff --git a/src/networking.cpp b/src/networking.cpp index 1da41e566..e814bf530 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1777,6 +1777,13 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { int processed = 0; serverAssert(iel == (serverTL - g_pserver->rgthreadvar)); + if (listLength(serverTL->clients_pending_asyncwrite)) + { + AeLocker locker; + locker.arm(nullptr); + ProcessPendingAsyncWrites(); + } + int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; /* For the fsync=always policy, we want that a given FD is never * served for reading and writing in the same event loop iteration, @@ -1825,13 +1832,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { } } - if (listLength(serverTL->clients_pending_asyncwrite)) - { - AeLocker locker; - locker.arm(nullptr); - ProcessPendingAsyncWrites(); - } - return processed; } @@ -3337,6 +3337,13 @@ void processEventsWhileBlocked(int iel) { } } + /* Since we're about to release our lock we need to flush the repl backlog queue */ + bool fReplBacklog = g_pserver->repl_batch_offStart >= 0; + if (fReplBacklog) { + flushReplBacklogToClients(); + g_pserver->repl_batch_idxStart = -1; + g_pserver->repl_batch_offStart = -1; + } aeReleaseLock(); serverAssert(!GlobalLocksAcquired()); @@ -3369,6 +3376,12 @@ void processEventsWhileBlocked(int iel) { locker.arm(nullptr); locker.release(); + // Restore it so the calling code is not confused + if (fReplBacklog) { + g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + } + for (client *c : vecclients) c->lock.lock(); diff --git a/src/replication.cpp b/src/replication.cpp index 45625d7ba..d30d3c62a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -198,8 +198,8 @@ void createReplicationBacklog(void) { g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; /* Allow transmission to clients */ - serverTL->repl_batch_idxStart = 0; - serverTL->repl_batch_offStart = g_pserver->master_repl_offset; + g_pserver->repl_batch_idxStart = 0; + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; } /* This function is called when the user modifies the replication backlog @@ -220,19 +220,19 @@ void resizeReplicationBacklog(long long newsize) { * worse often we need to alloc additional space before freeing the * old buffer. */ - if (serverTL->repl_batch_idxStart >= 0) { + if (g_pserver->repl_batch_idxStart >= 0) { // We need to keep critical data so we can't shrink less than the hot data in the buffer - newsize = std::max(newsize, g_pserver->master_repl_offset - serverTL->repl_batch_offStart); + newsize = std::max(newsize, g_pserver->master_repl_offset - g_pserver->repl_batch_offStart); char *backlog = (char*)zmalloc(newsize); - g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - serverTL->repl_batch_offStart; + g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - g_pserver->repl_batch_offStart; - if (g_pserver->repl_backlog_idx >= serverTL->repl_batch_idxStart) { - auto cbActiveBacklog = g_pserver->repl_backlog_idx - serverTL->repl_batch_idxStart; - memcpy(backlog, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbActiveBacklog); + if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { + auto cbActiveBacklog = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; + memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbActiveBacklog); serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); } else { - auto cbPhase1 = g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart; - memcpy(backlog, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbPhase1); + auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; + memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx; serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); @@ -240,7 +240,7 @@ void resizeReplicationBacklog(long long newsize) { zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = backlog; g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; - serverTL->repl_batch_idxStart = 0; + g_pserver->repl_batch_idxStart = 0; } else { zfree(g_pserver->repl_backlog); g_pserver->repl_backlog = (char*)zmalloc(newsize); @@ -275,11 +275,11 @@ void feedReplicationBacklog(const void *ptr, size_t len) { serverAssert(GlobalLocksAcquired()); const unsigned char *p = (const unsigned char*)ptr; - if (serverTL->repl_batch_idxStart >= 0) { - long long minimumsize = g_pserver->master_repl_offset + len - serverTL->repl_batch_offStart+1; + if (g_pserver->repl_batch_idxStart >= 0) { + long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; if (minimumsize > g_pserver->repl_backlog_size) { flushReplBacklogToClients(); - minimumsize = g_pserver->master_repl_offset + len - serverTL->repl_batch_offStart+1; + minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; if (minimumsize > g_pserver->repl_backlog_size) { // This is an emergency overflow, we better resize to fit @@ -416,7 +416,7 @@ static int writeProtoNum(char *dst, const size_t cchdst, long long num) void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) { int j; serverAssert(GlobalLocksAcquired()); - serverAssert(serverTL->repl_batch_offStart >= 0); + serverAssert(g_pserver->repl_batch_offStart >= 0); if (dictid < 0) dictid = 0; // this can happen if we send a PING before any real operation @@ -2257,8 +2257,8 @@ void readSyncBulkPayload(connection *conn) { * we are starting a new history. */ memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid)); g_pserver->master_repl_offset = mi->master->reploff; - if (serverTL->repl_batch_offStart >= 0) - serverTL->repl_batch_offStart = g_pserver->master_repl_offset; + if (g_pserver->repl_batch_offStart >= 0) + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; } clearReplicationId2(); @@ -2266,7 +2266,7 @@ void readSyncBulkPayload(connection *conn) { * accumulate the backlog regardless of the fact they have sub-slaves * or not, in order to behave correctly if they are promoted to * masters after a failover. */ - if (g_pserver->repl_backlog == NULL) createReplicationBacklog(); + if (g_pserver->repl_backlog == NULL) runAndPropogateToReplicas(createReplicationBacklog); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); if (cserver.supervised_mode == SUPERVISED_SYSTEMD) { @@ -2526,7 +2526,7 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read /* If this instance was restarted and we read the metadata to * PSYNC from the persistence file, our replication backlog could * be still not initialized. Create it. */ - if (g_pserver->repl_backlog == NULL) createReplicationBacklog(); + if (g_pserver->repl_backlog == NULL) runAndPropogateToReplicas(createReplicationBacklog); return PSYNC_CONTINUE; } @@ -4360,13 +4360,15 @@ static void propagateMasterStaleKeys() void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); + if (g_pserver->repl_batch_offStart < 0) + return; - if (serverTL->repl_batch_offStart != g_pserver->master_repl_offset) { + if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { bool fAsyncWrite = false; // Ensure no overflow - serverAssert(serverTL->repl_batch_offStart < g_pserver->master_repl_offset); - serverAssert(g_pserver->master_repl_offset - serverTL->repl_batch_offStart <= g_pserver->repl_backlog_size); - serverAssert(serverTL->repl_batch_idxStart != g_pserver->repl_backlog_idx); + serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset); + serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); + serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx); listIter li; listNode *ln; @@ -4383,25 +4385,25 @@ void flushReplBacklogToClients() else fAsyncWrite = true; - if (g_pserver->repl_backlog_idx >= serverTL->repl_batch_idxStart) { - long long cbCopy = g_pserver->repl_backlog_idx - serverTL->repl_batch_idxStart; - serverAssert((g_pserver->master_repl_offset - serverTL->repl_batch_offStart) == cbCopy); - serverAssert((g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart) >= (cbCopy)); - serverAssert((serverTL->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size); + if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { + long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; + serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy); + serverAssert((g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart) >= (cbCopy)); + serverAssert((g_pserver->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size); - addReplyProto(replica, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbCopy); + addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); } else { - auto cbPhase1 = g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart; - addReplyProto(replica, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbPhase1); + auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; + addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); - serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - serverTL->repl_batch_offStart)); + serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart)); } } if (fAsyncWrite) ProcessPendingAsyncWrites(); // This may be called multiple times per "frame" so update with our progress flushing to clients - serverTL->repl_batch_idxStart = g_pserver->repl_backlog_idx; - serverTL->repl_batch_offStart = g_pserver->master_repl_offset; + g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; } } \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index 9380943d3..8114cc993 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2235,7 +2235,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { int iel = ielFromEventLoop(eventLoop); locker.arm(); - processClients(); + serverAssert(g_pserver->repl_batch_offStart < 0); + runAndPropogateToReplicas(processClients); /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -2321,6 +2322,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ + serverAssert(g_pserver->repl_batch_offStart < 0); locker.disarm(); if (!fSentReplies) handleClientsWithPendingWrites(iel, aof_state); @@ -3553,6 +3555,7 @@ void call(client *c, int flags) { /* We need to transfer async writes before a client's repl state gets changed. Otherwise we won't be able to propogate them correctly. */ if (c->cmd->flags & CMD_CATEGORY_REPLICATION) { + flushReplBacklogToClients(); ProcessPendingAsyncWrites(); } @@ -5316,8 +5319,8 @@ void loadDataFromDisk(void) { { memcpy(g_pserver->replid,rsi.repl_id,sizeof(g_pserver->replid)); g_pserver->master_repl_offset = rsi.repl_offset; - if (serverTL->repl_batch_offStart >= 0) - serverTL->repl_batch_offStart = g_pserver->master_repl_offset; + if (g_pserver->repl_batch_offStart >= 0) + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; listIter li; listNode *ln; @@ -5568,6 +5571,7 @@ int main(int argc, char **argv) { srand(time(NULL)^getpid()); gettimeofday(&tv,NULL); crc64_init(); + serverAssert(g_pserver->repl_batch_offStart < 0); uint8_t hashseed[16]; getRandomHexChars((char*)hashseed,sizeof(hashseed)); diff --git a/src/server.h b/src/server.h index 02400ef7f..dc1fc3aca 100644 --- a/src/server.h +++ b/src/server.h @@ -1371,8 +1371,6 @@ struct redisServerThreadVars { char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; bool fRetrySetAofEvent = false; - long long repl_batch_offStart = -1; - long long repl_batch_idxStart = -1; std::vector vecclientsProcess; }; @@ -1828,6 +1826,10 @@ struct redisServer { char *bio_cpulist; /* cpu affinity list of bio thread. */ char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */ char *bgsave_cpulist; /* cpu affinity list of bgsave process. */ + + + long long repl_batch_offStart = -1; + long long repl_batch_idxStart = -1; }; typedef struct pubsubPattern { @@ -2887,18 +2889,18 @@ template void runAndPropogateToReplicas(FN_PTR *pfn, TARGS... args) { // Store the replication backlog starting params, we use this to know how much data was written. // these are TLS in case we need to expand the buffer and therefore need to update them - bool fNestedProcess = (serverTL->repl_batch_idxStart >= 0); + bool fNestedProcess = (g_pserver->repl_batch_idxStart >= 0); if (!fNestedProcess) { - serverTL->repl_batch_offStart = g_pserver->master_repl_offset; - serverTL->repl_batch_idxStart = g_pserver->repl_backlog_idx; + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; } pfn(args...); if (!fNestedProcess) { flushReplBacklogToClients(); - serverTL->repl_batch_offStart = -1; - serverTL->repl_batch_idxStart = -1; + g_pserver->repl_batch_offStart = -1; + g_pserver->repl_batch_idxStart = -1; } }