diff --git a/src/replication.cpp b/src/replication.cpp index eef679ea7..f37473fc5 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -229,7 +229,7 @@ void feedReplicationBacklogWithObject(robj *o) { feedReplicationBacklog(p,len); } -void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc) +void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc, bool fSendRaw) { char llstr[LONG_STR_SIZE]; std::unique_locklock)> lock(slave->lock); @@ -252,7 +252,8 @@ void replicationFeedSlave(client *slave, int dictid, robj **argv, int argc) } /* Add the SELECT command into the backlog. */ - if (g_pserver->repl_backlog) feedReplicationBacklogWithObject(selectcmd); + /* We don't do this for advanced replication because this will be done later when it adds the whole RREPLAY command */ + if (g_pserver->repl_backlog && fSendRaw) feedReplicationBacklogWithObject(selectcmd); /* Send it to slaves */ addReply(slave,selectcmd); @@ -302,34 +303,9 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { client *fake = createClient(-1, serverTL - g_pserver->rgthreadvar); fake->flags |= CLIENT_FORCE_REPLY; - replicationFeedSlave(fake, dictid, argv, argc); // Note: updates the repl log, keep above the repl update code below + bool fSendRaw = !g_pserver->fActiveReplica; + replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below - /* Write the command to the replication backlog if any. */ - if (g_pserver->repl_backlog) { - char aux[LONG_STR_SIZE+3]; - - /* Add the multi bulk reply length. */ - aux[0] = '*'; - len = ll2string(aux+1,sizeof(aux)-1,argc); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBacklog(aux,len+3); - - for (j = 0; j < argc; j++) { - long objlen = stringObjectLen(argv[j]); - - /* We need to feed the buffer with the object as a bulk reply - * not just as a plain string, so create the $..CRLF payload len - * and add the final CRLF */ - aux[0] = '$'; - len = ll2string(aux+1,sizeof(aux)-1,objlen); - aux[len+1] = '\r'; - aux[len+2] = '\n'; - feedReplicationBacklog(aux,len+3); - feedReplicationBacklogWithObject(argv[j]); - feedReplicationBacklog(aux+len+1,2); - } - } long long cchbuf = fake->bufpos; listRewind(fake->reply, &liReply); @@ -339,8 +315,6 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { cchbuf += reply->used; } - bool fSendRaw = !g_pserver->fActiveReplica; - serverAssert(argc > 0); serverAssert(cchbuf > 0); @@ -348,7 +322,51 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { uuid_unparse(cserver.uuid, uuid); char proto[1024]; int cchProto = snprintf(proto, sizeof(proto), "*3\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); - cchProto = std::min((int)sizeof(proto), cchProto); + cchProto = std::min((int)sizeof(proto), cchProto); + + /* Write the command to the replication backlog if any. */ + if (g_pserver->repl_backlog) + { + if (fSendRaw) + { + char aux[LONG_STR_SIZE+3]; + + /* Add the multi bulk reply length. */ + aux[0] = '*'; + len = ll2string(aux+1,sizeof(aux)-1,argc); + aux[len+1] = '\r'; + aux[len+2] = '\n'; + feedReplicationBacklog(aux,len+3); + + for (j = 0; j < argc; j++) { + long objlen = stringObjectLen(argv[j]); + + /* We need to feed the buffer with the object as a bulk reply + * not just as a plain string, so create the $..CRLF payload len + * and add the final CRLF */ + aux[0] = '$'; + len = ll2string(aux+1,sizeof(aux)-1,objlen); + aux[len+1] = '\r'; + aux[len+2] = '\n'; + feedReplicationBacklog(aux,len+3); + feedReplicationBacklogWithObject(argv[j]); + feedReplicationBacklog(aux+len+1,2); + } + } + else + { + feedReplicationBacklog(proto, cchProto); + feedReplicationBacklog(fake->buf, fake->bufpos); + listRewind(fake->reply, &liReply); + while ((lnReply = listNext(&liReply))) + { + clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); + feedReplicationBacklog(reply->buf(), reply->used); + } + const char *crlf = "\r\n"; + feedReplicationBacklog(crlf, 2); + } + } /* Write the command to every slave. */ listRewind(slaves,&li); @@ -3235,6 +3253,7 @@ void replicaReplayCommand(client *c) return; // OK We've recieved a command lets execute + client *current_clientSave = serverTL->current_client; client *cFake = createClient(-1, c->iel); cFake->lock.lock(); cFake->authenticated = c->authenticated; @@ -3245,6 +3264,7 @@ void replicaReplayCommand(client *c) cFake->lock.unlock(); addReply(c, shared.ok); freeClient(cFake); + serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here if (!s_pstate->FCancelled() && s_pstate->FFirst())