From 9e344f257764a0f1199fc43e878b8b4b47bb0306 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Fri, 4 Jun 2021 20:09:47 +0000 Subject: [PATCH] Seems to pass multithreaded test cases, thank the lord Former-commit-id: 6cbf70cfff5735f3d4ef2e980945b4b1a1f85971 --- src/networking.cpp | 19 +++++++++---------- src/replication.cpp | 15 +++++++++------ src/server.h | 1 + 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index e8ede3338..cead76998 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -277,8 +277,9 @@ void clientInstallAsyncWriteHandler(client *c) { int prepareClientToWrite(client *c) { bool fAsync = !FCorrectThread(c); // Not async if we're on the right thread - if (c->flags & CLIENT_SLAVE) + if (c->flags & CLIENT_SLAVE){ serverLog(LL_NOTICE, "got into prepareClientToWrite"); + } if (!fAsync) { serverAssert(c->conn == nullptr || c->lock.fOwnLock()); @@ -1758,7 +1759,7 @@ int writeToClient(client *c, int handler_installed) { /* We can only directly read from the replication backlog if the client is a replica, so only attempt to do so if that's the case. */ - if (c->flags & CLIENT_SLAVE) { + if (c->flags & CLIENT_SLAVE && !(c->flags & CLIENT_MONITOR)) { /* For replicas, we don't store all the information in the client buffer * We always read from the replication backlog directly */ std::unique_lock repl_backlog_lock (g_pserver->repl_backlog_lock); @@ -1766,14 +1767,12 @@ int writeToClient(client *c, int handler_installed) { /* Right now, we're bringing in the offStart into the scope * If repl_batch_offStart is equal to -1, that means the mechanism is disabled * which implies there is no data to flush and that the global offset is accurate */ - long long offStart = g_pserver->repl_batch_offStart == -1 ? g_pserver->master_repl_offset : g_pserver->repl_batch_offStart; + // long long offStart = g_pserver->repl_batch_offStart == -1 ? g_pserver->master_repl_offset : g_pserver->repl_batch_offStart; + long long offStart = c->repl_end_off; long long idxStart = getReplIndexFromOffset(offStart); - if (g_pserver->repl_batch_offStart != -1) - serverAssert(idxStart == g_pserver->repl_batch_idxStart); - else - serverAssert(idxStart == g_pserver->repl_backlog_idx); - - if (c->repl_curr_off != -1 && c->repl_curr_off != offStart){ + + serverAssert(c->repl_curr_off != -1); + if (c->repl_curr_off != offStart){ serverLog(LL_NOTICE, "printing the stats for client %lu: c->repl_curr_off: %lld, repl_batch_offStart: %lld, nwritten: %ld, offStart: %lld", c->id, c->repl_curr_off, g_pserver->repl_batch_offStart, nwritten, offStart); @@ -1846,7 +1845,7 @@ int writeToClient(client *c, int handler_installed) { if (!clientHasPendingReplies(c) && !c->fPendingReplicaWrite) { // if(c->flags & CLIENT_SLAVE && handler_installed){ // serverLog(LL_NOTICE, "Uninstalling handler"); - // serverLog(LL_NOTICE, "handler repl_curr_idx: %lld, repl_backlog_size: %lld", c->repl_curr_idx, g_pserver->repl_backlog_size); + // serverLog(LL_NOTICE, "repl_backlog_size: %lld", g_pserver->repl_backlog_size); // serverLog(LL_NOTICE, "handler repl_curr_off: %lld, master_repl_offset: %lld", c->repl_curr_off, g_pserver->master_repl_offset); // } c->sentlen = 0; diff --git a/src/replication.cpp b/src/replication.cpp index 3a48963ab..96bf161f9 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -382,7 +382,9 @@ void feedReplicationBacklog(const void *ptr, size_t len) { lower_bound = g_pserver->repl_batch_offStart; long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; if (minimumsize > g_pserver->repl_backlog_size) { + g_pserver->repl_backlog_lock.unlock(); flushReplBacklogToClients(); + g_pserver->repl_backlog_lock.lock(); minimumsize = g_pserver->master_repl_offset + len - lower_bound +1; if (minimumsize > g_pserver->repl_backlog_size) { @@ -809,6 +811,7 @@ long long addReplyReplicationBacklog(client *c, long long offset) { serverLog(LL_DEBUG, "[PSYNC] Reply total length: %lld", len); #ifdef BYPASS_PSYNC c->repl_curr_off = offset - 1; + c->repl_end_off = g_pserver->master_repl_offset; serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", c->id, getClientPeerId(c), c->repl_curr_off); /* Force the partial sync to be queued */ @@ -861,6 +864,7 @@ int replicationSetupSlaveForFullResync(client *replica, long long offset) { replica->replstate = SLAVE_STATE_WAIT_BGSAVE_END; replica->repl_curr_off = offset; + replica->repl_end_off = g_pserver->master_repl_offset; serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", replica->id, getClientPeerId(replica), replica->repl_curr_off); @@ -4634,19 +4638,18 @@ void flushReplBacklogToClients() fAsyncWrite = true; + /* If we are online and the RDB has been sent, there is no need to feed the client buffer + * We will send our replies directly from the replication backlog instead */ #ifdef BYPASS_BUFFER { std::unique_lock asyncUl(replica->lock, std::defer_lock); if (!FCorrectThread(replica)) asyncUl.lock(); - /* If we are online and the RDB has been sent, there is no need to feed the client buffer - * We will send our replies directly from the replication backlog instead */ - if (replica->repl_curr_off == -1){ - replica->repl_curr_off = g_pserver->repl_batch_offStart; - serverLog(LL_NOTICE, "This client %lu at addr %s synchronized to %lld", replica->id, getClientPeerId(replica), replica->repl_curr_off); + /* We should have set the repl_curr_off when synchronizing, so it shouldn't be -1 here */ + serverAssert(replica->repl_curr_off != -1); - } + replica->repl_end_off = g_pserver->master_repl_offset; /* Only if the there isn't already a pending write do we prepare the client to write */ if (!replica->fPendingReplicaWrite){ diff --git a/src/server.h b/src/server.h index 2aba985ed..64a2ca515 100644 --- a/src/server.h +++ b/src/server.h @@ -1518,6 +1518,7 @@ struct client { should use. */ long long repl_curr_off = -1; /* Replication offset of the client, only if it's a replica*/ + long long repl_end_off = -1; /* Replication offset to write to */ int fPendingReplicaWrite; char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */