Process loss of slot ownership in cluster bus (#12344)
Process loss of slot ownership in cluster bus When a node no longer owns a slot, it clears the bit corresponding to the slot in the cluster bus messages. The receiving nodes currently don't record the fact that the sender stopped claiming a slot until some other node in the cluster starts claiming the slot. This can cause a slot to go missing during slot migration when subjected to inopportune race with addition of new shards or a failover. This fix forces the receiving nodes to process the loss of ownership to avoid spreading wrong information.
This commit is contained in:
parent
b35e20d1de
commit
1190f25ca7
@ -67,6 +67,8 @@ void clusterSetMaster(clusterNode *n);
|
||||
void clusterHandleSlaveFailover(void);
|
||||
void clusterHandleSlaveMigration(int max_slaves);
|
||||
int bitmapTestBit(unsigned char *bitmap, int pos);
|
||||
void bitmapSetBit(unsigned char *bitmap, int pos);
|
||||
void bitmapClearBit(unsigned char *bitmap, int pos);
|
||||
void clusterDoBeforeSleep(int flags);
|
||||
void clusterSendUpdate(clusterLink *link, clusterNode *node);
|
||||
void resetManualFailover(void);
|
||||
@ -124,6 +126,10 @@ static inline int defaultClientPort(void) {
|
||||
#define dictEntryPrevInSlot(de) \
|
||||
(((clusterDictEntryMetadata *)dictEntryMetadata(de))->prev)
|
||||
|
||||
#define isSlotUnclaimed(slot) \
|
||||
(server.cluster->slots[slot] == NULL || \
|
||||
bitmapTestBit(server.cluster->owner_not_claiming_slot, slot))
|
||||
|
||||
#define RCVBUF_INIT_LEN 1024
|
||||
#define RCVBUF_MAX_PREALLOC (1<<20) /* 1MB */
|
||||
|
||||
@ -2352,7 +2358,10 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
|
||||
sender_slots++;
|
||||
|
||||
/* The slot is already bound to the sender of this message. */
|
||||
if (server.cluster->slots[j] == sender) continue;
|
||||
if (server.cluster->slots[j] == sender) {
|
||||
bitmapClearBit(server.cluster->owner_not_claiming_slot, j);
|
||||
continue;
|
||||
}
|
||||
|
||||
/* The slot is in importing state, it should be modified only
|
||||
* manually via redis-cli (example: a resharding is in progress
|
||||
@ -2361,10 +2370,10 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
|
||||
if (server.cluster->importing_slots_from[j]) continue;
|
||||
|
||||
/* We rebind the slot to the new node claiming it if:
|
||||
* 1) The slot was unassigned or the new node claims it with a
|
||||
* greater configEpoch.
|
||||
* 1) The slot was unassigned or the previous owner no longer owns the slot or
|
||||
* the new node claims it with a greater configEpoch.
|
||||
* 2) We are not currently importing the slot. */
|
||||
if (server.cluster->slots[j] == NULL ||
|
||||
if (isSlotUnclaimed(j) ||
|
||||
server.cluster->slots[j]->configEpoch < senderConfigEpoch)
|
||||
{
|
||||
/* Was this slot mine, and still contains keys? Mark it as
|
||||
@ -2383,10 +2392,20 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
|
||||
}
|
||||
clusterDelSlot(j);
|
||||
clusterAddSlot(sender,j);
|
||||
bitmapClearBit(server.cluster->owner_not_claiming_slot, j);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
|
||||
CLUSTER_TODO_UPDATE_STATE|
|
||||
CLUSTER_TODO_FSYNC_CONFIG);
|
||||
}
|
||||
} else if (server.cluster->slots[j] == sender) {
|
||||
/* The slot is currently bound to the sender but the sender is no longer
|
||||
* claiming it. We don't want to unbind the slot yet as it can cause the cluster
|
||||
* to move to FAIL state and also throw client error. Keeping the slot bound to
|
||||
* the previous owner will cause a few client side redirects, but won't throw
|
||||
* any errors. We will keep track of the uncertainty in ownership to avoid
|
||||
* propagating misinformation about this slot's ownership using UPDATE
|
||||
* messages. */
|
||||
bitmapSetBit(server.cluster->owner_not_claiming_slot, j);
|
||||
}
|
||||
}
|
||||
|
||||
@ -3043,7 +3062,7 @@ int clusterProcessPacket(clusterLink *link) {
|
||||
for (j = 0; j < CLUSTER_SLOTS; j++) {
|
||||
if (bitmapTestBit(hdr->myslots,j)) {
|
||||
if (server.cluster->slots[j] == sender ||
|
||||
server.cluster->slots[j] == NULL) continue;
|
||||
isSlotUnclaimed(j)) continue;
|
||||
if (server.cluster->slots[j]->configEpoch >
|
||||
senderConfigEpoch)
|
||||
{
|
||||
@ -3746,6 +3765,10 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) {
|
||||
memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
|
||||
hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
|
||||
memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
|
||||
for (unsigned int i = 0; i < sizeof(node->slots); i++) {
|
||||
/* Don't advertise slots that the node stopped claiming */
|
||||
hdr->data.update.nodecfg.slots[i] = hdr->data.update.nodecfg.slots[i] & (~server.cluster->owner_not_claiming_slot[i]);
|
||||
}
|
||||
|
||||
clusterSendMessage(link,msgblock);
|
||||
clusterMsgSendBlockDecrRefCount(msgblock);
|
||||
@ -3951,7 +3974,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) {
|
||||
* slots in the current configuration. */
|
||||
for (j = 0; j < CLUSTER_SLOTS; j++) {
|
||||
if (bitmapTestBit(claimed_slots, j) == 0) continue;
|
||||
if (server.cluster->slots[j] == NULL ||
|
||||
if (isSlotUnclaimed(j) ||
|
||||
server.cluster->slots[j]->configEpoch <= requestConfigEpoch)
|
||||
{
|
||||
continue;
|
||||
|
@ -213,6 +213,13 @@ typedef struct clusterState {
|
||||
long long stats_pfail_nodes; /* Number of nodes in PFAIL status,
|
||||
excluding nodes without address. */
|
||||
unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */
|
||||
|
||||
/* Bit map for slots that are no longer claimed by the owner in cluster PING
|
||||
* messages. During slot migration, the owner will stop claiming the slot after
|
||||
* the ownership transfer. Set the bit corresponding to the slot when a node
|
||||
* stops claiming the slot. This prevents spreading incorrect information (that
|
||||
* source still owns the slot) using UPDATE messages. */
|
||||
unsigned char owner_not_claiming_slot[CLUSTER_SLOTS / 8];
|
||||
} clusterState;
|
||||
|
||||
/* Redis cluster messages header */
|
||||
|
@ -77,6 +77,43 @@ test "slot migration is invalid from primary to replica" {
|
||||
assert_match "*Target node is not a master" $err
|
||||
}
|
||||
|
||||
proc count_bound_slots {n} {
|
||||
set slot_count 0
|
||||
foreach slot_range_mapping [$n cluster slots] {
|
||||
set start_slot [lindex $slot_range_mapping 0]
|
||||
set end_slot [lindex $slot_range_mapping 1]
|
||||
incr slot_count [expr $end_slot - $start_slot + 1]
|
||||
}
|
||||
return $slot_count
|
||||
}
|
||||
|
||||
test "slot must be unbound on the owner when it is deleted" {
|
||||
set node0 [Rn 0]
|
||||
set node1 [Rn 1]
|
||||
assert {[count_bound_slots $node0] eq 16384}
|
||||
assert {[count_bound_slots $node1] eq 16384}
|
||||
|
||||
set slot_to_delete 0
|
||||
# Delete
|
||||
$node0 CLUSTER DELSLOTS $slot_to_delete
|
||||
|
||||
# Verify
|
||||
# The node that owns the slot must unbind the slot that was deleted
|
||||
wait_for_condition 1000 50 {
|
||||
[count_bound_slots $node0] == 16383
|
||||
} else {
|
||||
fail "Cluster slot deletion was not recorded on the node that owns the slot"
|
||||
}
|
||||
|
||||
# We don't propagate slot deletion across all nodes in the cluster.
|
||||
# This can lead to extra redirect before the clients find out that the slot is unbound.
|
||||
wait_for_condition 1000 50 {
|
||||
[count_bound_slots $node1] == 16384
|
||||
} else {
|
||||
fail "Cluster slot deletion should not be propagated to all nodes in the cluster"
|
||||
}
|
||||
}
|
||||
|
||||
if {$::tls} {
|
||||
test {CLUSTER SLOTS from non-TLS client in TLS cluster} {
|
||||
set slots_tls [R 0 cluster slots]
|
||||
|
Loading…
x
Reference in New Issue
Block a user