diff --git a/src/multi.cpp b/src/multi.cpp index 589dba589..1b91a05a0 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -267,7 +267,6 @@ void execCommand(client *c) { * backlog with the final EXEC. */ if (g_pserver->repl_backlog && was_master && !is_master) { const char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; - updateLowestOffsetAmongReplicas(); feedReplicationBacklog(execcmd,strlen(execcmd)); } afterPropagateExec(); diff --git a/src/replication.cpp b/src/replication.cpp index a5f9c3acf..cb0b562b1 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -56,29 +56,6 @@ void putSlaveOnline(client *replica); int cancelReplicationHandshake(redisMaster *mi, int reconnect); static void propagateMasterStaleKeys(); -/* gets the lowest offset amongst all of the replicas and stores it globally*/ -void updateLowestOffsetAmongReplicas(){ - serverAssert(GlobalLocksAcquired()); - serverAssert(!g_pserver->repl_backlog_lock.fOwnLock()); - long long min_offset = LONG_LONG_MAX; - listIter li; - listNode *ln; - listRewind(g_pserver->slaves, &li); - // check for potential overflow first - while ((ln = listNext(&li))) { - client *replica = (client*)listNodeValue(ln); - - if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - if (replica->flags & CLIENT_CLOSE_ASAP) continue; - - std::unique_lock ul(replica->lock); - - min_offset = std::min(min_offset, replica->repl_curr_off); - } - /* return -1 if no other minimum was found */ - g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst); -} - /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case * the instance is configured to have no persistence. */ @@ -323,6 +300,10 @@ void feedReplicationBacklog(const void *ptr, size_t len) { long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; if (minimumsize > g_pserver->repl_backlog_size) { flushReplBacklogToClients(); + lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst); + if (lower_bound == -1) + lower_bound = g_pserver->repl_batch_offStart; + minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; serverLog(LL_NOTICE, "minimumsize: %lld, g_pserver->master_repl_offset: %lld, len: %lu, lower_bound: %lld", @@ -494,7 +475,6 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL)); bool fSendRaw = !g_pserver->fActiveReplica; - updateLowestOffsetAmongReplicas(); /* Send SELECT command to every replica if needed. */ if (g_pserver->replicaseldb != dictid) { @@ -656,7 +636,6 @@ void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) { } if (g_pserver->repl_backlog){ - updateLowestOffsetAmongReplicas(); feedReplicationBacklog(buf,buflen); } } @@ -4975,6 +4954,7 @@ void flushReplBacklogToClients() if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { bool fAsyncWrite = false; + long long min_offset = LONG_LONG_MAX; // Ensure no overflow serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset); serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); @@ -4998,6 +4978,8 @@ void flushReplBacklogToClients() /* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */ serverAssert(replica->repl_curr_off != -1); + min_offset = std::min(min_offset, replica->repl_curr_off); + replica->repl_end_off = g_pserver->master_repl_offset; /* Only if the there isn't already a pending write do we prepare the client to write */ @@ -5014,7 +4996,7 @@ void flushReplBacklogToClients() // This may be called multiple times per "frame" so update with our progress flushing to clients g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - updateLowestOffsetAmongReplicas(); + g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst); } }