From 28976a9003c6dd5cdd7225c5bc90743b4fcde13c Mon Sep 17 00:00:00 2001 From: Ping Xie Date: Mon, 4 Mar 2024 17:32:25 -0800 Subject: [PATCH] Fix PONG message processing for primary-ship tracking during failovers (#13055) This commit updates the processing of PONG gossip messages in the cluster. When a node (B) becomes a replica due to a failover, its PONG messages include its new primary node's (A) information and B's configuration epoch is aligned with A's. This allows observer nodes to identify changes in primary-ship, addressing issues of intermediate states and enhancing cluster state consistency during topology changes. Fix #13018 --- src/cluster_legacy.c | 62 +++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 61 insertions(+), 1 deletion(-) diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index bbaddf464..4e4f9425d 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -65,6 +65,7 @@ list *clusterGetNodesInMyShard(clusterNode *node); int clusterNodeAddSlave(clusterNode *master, clusterNode *slave); int clusterAddSlot(clusterNode *n, int slot); int clusterDelSlot(int slot); +int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node); int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); void clusterSetMaster(clusterNode *n); @@ -2990,7 +2991,50 @@ int clusterProcessPacket(clusterLink *link) { if (clusterNodeIsMaster(sender)) { /* Master turned into a slave! Reconfigure the node. */ - clusterDelNodeSlots(sender); + if (master && !memcmp(master->shard_id, sender->shard_id, CLUSTER_NAMELEN)) { + /* `sender` was a primary and was in the same shard as `master`, its new primary */ + if (sender->configEpoch > senderConfigEpoch) { + serverLog(LL_NOTICE, + "Ignore stale message from %.40s (%s) in shard %.40s;" + " gossip config epoch: %llu, current config epoch: %llu", + sender->name, + sender->human_nodename, + sender->shard_id, + (unsigned long long)senderConfigEpoch, + (unsigned long long)sender->configEpoch); + } else { + /* A failover occurred in the shard where `sender` belongs to and `sender` is no longer + * a primary. Update slot assignment to `master`, which is the new primary in the shard */ + int slots = clusterMoveNodeSlots(sender, master); + /* `master` is still a `slave` in this observer node's view; update its role and configEpoch */ + clusterSetNodeAsMaster(master); + master->configEpoch = senderConfigEpoch; + serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)" + " lost %d slot(s) to node %.40s (%s) with a config epoch of %llu", + sender->shard_id, + sender->name, + sender->human_nodename, + slots, + master->name, + master->human_nodename, + (unsigned long long) master->configEpoch); + } + } else { + /* `sender` was moved to another shard and has become a replica, remove its slot assignment */ + int slots = clusterDelNodeSlots(sender); + serverLog(LL_NOTICE, "Node %.40s (%s) is no longer master of shard %.40s;" + " removed all %d slot(s) it used to own", + sender->name, + sender->human_nodename, + sender->shard_id, + slots); + if (master != NULL) { + serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s", + sender->name, + sender->human_nodename, + master->shard_id); + } + } sender->flags &= ~(CLUSTER_NODE_MASTER| CLUSTER_NODE_MIGRATE_TO); sender->flags |= CLUSTER_NODE_SLAVE; @@ -4938,6 +4982,22 @@ int clusterDelSlot(int slot) { return C_OK; } +/* Transfer slots from `from_node` to `to_node`. + * Iterates over all cluster slots, transferring each slot covered by `from_node` to `to_node`. + * Counts and returns the number of slots transferred. */ +int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node) { + int processed = 0; + + for (int j = 0; j < CLUSTER_SLOTS; j++) { + if (clusterNodeCoversSlot(from_node, j)) { + clusterDelSlot(j); + clusterAddSlot(to_node, j); + processed++; + } + } + return processed; +} + /* Delete all the slots associated with the specified node. * The number of deleted slots is returned. */ int clusterDelNodeSlots(clusterNode *node) {