Added transmitted RDB lock

Former-commit-id: 4b32167afc85742d85ff9b47b2c2e0b6b02e140a
This commit is contained in:
VivekSainiEQ 2021-05-26 20:10:33 +00:00
parent 61fca72389
commit 8a7cf8a608
3 changed files with 23 additions and 7 deletions

View File

@ -319,6 +319,7 @@ void _clientAsyncReplyBufferReserve(client *c, size_t len) {
clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize);
replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock);
replyNew->used = 0; replyNew->used = 0;
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
c->replyAsync = replyNew; c->replyAsync = replyNew;
} }
@ -332,6 +333,7 @@ int _addReplyToBuffer(client *c, const char *s, size_t len) {
if (fAsync) if (fAsync)
{ {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
if (c->replyAsync == nullptr || (c->replyAsync->size - c->replyAsync->used) < len) if (c->replyAsync == nullptr || (c->replyAsync->size - c->replyAsync->used) < len)
{ {
if (c->replyAsync == nullptr) { if (c->replyAsync == nullptr) {
@ -1737,9 +1739,14 @@ int writeToClient(client *c, int handler_installed) {
/* If there are no more pending replies, then we have transmitted the RDB. /* If there are no more pending replies, then we have transmitted the RDB.
* This means further replication commands will be taken straight from the * This means further replication commands will be taken straight from the
* replication backlog from now on. */ * replication backlog from now on. */
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){ if (c->flags & CLIENT_SLAVE && c->replstate == SLAVE_STATE_ONLINE && !clientHasPendingReplies(c) && c->replyAsync == nullptr){
c->transmittedRDB = true; c->transmittedRDB = true;
} }
bool transmittedRDB = c->transmittedRDB;
tRDBLock.unlock();
/* if this is a write to a replica, it's coming straight from the replication backlog */ /* if this is a write to a replica, it's coming straight from the replication backlog */
long long repl_backlog_idx = g_pserver->repl_backlog_idx; long long repl_backlog_idx = g_pserver->repl_backlog_idx;
@ -1747,7 +1754,7 @@ int writeToClient(client *c, int handler_installed) {
/* For replicas, we don't store all the information in the client buffer /* For replicas, we don't store all the information in the client buffer
* Most of the time (aside from immediately after synchronizing), we read * Most of the time (aside from immediately after synchronizing), we read
* from the replication backlog directly */ * from the replication backlog directly */
if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1 && c->transmittedRDB){ if (c->flags & CLIENT_SLAVE && c->repl_curr_idx != -1 && transmittedRDB){
/* copy global variables into local scope so if they change in between we don't care */ /* copy global variables into local scope so if they change in between we don't care */
long long repl_backlog_size = g_pserver->repl_backlog_size; long long repl_backlog_size = g_pserver->repl_backlog_size;
long long nwrittenPart2 = 0; long long nwrittenPart2 = 0;
@ -1874,6 +1881,7 @@ void ProcessPendingAsyncWrites()
serverAssert(c->fPendingAsyncWrite); serverAssert(c->fPendingAsyncWrite);
if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY)) if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY))
{ {
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
if (c->replyAsync != nullptr){ if (c->replyAsync != nullptr){
zfree(c->replyAsync); zfree(c->replyAsync);
c->replyAsync = nullptr; c->replyAsync = nullptr;
@ -1885,6 +1893,7 @@ void ProcessPendingAsyncWrites()
/* since writes from master to replica can come directly from the replication backlog, /* since writes from master to replica can come directly from the replication backlog,
* writes may have been signalled without having been copied to the replyAsync buffer, * writes may have been signalled without having been copied to the replyAsync buffer,
* thus causing the buffer to be NULL */ * thus causing the buffer to be NULL */
std::unique_lock<fastlock> tRDBLock (c->transmittedRDBLock);
if (c->replyAsync != nullptr){ if (c->replyAsync != nullptr){
int size = c->replyAsync->used; int size = c->replyAsync->used;
@ -1905,7 +1914,7 @@ void ProcessPendingAsyncWrites()
} }
c->fPendingAsyncWrite = FALSE; c->fPendingAsyncWrite = FALSE;
tRDBLock.unlock();
// Now install the write event handler // Now install the write event handler
int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE;
/* For the fsync=always policy, we want that a given FD is never /* For the fsync=always policy, we want that a given FD is never

View File

@ -441,6 +441,8 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
g_pserver->master_repl_offset += len; g_pserver->master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every /* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */ * iteration and rewind the "idx" index if we reach the limit. */
while(len) { while(len) {
@ -4659,11 +4661,14 @@ void flushReplBacklogToClients()
#ifdef BYPASS_BUFFER #ifdef BYPASS_BUFFER
/* If we are online and the RDB has been sent, there is no need to feed the client buffer {
* We will send our replies directly from the replication backlog instead */ /* If we are online and the RDB has been sent, there is no need to feed the client buffer
if (replica->replstate == SLAVE_STATE_ONLINE && replica->transmittedRDB){ * We will send our replies directly from the replication backlog instead */
setReplIdx(replica, g_pserver->repl_batch_idxStart, g_pserver->repl_batch_offStart); std::unique_lock<fastlock> tRDBLock (replica->transmittedRDBLock);
continue; if (replica->replstate == SLAVE_STATE_ONLINE && replica->transmittedRDB){
setReplIdx(replica, g_pserver->repl_batch_idxStart, g_pserver->repl_batch_offStart);
continue;
}
} }
#endif #endif
if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) {

View File

@ -1582,6 +1582,7 @@ struct client {
// post a function from a non-client thread to run on its client thread // post a function from a non-client thread to run on its client thread
bool postFunction(std::function<void(client *)> fn, bool fLock = true); bool postFunction(std::function<void(client *)> fn, bool fLock = true);
fastlock transmittedRDBLock {"transmittedRDB"};
size_t argv_len_sum() const; size_t argv_len_sum() const;
}; };
@ -2228,6 +2229,7 @@ struct redisServer {
that is the next byte will'll write to.*/ that is the next byte will'll write to.*/
long long repl_backlog_off; /* Replication "master offset" of first long long repl_backlog_off; /* Replication "master offset" of first
byte in the replication backlog buffer.*/ byte in the replication backlog buffer.*/
fastlock repl_backlog_lock {"replication backlog"};
time_t repl_backlog_time_limit; /* Time without slaves after the backlog time_t repl_backlog_time_limit; /* Time without slaves after the backlog
gets released. */ gets released. */
time_t repl_no_slaves_since; /* We have no slaves since that time. time_t repl_no_slaves_since; /* We have no slaves since that time.