Fix PONG message processing for primary-ship tracking during failovers (#13055)

This commit updates the processing of PONG gossip messages in the
cluster. When a node (B) becomes a replica due to a failover, its PONG
messages include its new primary node's (A) information and B's
configuration epoch is aligned with A's. This allows observer nodes to
identify changes in primary-ship, addressing issues of intermediate
states and enhancing cluster state consistency during topology changes.

Fix #13018
This commit is contained in:
Ping Xie 2024-03-04 17:32:25 -08:00 committed by GitHub
parent ad12730333
commit 28976a9003
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -65,6 +65,7 @@ list *clusterGetNodesInMyShard(clusterNode *node);
int clusterNodeAddSlave(clusterNode *master, clusterNode *slave);
int clusterAddSlot(clusterNode *n, int slot);
int clusterDelSlot(int slot);
int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node);
int clusterDelNodeSlots(clusterNode *node);
int clusterNodeSetSlotBit(clusterNode *n, int slot);
void clusterSetMaster(clusterNode *n);
@ -2990,7 +2991,50 @@ int clusterProcessPacket(clusterLink *link) {
if (clusterNodeIsMaster(sender)) {
/* Master turned into a slave! Reconfigure the node. */
clusterDelNodeSlots(sender);
if (master && !memcmp(master->shard_id, sender->shard_id, CLUSTER_NAMELEN)) {
/* `sender` was a primary and was in the same shard as `master`, its new primary */
if (sender->configEpoch > senderConfigEpoch) {
serverLog(LL_NOTICE,
"Ignore stale message from %.40s (%s) in shard %.40s;"
" gossip config epoch: %llu, current config epoch: %llu",
sender->name,
sender->human_nodename,
sender->shard_id,
(unsigned long long)senderConfigEpoch,
(unsigned long long)sender->configEpoch);
} else {
/* A failover occurred in the shard where `sender` belongs to and `sender` is no longer
* a primary. Update slot assignment to `master`, which is the new primary in the shard */
int slots = clusterMoveNodeSlots(sender, master);
/* `master` is still a `slave` in this observer node's view; update its role and configEpoch */
clusterSetNodeAsMaster(master);
master->configEpoch = senderConfigEpoch;
serverLog(LL_NOTICE, "A failover occurred in shard %.40s; node %.40s (%s)"
" lost %d slot(s) to node %.40s (%s) with a config epoch of %llu",
sender->shard_id,
sender->name,
sender->human_nodename,
slots,
master->name,
master->human_nodename,
(unsigned long long) master->configEpoch);
}
} else {
/* `sender` was moved to another shard and has become a replica, remove its slot assignment */
int slots = clusterDelNodeSlots(sender);
serverLog(LL_NOTICE, "Node %.40s (%s) is no longer master of shard %.40s;"
" removed all %d slot(s) it used to own",
sender->name,
sender->human_nodename,
sender->shard_id,
slots);
if (master != NULL) {
serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s",
sender->name,
sender->human_nodename,
master->shard_id);
}
}
sender->flags &= ~(CLUSTER_NODE_MASTER|
CLUSTER_NODE_MIGRATE_TO);
sender->flags |= CLUSTER_NODE_SLAVE;
@ -4938,6 +4982,22 @@ int clusterDelSlot(int slot) {
return C_OK;
}
/* Transfer slots from `from_node` to `to_node`.
* Iterates over all cluster slots, transferring each slot covered by `from_node` to `to_node`.
* Counts and returns the number of slots transferred. */
int clusterMoveNodeSlots(clusterNode *from_node, clusterNode *to_node) {
int processed = 0;
for (int j = 0; j < CLUSTER_SLOTS; j++) {
if (clusterNodeCoversSlot(from_node, j)) {
clusterDelSlot(j);
clusterAddSlot(to_node, j);
processed++;
}
}
return processed;
}
/* Delete all the slots associated with the specified node.
* The number of deleted slots is returned. */
int clusterDelNodeSlots(clusterNode *node) {