From 80dddab0c4e587332b497cbe8157f39dcc417eb4 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 14 Jun 2021 19:30:49 +0000 Subject: [PATCH] Relaxed locking, should run faster now Former-commit-id: 5cec4d026dc1766b9ecbade6ec4b9d0e75a94e0f --- src/multi.cpp | 1 - src/networking.cpp | 6 ++++++ src/replication.cpp | 18 ++++++++++-------- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/multi.cpp b/src/multi.cpp index f74748e90..589dba589 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -268,7 +268,6 @@ void execCommand(client *c) { if (g_pserver->repl_backlog && was_master && !is_master) { const char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; updateLowestOffsetAmongReplicas(); - std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); feedReplicationBacklog(execcmd,strlen(execcmd)); } afterPropagateExec(); diff --git a/src/networking.cpp b/src/networking.cpp index d8d91751d..07312a9ee 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1856,6 +1856,8 @@ int writeToClient(client *c, int handler_installed) { * We always read from the replication backlog directly */ std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); + // serverLog(LL_NOTICE, "written to handler"); + long long repl_end_idx = getReplIndexFromOffset(c->repl_end_off); serverAssert(c->repl_curr_off != -1); @@ -1884,8 +1886,12 @@ int writeToClient(client *c, int handler_installed) { serverAssert(c->repl_curr_off <= c->repl_end_off); /* If the client offset matches the global offset, we wrote all we needed to, * in which case, there is no pending write */ + if (c->repl_curr_off == c->repl_end_off){ + // serverLog(LL_NOTICE, "Successfully wrote up until %lld", c->repl_end_off); c->fPendingReplicaWrite = false; + } else { + // serverLog(LL_NOTICE, "Wrote to %lld out of %lld", c->repl_curr_off, c->repl_end_off); } } diff --git a/src/replication.cpp b/src/replication.cpp index d10bac99a..a5f9c3acf 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -241,6 +241,8 @@ void resizeReplicationBacklog(long long newsize) { newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (g_pserver->repl_backlog_size == newsize) return; + std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); + if (g_pserver->repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. @@ -310,9 +312,9 @@ void freeReplicationBacklog(void) { * the backlog without incrementing the offset. */ void feedReplicationBacklog(const void *ptr, size_t len) { serverAssert(GlobalLocksAcquired()); - serverAssert(g_pserver->repl_backlog_lock.fOwnLock()); const unsigned char *p = (const unsigned char*)ptr; + if (g_pserver->repl_batch_idxStart >= 0) { /* we are lower bounded by the lower client offset or the offStart if all the clients are up to date */ long long lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst); @@ -320,10 +322,11 @@ void feedReplicationBacklog(const void *ptr, size_t len) { lower_bound = g_pserver->repl_batch_offStart; long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; if (minimumsize > g_pserver->repl_backlog_size) { - g_pserver->repl_backlog_lock.unlock(); flushReplBacklogToClients(); - g_pserver->repl_backlog_lock.lock(); - minimumsize = g_pserver->master_repl_offset + len - lower_bound +1; + minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; + + serverLog(LL_NOTICE, "minimumsize: %lld, g_pserver->master_repl_offset: %lld, len: %lu, lower_bound: %lld", + minimumsize, g_pserver->master_repl_offset, len, lower_bound); if (minimumsize > g_pserver->repl_backlog_size) { // This is an emergency overflow, we better resize to fit @@ -492,7 +495,6 @@ void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) bool fSendRaw = !g_pserver->fActiveReplica; updateLowestOffsetAmongReplicas(); - std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); /* Send SELECT command to every replica if needed. */ if (g_pserver->replicaseldb != dictid) { @@ -655,7 +657,6 @@ void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) { if (g_pserver->repl_backlog){ updateLowestOffsetAmongReplicas(); - std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); feedReplicationBacklog(buf,buflen); } } @@ -750,7 +751,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) { serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); c->repl_curr_off = offset - 1; - serverLog(LL_NOTICE, "Client %s, replica offset %lld in psync", replicationGetSlaveName(c), c->repl_curr_off); + // serverLog(LL_NOTICE, "Client %s, replica offset %lld in psync", replicationGetSlaveName(c), c->repl_curr_off); c->repl_end_off = g_pserver->master_repl_offset; /* Force the partial sync to be queued */ @@ -4988,7 +4989,7 @@ void flushReplBacklogToClients() if (!canFeedReplicaReplBuffer(replica)) continue; if (replica->flags & CLIENT_CLOSE_ASAP) continue; - serverLog(LL_NOTICE, "Client %s, replica offset %lld", replicationGetSlaveName(replica), replica->repl_curr_off); + // serverLog(LL_NOTICE, "Client %s, replica offset %lld", replicationGetSlaveName(replica), replica->repl_curr_off); std::unique_lock ul(replica->lock); if (!FCorrectThread(replica)) @@ -5013,6 +5014,7 @@ void flushReplBacklogToClients() // This may be called multiple times per "frame" so update with our progress flushing to clients g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + updateLowestOffsetAmongReplicas(); } }