Ensure multi-thread handling of cached master is equivalent to single thread case

Former-commit-id: 7ee766aee13f501923afc127a74c79b29c0b0aa2
This commit is contained in:
John Sully 2021-05-26 17:29:47 +00:00
parent c091050d4a
commit d414b5ee60

View File

@ -1889,6 +1889,11 @@ void replicationCreateMasterClient(redisMaster *mi, connection *conn, int dbid)
void replicationCreateCachedMasterClone(redisMaster *mi) { void replicationCreateCachedMasterClone(redisMaster *mi) {
serverAssert(mi->master != nullptr); serverAssert(mi->master != nullptr);
serverLog(LL_NOTICE, "Creating cache clone of our master"); serverLog(LL_NOTICE, "Creating cache clone of our master");
if ((mi->master->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) {
freeClientAsync(mi->master);
mi->master = nullptr;
return;
}
client *c = createClient(nullptr, ielFromEventLoop(serverTL->el)); client *c = createClient(nullptr, ielFromEventLoop(serverTL->el));
c->flags |= mi->master->flags; c->flags |= mi->master->flags;
@ -1897,6 +1902,12 @@ void replicationCreateCachedMasterClone(redisMaster *mi) {
c->read_reploff = mi->master->read_reploff; c->read_reploff = mi->master->read_reploff;
c->user = mi->master->user; c->user = mi->master->user;
c->replstate = mi->master->replstate;
c->master_error = mi->master->master_error;
c->psync_initial_offset = mi->master->psync_initial_offset;
c->repldboff = mi->master->repldboff;
c->repldbsize = mi->master->repldbsize;
memcpy(c->uuid, mi->master->uuid, UUID_BINARY_LEN); memcpy(c->uuid, mi->master->uuid, UUID_BINARY_LEN);
memcpy(c->replid, mi->master->replid, memcpy(c->replid, mi->master->replid,
sizeof(mi->master->replid)); sizeof(mi->master->replid));
@ -3166,6 +3177,21 @@ int cancelReplicationHandshake(redisMaster *mi, int reconnect) {
return 1; return 1;
} }
void disconnectMaster(redisMaster *mi)
{
if (mi->master) {
if (FCorrectThread(mi->master)) {
// This will cache the master and do all that fancy stuff
if (!freeClient(mi->master) && mi->master)
replicationCreateCachedMasterClone(mi);
} else {
// We're not on the same thread so we can't use the freeClient method, instead we have to clone the master
// and cache that clone
replicationCreateCachedMasterClone(mi);
}
}
}
/* Set replication to the specified master address and port. */ /* Set replication to the specified master address and port. */
struct redisMaster *replicationAddMaster(char *ip, int port) { struct redisMaster *replicationAddMaster(char *ip, int port) {
// pre-reqs: We must not already have a replica in the list with the same tuple // pre-reqs: We must not already have a replica in the list with the same tuple
@ -3195,17 +3221,7 @@ struct redisMaster *replicationAddMaster(char *ip, int port) {
sdsfree(mi->masterhost); sdsfree(mi->masterhost);
mi->masterhost = nullptr; mi->masterhost = nullptr;
if (mi->master) { disconnectMaster(mi);
if (FCorrectThread(mi->master)) {
// This will cache the master and do all that fancy stuff
if (!freeClient(mi->master) && mi->master)
replicationCreateCachedMasterClone(mi);
} else {
// We're not on the same thread so we can't use the freeClient method, instead we have to clone the master
// and cache that clone
replicationCreateCachedMasterClone(mi);
}
}
serverAssert(mi->master == nullptr); serverAssert(mi->master == nullptr);
if (!g_pserver->fActiveReplica) if (!g_pserver->fActiveReplica)
disconnectAllBlockedClients(); /* Clients blocked in master, now replica. */ disconnectAllBlockedClients(); /* Clients blocked in master, now replica. */
@ -3280,17 +3296,7 @@ void replicationUnsetMaster(redisMaster *mi) {
* replicationHandleMasterDisconnection which can attempt to re-connect. */ * replicationHandleMasterDisconnection which can attempt to re-connect. */
sdsfree(mi->masterhost); sdsfree(mi->masterhost);
mi->masterhost = NULL; mi->masterhost = NULL;
if (mi->master) { disconnectMaster(mi);
if (FCorrectThread(mi->master)) {
// This will cache the master and do all that fancy stuff
if (!freeClient(mi->master) && mi->master)
replicationCreateCachedMasterClone(mi);
} else {
// We're not on the same thread so we can't use the freeClient method, instead we have to clone the master
// and cache that clone
replicationCreateCachedMasterClone(mi);
}
}
replicationDiscardCachedMaster(mi); replicationDiscardCachedMaster(mi);
cancelReplicationHandshake(mi,false); cancelReplicationHandshake(mi,false);
/* When a slave is turned into a master, the current replication ID /* When a slave is turned into a master, the current replication ID
@ -4038,15 +4044,7 @@ void replicationCron(void) {
(time(NULL)-mi->master->lastinteraction) > g_pserver->repl_timeout) (time(NULL)-mi->master->lastinteraction) > g_pserver->repl_timeout)
{ {
serverLog(LL_WARNING,"MASTER timeout: no data nor PING received..."); serverLog(LL_WARNING,"MASTER timeout: no data nor PING received...");
if (FCorrectThread(mi->master)) { disconnectMaster(mi);
// This will cache the master and do all that fancy stuff
if (!freeClient(mi->master) && mi->master)
replicationCreateCachedMasterClone(mi);
} else {
// We're not on the same thread so we can't use the freeClient method, instead we have to clone the master
// and cache that clone
replicationCreateCachedMasterClone(mi);
}
} }
/* Check if we should connect to a MASTER */ /* Check if we should connect to a MASTER */