Optimized use of repl_lowest_off to reduce lock contention
Former-commit-id: 30a957e5399fe94675f0b6d2d34c24112d5a9734
This commit is contained in:
parent
303763e042
commit
b2d2abbc09
@ -267,7 +267,6 @@ void execCommand(client *c) {
|
||||
* backlog with the final EXEC. */
|
||||
if (g_pserver->repl_backlog && was_master && !is_master) {
|
||||
const char *execcmd = "*1\r\n$4\r\nEXEC\r\n";
|
||||
updateLowestOffsetAmongReplicas();
|
||||
feedReplicationBacklog(execcmd,strlen(execcmd));
|
||||
}
|
||||
afterPropagateExec();
|
||||
|
@ -56,29 +56,6 @@ void putSlaveOnline(client *replica);
|
||||
int cancelReplicationHandshake(redisMaster *mi, int reconnect);
|
||||
static void propagateMasterStaleKeys();
|
||||
|
||||
/* gets the lowest offset amongst all of the replicas and stores it globally*/
|
||||
void updateLowestOffsetAmongReplicas(){
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
serverAssert(!g_pserver->repl_backlog_lock.fOwnLock());
|
||||
long long min_offset = LONG_LONG_MAX;
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(g_pserver->slaves, &li);
|
||||
// check for potential overflow first
|
||||
while ((ln = listNext(&li))) {
|
||||
client *replica = (client*)listNodeValue(ln);
|
||||
|
||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||
|
||||
std::unique_lock<fastlock> ul(replica->lock);
|
||||
|
||||
min_offset = std::min(min_offset, replica->repl_curr_off);
|
||||
}
|
||||
/* return -1 if no other minimum was found */
|
||||
g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst);
|
||||
}
|
||||
|
||||
/* We take a global flag to remember if this instance generated an RDB
|
||||
* because of replication, so that we can remove the RDB file in case
|
||||
* the instance is configured to have no persistence. */
|
||||
@ -323,6 +300,10 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
||||
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
|
||||
if (minimumsize > g_pserver->repl_backlog_size) {
|
||||
flushReplBacklogToClients();
|
||||
lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst);
|
||||
if (lower_bound == -1)
|
||||
lower_bound = g_pserver->repl_batch_offStart;
|
||||
|
||||
minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
|
||||
|
||||
serverLog(LL_NOTICE, "minimumsize: %lld, g_pserver->master_repl_offset: %lld, len: %lu, lower_bound: %lld",
|
||||
@ -494,7 +475,6 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc)
|
||||
serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL));
|
||||
|
||||
bool fSendRaw = !g_pserver->fActiveReplica;
|
||||
updateLowestOffsetAmongReplicas();
|
||||
|
||||
/* Send SELECT command to every replica if needed. */
|
||||
if (g_pserver->replicaseldb != dictid) {
|
||||
@ -656,7 +636,6 @@ void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) {
|
||||
}
|
||||
|
||||
if (g_pserver->repl_backlog){
|
||||
updateLowestOffsetAmongReplicas();
|
||||
feedReplicationBacklog(buf,buflen);
|
||||
}
|
||||
}
|
||||
@ -4975,6 +4954,7 @@ void flushReplBacklogToClients()
|
||||
|
||||
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
|
||||
bool fAsyncWrite = false;
|
||||
long long min_offset = LONG_LONG_MAX;
|
||||
// Ensure no overflow
|
||||
serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset);
|
||||
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
|
||||
@ -4998,6 +4978,8 @@ void flushReplBacklogToClients()
|
||||
/* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */
|
||||
serverAssert(replica->repl_curr_off != -1);
|
||||
|
||||
min_offset = std::min(min_offset, replica->repl_curr_off);
|
||||
|
||||
replica->repl_end_off = g_pserver->master_repl_offset;
|
||||
|
||||
/* Only if the there isn't already a pending write do we prepare the client to write */
|
||||
@ -5014,7 +4996,7 @@ void flushReplBacklogToClients()
|
||||
// This may be called multiple times per "frame" so update with our progress flushing to clients
|
||||
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
|
||||
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
|
||||
updateLowestOffsetAmongReplicas();
|
||||
g_pserver->repl_lowest_off.store(min_offset == LONG_LONG_MAX ? -1 : min_offset, std::memory_order_seq_cst);
|
||||
}
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user