Improve CLUSTER SETSLOT replication handling to support older replica versions. (#686)

This commit is contained in:
Ping Xie 2024-06-23 22:08:52 -07:00 committed by GitHub
parent ce79539047
commit 32ca6e5b38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -6022,23 +6022,42 @@ void clusterCommandSetSlot(client *c) {
* a reliable slot ownership transfer even if the primary node went down during
* the process. */
if (nodeIsPrimary(myself) && myself->num_replicas != 0 && (c->flags & CLIENT_REPLICATION_DONE) == 0) {
forceCommandPropagation(c, PROPAGATE_REPL);
/* We are a primary and this is the first time we see this `SETSLOT`
* command. Force-replicate the command to all of our replicas
* first and only on success will we handle the command.
* Note that
* 1. All replicas are expected to ack the replication within the given timeout
* 2. The repl offset target is set to the primary's current repl offset + 1.
* There is no concern of partial replication because replicas always
* ack the repl offset at the command boundary. */
blockClientForReplicaAck(c, timeout_ms, server.primary_repl_offset + 1, myself->num_replicas, 0);
/* Mark client as pending command for execution after replication to replicas. */
c->flags |= CLIENT_PENDING_COMMAND;
replicationRequestAckFromReplicas();
return;
/* Iterate through the list of replicas to check if there are any running
* a version older than 8.0.0. Replicas with versions older than 8.0.0 do
* not support the CLUSTER SETSLOT command on replicas. If such a replica
* is found, we should skip the replication and fall back to the old
* non-replicated behavior.*/
listIter li;
listNode *ln;
int legacy_replica_found = 0;
listRewind(server.replicas, &li);
while ((ln = listNext(&li))) {
client *r = ln->value;
if (r->replica_version < 0x80000 /* 8.0.0 */) {
legacy_replica_found++;
break;
}
}
if (!legacy_replica_found) {
forceCommandPropagation(c, PROPAGATE_REPL);
/* We are a primary and this is the first time we see this `SETSLOT`
* command. Force-replicate the command to all of our replicas
* first and only on success will we handle the command.
* Note that
* 1. All replicas are expected to ack the replication within the given timeout
* 2. The repl offset target is set to the primary's current repl offset + 1.
* There is no concern of partial replication because replicas always
* ack the repl offset at the command boundary. */
blockClientForReplicaAck(c, timeout_ms, server.primary_repl_offset + 1, myself->num_replicas, 0);
/* Mark client as pending command for execution after replication to replicas. */
c->flags |= CLIENT_PENDING_COMMAND;
replicationRequestAckFromReplicas();
return;
}
}
/* Slot states have been updated on the replicas (if any).
/* Slot states have been updated on the compatible replicas (if any).
* Now exuecte the command on the primary. */
if (!strcasecmp(c->argv[3]->ptr, "migrating")) {
serverLog(LL_NOTICE, "Migrating slot %d to node %.40s (%s)", slot, n->name, n->human_nodename);