Improve replication performance in multithreaded scenarios

Former-commit-id: 96e0b2a2b19df220975e61131cbc535b0c34a828
This commit is contained in:
John Sully 2020-10-23 20:24:01 +00:00
parent 104164aa14
commit 973b769b21
5 changed files with 221 additions and 142 deletions

View File

@ -304,7 +304,7 @@ void pexpireMemberAtCommand(client *c)
* executed, where the time limit is a percentage of the REDIS_HZ period * executed, where the time limit is a percentage of the REDIS_HZ period
* as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */ * as specified by the ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC define. */
void activeExpireCycle(int type) { void activeExpireCycleCore(int type) {
/* This function has some global state in order to continue the work /* This function has some global state in order to continue the work
* incrementally across calls. */ * incrementally across calls. */
static unsigned int current_db = 0; /* Last DB tested. */ static unsigned int current_db = 0; /* Last DB tested. */
@ -420,6 +420,11 @@ void activeExpireCycle(int type) {
(g_pserver->stat_expired_stale_perc*0.95); (g_pserver->stat_expired_stale_perc*0.95);
} }
void activeExpireCycle(int type)
{
runAndPropogateToReplicas(activeExpireCycleCore, type);
}
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* Expires of keys created in writable slaves * Expires of keys created in writable slaves
* *

View File

@ -2187,8 +2187,7 @@ void commandProcessed(client *c, int flags) {
if (applied) { if (applied) {
if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE)) if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE))
{ {
replicationFeedSlavesFromMasterStream(g_pserver->slaves, replicationFeedSlavesFromMasterStream(c->pending_querybuf, applied);
c->pending_querybuf, applied);
} }
sdsrange(c->pending_querybuf,applied,-1); sdsrange(c->pending_querybuf,applied,-1);
} }

View File

@ -196,6 +196,10 @@ void createReplicationBacklog(void) {
* byte we have is the next byte that will be generated for the * byte we have is the next byte that will be generated for the
* replication stream. */ * replication stream. */
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1;
/* Allow transmission to clients */
serverTL->repl_batch_idxStart = 0;
serverTL->repl_batch_offStart = g_pserver->master_repl_offset;
} }
/* This function is called when the user modifies the replication backlog /* This function is called when the user modifies the replication backlog
@ -209,20 +213,44 @@ void resizeReplicationBacklog(long long newsize) {
newsize = CONFIG_REPL_BACKLOG_MIN_SIZE; newsize = CONFIG_REPL_BACKLOG_MIN_SIZE;
if (g_pserver->repl_backlog_size == newsize) return; if (g_pserver->repl_backlog_size == newsize) return;
g_pserver->repl_backlog_size = newsize;
if (g_pserver->repl_backlog != NULL) { if (g_pserver->repl_backlog != NULL) {
/* What we actually do is to flush the old buffer and realloc a new /* What we actually do is to flush the old buffer and realloc a new
* empty one. It will refill with new data incrementally. * empty one. It will refill with new data incrementally.
* The reason is that copying a few gigabytes adds latency and even * The reason is that copying a few gigabytes adds latency and even
* worse often we need to alloc additional space before freeing the * worse often we need to alloc additional space before freeing the
* old buffer. */ * old buffer. */
zfree(g_pserver->repl_backlog);
g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL); if (serverTL->repl_batch_idxStart >= 0) {
g_pserver->repl_backlog_histlen = 0; // We need to keep critical data so we can't shrink less than the hot data in the buffer
g_pserver->repl_backlog_idx = 0; newsize = std::max(newsize, g_pserver->master_repl_offset - serverTL->repl_batch_offStart);
/* Next byte we have is... the next since the buffer is empty. */ char *backlog = (char*)zmalloc(newsize);
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1; g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - serverTL->repl_batch_offStart;
if (g_pserver->repl_backlog_idx >= serverTL->repl_batch_idxStart) {
auto cbActiveBacklog = g_pserver->repl_backlog_idx - serverTL->repl_batch_idxStart;
memcpy(backlog, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbActiveBacklog);
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
} else {
auto cbPhase1 = g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart;
memcpy(backlog, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbPhase1);
memcpy(backlog + cbPhase1, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx;
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
}
zfree(g_pserver->repl_backlog);
g_pserver->repl_backlog = backlog;
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
serverTL->repl_batch_idxStart = 0;
} else {
zfree(g_pserver->repl_backlog);
g_pserver->repl_backlog = (char*)zmalloc(newsize);
g_pserver->repl_backlog_histlen = 0;
g_pserver->repl_backlog_idx = 0;
/* Next byte we have is... the next since the buffer is empty. */
g_pserver->repl_backlog_off = g_pserver->master_repl_offset+1;
}
} }
g_pserver->repl_backlog_size = newsize;
} }
void freeReplicationBacklog(void) { void freeReplicationBacklog(void) {
@ -247,6 +275,21 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
const unsigned char *p = (const unsigned char*)ptr; const unsigned char *p = (const unsigned char*)ptr;
if (serverTL->repl_batch_idxStart >= 0) {
long long minimumsize = g_pserver->master_repl_offset + len - serverTL->repl_batch_offStart+1;
if (minimumsize > g_pserver->repl_backlog_size) {
flushReplBacklogToClients();
minimumsize = g_pserver->master_repl_offset + len - serverTL->repl_batch_offStart+1;
if (minimumsize > g_pserver->repl_backlog_size) {
// This is an emergency overflow, we better resize to fit
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize);
resizeReplicationBacklog(newsize);
}
}
}
g_pserver->master_repl_offset += len; g_pserver->master_repl_offset += len;
/* This is a circular buffer, so write as much data we can at every /* This is a circular buffer, so write as much data we can at every
@ -370,12 +413,10 @@ static int writeProtoNum(char *dst, const size_t cchdst, long long num)
* the commands received by our clients in order to create the replication * the commands received by our clients in order to create the replication
* stream. Instead if the instance is a replica and has sub-slaves attached, * stream. Instead if the instance is a replica and has sub-slaves attached,
* we use replicationFeedSlavesFromMaster() */ * we use replicationFeedSlavesFromMaster() */
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { void replicationFeedSlavesCore(list *slaves, int dictid, robj **argv, int argc) {
listNode *ln, *lnReply; int j;
listIter li, liReply;
int j, len;
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
static client *fake = nullptr; serverAssert(serverTL->repl_batch_offStart >= 0);
if (dictid < 0) if (dictid < 0)
dictid = 0; // this can happen if we send a PING before any real operation dictid = 0; // this can happen if we send a PING before any real operation
@ -394,58 +435,34 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* We can't have slaves attached and no backlog. */ /* We can't have slaves attached and no backlog. */
serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL)); serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL));
if (fake == nullptr)
{
fake = createClient(nullptr, serverTL - g_pserver->rgthreadvar);
fake->flags |= CLIENT_FORCE_REPLY;
}
bool fSendRaw = !g_pserver->fActiveReplica; bool fSendRaw = !g_pserver->fActiveReplica;
replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below
/* Send SELECT command to every replica if needed. */
if (g_pserver->replicaseldb != dictid) {
char llstr[LONG_STR_SIZE];
robj *selectcmd;
long long cchbuf = fake->bufpos; /* For a few DBs we have pre-computed SELECT command. */
listRewind(fake->reply, &liReply); if (dictid >= 0 && dictid < PROTO_SHARED_SELECT_CMDS) {
while ((lnReply = listNext(&liReply))) selectcmd = shared.select[dictid];
{ } else {
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); int dictid_len;
cchbuf += reply->used;
dictid_len = ll2string(llstr,sizeof(llstr),dictid);
selectcmd = createObject(OBJ_STRING,
sdscatprintf(sdsempty(),
"*2\r\n$6\r\nSELECT\r\n$%d\r\n%s\r\n",
dictid_len, llstr));
}
/* Add the SELECT command into the backlog. */
/* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */
if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd);
if (dictid < 0 || dictid >= PROTO_SHARED_SELECT_CMDS)
decrRefCount(selectcmd);
} }
g_pserver->replicaseldb = dictid;
serverAssert(argc > 0);
serverAssert(cchbuf > 0);
// The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
// but that was much too slow
static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$";
char proto[1024];
int cchProto = 0;
if (!fSendRaw)
{
char uuid[37];
uuid_unparse(cserver.uuid, uuid);
cchProto = strlen(protoRREPLAY);
memcpy(proto, protoRREPLAY, strlen(protoRREPLAY));
memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want
cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf);
memcpy(proto + cchProto, "\r\n", 3);
cchProto += 2;
}
long long master_repl_offset_start = g_pserver->master_repl_offset;
char szDbNum[128];
int cchDbNum = 0;
if (!fSendRaw)
cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid);
char szMvcc[128];
int cchMvcc = 0;
incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication
if (!fSendRaw)
cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp());
/* Write the command to the replication backlog if any. */ /* Write the command to the replication backlog if any. */
if (g_pserver->repl_backlog) if (g_pserver->repl_backlog)
@ -456,10 +473,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Add the multi bulk reply length. */ /* Add the multi bulk reply length. */
aux[0] = '*'; aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc); int multilen = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r'; aux[multilen+1] = '\r';
aux[len+2] = '\n'; aux[multilen+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklog(aux,multilen+3);
for (j = 0; j < argc; j++) { for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]); long objlen = stringObjectLen(argv[j]);
@ -468,7 +486,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
* not just as a plain string, so create the $..CRLF payload len * not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */ * and add the final CRLF */
aux[0] = '$'; aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen); int len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r'; aux[len+1] = '\r';
aux[len+2] = '\n'; aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3); feedReplicationBacklog(aux,len+3);
@ -478,67 +496,57 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
} }
else else
{ {
feedReplicationBacklog(proto, cchProto); char szDbNum[128];
feedReplicationBacklog(fake->buf, fake->bufpos); int cchDbNum = 0;
listRewind(fake->reply, &liReply); if (!fSendRaw)
while ((lnReply = listNext(&liReply))) cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid);
char szMvcc[128];
int cchMvcc = 0;
incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication
if (!fSendRaw)
cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp());
//size_t cchlen = multilen+3;
struct redisCommand *cmd = lookupCommand(szFromObj(argv[0]));
sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc);
size_t cchlen = sdslen(buf);
// The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
// but that was much too slow
static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$";
char proto[1024];
int cchProto = 0;
if (!fSendRaw)
{ {
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); char uuid[37];
feedReplicationBacklog(reply->buf(), reply->used); uuid_unparse(cserver.uuid, uuid);
cchProto = strlen(protoRREPLAY);
memcpy(proto, protoRREPLAY, strlen(protoRREPLAY));
memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want
cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchlen);
memcpy(proto + cchProto, "\r\n", 3);
cchProto += 2;
} }
feedReplicationBacklog(proto, cchProto);
feedReplicationBacklog(buf, sdslen(buf));
const char *crlf = "\r\n"; const char *crlf = "\r\n";
feedReplicationBacklog(crlf, 2); feedReplicationBacklog(crlf, 2);
feedReplicationBacklog(szDbNum, cchDbNum); feedReplicationBacklog(szDbNum, cchDbNum);
feedReplicationBacklog(szMvcc, cchMvcc); feedReplicationBacklog(szMvcc, cchMvcc);
sdsfree(buf);
} }
} }
}
/* Write the command to every replica. */ void replicationFeedSlaves(list *replicas, int dictid, robj **argv, int argc) {
listRewind(slaves,&li); runAndPropogateToReplicas(replicationFeedSlavesCore, replicas, dictid, argv, argc);
while((ln = listNext(&li))) {
client *replica = (client*)ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
std::unique_lock<decltype(replica->lock)> lock(replica->lock, std::defer_lock);
// When writing to clients on other threads the global lock is sufficient provided we only use AddReply*Async()
if (FCorrectThread(replica))
lock.lock();
if (serverTL->current_client && FSameHost(serverTL->current_client, replica))
{
replica->reploff_skipped += g_pserver->master_repl_offset - master_repl_offset_start;
continue;
}
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
if (!fSendRaw)
addReplyProto(replica, proto, cchProto);
addReplyProto(replica,fake->buf,fake->bufpos);
listRewind(fake->reply, &liReply);
while ((lnReply = listNext(&liReply)))
{
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
addReplyProto(replica, reply->buf(), reply->used);
}
if (!fSendRaw)
{
addReply(replica,shared.crlf);
addReplyProto(replica, szDbNum, cchDbNum);
addReplyProto(replica, szMvcc, cchMvcc);
}
}
// Cleanup cached fake client output buffers
fake->bufpos = 0;
fake->sentlen = 0;
fake->reply_bytes = 0;
listEmpty(fake->reply);
} }
/* This is a debugging function that gets called when we detect something /* This is a debugging function that gets called when we detect something
@ -578,10 +586,7 @@ void showLatestBacklog(void) {
/* This function is used in order to proxy what we receive from our master /* This function is used in order to proxy what we receive from our master
* to our sub-slaves. */ * to our sub-slaves. */
#include <ctype.h> #include <ctype.h>
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen) { void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen) {
listNode *ln;
listIter li;
/* Debugging: this is handy to see the stream sent from master /* Debugging: this is handy to see the stream sent from master
* to slaves. Disabled with if(0). */ * to slaves. Disabled with if(0). */
if (0) { if (0) {
@ -593,23 +598,6 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
} }
if (g_pserver->repl_backlog) feedReplicationBacklog(buf,buflen); if (g_pserver->repl_backlog) feedReplicationBacklog(buf,buflen);
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *replica = (client*)ln->value;
std::unique_lock<decltype(replica->lock)> ulock(replica->lock, std::defer_lock);
if (FCorrectThread(replica))
ulock.lock();
if (FMasterHost(replica))
continue; // Active Active case, don't feed back
/* Don't feed slaves that are still waiting for BGSAVE to start */
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReplyProto(replica,buf,buflen);
}
if (listLength(slaves))
ProcessPendingAsyncWrites(); // flush them to their respective threads
} }
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) {
@ -2269,6 +2257,8 @@ void readSyncBulkPayload(connection *conn) {
* we are starting a new history. */ * we are starting a new history. */
memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid)); memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid));
g_pserver->master_repl_offset = mi->master->reploff; g_pserver->master_repl_offset = mi->master->reploff;
if (serverTL->repl_batch_offStart >= 0)
serverTL->repl_batch_offStart = g_pserver->master_repl_offset;
} }
clearReplicationId2(); clearReplicationId2();
@ -4366,3 +4356,52 @@ static void propagateMasterStaleKeys()
decrRefCount(rgobj[0]); decrRefCount(rgobj[0]);
} }
void flushReplBacklogToClients()
{
serverAssert(GlobalLocksAcquired());
if (serverTL->repl_batch_offStart != g_pserver->master_repl_offset) {
bool fAsyncWrite = false;
// Ensure no overflow
serverAssert(serverTL->repl_batch_offStart < g_pserver->master_repl_offset);
serverAssert(g_pserver->master_repl_offset - serverTL->repl_batch_offStart <= g_pserver->repl_backlog_size);
serverAssert(serverTL->repl_batch_idxStart != g_pserver->repl_backlog_idx);
listIter li;
listNode *ln;
listRewind(g_pserver->slaves, &li);
while ((ln = listNext(&li))) {
client *replica = (client*)listNodeValue(ln);
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
std::unique_lock<fastlock> ul(replica->lock, std::defer_lock);
if (FCorrectThread(replica))
ul.lock();
else
fAsyncWrite = true;
if (g_pserver->repl_backlog_idx >= serverTL->repl_batch_idxStart) {
long long cbCopy = g_pserver->repl_backlog_idx - serverTL->repl_batch_idxStart;
serverAssert((g_pserver->master_repl_offset - serverTL->repl_batch_offStart) == cbCopy);
serverAssert((g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart) >= (cbCopy));
serverAssert((serverTL->repl_batch_idxStart + cbCopy) <= g_pserver->repl_backlog_size);
addReplyProto(replica, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbCopy);
} else {
auto cbPhase1 = g_pserver->repl_backlog_size - serverTL->repl_batch_idxStart;
addReplyProto(replica, g_pserver->repl_backlog + serverTL->repl_batch_idxStart, cbPhase1);
addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - serverTL->repl_batch_offStart));
}
}
if (fAsyncWrite)
ProcessPendingAsyncWrites();
// This may be called multiple times per "frame" so update with our progress flushing to clients
serverTL->repl_batch_idxStart = g_pserver->repl_backlog_idx;
serverTL->repl_batch_offStart = g_pserver->master_repl_offset;
}
}

View File

@ -4007,8 +4007,18 @@ int processCommand(client *c, int callFlags) {
queueMultiCommand(c); queueMultiCommand(c);
addReply(c,shared.queued); addReply(c,shared.queued);
} else { } else {
/* If the command was replication or admin related we *must* flush our buffers first. This is in case
something happens which would modify what we would send to replicas */
if (c->cmd->flags & (CMD_MODULE | CMD_ADMIN))
flushReplBacklogToClients();
call(c,callFlags); call(c,callFlags);
c->woff = g_pserver->master_repl_offset; c->woff = g_pserver->master_repl_offset;
if (c->cmd->flags & (CMD_MODULE | CMD_ADMIN))
flushReplBacklogToClients();
if (listLength(g_pserver->ready_keys)) if (listLength(g_pserver->ready_keys))
handleClientsBlockedOnKeys(); handleClientsBlockedOnKeys();
} }
@ -5306,6 +5316,8 @@ void loadDataFromDisk(void) {
{ {
memcpy(g_pserver->replid,rsi.repl_id,sizeof(g_pserver->replid)); memcpy(g_pserver->replid,rsi.repl_id,sizeof(g_pserver->replid));
g_pserver->master_repl_offset = rsi.repl_offset; g_pserver->master_repl_offset = rsi.repl_offset;
if (serverTL->repl_batch_offStart >= 0)
serverTL->repl_batch_offStart = g_pserver->master_repl_offset;
listIter li; listIter li;
listNode *ln; listNode *ln;

View File

@ -1371,6 +1371,8 @@ struct redisServerThreadVars {
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
long unsigned commandsExecuted = 0; long unsigned commandsExecuted = 0;
bool fRetrySetAofEvent = false; bool fRetrySetAofEvent = false;
long long repl_batch_offStart = -1;
long long repl_batch_idxStart = -1;
std::vector<client*> vecclientsProcess; std::vector<client*> vecclientsProcess;
}; };
@ -2185,7 +2187,7 @@ ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */ /* Replication */
void initMasterInfo(struct redisMaster *master); void initMasterInfo(struct redisMaster *master);
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc); void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc);
void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t buflen); void replicationFeedSlavesFromMasterStream(char *buf, size_t buflen);
void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc);
void updateSlavesWaitingBgsave(int bgsaveerr, int type); void updateSlavesWaitingBgsave(int bgsaveerr, int type);
void replicationCron(void); void replicationCron(void);
@ -2879,6 +2881,28 @@ inline int FCorrectThread(client *c)
} }
#define AssertCorrectThread(c) serverAssert(FCorrectThread(c)) #define AssertCorrectThread(c) serverAssert(FCorrectThread(c))
void flushReplBacklogToClients();
template<typename FN_PTR, class ...TARGS>
void runAndPropogateToReplicas(FN_PTR *pfn, TARGS... args) {
// Store the replication backlog starting params, we use this to know how much data was written.
// these are TLS in case we need to expand the buffer and therefore need to update them
bool fNestedProcess = (serverTL->repl_batch_idxStart >= 0);
if (!fNestedProcess) {
serverTL->repl_batch_offStart = g_pserver->master_repl_offset;
serverTL->repl_batch_idxStart = g_pserver->repl_backlog_idx;
}
pfn(args...);
if (!fNestedProcess) {
flushReplBacklogToClients();
serverTL->repl_batch_offStart = -1;
serverTL->repl_batch_idxStart = -1;
}
}
/* TLS stuff */ /* TLS stuff */
void tlsInit(void); void tlsInit(void);
void tlsInitThread(); void tlsInitThread();