From d414b5ee60f507d1b84816a1706bfeaede003a9c Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 26 May 2021 17:29:47 +0000 Subject: [PATCH] Ensure multi-thread handling of cached master is equivalent to single thread case Former-commit-id: 7ee766aee13f501923afc127a74c79b29c0b0aa2 --- src/replication.cpp | 60 ++++++++++++++++++++++----------------------- 1 file changed, 29 insertions(+), 31 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index ac412368b..109af53e5 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1889,6 +1889,11 @@ void replicationCreateMasterClient(redisMaster *mi, connection *conn, int dbid) void replicationCreateCachedMasterClone(redisMaster *mi) { serverAssert(mi->master != nullptr); serverLog(LL_NOTICE, "Creating cache clone of our master"); + if ((mi->master->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) { + freeClientAsync(mi->master); + mi->master = nullptr; + return; + } client *c = createClient(nullptr, ielFromEventLoop(serverTL->el)); c->flags |= mi->master->flags; @@ -1897,6 +1902,12 @@ void replicationCreateCachedMasterClone(redisMaster *mi) { c->read_reploff = mi->master->read_reploff; c->user = mi->master->user; + c->replstate = mi->master->replstate; + c->master_error = mi->master->master_error; + c->psync_initial_offset = mi->master->psync_initial_offset; + c->repldboff = mi->master->repldboff; + c->repldbsize = mi->master->repldbsize; + memcpy(c->uuid, mi->master->uuid, UUID_BINARY_LEN); memcpy(c->replid, mi->master->replid, sizeof(mi->master->replid)); @@ -3166,6 +3177,21 @@ int cancelReplicationHandshake(redisMaster *mi, int reconnect) { return 1; } +void disconnectMaster(redisMaster *mi) +{ + if (mi->master) { + if (FCorrectThread(mi->master)) { + // This will cache the master and do all that fancy stuff + if (!freeClient(mi->master) && mi->master) + replicationCreateCachedMasterClone(mi); + } else { + // We're not on the same thread so we can't use the freeClient method, instead we have to clone the master + // and cache that clone + replicationCreateCachedMasterClone(mi); + } + } +} + /* Set replication to the specified master address and port. */ struct redisMaster *replicationAddMaster(char *ip, int port) { // pre-reqs: We must not already have a replica in the list with the same tuple @@ -3195,17 +3221,7 @@ struct redisMaster *replicationAddMaster(char *ip, int port) { sdsfree(mi->masterhost); mi->masterhost = nullptr; - if (mi->master) { - if (FCorrectThread(mi->master)) { - // This will cache the master and do all that fancy stuff - if (!freeClient(mi->master) && mi->master) - replicationCreateCachedMasterClone(mi); - } else { - // We're not on the same thread so we can't use the freeClient method, instead we have to clone the master - // and cache that clone - replicationCreateCachedMasterClone(mi); - } - } + disconnectMaster(mi); serverAssert(mi->master == nullptr); if (!g_pserver->fActiveReplica) disconnectAllBlockedClients(); /* Clients blocked in master, now replica. */ @@ -3280,17 +3296,7 @@ void replicationUnsetMaster(redisMaster *mi) { * replicationHandleMasterDisconnection which can attempt to re-connect. */ sdsfree(mi->masterhost); mi->masterhost = NULL; - if (mi->master) { - if (FCorrectThread(mi->master)) { - // This will cache the master and do all that fancy stuff - if (!freeClient(mi->master) && mi->master) - replicationCreateCachedMasterClone(mi); - } else { - // We're not on the same thread so we can't use the freeClient method, instead we have to clone the master - // and cache that clone - replicationCreateCachedMasterClone(mi); - } - } + disconnectMaster(mi); replicationDiscardCachedMaster(mi); cancelReplicationHandshake(mi,false); /* When a slave is turned into a master, the current replication ID @@ -4038,15 +4044,7 @@ void replicationCron(void) { (time(NULL)-mi->master->lastinteraction) > g_pserver->repl_timeout) { serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); - if (FCorrectThread(mi->master)) { - // This will cache the master and do all that fancy stuff - if (!freeClient(mi->master) && mi->master) - replicationCreateCachedMasterClone(mi); - } else { - // We're not on the same thread so we can't use the freeClient method, instead we have to clone the master - // and cache that clone - replicationCreateCachedMasterClone(mi); - } + disconnectMaster(mi); } /* Check if we should connect to a MASTER */