Ensure only primary sender drives slot ownership updates (#754)
Fixes a regression introduced in PR #445, which allowed a message from a replica to update the slot ownership of its primary. The regression results in a `replicaof` cycle, causing server crashes due to the cycle detection assert. The fix restores the previous behavior where only primary senders can trigger `clusterUpdateSlotsConfigWith`. Additional changes: * Handling of primaries without slots is obsoleted by new handling of when a sender that was a replica announces that it is now a primary. * Replication loop detection code is unchanged but shifted downwards. * Some variables are renamed for better readability and some are introduced to avoid repeated memcmp() calls. Fixes #753. --------- Signed-off-by: Ping Xie <pingxie@google.com>
This commit is contained in:
parent
1a8bd045f3
commit
66d0f7d9a1
@ -2927,8 +2927,11 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
mstime_t now = mstime();
|
mstime_t now = mstime();
|
||||||
|
|
||||||
uint16_t flags = ntohs(hdr->flags);
|
uint16_t flags = ntohs(hdr->flags);
|
||||||
uint64_t senderCurrentEpoch = 0, senderConfigEpoch = 0;
|
uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0;
|
||||||
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
|
clusterNode *sender = getNodeFromLinkAndMsg(link, hdr);
|
||||||
|
int sender_claims_to_be_primary = !memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, CLUSTER_NAMELEN);
|
||||||
|
int sender_last_reported_as_replica = sender && nodeIsReplica(sender);
|
||||||
|
int sender_last_reported_as_primary = sender && nodeIsPrimary(sender);
|
||||||
|
|
||||||
if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) {
|
if (sender && (hdr->mflags[0] & CLUSTERMSG_FLAG0_EXT_DATA)) {
|
||||||
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
|
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
|
||||||
@ -2942,13 +2945,13 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
|
|
||||||
if (sender && !nodeInHandshake(sender)) {
|
if (sender && !nodeInHandshake(sender)) {
|
||||||
/* Update our currentEpoch if we see a newer epoch in the cluster. */
|
/* Update our currentEpoch if we see a newer epoch in the cluster. */
|
||||||
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
|
sender_claimed_current_epoch = ntohu64(hdr->currentEpoch);
|
||||||
senderConfigEpoch = ntohu64(hdr->configEpoch);
|
sender_claimed_config_epoch = ntohu64(hdr->configEpoch);
|
||||||
if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch;
|
if (sender_claimed_current_epoch > server.cluster->currentEpoch)
|
||||||
|
server.cluster->currentEpoch = sender_claimed_current_epoch;
|
||||||
/* Update the sender configEpoch if it is a primary publishing a newer one. */
|
/* Update the sender configEpoch if it is a primary publishing a newer one. */
|
||||||
if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) &&
|
if (sender_claims_to_be_primary && sender_claimed_config_epoch > sender->configEpoch) {
|
||||||
senderConfigEpoch > sender->configEpoch) {
|
sender->configEpoch = sender_claimed_config_epoch;
|
||||||
sender->configEpoch = senderConfigEpoch;
|
|
||||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG);
|
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_FSYNC_CONFIG);
|
||||||
}
|
}
|
||||||
/* Update the replication offset info for this node. */
|
/* Update the replication offset info for this node. */
|
||||||
@ -3110,36 +3113,36 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
/* Check for role switch: replica -> primary or primary -> replica. */
|
/* Check for role switch: replica -> primary or primary -> replica. */
|
||||||
if (sender) {
|
if (sender) {
|
||||||
serverLog(LL_DEBUG, "node %.40s (%s) announces that it is a %s in shard %.40s", sender->name,
|
serverLog(LL_DEBUG, "node %.40s (%s) announces that it is a %s in shard %.40s", sender->name,
|
||||||
sender->human_nodename,
|
sender->human_nodename, sender_claims_to_be_primary ? "primary" : "replica", sender->shard_id);
|
||||||
!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof)) ? "primary" : "replica",
|
if (sender_claims_to_be_primary) {
|
||||||
sender->shard_id);
|
|
||||||
if (!memcmp(hdr->replicaof, CLUSTER_NODE_NULL_NAME, sizeof(hdr->replicaof))) {
|
|
||||||
/* Node is a primary. */
|
/* Node is a primary. */
|
||||||
clusterSetNodeAsPrimary(sender);
|
clusterSetNodeAsPrimary(sender);
|
||||||
} else {
|
} else {
|
||||||
/* Node is a replica. */
|
/* Node is a replica. */
|
||||||
clusterNode *primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN);
|
clusterNode *sender_claimed_primary = clusterLookupNode(hdr->replicaof, CLUSTER_NAMELEN);
|
||||||
|
|
||||||
if (clusterNodeIsPrimary(sender)) {
|
if (sender_last_reported_as_primary) {
|
||||||
/* Primary turned into a replica! Reconfigure the node. */
|
/* Primary turned into a replica! Reconfigure the node. */
|
||||||
if (primary && areInSameShard(primary, sender)) {
|
if (sender_claimed_primary && areInSameShard(sender_claimed_primary, sender)) {
|
||||||
/* `sender` was a primary and was in the same shard as its new primary */
|
/* `sender` was a primary and was in the same shard as its new primary */
|
||||||
if (sender->configEpoch > senderConfigEpoch) {
|
if (sender->configEpoch > sender_claimed_config_epoch) {
|
||||||
serverLog(LL_NOTICE,
|
serverLog(LL_NOTICE,
|
||||||
"Ignore stale message from %.40s (%s) in shard %.40s;"
|
"Ignore stale message from %.40s (%s) in shard %.40s;"
|
||||||
" gossip config epoch: %llu, current config epoch: %llu",
|
" gossip config epoch: %llu, current config epoch: %llu",
|
||||||
sender->name, sender->human_nodename, sender->shard_id,
|
sender->name, sender->human_nodename, sender->shard_id,
|
||||||
(unsigned long long)senderConfigEpoch, (unsigned long long)sender->configEpoch);
|
(unsigned long long)sender_claimed_config_epoch,
|
||||||
|
(unsigned long long)sender->configEpoch);
|
||||||
} else {
|
} else {
|
||||||
/* `primary` is still a `replica` in this observer node's view;
|
/* `primary` is still a `replica` in this observer node's view;
|
||||||
* update its role and configEpoch */
|
* update its role and configEpoch */
|
||||||
clusterSetNodeAsPrimary(primary);
|
clusterSetNodeAsPrimary(sender_claimed_primary);
|
||||||
primary->configEpoch = senderConfigEpoch;
|
sender_claimed_primary->configEpoch = sender_claimed_config_epoch;
|
||||||
serverLog(LL_NOTICE,
|
serverLog(LL_NOTICE,
|
||||||
"A failover occurred in shard %.40s; node %.40s (%s)"
|
"A failover occurred in shard %.40s; node %.40s (%s)"
|
||||||
" failed over to node %.40s (%s) with a config epoch of %llu",
|
" failed over to node %.40s (%s) with a config epoch of %llu",
|
||||||
sender->shard_id, sender->name, sender->human_nodename, primary->name,
|
sender->shard_id, sender->name, sender->human_nodename,
|
||||||
primary->human_nodename, (unsigned long long)primary->configEpoch);
|
sender_claimed_primary->name, sender_claimed_primary->human_nodename,
|
||||||
|
(unsigned long long)sender_claimed_primary->configEpoch);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/* `sender` was moved to another shard and has become a replica, remove its slot assignment */
|
/* `sender` was moved to another shard and has become a replica, remove its slot assignment */
|
||||||
@ -3148,9 +3151,9 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
"Node %.40s (%s) is no longer primary of shard %.40s;"
|
"Node %.40s (%s) is no longer primary of shard %.40s;"
|
||||||
" removed all %d slot(s) it used to own",
|
" removed all %d slot(s) it used to own",
|
||||||
sender->name, sender->human_nodename, sender->shard_id, slots);
|
sender->name, sender->human_nodename, sender->shard_id, slots);
|
||||||
if (primary != NULL) {
|
if (sender_claimed_primary != NULL) {
|
||||||
serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s", sender->name,
|
serverLog(LL_NOTICE, "Node %.40s (%s) is now part of shard %.40s", sender->name,
|
||||||
sender->human_nodename, primary->shard_id);
|
sender->human_nodename, sender_claimed_primary->shard_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3162,17 +3165,17 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Primary node changed for this replica? */
|
/* Primary node changed for this replica? */
|
||||||
if (primary && sender->replicaof != primary) {
|
if (sender_claimed_primary && sender->replicaof != sender_claimed_primary) {
|
||||||
if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender);
|
if (sender->replicaof) clusterNodeRemoveReplica(sender->replicaof, sender);
|
||||||
serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s",
|
serverLog(LL_NOTICE, "Node %.40s (%s) is now a replica of node %.40s (%s) in shard %.40s",
|
||||||
sender->name, sender->human_nodename, primary->name, primary->human_nodename,
|
sender->name, sender->human_nodename, sender_claimed_primary->name,
|
||||||
sender->shard_id);
|
sender_claimed_primary->human_nodename, sender->shard_id);
|
||||||
clusterNodeAddReplica(primary, sender);
|
clusterNodeAddReplica(sender_claimed_primary, sender);
|
||||||
sender->replicaof = primary;
|
sender->replicaof = sender_claimed_primary;
|
||||||
|
|
||||||
/* Update the shard_id when a replica is connected to its
|
/* Update the shard_id when a replica is connected to its
|
||||||
* primary in the very first time. */
|
* primary in the very first time. */
|
||||||
updateShardId(sender, primary->shard_id);
|
updateShardId(sender, sender_claimed_primary->shard_id);
|
||||||
|
|
||||||
/* Update config. */
|
/* Update config. */
|
||||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG);
|
||||||
@ -3187,66 +3190,41 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
|
|
||||||
/* Many checks are only needed if the set of served slots this
|
/* Many checks are only needed if the set of served slots this
|
||||||
* instance claims is different compared to the set of slots we have
|
* instance claims is different compared to the set of slots we have
|
||||||
* for it. Check this ASAP to avoid other computational expansive
|
* for it or if there was a failover in the sender's shard. Check
|
||||||
* checks later. */
|
* this ASAP to avoid other computational expensive checks later.*/
|
||||||
clusterNode *sender_primary = NULL; /* Sender or its primary if replica. */
|
|
||||||
int dirty_slots = 0; /* Sender claimed slots don't match my view? */
|
|
||||||
|
|
||||||
if (sender) {
|
if (sender && sender_claims_to_be_primary &&
|
||||||
sender_primary = clusterNodeIsPrimary(sender) ? sender : sender->replicaof;
|
(sender_last_reported_as_replica || memcmp(sender->slots, hdr->myslots, sizeof(hdr->myslots)))) {
|
||||||
if (sender_primary) {
|
/* Make sure CLUSTER_NODE_PRIMARY has already been set by now on sender */
|
||||||
dirty_slots = memcmp(sender_primary->slots, hdr->myslots, sizeof(hdr->myslots)) != 0;
|
serverAssert(nodeIsPrimary(sender));
|
||||||
|
|
||||||
/* Force dirty when the sending shard owns no slots so that
|
/* 1) If the sender of the message is a primary, and we detected that
|
||||||
* we have a chance to examine and repair slot migrating/importing
|
* the set of slots it claims changed, scan the slots to see if we
|
||||||
* states that involve empty shards. */
|
* need to update our configuration. */
|
||||||
dirty_slots |= sender_primary->numslots == 0;
|
clusterUpdateSlotsConfigWith(sender, sender_claimed_config_epoch, hdr->myslots);
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* 1) If the sender of the message is a primary, and we detected that
|
/* 2) We also check for the reverse condition, that is, the sender
|
||||||
* the set of slots it claims changed, scan the slots to see if we
|
* claims to serve slots we know are served by a primary with a
|
||||||
* need to update our configuration. */
|
* greater configEpoch. If this happens we inform the sender.
|
||||||
if (sender_primary && dirty_slots)
|
*
|
||||||
clusterUpdateSlotsConfigWith(sender_primary, senderConfigEpoch, hdr->myslots);
|
* This is useful because sometimes after a partition heals, a
|
||||||
|
* reappearing primary may be the last one to claim a given set of
|
||||||
/* Explicitly check for a replication loop before attempting the replication
|
* hash slots, but with a configuration that other instances know to
|
||||||
* chain folding logic. */
|
* be deprecated. Example:
|
||||||
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
|
*
|
||||||
/* Safeguard against sub-replicas. A replica's primary can turn itself
|
* A and B are primary and replica for slots 1,2,3.
|
||||||
* into a replica if its last slot is removed. If no other node takes
|
* A is partitioned away, B gets promoted.
|
||||||
* over the slot, there is nothing else to trigger replica migration. */
|
* B is partitioned away, and A returns available.
|
||||||
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
|
*
|
||||||
myself->replicaof->replicaof->name, myself->replicaof->name);
|
* Usually B would PING A publishing its set of served slots and its
|
||||||
clusterSetPrimary(myself->replicaof->replicaof, 1);
|
* configEpoch, but because of the partition B can't inform A of the
|
||||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
|
* new configuration, so other nodes that have an updated table must
|
||||||
}
|
* do it. In this way A will stop to act as a primary (or can try to
|
||||||
|
* failover if there are the conditions to win the election). */
|
||||||
/* 2) We also check for the reverse condition, that is, the sender
|
for (int j = 0; j < CLUSTER_SLOTS; j++) {
|
||||||
* claims to serve slots we know are served by a primary with a
|
|
||||||
* greater configEpoch. If this happens we inform the sender.
|
|
||||||
*
|
|
||||||
* This is useful because sometimes after a partition heals, a
|
|
||||||
* reappearing primary may be the last one to claim a given set of
|
|
||||||
* hash slots, but with a configuration that other instances know to
|
|
||||||
* be deprecated. Example:
|
|
||||||
*
|
|
||||||
* A and B are primary and replica for slots 1,2,3.
|
|
||||||
* A is partitioned away, B gets promoted.
|
|
||||||
* B is partitioned away, and A returns available.
|
|
||||||
*
|
|
||||||
* Usually B would PING A publishing its set of served slots and its
|
|
||||||
* configEpoch, but because of the partition B can't inform A of the
|
|
||||||
* new configuration, so other nodes that have an updated table must
|
|
||||||
* do it. In this way A will stop to act as a primary (or can try to
|
|
||||||
* failover if there are the conditions to win the election). */
|
|
||||||
if (sender && dirty_slots) {
|
|
||||||
int j;
|
|
||||||
|
|
||||||
for (j = 0; j < CLUSTER_SLOTS; j++) {
|
|
||||||
if (bitmapTestBit(hdr->myslots, j)) {
|
if (bitmapTestBit(hdr->myslots, j)) {
|
||||||
if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue;
|
if (server.cluster->slots[j] == sender || isSlotUnclaimed(j)) continue;
|
||||||
if (server.cluster->slots[j]->configEpoch > senderConfigEpoch) {
|
if (server.cluster->slots[j]->configEpoch > sender_claimed_config_epoch) {
|
||||||
serverLog(LL_VERBOSE,
|
serverLog(LL_VERBOSE,
|
||||||
"Node %.40s has old slots configuration, sending "
|
"Node %.40s has old slots configuration, sending "
|
||||||
"an UPDATE message about %.40s",
|
"an UPDATE message about %.40s",
|
||||||
@ -3262,10 +3240,22 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Explicitly check for a replication loop before attempting the replication
|
||||||
|
* chain folding logic. */
|
||||||
|
if (myself->replicaof && myself->replicaof->replicaof && myself->replicaof->replicaof != myself) {
|
||||||
|
/* Safeguard against sub-replicas. A replica's primary can turn itself
|
||||||
|
* into a replica if its last slot is removed. If no other node takes
|
||||||
|
* over the slot, there is nothing else to trigger replica migration. */
|
||||||
|
serverLog(LL_NOTICE, "I'm a sub-replica! Reconfiguring myself as a replica of %.40s from %.40s",
|
||||||
|
myself->replicaof->replicaof->name, myself->replicaof->name);
|
||||||
|
clusterSetPrimary(myself->replicaof->replicaof, 1);
|
||||||
|
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
|
||||||
|
}
|
||||||
|
|
||||||
/* If our config epoch collides with the sender's try to fix
|
/* If our config epoch collides with the sender's try to fix
|
||||||
* the problem. */
|
* the problem. */
|
||||||
if (sender && clusterNodeIsPrimary(myself) && clusterNodeIsPrimary(sender) &&
|
if (sender && nodeIsPrimary(myself) && nodeIsPrimary(sender) &&
|
||||||
senderConfigEpoch == myself->configEpoch) {
|
sender_claimed_config_epoch == myself->configEpoch) {
|
||||||
clusterHandleConfigEpochCollision(sender);
|
clusterHandleConfigEpochCollision(sender);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -3317,7 +3307,7 @@ int clusterProcessPacket(clusterLink *link) {
|
|||||||
/* We consider this vote only if the sender is a primary serving
|
/* We consider this vote only if the sender is a primary serving
|
||||||
* a non zero number of slots, and its currentEpoch is greater or
|
* a non zero number of slots, and its currentEpoch is greater or
|
||||||
* equal to epoch where this node started the election. */
|
* equal to epoch where this node started the election. */
|
||||||
if (clusterNodeIsVotingPrimary(sender) && senderCurrentEpoch >= server.cluster->failover_auth_epoch) {
|
if (clusterNodeIsVotingPrimary(sender) && sender_claimed_current_epoch >= server.cluster->failover_auth_epoch) {
|
||||||
server.cluster->failover_auth_count++;
|
server.cluster->failover_auth_count++;
|
||||||
/* Maybe we reached a quorum here, set a flag to make sure
|
/* Maybe we reached a quorum here, set a flag to make sure
|
||||||
* we check ASAP. */
|
* we check ASAP. */
|
||||||
|
@ -156,18 +156,18 @@ test "Verify the nodes configured with prefer hostname only show hostname for ne
|
|||||||
# to accept our isolated nodes connections. At this point they will
|
# to accept our isolated nodes connections. At this point they will
|
||||||
# start showing up in cluster slots.
|
# start showing up in cluster slots.
|
||||||
wait_for_condition 50 100 {
|
wait_for_condition 50 100 {
|
||||||
[llength [R 6 CLUSTER SLOTS]] eq 3
|
[llength [R 6 CLUSTER SLOTS]] eq 2
|
||||||
} else {
|
} else {
|
||||||
fail "Node did not learn about the 2 shards it can talk to"
|
fail "Node did not learn about the 2 shards it can talk to"
|
||||||
}
|
}
|
||||||
wait_for_condition 50 100 {
|
wait_for_condition 50 100 {
|
||||||
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-1.com"
|
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 0 2 3] 1] eq "shard-1.com"
|
||||||
} else {
|
} else {
|
||||||
fail "hostname for shard-1 didn't reach node 6"
|
fail "hostname for shard-1 didn't reach node 6"
|
||||||
}
|
}
|
||||||
|
|
||||||
wait_for_condition 50 100 {
|
wait_for_condition 50 100 {
|
||||||
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 2 2 3] 1] eq "shard-2.com"
|
[lindex [get_slot_field [R 6 CLUSTER SLOTS] 1 2 3] 1] eq "shard-2.com"
|
||||||
} else {
|
} else {
|
||||||
fail "hostname for shard-2 didn't reach node 6"
|
fail "hostname for shard-2 didn't reach node 6"
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user