From 4fd76c47911f506909e54a138bf8f72b0fea8687 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Fri, 21 May 2021 17:05:55 +0000 Subject: [PATCH] Fixed single threaded for real this time, need to add synchronization for multi threaded Former-commit-id: 4d858dac1a503f4d518477212ba585069af22574 --- src/networking.cpp | 8 +++++--- src/replication.cpp | 5 +++++ 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 6f4aa6268..c39d8ce42 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1676,8 +1676,7 @@ int writeToClient(client *c, int handler_installed) { std::unique_locklock)> lock(c->lock); - /* if this is a write to a replica, it's coming straight from the replication backlog */ - long long repl_backlog_idx = g_pserver->repl_backlog_idx; + while(clientHasPendingReplies(c)) { if (c->bufpos > 0) { @@ -1742,6 +1741,9 @@ int writeToClient(client *c, int handler_installed) { c->transmittedRDB = true; } + /* if this is a write to a replica, it's coming straight from the replication backlog */ + long long repl_backlog_idx = g_pserver->repl_backlog_idx; + /* For replicas, we don't store all the information in the client buffer * Most of the time (aside from immediately after synchronizing), we read * from the replication backlog directly */ @@ -1782,7 +1784,7 @@ int writeToClient(client *c, int handler_installed) { // } - if (nwritten == nrequested && g_pserver->repl_backlog_idx == c->repl_curr_idx){ + if (nwritten == nrequested && g_pserver->repl_backlog_idx == repl_backlog_idx){ c->repl_curr_idx = -1; /* -1 denotes no more replica writes */ } else if (nwritten > 0) diff --git a/src/replication.cpp b/src/replication.cpp index d3df6d12a..1d4e01289 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3059,6 +3059,11 @@ void syncWithMaster(connection *conn) { if (psync_result == PSYNC_CONTINUE) { serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); + /* Reset the bulklen information in case it is lingering from the last connection + * The partial sync will start from the beginning of a command so these should be reset */ + mi->master->reqtype = 0; + mi->master->multibulklen = 0; + mi->master->bulklen = -1; if (cserver.supervised_mode == SUPERVISED_SYSTEMD) { redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections.\n"); redisCommunicateSystemd("READY=1\n");