diff --git a/src/expire.cpp b/src/expire.cpp index 28b42c6fa..e1d4cb29f 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -304,7 +304,7 @@ void pexpireMemberAtCommand(client *c) * executed, where the time limit is a percentage of the REDIS_HZ period * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ -void activeExpireCycle(int type) { +void activeExpireCycleCore(int type) { /* This function has some global state in order to continue the work * incrementally across calls. */ static unsigned int current_db = 0; /* Last DB tested. */ @@ -420,6 +420,11 @@ void activeExpireCycle(int type) { (g_pserver->stat_expired_stale_perc*0.95); } +void activeExpireCycle(int type) +{ + runAndPropogateToReplicas(activeExpireCycleCore, type); +} + /*----------------------------------------------------------------------------- * Expires of keys created in writable slaves * diff --git a/src/networking.cpp b/src/networking.cpp index 5241a3f70..58b97af6c 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1777,6 +1777,13 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { int processed = 0; serverAssert(iel == (serverTL - g_pserver->rgthreadvar)); + if (listLength(serverTL->clients_pending_asyncwrite)) + { + AeLocker locker; + locker.arm(nullptr); + ProcessPendingAsyncWrites(); + } + int ae_flags = AE_WRITABLE|AE_WRITE_THREADSAFE; /* For the fsync=always policy, we want that a given FD is never * served for reading and writing in the same event loop iteration, @@ -1825,13 +1832,6 @@ int handleClientsWithPendingWrites(int iel, int aof_state) { } } - if (listLength(serverTL->clients_pending_asyncwrite)) - { - AeLocker locker; - locker.arm(nullptr); - ProcessPendingAsyncWrites(); - } - return processed; } @@ -2187,8 +2187,7 @@ void commandProcessed(client *c, int flags) { if (applied) { if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE)) { - replicationFeedSlavesFromMasterStream(g_pserver->slaves, - c->pending_querybuf, applied); + replicationFeedSlavesFromMasterStream(c->pending_querybuf, applied); } sdsrange(c->pending_querybuf,applied,-1); } @@ -3338,6 +3337,13 @@ void processEventsWhileBlocked(int iel) { } } + /* Since we're about to release our lock we need to flush the repl backlog queue */ + bool fReplBacklog = g_pserver->repl_batch_offStart >= 0; + if (fReplBacklog) { + flushReplBacklogToClients(); + g_pserver->repl_batch_idxStart = -1; + g_pserver->repl_batch_offStart = -1; + } aeReleaseLock(); serverAssert(!GlobalLocksAcquired()); @@ -3370,6 +3376,12 @@ void processEventsWhileBlocked(int iel) { locker.arm(nullptr); locker.release(); + // Restore it so the calling code is not confused + if (fReplBacklog) { + g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + } + for (client *c : vecclients) c->lock.lock(); diff --git a/src/replication.cpp b/src/replication.cpp index 46ade807d..6c8e2868a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -196,6 +196,10 @@ void createReplicationBacklog(void) { * byte we have is the next byte that will be generated for the * replication stream. */ g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; + + /* Allow transmission to clients */ + g_pserver->repl_batch_idxStart = 0; + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; } /* This function is called when the user modifies the replication backlog @@ -209,20 +213,44 @@ void resizeReplicationBacklog(long long newsize) { newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; if (g_pserver->repl_backlog_size == newsize) return; - g_pserver->repl_backlog_size = newsize; if (g_pserver->repl_backlog != NULL) { /* What we actually do is to flush the old buffer and realloc a new * empty one. It will refill with new data incrementally. * The reason is that copying a few gigabytes adds latency and even * worse often we need to alloc additional space before freeing the * old buffer. */ - zfree(g_pserver->repl_backlog); - g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL); - g_pserver->repl_backlog_histlen = 0; - g_pserver->repl_backlog_idx = 0; - /* Next byte we have is... the next since the buffer is empty. */ - g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; + + if (g_pserver->repl_batch_idxStart >= 0) { + // We need to keep critical data so we can't shrink less than the hot data in the buffer + newsize = std::max(newsize, g_pserver->master_repl_offset - g_pserver->repl_batch_offStart); + char *backlog = (char*)zmalloc(newsize); + g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - g_pserver->repl_batch_offStart; + + if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { + auto cbActiveBacklog = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; + memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbActiveBacklog); + serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); + } else { + auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; + memcpy(backlog, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); + memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); + auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx; + serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); + } + zfree(g_pserver->repl_backlog); + g_pserver->repl_backlog = backlog; + g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; + g_pserver->repl_batch_idxStart = 0; + } else { + zfree(g_pserver->repl_backlog); + g_pserver->repl_backlog = (char*)zmalloc(newsize); + g_pserver->repl_backlog_histlen = 0; + g_pserver->repl_backlog_idx = 0; + /* Next byte we have is... the next since the buffer is empty. */ + g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; + } } + g_pserver->repl_backlog_size = newsize; } void freeReplicationBacklog(void) { @@ -247,6 +275,21 @@ void feedReplicationBacklog(const void *ptr, size_t len) { serverAssert(GlobalLocksAcquired()); const unsigned char *p = (const unsigned char*)ptr; + if (g_pserver->repl_batch_idxStart >= 0) { + long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; + if (minimumsize > g_pserver->repl_backlog_size) { + flushReplBacklogToClients(); + minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; + + if (minimumsize > g_pserver->repl_backlog_size) { + // This is an emergency overflow, we better resize to fit + long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); + serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize); + resizeReplicationBacklog(newsize); + } + } + } + g_pserver->master_repl_offset += len; /* This is a circular buffer, so write as much data we can at every @@ -370,12 +413,10 @@ static int writeProtoNum(char *dst, const size_t cchdst, long long num) * the commands received by our clients in order to create the replication * stream. Instead if the instance is a replica and has sub-slaves attached, * we use replicationFeedSlavesFromMaster() */ -void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { - listNode *ln, *lnReply; - listIter li, liReply; - int j, len; +void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) { + int j; serverAssert(GlobalLocksAcquired()); - static client *fake = nullptr; + serverAssert(g_pserver->repl_batch_offStart >= 0); if (dictid < 0) dictid = 0; // this can happen if we send a PING before any real operation @@ -394,58 +435,34 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL)); - if (fake == nullptr) - { - fake = createClient(nullptr, serverTL - g_pserver->rgthreadvar); - fake->flags |= CLIENT_FORCE_REPLY; - } - bool fSendRaw = !g_pserver->fActiveReplica; - replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below + /* Send SELECT command to every replica if needed. */ + if (g_pserver->replicaseldb != dictid) { + char llstr[LONG_STR_SIZE]; + robj *selectcmd; - long long cchbuf = fake->bufpos; - listRewind(fake->reply, &liReply); - while ((lnReply = listNext(&liReply))) - { - clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); - cchbuf += reply->used; + /* For a few DBs we have pre-computed SELECT command. */ + if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) { + selectcmd = shared.select[dictid]; + } else { + int dictid_len; + + dictid_len = ll2string(llstr,sizeof(llstr),dictid); + selectcmd = createObject(OBJ_STRING, + sdscatprintf(sdsempty(), + "*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n", + dictid_len, llstr)); + } + + /* Add the SELECT command into the backlog. */ + /* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */ + if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd); + + if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS) + decrRefCount(selectcmd); } - - serverAssert(argc > 0); - serverAssert(cchbuf > 0); - - // The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); - // but that was much too slow - static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$"; - char proto[1024]; - int cchProto = 0; - if (!fSendRaw) - { - char uuid[37]; - uuid_unparse(cserver.uuid, uuid); - - cchProto = strlen(protoRREPLAY); - memcpy(proto, protoRREPLAY, strlen(protoRREPLAY)); - memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want - cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf); - memcpy(proto + cchProto, "\r\n", 3); - cchProto += 2; - } - - long long master_repl_offset_start = g_pserver->master_repl_offset; - - char szDbNum[128]; - int cchDbNum = 0; - if (!fSendRaw) - cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid); - - - char szMvcc[128]; - int cchMvcc = 0; - incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication - if (!fSendRaw) - cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp()); + g_pserver->replicaseldb = dictid; /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) @@ -456,10 +473,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* Add the multi bulk reply length. */ aux[0] = '*'; - len = ll2string(aux+1,sizeof(aux)-1,argc); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBacklog(aux,len+3); + int multilen = ll2string(aux+1,sizeof(aux)-1,argc); + aux[multilen+1] = '\r'; + aux[multilen+2] = '\n'; + + feedReplicationBacklog(aux,multilen+3); for (j = 0; j < argc; j++) { long objlen = stringObjectLen(argv[j]); @@ -468,7 +486,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { * not just as a plain string, so create the $..CRLF payload len * and add the final CRLF */ aux[0] = '$'; - len = ll2string(aux+1,sizeof(aux)-1,objlen); + int len = ll2string(aux+1,sizeof(aux)-1,objlen); aux[len+1] = '\r'; aux[len+2] = '\n'; feedReplicationBacklog(aux,len+3); @@ -478,67 +496,57 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { } else { - feedReplicationBacklog(proto, cchProto); - feedReplicationBacklog(fake->buf, fake->bufpos); - listRewind(fake->reply, &liReply); - while ((lnReply = listNext(&liReply))) + char szDbNum[128]; + int cchDbNum = 0; + if (!fSendRaw) + cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid); + + + char szMvcc[128]; + int cchMvcc = 0; + incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication + if (!fSendRaw) + cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp()); + + //size_t cchlen = multilen+3; + struct redisCommand *cmd = lookupCommand(szFromObj(argv[0])); + sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc); + size_t cchlen = sdslen(buf); + + // The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); + // but that was much too slow + static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$"; + char proto[1024]; + int cchProto = 0; + if (!fSendRaw) { - clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); - feedReplicationBacklog(reply->buf(), reply->used); + char uuid[37]; + uuid_unparse(cserver.uuid, uuid); + + cchProto = strlen(protoRREPLAY); + memcpy(proto, protoRREPLAY, strlen(protoRREPLAY)); + memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want + cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchlen); + memcpy(proto + cchProto, "\r\n", 3); + cchProto += 2; } + + + feedReplicationBacklog(proto, cchProto); + feedReplicationBacklog(buf, sdslen(buf)); + const char *crlf = "\r\n"; feedReplicationBacklog(crlf, 2); feedReplicationBacklog(szDbNum, cchDbNum); feedReplicationBacklog(szMvcc, cchMvcc); + + sdsfree(buf); } } +} - /* Write the command to every replica. */ - listRewind(slaves,&li); - while((ln = listNext(&li))) { - client *replica = (client*)ln->value; - - /* Don't feed slaves that are still waiting for BGSAVE to start */ - if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - if (replica->flags & CLIENT_CLOSE_ASAP) continue; - std::unique_locklock)> lock(replica->lock, std::defer_lock); - // When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async() - if (FCorrectThread(replica)) - lock.lock(); - if (serverTL->current_client && FSameHost(serverTL->current_client, replica)) - { - replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start; - continue; - } - - /* Feed slaves that are waiting for the initial SYNC (so these commands - * are queued in the output buffer until the initial SYNC completes), - * or are already in sync with the master. */ - - if (!fSendRaw) - addReplyProto(replica, proto, cchProto); - - addReplyProto(replica,fake->buf,fake->bufpos); - listRewind(fake->reply, &liReply); - while ((lnReply = listNext(&liReply))) - { - clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); - addReplyProto(replica, reply->buf(), reply->used); - } - - if (!fSendRaw) - { - addReply(replica,shared.crlf); - addReplyProto(replica, szDbNum, cchDbNum); - addReplyProto(replica, szMvcc, cchMvcc); - } - } - - // Cleanup cached fake client output buffers - fake->bufpos = 0; - fake->sentlen = 0; - fake->reply_bytes = 0; - listEmpty(fake->reply); +void replicationFeedSlaves(list *replicas, int dictid, robj **argv, int argc) { + runAndPropogateToReplicas(replicationFeedSlavesCore, replicas, dictid, argv, argc); } /* This is a debugging function that gets called when we detect something @@ -578,10 +586,7 @@ void showLatestBacklog(void) { /* This function is used in order to proxy what we receive from our master * to our sub-slaves. */ #include -void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { - listNode *ln; - listIter li; - +void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) { /* Debugging: this is handy to see the stream sent from master * to slaves. Disabled with if(0). */ if (0) { @@ -593,23 +598,6 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle } if (g_pserver->repl_backlog) feedReplicationBacklog(buf,buflen); - listRewind(slaves,&li); - - while((ln = listNext(&li))) { - client *replica = (client*)ln->value; - std::unique_locklock)> ulock(replica->lock, std::defer_lock); - if (FCorrectThread(replica)) - ulock.lock(); - if (FMasterHost(replica)) - continue; // Active Active case, don't feed back - - /* Don't feed slaves that are still waiting for BGSAVE to start */ - if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - addReplyProto(replica,buf,buflen); - } - - if (listLength(slaves)) - ProcessPendingAsyncWrites(); // flush them to their respective threads } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { @@ -2269,6 +2257,8 @@ void readSyncBulkPayload(connection *conn) { * we are starting a new history. */ memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid)); g_pserver->master_repl_offset = mi->master->reploff; + if (g_pserver->repl_batch_offStart >= 0) + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; } clearReplicationId2(); @@ -2276,7 +2266,7 @@ void readSyncBulkPayload(connection *conn) { * accumulate the backlog regardless of the fact they have sub-slaves * or not, in order to behave correctly if they are promoted to * masters after a failover. */ - if (g_pserver->repl_backlog == NULL) createReplicationBacklog(); + if (g_pserver->repl_backlog == NULL) runAndPropogateToReplicas(createReplicationBacklog); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); if (cserver.supervised_mode == SUPERVISED_SYSTEMD) { @@ -2536,7 +2526,7 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read /* If this instance was restarted and we read the metadata to * PSYNC from the persistence file, our replication backlog could * be still not initialized. Create it. */ - if (g_pserver->repl_backlog == NULL) createReplicationBacklog(); + if (g_pserver->repl_backlog == NULL) runAndPropogateToReplicas(createReplicationBacklog); return PSYNC_CONTINUE; } @@ -4402,4 +4392,55 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long replicationFeedSlaves(g_pserver->slaves, db - g_pserver->db, argv, 4); sdsfree(szFromObj(&objTtl)); -} \ No newline at end of file +} + +void flushReplBacklogToClients() +{ + serverAssert(GlobalLocksAcquired()); + if (g_pserver->repl_batch_offStart < 0) + return; + + if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { + bool fAsyncWrite = false; + // Ensure no overflow + serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset); + serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); + serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx); + + listIter li; + listNode *ln; + listRewind(g_pserver->slaves, &li); + while ((ln = listNext(&li))) { + client *replica = (client*)listNodeValue(ln); + + if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (replica->flags & CLIENT_CLOSE_ASAP) continue; + + std::unique_lock ul(replica->lock, std::defer_lock); + if (FCorrectThread(replica)) + ul.lock(); + else + fAsyncWrite = true; + + if (g_pserver->repl_backlog_idx >= g_pserver->repl_batch_idxStart) { + long long cbCopy = g_pserver->repl_backlog_idx - g_pserver->repl_batch_idxStart; + serverAssert((g_pserver->master_repl_offset - g_pserver->repl_batch_offStart) == cbCopy); + serverAssert((g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart) >= (cbCopy)); + serverAssert((g_pserver->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size); + + addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); + } else { + auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; + addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); + addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); + serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart)); + } + } + if (fAsyncWrite) + ProcessPendingAsyncWrites(); + + // This may be called multiple times per "frame" so update with our progress flushing to clients + g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + } +} diff --git a/src/server.cpp b/src/server.cpp index cecf715c2..4f126f947 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2239,7 +2239,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { int iel = ielFromEventLoop(eventLoop); locker.arm(); - processClients(); + serverAssert(g_pserver->repl_batch_offStart < 0); + runAndPropogateToReplicas(processClients); /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); @@ -2325,6 +2326,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ + serverAssert(g_pserver->repl_batch_offStart < 0); locker.disarm(); if (!fSentReplies) handleClientsWithPendingWrites(iel, aof_state); @@ -3559,6 +3561,7 @@ void call(client *c, int flags) { /* We need to transfer async writes before a client's repl state gets changed. Otherwise we won't be able to propogate them correctly. */ if (c->cmd->flags & CMD_CATEGORY_REPLICATION) { + flushReplBacklogToClients(); ProcessPendingAsyncWrites(); } @@ -4013,8 +4016,18 @@ int processCommand(client *c, int callFlags) { queueMultiCommand(c); addReply(c,shared.queued); } else { + /* If the command was replication or admin related we *must* flush our buffers first. This is in case + something happens which would modify what we would send to replicas */ + + if (c->cmd->flags & (CMD_MODULE | CMD_ADMIN)) + flushReplBacklogToClients(); + call(c,callFlags); c->woff = g_pserver->master_repl_offset; + + if (c->cmd->flags & (CMD_MODULE | CMD_ADMIN)) + flushReplBacklogToClients(); + if (listLength(g_pserver->ready_keys)) handleClientsBlockedOnKeys(); } @@ -5312,6 +5325,8 @@ void loadDataFromDisk(void) { { memcpy(g_pserver->replid,rsi.repl_id,sizeof(g_pserver->replid)); g_pserver->master_repl_offset = rsi.repl_offset; + if (g_pserver->repl_batch_offStart >= 0) + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; listIter li; listNode *ln; @@ -5562,6 +5577,7 @@ int main(int argc, char **argv) { srand(time(NULL)^getpid()); gettimeofday(&tv,NULL); crc64_init(); + serverAssert(g_pserver->repl_batch_offStart < 0); uint8_t hashseed[16]; getRandomHexChars((char*)hashseed,sizeof(hashseed)); diff --git a/src/server.h b/src/server.h index 32581fb00..e9fe7a504 100644 --- a/src/server.h +++ b/src/server.h @@ -1827,6 +1827,10 @@ struct redisServer { char *bio_cpulist; /* cpu affinity list of bio thread. */ char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */ char *bgsave_cpulist; /* cpu affinity list of bgsave process. */ + + + long long repl_batch_offStart = -1; + long long repl_batch_idxStart = -1; }; typedef struct pubsubPattern { @@ -2186,7 +2190,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout); /* Replication */ void initMasterInfo(struct redisMaster *master); void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); -void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen); +void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen); void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); @@ -2884,6 +2888,28 @@ inline int FCorrectThread(client *c) } #define AssertCorrectThread(c) serverAssert(FCorrectThread(c)) +void flushReplBacklogToClients(); + +template +void runAndPropogateToReplicas(FN_PTR *pfn, TARGS... args) { + // Store the replication backlog starting params, we use this to know how much data was written. + // these are TLS in case we need to expand the buffer and therefore need to update them + bool fNestedProcess = (g_pserver->repl_batch_idxStart >= 0); + if (!fNestedProcess) { + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; + } + + pfn(args...); + + if (!fNestedProcess) { + flushReplBacklogToClients(); + g_pserver->repl_batch_offStart = -1; + g_pserver->repl_batch_idxStart = -1; + } +} + + /* TLS stuff */ void tlsInit(void); void tlsInitThread();