Merge c8037a1f1a5e6c91bc2772f48ede3c20177f183e into 26c6f1af9b29d525831c7fa9840ab3e47ed7b700

This commit is contained in:
Binbin 2025-02-02 18:31:24 +01:00 committed by GitHub
commit 8a33646b1b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 225 additions and 14 deletions

View File

@ -7,7 +7,6 @@
#define CLUSTER_SLOT_MASK_BITS 14 /* Number of bits used for slot id. */
#define CLUSTER_SLOTS (1 << CLUSTER_SLOT_MASK_BITS) /* Total number of slots in cluster mode, which is 16384. */
#define CLUSTER_SLOT_MASK ((unsigned long long)(CLUSTER_SLOTS - 1)) /* Bit mask for slot id stored in LSB. */
#define CLUSTER_OK 0 /* Everything looks ok */
#define CLUSTER_FAIL 1 /* The cluster can't work */
#define CLUSTER_NAMELEN 40 /* sha1 hex length */

View File

@ -1234,6 +1234,48 @@ void clusterInitLast(void) {
/* Called when a cluster node receives SHUTDOWN. */
void clusterHandleServerShutdown(void) {
if (nodeIsPrimary(myself) && server.auto_failover_on_shutdown) {
/* Find the first best replica, that is, the replica with the largest offset. */
client *best_replica = NULL;
listIter replicas_iter;
listNode *replicas_list_node;
listRewind(server.replicas, &replicas_iter);
while ((replicas_list_node = listNext(&replicas_iter)) != NULL) {
client *replica = listNodeValue(replicas_list_node);
/* This is done only when the replica offset is caught up, to avoid data loss.
* And 0x800ff is 8.0.255, we only support new versions for this feature. */
if (replica->repl_data->repl_state == REPLICA_STATE_ONLINE &&
// replica->repl_data->replica_version > 0x800ff &&
replica->name && sdslen(replica->name->ptr) == CLUSTER_NAMELEN &&
replica->repl_data->repl_ack_off == server.primary_repl_offset) {
best_replica = replica;
break;
}
}
if (best_replica) {
/* Send the CLUSTER FAILOVER FORCE REPLICAID node-id to all replicas since
* it is a shared replication buffer, but only the replica with the matching
* node-id will execute it. The caller will call flushReplicasOutputBuffers,
* so in here it is a best effort. */
char buf[128];
size_t buflen = snprintf(buf, sizeof(buf),
"*5\r\n$7\r\nCLUSTER\r\n"
"$8\r\nFAILOVER\r\n"
"$5\r\nFORCE\r\n"
"$9\r\nREPLICAID\r\n"
"$%d\r\n%s\r\n",
CLUSTER_NAMELEN,
(char *)best_replica->name->ptr);
/* Must install write handler for all replicas first before feeding
* replication stream. */
prepareReplicasToWrite();
feedReplicationBuffer(buf, buflen);
} else {
serverLog(LL_NOTICE, "Unable to find a replica to perform an auto failover on shutdown.");
}
}
/* The error logs have been logged in the save function if the save fails. */
serverLog(LL_NOTICE, "Saving the cluster configuration file before exiting.");
clusterSaveConfig(1);
@ -7001,32 +7043,47 @@ int clusterCommandSpecial(client *c) {
} else {
addReplyLongLong(c, clusterNodeFailureReportsCount(n));
}
} else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc == 2 || c->argc == 3)) {
/* CLUSTER FAILOVER [FORCE|TAKEOVER] */
} else if (!strcasecmp(c->argv[1]->ptr, "failover") && (c->argc >= 2)) {
/* CLUSTER FAILOVER [FORCE|TAKEOVER] [REPLICAID <NODE ID>]
* REPLICAID is currently available only for internal so we won't
* put it into the JSON file. */
int force = 0, takeover = 0;
robj *replicaid = NULL;
if (c->argc == 3) {
if (!strcasecmp(c->argv[2]->ptr, "force")) {
for (int j = 2; j < c->argc; j++) {
int moreargs = (c->argc - 1) - j;
if (!strcasecmp(c->argv[j]->ptr, "force")) {
force = 1;
} else if (!strcasecmp(c->argv[2]->ptr, "takeover")) {
} else if (!strcasecmp(c->argv[j]->ptr, "takeover")) {
takeover = 1;
force = 1; /* Takeover also implies force. */
} else if (c == server.primary && !strcasecmp(c->argv[j]->ptr, "replicaid") && moreargs) {
/* This option is currently available only for primary. */
j++;
replicaid = c->argv[j];
} else {
addReplyErrorObject(c, shared.syntaxerr);
return 1;
}
}
/* Check if it should be executed by myself. */
if (replicaid != NULL && memcmp(replicaid->ptr, myself->name, CLUSTER_NAMELEN) != 0) {
/* Ignore this command, including the sanity check and the process. */
serverLog(LL_NOTICE, "return ok");
addReply(c, shared.ok);
return 1;
}
/* Check preconditions. */
if (clusterNodeIsPrimary(myself)) {
addReplyError(c, "You should send CLUSTER FAILOVER to a replica");
if (replicaid == NULL) addReplyError(c, "You should send CLUSTER FAILOVER to a replica");
return 1;
} else if (myself->replicaof == NULL) {
addReplyError(c, "I'm a replica but my master is unknown to me");
if (replicaid == NULL) addReplyError(c, "I'm a replica but my master is unknown to me");
return 1;
} else if (!force && (nodeFailed(myself->replicaof) || myself->replicaof->link == NULL)) {
addReplyError(c, "Master is down or failed, "
"please use CLUSTER FAILOVER FORCE");
if (replicaid == NULL) addReplyError(c, "Master is down or failed, please use CLUSTER FAILOVER FORCE");
return 1;
}
resetManualFailover();
@ -7045,7 +7102,11 @@ int clusterCommandSpecial(client *c) {
/* If this is a forced failover, we don't need to talk with our
* primary to agree about the offset. We just failover taking over
* it without coordination. */
serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client);
if (c == server.primary) {
serverLog(LL_NOTICE, "Forced failover primary request accepted (primary request from '%s').", client);
} else {
serverLog(LL_NOTICE, "Forced failover user request accepted (user request from '%s').", client);
}
manualFailoverCanStart();
/* We can start a manual failover as soon as possible, setting a flag
* here so that we don't need to waiting for the cron to kick in. */

View File

@ -3192,6 +3192,7 @@ standardConfig static_configs[] = {
createBoolConfig("cluster-slot-stats-enabled", NULL, MODIFIABLE_CONFIG, server.cluster_slot_stats_enabled, 0, NULL, NULL),
createBoolConfig("hide-user-data-from-log", NULL, MODIFIABLE_CONFIG, server.hide_user_data_from_log, 1, NULL, NULL),
createBoolConfig("import-mode", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.import_mode, 0, NULL, NULL),
createBoolConfig("auto-failover-on-shutdown", NULL, MODIFIABLE_CONFIG, server.auto_failover_on_shutdown, 0, NULL, NULL),
/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.acl_filename, "", NULL, NULL),

View File

@ -3594,6 +3594,14 @@ void syncWithPrimary(connection *conn) {
err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL);
if (err) goto write_error;
/* Inform the primary of our (replica) node name. */
if (server.cluster_enabled) {
char *argv[] = {"CLIENT", "SETNAME", server.cluster->myself->name};
size_t lens[] = {6, 7, CLUSTER_NAMELEN};
err = sendCommandArgv(conn, 3, argv, lens);
if (err) goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
return;
}
@ -3684,6 +3692,24 @@ void syncWithPrimary(connection *conn) {
}
sdsfree(err);
err = NULL;
if (server.cluster_enabled) {
server.repl_state = REPL_STATE_RECEIVE_SETNAME_REPLY;
return;
} else {
server.repl_state = REPL_STATE_SEND_PSYNC;
}
}
/* Receive CLIENT SETNAME reply. */
if (server.repl_state == REPL_STATE_RECEIVE_SETNAME_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any, we don't care if it failed, it is best effort. */
if (err[0] == '-') {
serverLog(LL_NOTICE, "(Non critical) Primary does not understand CLIENT SETNAME: %s", err);
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC;
}

View File

@ -4576,6 +4576,9 @@ int finishShutdown(void) {
unlink(server.pidfile);
}
/* Handle cluster-related matters when shutdown. */
if (server.cluster_enabled) clusterHandleServerShutdown();
/* Best effort flush of replica output buffers, so that we hopefully
* send them pending writes. */
flushReplicasOutputBuffers();
@ -4583,9 +4586,6 @@ int finishShutdown(void) {
/* Close the listening sockets. Apparently this allows faster restarts. */
closeListeningSockets(1);
/* Handle cluster-related matters when shutdown. */
if (server.cluster_enabled) clusterHandleServerShutdown();
serverLog(LL_WARNING, "%s is now ready to exit, bye bye...", server.sentinel_mode ? "Sentinel" : "Valkey");
return C_OK;

View File

@ -412,6 +412,7 @@ typedef enum {
REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_VERSION_REPLY, /* Wait for REPLCONF reply */
REPL_STATE_RECEIVE_SETNAME_REPLY, /* Wait for CLIENT SETNAME reply */
REPL_STATE_SEND_PSYNC, /* Send PSYNC */
REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
/* --- End of handshake states --- */
@ -2084,6 +2085,7 @@ struct valkeyServer {
unsigned long cluster_blacklist_ttl; /* Duration in seconds that a node is denied re-entry into
* the cluster after it is forgotten with CLUSTER FORGET. */
int cluster_slot_stats_enabled; /* Cluster slot usage statistics tracking enabled. */
int auto_failover_on_shutdown; /* Trigger manual failover on shutdown to primary. */
/* Debug config that goes along with cluster_drop_packet_filter. When set, the link is closed on packet drop. */
uint32_t debug_cluster_close_link_on_packet_drop : 1;
/* Debug config to control the random ping. When set, we will disable the random ping in clusterCron. */
@ -2906,6 +2908,7 @@ ssize_t syncRead(int fd, char *ptr, ssize_t size, long long timeout);
ssize_t syncReadLine(int fd, char *ptr, ssize_t size, long long timeout);
/* Replication */
int prepareReplicasToWrite(void);
void replicationFeedReplicas(int dictid, robj **argv, int argc);
void replicationFeedStreamFromPrimaryStream(char *buf, size_t buflen);
void resetReplicationBuffer(void);

View File

@ -126,6 +126,31 @@ proc wait_replica_online r {
}
}
proc check_replica_acked_ofs {primary replica_ip replica_port} {
set infostr [$primary info replication]
set master_repl_offset [getInfoProperty $infostr master_repl_offset]
if {[regexp -lineanchor "^slave\\d:ip=$replica_ip,port=$replica_port,.*,offset=(\\d+).*\r\n" $infostr _ offset]} {
if {$master_repl_offset == $offset} {
return 1
}
return 0
}
return 0
}
proc wait_replica_acked_ofs {primary replica replica_ip replica_port} {
$primary config set repl-ping-replica-period 3600
$replica config set hz 500
wait_for_condition 1000 50 {
[check_replica_acked_ofs $primary $replica_ip $replica_port] eq 1
} else {
puts "INFO REPLICATION: [$primary info replication]"
fail "replica acked offset didn't match in time"
}
$primary config set repl-ping-replica-period 10
$replica config set hz 10
}
proc wait_for_ofs_sync {r1 r2} {
wait_for_condition 50 100 {
[status $r1 master_repl_offset] eq [status $r2 master_repl_offset]

View File

@ -0,0 +1,92 @@
proc shutdown_how {srv_id how} {
if {$how == "shutdown"} {
catch {R $srv_id shutdown nosave}
} elseif {$how == "sigterm"} {
exec kill -SIGTERM [s -$srv_id process_id]
}
}
# We will start a cluster with 3 primary nodes and 4 replicas, the primary 1 will have 2 replicas.
# We will pause the replica 1, and then shutdown the primary 1, and making replica 2 to become
# the new primary.
proc test_main {how shutdown_timeout} {
test "auto-failover-on-shutdown will always pick a best replica and send CLUSTER FAILOVER - $how - shutdown-timeout: $shutdown_timeout" {
set primary [srv 0 client]
set replica1 [srv -3 client]
set replica1_pid [s -3 process_id]
set replica2 [srv -6 client]
set replica2_ip [srv -6 host]
set replica2_port [srv -6 port]
$primary config set auto-failover-on-shutdown yes
$primary config set shutdown-timeout $shutdown_timeout
$primary config set repl-ping-replica-period 3600
# To avoid failover kick in.
$replica2 config set cluster-replica-no-failover yes
# Pause a replica so it has no chance to catch up with the offset.
pause_process $replica1_pid
# Primary write some data to increase the offset.
for {set i 0} {$i < 10} {incr i} {
$primary incr key_991803
}
if {$shutdown_timeout == 0} {
# Wait the replica2 catch up with the offset
wait_for_ofs_sync $primary $replica2
wait_replica_acked_ofs $primary $replica2 $replica2_ip $replica2_port
} else {
# If shutdown-timeout is enable, we expect the primary to pause writing
# and wait for the replica to catch up with the offset.
}
# Shutdown the primary.
shutdown_how 0 $how
# Wait for the replica2 to become a primary.
wait_for_condition 1000 50 {
[s -6 role] eq {master}
} else {
puts "s -6 role: [s -6 role]"
fail "Failover does not happened"
}
# Make sure that the expected logs are printed.
verify_log_message -6 "*Forced failover primary request accepted*" 0
resume_process $replica1_pid
}
test "Unable to find a replica to perform an auto failover - $how" {
set primary [srv -6 client]
set replica1 [srv -3 client]
set replica1_pid [s -3 process_id]
pause_process $replica1_pid
$primary config set auto-failover-on-shutdown yes
$primary client kill type replica
shutdown_how 6 $how
wait_for_log_messages -6 {"*Unable to find a replica to perform an auto failover on shutdown*"} 0 1000 10
resume_process $replica1_pid
}
}
start_cluster 3 4 {tags {external:skip cluster}} {
test_main "shutdown" 0
}
start_cluster 3 4 {tags {external:skip cluster}} {
test_main "sigterm" 0
}
start_cluster 3 4 {tags {external:skip cluster}} {
test_main "shutdown" 10
}
start_cluster 3 4 {tags {external:skip cluster}} {
test_main "sigterm" 10
}

View File

@ -1692,6 +1692,10 @@ aof-timestamp-enabled no
# shutdown-on-sigint default
# shutdown-on-sigterm default
# TODO
#
# auto-failover-on-shutdown no
################ NON-DETERMINISTIC LONG BLOCKING COMMANDS #####################
# Maximum time in milliseconds for EVAL scripts, functions and in some cases