Cache fake client in replicaFeedSlaves
Former-commit-id: 8e81e5f29e718395b32a60ff263808305d0b5818
This commit is contained in:
parent
4ed8f38a13
commit
8fb3be9ce1
@ -340,6 +340,8 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
listIter li, liReply;
|
listIter li, liReply;
|
||||||
int j, len;
|
int j, len;
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
static client *fake = nullptr;
|
||||||
|
|
||||||
if (dictid < 0)
|
if (dictid < 0)
|
||||||
dictid = 0; // this can happen if we send a PING before any real operation
|
dictid = 0; // this can happen if we send a PING before any real operation
|
||||||
|
|
||||||
@ -357,8 +359,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
/* We can't have slaves attached and no backlog. */
|
/* We can't have slaves attached and no backlog. */
|
||||||
serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL));
|
serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL));
|
||||||
|
|
||||||
client *fake = createClient(-1, serverTL - g_pserver->rgthreadvar);
|
if (fake == nullptr)
|
||||||
fake->flags |= CLIENT_FORCE_REPLY;
|
{
|
||||||
|
fake = createClient(-1, serverTL - g_pserver->rgthreadvar);
|
||||||
|
fake->flags |= CLIENT_FORCE_REPLY;
|
||||||
|
}
|
||||||
|
|
||||||
bool fSendRaw = !g_pserver->fActiveReplica;
|
bool fSendRaw = !g_pserver->fActiveReplica;
|
||||||
replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below
|
replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below
|
||||||
|
|
||||||
@ -374,10 +380,6 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
serverAssert(argc > 0);
|
serverAssert(argc > 0);
|
||||||
serverAssert(cchbuf > 0);
|
serverAssert(cchbuf > 0);
|
||||||
|
|
||||||
char uuid[37];
|
|
||||||
uuid_unparse(cserver.uuid, uuid);
|
|
||||||
|
|
||||||
|
|
||||||
// The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
|
// The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf);
|
||||||
// but that was much too slow
|
// but that was much too slow
|
||||||
static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$";
|
static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$";
|
||||||
@ -385,8 +387,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
int cchProto = 0;
|
int cchProto = 0;
|
||||||
if (!fSendRaw)
|
if (!fSendRaw)
|
||||||
{
|
{
|
||||||
|
char uuid[37];
|
||||||
|
uuid_unparse(cserver.uuid, uuid);
|
||||||
|
|
||||||
cchProto = strlen(protoRREPLAY);
|
cchProto = strlen(protoRREPLAY);
|
||||||
memcpy(proto, protoRREPLAY, strlen(protoRREPLAY));
|
memcpy(proto, protoRREPLAY, strlen(protoRREPLAY));
|
||||||
memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want
|
memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want
|
||||||
cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf);
|
cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf);
|
||||||
memcpy(proto + cchProto, "\r\n", 3);
|
memcpy(proto + cchProto, "\r\n", 3);
|
||||||
@ -405,9 +410,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
int cchMvcc = 0;
|
int cchMvcc = 0;
|
||||||
incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication
|
incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication
|
||||||
if (!fSendRaw)
|
if (!fSendRaw)
|
||||||
{
|
|
||||||
cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp());
|
cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp());
|
||||||
}
|
|
||||||
|
|
||||||
/* Write the command to the replication backlog if any. */
|
/* Write the command to the replication backlog if any. */
|
||||||
if (g_pserver->repl_backlog)
|
if (g_pserver->repl_backlog)
|
||||||
@ -492,7 +495,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
freeClient(fake);
|
// Cleanup cached fake client output buffers
|
||||||
|
fake->bufpos = 0;
|
||||||
|
fake->sentlen = 0;
|
||||||
|
fake->reply_bytes = 0;
|
||||||
|
listEmpty(fake->reply);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function is used in order to proxy what we receive from our master
|
/* This function is used in order to proxy what we receive from our master
|
||||||
|
Loading…
x
Reference in New Issue
Block a user