Brocast a PONG to all node in cluster when role changed (#1295)
When a node role changes, we should brocast the change to notify other nodes. For example, one primary and one replica, after a failover, the replica became a new primary, the primary became a new replica. And then we trigger a second cluster failover for the new replica, the new replica will send a MFSTART to its primary, ie, the new primary. But the new primary may reject the MFSTART due to this logic: ``` } else if (type == CLUSTERMSG_TYPE_MFSTART) { if (!sender || sender->replicaof != myself) return 1; ``` In the new primary views, sender is still a primary, and sender->replicaof is NULL, so we will return. Then the manual failover timedout. Another possibility is that other primaries refuse to vote after receiving the FAILOVER_AUTH_REQUEST, since in their's views, sender is still a primary, so it refuse to vote, and then manual failover timedout. ``` void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { ... if (clusterNodeIsPrimary(node)) { serverLog(LL_WARNING, "Failover auth denied to... ``` The reason is that, currently, we only update the node->replicaof information when we receive a PING/PONG from the sender. For details, see clusterProcessPacket. Therefore, in some scenarios, such as clusters with many nodes and a large cluster-ping-interval (that is, cluster-node-timeout), the role change of the node will be very delayed. Added a DEBUG DISABLE-CLUSTER-RANDOM-PING command, send cluster ping to a random node every second (see clusterCron). Signed-off-by: Binbin <binloveplay1314@qq.com>
This commit is contained in:
parent
979f4c1ceb
commit
b9d224097a
@ -2669,7 +2669,8 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
|
||||
*
|
||||
* If the sender and myself are in the same shard, try psync. */
|
||||
clusterSetPrimary(sender, !are_in_same_shard, !are_in_same_shard);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG |
|
||||
CLUSTER_TODO_BROADCAST_ALL);
|
||||
} else if (nodeIsPrimary(myself) && (sender_slots >= migrated_our_slots) && !are_in_same_shard) {
|
||||
/* When all our slots are lost to the sender and the sender belongs to
|
||||
* a different shard, this is likely due to a client triggered slot
|
||||
@ -4538,7 +4539,7 @@ void clusterFailoverReplaceYourPrimary(void) {
|
||||
|
||||
/* 4) Pong all the other nodes so that they can update the state
|
||||
* accordingly and detect that we switched to primary role. */
|
||||
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_ALL);
|
||||
|
||||
/* 5) If there was a manual failover in progress, clear the state. */
|
||||
resetManualFailover();
|
||||
@ -5029,7 +5030,7 @@ void clusterCron(void) {
|
||||
|
||||
/* Ping some random node 1 time every 10 iterations, so that we usually ping
|
||||
* one random node every second. */
|
||||
if (!(iteration % 10)) {
|
||||
if (!server.debug_cluster_disable_random_ping && !(iteration % 10)) {
|
||||
int j;
|
||||
|
||||
/* Check a few random nodes and ping the one with the oldest
|
||||
@ -5206,6 +5207,13 @@ void clusterBeforeSleep(void) {
|
||||
int fsync = flags & CLUSTER_TODO_FSYNC_CONFIG;
|
||||
clusterSaveConfigOrDie(fsync);
|
||||
}
|
||||
|
||||
if (flags & CLUSTER_TODO_BROADCAST_ALL) {
|
||||
/* Broadcast a pong to all known nodes. This is useful when something changes
|
||||
* in the configuration and we want to make the cluster aware it before the
|
||||
* regular ping. */
|
||||
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
|
||||
}
|
||||
}
|
||||
|
||||
void clusterDoBeforeSleep(int flags) {
|
||||
@ -6556,7 +6564,7 @@ void clusterCommandSetSlot(client *c) {
|
||||
}
|
||||
/* After importing this slot, let the other nodes know as
|
||||
* soon as possible. */
|
||||
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_BROADCAST_ALL);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -6748,8 +6756,7 @@ int clusterCommandSpecial(client *c) {
|
||||
* If the instance is a replica, it had a totally different replication history.
|
||||
* In these both cases, myself as a replica has to do a full sync. */
|
||||
clusterSetPrimary(n, 1, 1);
|
||||
clusterBroadcastPong(CLUSTER_BROADCAST_ALL);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG);
|
||||
clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_BROADCAST_ALL);
|
||||
addReply(c, shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr, "count-failure-reports") && c->argc == 3) {
|
||||
/* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
|
||||
|
@ -25,6 +25,7 @@
|
||||
#define CLUSTER_TODO_SAVE_CONFIG (1 << 2)
|
||||
#define CLUSTER_TODO_FSYNC_CONFIG (1 << 3)
|
||||
#define CLUSTER_TODO_HANDLE_MANUALFAILOVER (1 << 4)
|
||||
#define CLUSTER_TODO_BROADCAST_ALL (1 << 5)
|
||||
|
||||
/* clusterLink encapsulates everything needed to talk with a remote node. */
|
||||
typedef struct clusterLink {
|
||||
|
@ -436,6 +436,8 @@ void debugCommand(client *c) {
|
||||
"CLOSE-CLUSTER-LINK-ON-PACKET-DROP <0|1>",
|
||||
" This is valid only when DROP-CLUSTER-PACKET-FILTER is set to a valid packet type.",
|
||||
" When set to 1, the cluster link is closed after dropping a packet based on the filter.",
|
||||
"DISABLE-CLUSTER-RANDOM-PING <0|1>",
|
||||
" Disable sending cluster ping to a random node every second.",
|
||||
"OOM",
|
||||
" Crash the server simulating an out-of-memory error.",
|
||||
"PANIC",
|
||||
@ -607,6 +609,9 @@ void debugCommand(client *c) {
|
||||
} else if (!strcasecmp(c->argv[1]->ptr, "close-cluster-link-on-packet-drop") && c->argc == 3) {
|
||||
server.debug_cluster_close_link_on_packet_drop = atoi(c->argv[2]->ptr);
|
||||
addReply(c, shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr, "disable-cluster-random-ping") && c->argc == 3) {
|
||||
server.debug_cluster_disable_random_ping = atoi(c->argv[2]->ptr);
|
||||
addReply(c, shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr, "object") && (c->argc == 3 || c->argc == 4)) {
|
||||
dictEntry *de;
|
||||
robj *val;
|
||||
|
@ -2693,6 +2693,7 @@ void initServer(void) {
|
||||
server.blocking_op_nesting = 0;
|
||||
server.thp_enabled = 0;
|
||||
server.cluster_drop_packet_filter = -1;
|
||||
server.debug_cluster_disable_random_ping = 0;
|
||||
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
|
||||
server.reply_buffer_resizing_enabled = 1;
|
||||
server.client_mem_usage_buckets = NULL;
|
||||
|
@ -2194,6 +2194,8 @@ struct valkeyServer {
|
||||
int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */
|
||||
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
|
||||
uint32_t debug_cluster_close_link_on_packet_drop : 1;
|
||||
/* Debug config to control the random ping. When set, we will disable the random ping in clusterCron. */
|
||||
uint32_t debug_cluster_disable_random_ping : 1;
|
||||
sds cached_cluster_slot_info[CACHE_CONN_TYPE_MAX]; /* Index in array is a bitwise or of CACHE_CONN_TYPE_* */
|
||||
/* Scripting */
|
||||
mstime_t busy_reply_threshold; /* Script / module timeout in milliseconds */
|
||||
|
@ -313,3 +313,64 @@ start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-ping-interval
|
||||
verify_no_log_message -3 "*Manual failover timed out*" $loglines2
|
||||
}
|
||||
} ;# start_cluster
|
||||
|
||||
start_cluster 3 1 {tags {external:skip cluster} overrides {cluster-ping-interval 1000 cluster-node-timeout 1000}} {
|
||||
test "Broadcast PONG to the cluster when the node role changes" {
|
||||
# R0 is a primary and R3 is a replica, we will do multiple cluster failover
|
||||
# and then check their role and flags.
|
||||
set R0_nodeid [R 0 cluster myid]
|
||||
set R3_nodeid [R 3 cluster myid]
|
||||
|
||||
# Make sure we don't send PINGs for a short period of time.
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
R $j debug disable-cluster-random-ping 0
|
||||
R $j config set cluster-ping-interval 300000
|
||||
}
|
||||
|
||||
R 3 cluster failover
|
||||
wait_for_condition 1000 50 {
|
||||
[s 0 role] eq {slave} &&
|
||||
[s -3 role] eq {master}
|
||||
} else {
|
||||
fail "Failover does not happened"
|
||||
}
|
||||
|
||||
# Get the node information of R0 and R3 in my view from CLUSTER NODES
|
||||
# R0 should be a replica and R3 should be a primary in all views.
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
wait_for_condition 1000 50 {
|
||||
[check_cluster_node_mark slave $j $R0_nodeid] &&
|
||||
[check_cluster_node_mark master $j $R3_nodeid]
|
||||
} else {
|
||||
puts "R0_nodeid: $R0_nodeid"
|
||||
puts "R3_nodeid: $R3_nodeid"
|
||||
puts "R $j cluster nodes:"
|
||||
puts [R $j cluster nodes]
|
||||
fail "Node role does not changed in the first failover"
|
||||
}
|
||||
}
|
||||
|
||||
R 0 cluster failover
|
||||
wait_for_condition 1000 50 {
|
||||
[s 0 role] eq {master} &&
|
||||
[s -3 role] eq {slave}
|
||||
} else {
|
||||
fail "The second failover does not happened"
|
||||
}
|
||||
|
||||
# Get the node information of R0 and R3 in my view from CLUSTER NODES
|
||||
# R0 should be a primary and R3 should be a replica in all views.
|
||||
for {set j 0} {$j < [llength $::servers]} {incr j} {
|
||||
wait_for_condition 1000 50 {
|
||||
[check_cluster_node_mark master $j $R0_nodeid] &&
|
||||
[check_cluster_node_mark slave $j $R3_nodeid]
|
||||
} else {
|
||||
puts "R0_nodeid: $R0_nodeid"
|
||||
puts "R3_nodeid: $R3_nodeid"
|
||||
puts "R $j cluster nodes:"
|
||||
puts [R $j cluster nodes]
|
||||
fail "Node role does not changed in the second failover"
|
||||
}
|
||||
}
|
||||
}
|
||||
} ;# start_cluster
|
||||
|
Loading…
x
Reference in New Issue
Block a user