From 8a7cf8a6084e335fd58971a53e0b5f0ef7df26b2 Mon Sep 17 00:00:00 2001 From: VivekSainiEQ Date: Wed, 26 May 2021 20:10:33 +0000 Subject: [PATCH] Added transmitted RDB lock Former-commit-id: 4b32167afc85742d85ff9b47b2c2e0b6b02e140a --- src/networking.cpp | 13 +++++++++++-- src/replication.cpp | 15 ++++++++++----- src/server.h | 2 ++ 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index c39d8ce42..176693501 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -319,6 +319,7 @@ void _clientAsyncReplyBufferReserve(client *c, size_t len) { clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); replyNew->used = 0; + std::unique_lock tRDBLock (c->transmittedRDBLock); c->replyAsync = replyNew; } @@ -332,6 +333,7 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) { if (fAsync) { serverAssert(GlobalLocksAcquired()); + std::unique_lock tRDBLock (c->transmittedRDBLock); if (c->replyAsync == nullptr || (c->replyAsync->size - c->replyAsync->used) < len) { if (c->replyAsync == nullptr) { @@ -1737,9 +1739,14 @@ int writeToClient(client *c, int handler_installed) { /* If there are no more pending replies, then we have transmitted the RDB. * This means further replication commands will be taken straight from the * replication backlog from now on. */ + + std::unique_lock tRDBLock (c->transmittedRDBLock); + if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){ c->transmittedRDB = true; } + bool transmittedRDB = c->transmittedRDB; + tRDBLock.unlock(); /* if this is a write to a replica, it's coming straight from the replication backlog */ long long repl_backlog_idx = g_pserver->repl_backlog_idx; @@ -1747,7 +1754,7 @@ int writeToClient(client *c, int handler_installed) { /* For replicas, we don't store all the information in the client buffer * Most of the time (aside from immediately after synchronizing), we read * from the replication backlog directly */ - if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1 && c->transmittedRDB){ + if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1 && transmittedRDB){ /* copy global variables into local scope so if they change in between we don't care */ long long repl_backlog_size = g_pserver->repl_backlog_size; long long nwrittenPart2 = 0; @@ -1874,6 +1881,7 @@ void ProcessPendingAsyncWrites() serverAssert(c->fPendingAsyncWrite); if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY)) { + std::unique_lock tRDBLock (c->transmittedRDBLock); if (c->replyAsync != nullptr){ zfree(c->replyAsync); c->replyAsync = nullptr; @@ -1885,6 +1893,7 @@ void ProcessPendingAsyncWrites() /* since writes from master to replica can come directly from the replication backlog, * writes may have been signalled without having been copied to the replyAsync buffer, * thus causing the buffer to be NULL */ + std::unique_lock tRDBLock (c->transmittedRDBLock); if (c->replyAsync != nullptr){ int size = c->replyAsync->used; @@ -1905,7 +1914,7 @@ void ProcessPendingAsyncWrites() } c->fPendingAsyncWrite = FALSE; - + tRDBLock.unlock(); // Now install the write event handler int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; /* For the fsync=always policy, we want that a given FD is never diff --git a/src/replication.cpp b/src/replication.cpp index 1d4e01289..ad79f4887 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -441,6 +441,8 @@ void feedReplicationBacklog(const void *ptr, size_t len) { g_pserver->master_repl_offset += len; + + /* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ while(len) { @@ -4659,11 +4661,14 @@ void flushReplBacklogToClients() #ifdef BYPASS_BUFFER - /* If we are online and the RDB has been sent, there is no need to feed the client buffer - * We will send our replies directly from the replication backlog instead */ - if (replica->replstate == SLAVE_STATE_ONLINE && replica->transmittedRDB){ - setReplIdx(replica, g_pserver->repl_batch_idxStart, g_pserver->repl_batch_offStart); - continue; + { + /* If we are online and the RDB has been sent, there is no need to feed the client buffer + * We will send our replies directly from the replication backlog instead */ + std::unique_lock tRDBLock (replica->transmittedRDBLock); + if (replica->replstate == SLAVE_STATE_ONLINE && replica->transmittedRDB){ + setReplIdx(replica, g_pserver->repl_batch_idxStart, g_pserver->repl_batch_offStart); + continue; + } } #endif if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { diff --git a/src/server.h b/src/server.h index 6c5265fbd..14005e7d5 100644 --- a/src/server.h +++ b/src/server.h @@ -1582,6 +1582,7 @@ struct client { // post a function from a non-client thread to run on its client thread bool postFunction(std::function fn, bool fLock = true); + fastlock transmittedRDBLock {"transmittedRDB"}; size_t argv_len_sum() const; }; @@ -2228,6 +2229,7 @@ struct redisServer { that is the next byte will'll write to.*/ long long repl_backlog_off; /* Replication "master offset" of first byte in the replication backlog buffer.*/ + fastlock repl_backlog_lock {"replication backlog"}; time_t repl_backlog_time_limit; /* Time without slaves after the backlog gets released. */ time_t repl_no_slaves_since; /* We have no slaves since that time.