From 973b769b21290b90c4fc85ddc6be40f3b622e70d Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 23 Oct 2020 20:24:01 +0000 Subject: [PATCH] Improve replication performance in multithreaded scenarios Former-commit-id: 96e0b2a2b19df220975e61131cbc535b0c34a828 --- src/expire.cpp | 7 +- src/networking.cpp | 3 +- src/replication.cpp | 315 +++++++++++++++++++++++++------------------- src/server.cpp | 12 ++ src/server.h | 26 +++- 5 files changed, 221 insertions(+), 142 deletions(-) diff --git a/src/expire.cpp b/src/expire.cpp index 28b42c6fa..e1d4cb29f 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -304,7 +304,7 @@ void pexpireMemberAtCommand(client *c) * executed, where the time limit is a percentage of the REDIS_HZ period * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ -void activeExpireCycle(int type) { +void activeExpireCycleCore(int type) { /* This function has some global state in order to continue the work * incrementally across calls. */ static unsigned int current_db = 0; /* Last DB tested. */ @@ -420,6 +420,11 @@ void activeExpireCycle(int type) { (g_pserver->stat_expired_stale_perc*0.95); } +void activeExpireCycle(int type) +{ + runAndPropogateToReplicas(activeExpireCycleCore, type); +} + /*----------------------------------------------------------------------------- * Expires of keys created in writable slaves * diff --git a/src/networking.cpp b/src/networking.cpp index 465376eeb..1da41e566 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2187,8 +2187,7 @@ void commandProcessed(client *c, int flags) { if (applied) { if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE)) { - replicationFeedSlavesFromMasterStream(g_pserver->slaves, - c->pending_querybuf, applied); + replicationFeedSlavesFromMasterStream(c->pending_querybuf, applied); } sdsrange(c->pending_querybuf,applied,-1); } diff --git a/src/replication.cpp b/src/replication.cpp index b9de0bce5..45625d7ba 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -196,6 +196,10 @@ void createReplicationBacklog(void) { * byte we have is the next byte that will be generated for the * replication stream. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; + + /* Allow transmission to clients */ + serverTL->repl_batch_idxStart = 0; + serverTL->repl_batch_offStart = g_pserver->master_repl_offset; } /* This function is called when the user modifies the replication backlog @@ -209,20 +213,44 @@ void resizeReplicationBacklog(long long newsize) { newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (g_pserver->repl_backlog_size == newsize) return; - g_pserver->repl_backlog_size = newsize; if (g_pserver->repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. * The reason is that copying a few gigabytes adds latency and even * worse often we need to alloc additional space before freeing the * old buffer. */ - zfree(g_pserver->repl_backlog); - g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL); - g_pserver->repl_backlog_histlen = 0; - g_pserver->repl_backlog_idx = 0; - /* Next byte we have is... the next since the buffer is empty. */ - g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; + + if (serverTL->repl_batch_idxStart >= 0) { + // We need to keep critical data so we can't shrink less than the hot data in the buffer + newsize = std::max(newsize, g_pserver->master_repl_offset - serverTL->repl_batch_offStart); + char *backlog = (char*)zmalloc(newsize); + g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - serverTL->repl_batch_offStart; + + if (g_pserver->repl_backlog_idx >= serverTL->repl_batch_idxStart) { + auto cbActiveBacklog = g_pserver->repl_backlog_idx - serverTL->repl_batch_idxStart; + memcpy(backlog, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbActiveBacklog); + serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); + } else { + auto cbPhase1 = g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart; + memcpy(backlog, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbPhase1); + memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); + auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx; + serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); + } + zfree(g_pserver->repl_backlog); + g_pserver->repl_backlog = backlog; + g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; + serverTL->repl_batch_idxStart = 0; + } else { + zfree(g_pserver->repl_backlog); + g_pserver->repl_backlog = (char*)zmalloc(newsize); + g_pserver->repl_backlog_histlen = 0; + g_pserver->repl_backlog_idx = 0; + /* Next byte we have is... the next since the buffer is empty. */ + g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; + } } + g_pserver->repl_backlog_size = newsize; } void freeReplicationBacklog(void) { @@ -247,6 +275,21 @@ void feedReplicationBacklog(const void *ptr, size_t len) { serverAssert(GlobalLocksAcquired()); const unsigned char *p = (const unsigned char*)ptr; + if (serverTL->repl_batch_idxStart >= 0) { + long long minimumsize = g_pserver->master_repl_offset + len - serverTL->repl_batch_offStart+1; + if (minimumsize > g_pserver->repl_backlog_size) { + flushReplBacklogToClients(); + minimumsize = g_pserver->master_repl_offset + len - serverTL->repl_batch_offStart+1; + + if (minimumsize > g_pserver->repl_backlog_size) { + // This is an emergency overflow, we better resize to fit + long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); + serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize); + resizeReplicationBacklog(newsize); + } + } + } + g_pserver->master_repl_offset += len; /* This is a circular buffer, so write as much data we can at every @@ -370,12 +413,10 @@ static int writeProtoNum(char *dst, const size_t cchdst, long long num) * the commands received by our clients in order to create the replication * stream. Instead if the instance is a replica and has sub-slaves attached, * we use replicationFeedSlavesFromMaster() */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { - listNode *ln, *lnReply; - listIter li, liReply; - int j, len; +void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) { + int j; serverAssert(GlobalLocksAcquired()); - static client *fake = nullptr; + serverAssert(serverTL->repl_batch_offStart >= 0); if (dictid < 0) dictid = 0; // this can happen if we send a PING before any real operation @@ -394,58 +435,34 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL)); - if (fake == nullptr) - { - fake = createClient(nullptr, serverTL - g_pserver->rgthreadvar); - fake->flags |= CLIENT_FORCE_REPLY; - } - bool fSendRaw = !g_pserver->fActiveReplica; - replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below + /* Send SELECT command to every replica if needed. */ + if (g_pserver->replicaseldb != dictid) { + char llstr[LONG_STR_SIZE]; + robj *selectcmd; - long long cchbuf = fake->bufpos; - listRewind(fake->reply, &liReply); - while ((lnReply = listNext(&liReply))) - { - clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); - cchbuf += reply->used; + /* For a few DBs we have pre-computed SELECT command. */ + if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { + selectcmd = shared.select[dictid]; + } else { + int dictid_len; + + dictid_len = ll2string(llstr,sizeof(llstr),dictid); + selectcmd = createObject(OBJ_STRING, + sdscatprintf(sdsempty(), + "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", + dictid_len, llstr)); + } + + /* Add the SELECT command into the backlog. */ + /* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */ + if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd); + + if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) + decrRefCount(selectcmd); } - - serverAssert(argc > 0); - serverAssert(cchbuf > 0); - - // The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); - // but that was much too slow - static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$"; - char proto[1024]; - int cchProto = 0; - if (!fSendRaw) - { - char uuid[37]; - uuid_unparse(cserver.uuid, uuid); - - cchProto = strlen(protoRREPLAY); - memcpy(proto, protoRREPLAY, strlen(protoRREPLAY)); - memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want - cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf); - memcpy(proto + cchProto, "\r\n", 3); - cchProto += 2; - } - - long long master_repl_offset_start = g_pserver->master_repl_offset; - - char szDbNum[128]; - int cchDbNum = 0; - if (!fSendRaw) - cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid); - - - char szMvcc[128]; - int cchMvcc = 0; - incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication - if (!fSendRaw) - cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp()); + g_pserver->replicaseldb = dictid; /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) @@ -456,10 +473,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Add the multi bulk reply length. */ aux[0] = '*'; - len = ll2string(aux+1,sizeof(aux)-1,argc); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBacklog(aux,len+3); + int multilen = ll2string(aux+1,sizeof(aux)-1,argc); + aux[multilen+1] = '\r'; + aux[multilen+2] = '\n'; + + feedReplicationBacklog(aux,multilen+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); @@ -468,7 +486,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { * not just as a plain string, so create the $..CRLF payload len * and add the final CRLF */ aux[0] = '$'; - len = ll2string(aux+1,sizeof(aux)-1,objlen); + int len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); @@ -478,67 +496,57 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { } else { - feedReplicationBacklog(proto, cchProto); - feedReplicationBacklog(fake->buf, fake->bufpos); - listRewind(fake->reply, &liReply); - while ((lnReply = listNext(&liReply))) + char szDbNum[128]; + int cchDbNum = 0; + if (!fSendRaw) + cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid); + + + char szMvcc[128]; + int cchMvcc = 0; + incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication + if (!fSendRaw) + cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp()); + + //size_t cchlen = multilen+3; + struct redisCommand *cmd = lookupCommand(szFromObj(argv[0])); + sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc); + size_t cchlen = sdslen(buf); + + // The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); + // but that was much too slow + static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$"; + char proto[1024]; + int cchProto = 0; + if (!fSendRaw) { - clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); - feedReplicationBacklog(reply->buf(), reply->used); + char uuid[37]; + uuid_unparse(cserver.uuid, uuid); + + cchProto = strlen(protoRREPLAY); + memcpy(proto, protoRREPLAY, strlen(protoRREPLAY)); + memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want + cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchlen); + memcpy(proto + cchProto, "\r\n", 3); + cchProto += 2; } + + + feedReplicationBacklog(proto, cchProto); + feedReplicationBacklog(buf, sdslen(buf)); + const char *crlf = "\r\n"; feedReplicationBacklog(crlf, 2); feedReplicationBacklog(szDbNum, cchDbNum); feedReplicationBacklog(szMvcc, cchMvcc); + + sdsfree(buf); } } +} - /* Write the command to every replica. */ - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *replica = (client*)ln->value; - - /* Don't feed slaves that are still waiting for BGSAVE to start */ - if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - if (replica->flags & CLIENT_CLOSE_ASAP) continue; - std::unique_locklock)> lock(replica->lock, std::defer_lock); - // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() - if (FCorrectThread(replica)) - lock.lock(); - if (serverTL->current_client && FSameHost(serverTL->current_client, replica)) - { - replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start; - continue; - } - - /* Feed slaves that are waiting for the initial SYNC (so these commands - * are queued in the output buffer until the initial SYNC completes), - * or are already in sync with the master. */ - - if (!fSendRaw) - addReplyProto(replica, proto, cchProto); - - addReplyProto(replica,fake->buf,fake->bufpos); - listRewind(fake->reply, &liReply); - while ((lnReply = listNext(&liReply))) - { - clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); - addReplyProto(replica, reply->buf(), reply->used); - } - - if (!fSendRaw) - { - addReply(replica,shared.crlf); - addReplyProto(replica, szDbNum, cchDbNum); - addReplyProto(replica, szMvcc, cchMvcc); - } - } - - // Cleanup cached fake client output buffers - fake->bufpos = 0; - fake->sentlen = 0; - fake->reply_bytes = 0; - listEmpty(fake->reply); +void replicationFeedSlaves(list *replicas, int dictid, robj **argv, int argc) { + runAndPropogateToReplicas(replicationFeedSlavesCore, replicas, dictid, argv, argc); } /* This is a debugging function that gets called when we detect something @@ -578,10 +586,7 @@ void showLatestBacklog(void) { /* This function is used in order to proxy what we receive from our master * to our sub-slaves. */ #include -void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { - listNode *ln; - listIter li; - +void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) { /* Debugging: this is handy to see the stream sent from master * to slaves. Disabled with if(0). */ if (0) { @@ -593,23 +598,6 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle } if (g_pserver->repl_backlog) feedReplicationBacklog(buf,buflen); - listRewind(slaves,&li); - - while((ln = listNext(&li))) { - client *replica = (client*)ln->value; - std::unique_locklock)> ulock(replica->lock, std::defer_lock); - if (FCorrectThread(replica)) - ulock.lock(); - if (FMasterHost(replica)) - continue; // Active Active case, don't feed back - - /* Don't feed slaves that are still waiting for BGSAVE to start */ - if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - addReplyProto(replica,buf,buflen); - } - - if (listLength(slaves)) - ProcessPendingAsyncWrites(); // flush them to their respective threads } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { @@ -2269,6 +2257,8 @@ void readSyncBulkPayload(connection *conn) { * we are starting a new history. */ memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid)); g_pserver->master_repl_offset = mi->master->reploff; + if (serverTL->repl_batch_offStart >= 0) + serverTL->repl_batch_offStart = g_pserver->master_repl_offset; } clearReplicationId2(); @@ -4366,3 +4356,52 @@ static void propagateMasterStaleKeys() decrRefCount(rgobj[0]); } + +void flushReplBacklogToClients() +{ + serverAssert(GlobalLocksAcquired()); + + if (serverTL->repl_batch_offStart != g_pserver->master_repl_offset) { + bool fAsyncWrite = false; + // Ensure no overflow + serverAssert(serverTL->repl_batch_offStart < g_pserver->master_repl_offset); + serverAssert(g_pserver->master_repl_offset - serverTL->repl_batch_offStart <= g_pserver->repl_backlog_size); + serverAssert(serverTL->repl_batch_idxStart != g_pserver->repl_backlog_idx); + + listIter li; + listNode *ln; + listRewind(g_pserver->slaves, &li); + while ((ln = listNext(&li))) { + client *replica = (client*)listNodeValue(ln); + + if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (replica->flags & CLIENT_CLOSE_ASAP) continue; + + std::unique_lock ul(replica->lock, std::defer_lock); + if (FCorrectThread(replica)) + ul.lock(); + else + fAsyncWrite = true; + + if (g_pserver->repl_backlog_idx >= serverTL->repl_batch_idxStart) { + long long cbCopy = g_pserver->repl_backlog_idx - serverTL->repl_batch_idxStart; + serverAssert((g_pserver->master_repl_offset - serverTL->repl_batch_offStart) == cbCopy); + serverAssert((g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart) >= (cbCopy)); + serverAssert((serverTL->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size); + + addReplyProto(replica, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbCopy); + } else { + auto cbPhase1 = g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart; + addReplyProto(replica, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbPhase1); + addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); + serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - serverTL->repl_batch_offStart)); + } + } + if (fAsyncWrite) + ProcessPendingAsyncWrites(); + + // This may be called multiple times per "frame" so update with our progress flushing to clients + serverTL->repl_batch_idxStart = g_pserver->repl_backlog_idx; + serverTL->repl_batch_offStart = g_pserver->master_repl_offset; + } +} \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index 9d7dc2ef3..9380943d3 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4007,8 +4007,18 @@ int processCommand(client *c, int callFlags) { queueMultiCommand(c); addReply(c,shared.queued); } else { + /* If the command was replication or admin related we *must* flush our buffers first. This is in case + something happens which would modify what we would send to replicas */ + + if (c->cmd->flags & (CMD_MODULE | CMD_ADMIN)) + flushReplBacklogToClients(); + call(c,callFlags); c->woff = g_pserver->master_repl_offset; + + if (c->cmd->flags & (CMD_MODULE | CMD_ADMIN)) + flushReplBacklogToClients(); + if (listLength(g_pserver->ready_keys)) handleClientsBlockedOnKeys(); } @@ -5306,6 +5316,8 @@ void loadDataFromDisk(void) { { memcpy(g_pserver->replid,rsi.repl_id,sizeof(g_pserver->replid)); g_pserver->master_repl_offset = rsi.repl_offset; + if (serverTL->repl_batch_offStart >= 0) + serverTL->repl_batch_offStart = g_pserver->master_repl_offset; listIter li; listNode *ln; diff --git a/src/server.h b/src/server.h index d8285e5f4..02400ef7f 100644 --- a/src/server.h +++ b/src/server.h @@ -1371,6 +1371,8 @@ struct redisServerThreadVars { char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; bool fRetrySetAofEvent = false; + long long repl_batch_offStart = -1; + long long repl_batch_idxStart = -1; std::vector vecclientsProcess; }; @@ -2185,7 +2187,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ void initMasterInfo(struct redisMaster *master); void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); -void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen); +void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen); void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); @@ -2879,6 +2881,28 @@ inline int FCorrectThread(client *c) } #define AssertCorrectThread(c) serverAssert(FCorrectThread(c)) +void flushReplBacklogToClients(); + +template +void runAndPropogateToReplicas(FN_PTR *pfn, TARGS... args) { + // Store the replication backlog starting params, we use this to know how much data was written. + // these are TLS in case we need to expand the buffer and therefore need to update them + bool fNestedProcess = (serverTL->repl_batch_idxStart >= 0); + if (!fNestedProcess) { + serverTL->repl_batch_offStart = g_pserver->master_repl_offset; + serverTL->repl_batch_idxStart = g_pserver->repl_backlog_idx; + } + + pfn(args...); + + if (!fNestedProcess) { + flushReplBacklogToClients(); + serverTL->repl_batch_offStart = -1; + serverTL->repl_batch_idxStart = -1; + } +} + + /* TLS stuff */ void tlsInit(void); void tlsInitThread();