Fix: Active replication falls behind under high load

Former-commit-id: 3d0de946bb73a34f96cd773e85c102c2743e6480
This commit is contained in:
John Sully 2019-05-18 19:52:43 -04:00
parent 6205dee978
commit 1d0036882c

View File

@ -229,7 +229,7 @@ void feedReplicationBacklogWithObject(robj *o) {
feedReplicationBacklog(p,len); feedReplicationBacklog(p,len);
} }
void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc) void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc, bool fSendRaw)
{ {
char llstr[LONG_STR_SIZE]; char llstr[LONG_STR_SIZE];
std::unique_lock<decltype(slave->lock)> lock(slave->lock); std::unique_lock<decltype(slave->lock)> lock(slave->lock);
@ -252,7 +252,8 @@ void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc)
} }
/* Add the SELECT command into the backlog. */ /* Add the SELECT command into the backlog. */
if (g_pserver->repl_backlog) feedReplicationBacklogWithObject(selectcmd); /* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */
if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd);
/* Send it to slaves */ /* Send it to slaves */
addReply(slave,selectcmd); addReply(slave,selectcmd);
@ -302,34 +303,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
client *fake = createClient(-1, serverTL - g_pserver->rgthreadvar); client *fake = createClient(-1, serverTL - g_pserver->rgthreadvar);
fake->flags |= CLIENT_FORCE_REPLY; fake->flags |= CLIENT_FORCE_REPLY;
replicationFeedSlave(fake, dictid, argv, argc); // Note: updates the repl log, keep above the repl update code below bool fSendRaw = !g_pserver->fActiveReplica;
replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below
/* Write the command to the replication backlog if any. */
if (g_pserver->repl_backlog) {
char aux[LONG_STR_SIZE+3];
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
long long cchbuf = fake->bufpos; long long cchbuf = fake->bufpos;
listRewind(fake->reply, &liReply); listRewind(fake->reply, &liReply);
@ -339,8 +315,6 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
cchbuf += reply->used; cchbuf += reply->used;
} }
bool fSendRaw = !g_pserver->fActiveReplica;
serverAssert(argc > 0); serverAssert(argc > 0);
serverAssert(cchbuf > 0); serverAssert(cchbuf > 0);
@ -348,7 +322,51 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
uuid_unparse(cserver.uuid, uuid); uuid_unparse(cserver.uuid, uuid);
char proto[1024]; char proto[1024];
int cchProto = snprintf(proto, sizeof(proto), "*3\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); int cchProto = snprintf(proto, sizeof(proto), "*3\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
cchProto = std::min((int)sizeof(proto), cchProto); cchProto = std::min((int)sizeof(proto), cchProto);
/* Write the command to the replication backlog if any. */
if (g_pserver->repl_backlog)
{
if (fSendRaw)
{
char aux[LONG_STR_SIZE+3];
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
else
{
feedReplicationBacklog(proto, cchProto);
feedReplicationBacklog(fake->buf, fake->bufpos);
listRewind(fake->reply, &liReply);
while ((lnReply = listNext(&liReply)))
{
clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply);
feedReplicationBacklog(reply->buf(), reply->used);
}
const char *crlf = "\r\n";
feedReplicationBacklog(crlf, 2);
}
}
/* Write the command to every slave. */ /* Write the command to every slave. */
listRewind(slaves,&li); listRewind(slaves,&li);
@ -3235,6 +3253,7 @@ void replicaReplayCommand(client *c)
return; return;
// OK We've recieved a command lets execute // OK We've recieved a command lets execute
client *current_clientSave = serverTL->current_client;
client *cFake = createClient(-1, c->iel); client *cFake = createClient(-1, c->iel);
cFake->lock.lock(); cFake->lock.lock();
cFake->authenticated = c->authenticated; cFake->authenticated = c->authenticated;
@ -3245,6 +3264,7 @@ void replicaReplayCommand(client *c)
cFake->lock.unlock(); cFake->lock.unlock();
addReply(c, shared.ok); addReply(c, shared.ok);
freeClient(cFake); freeClient(cFake);
serverTL->current_client = current_clientSave;
// call() will not propogate this for us, so we do so here // call() will not propogate this for us, so we do so here
if (!s_pstate->FCancelled() && s_pstate->FFirst()) if (!s_pstate->FCancelled() && s_pstate->FFirst())