Merge 80c9e1967ff1135c5bc074e5a41cfd831d17c625 into 26c6f1af9b29d525831c7fa9840ab3e47ed7b700

This commit is contained in:
nitaicaro 2025-02-02 11:18:06 +02:00 committed by GitHub
commit ffdb5c3f7e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -57,6 +57,7 @@ void replicationSteadyStateInit(void);
void dualChannelSetupMainConnForPsync(connection *conn);
void dualChannelSyncHandleRdbLoadCompletion(void);
static void dualChannelFullSyncWithPrimary(connection *conn);
void syncWithPrimary(connection *conn);
/* We take a global flag to remember if this instance generated an RDB
* because of replication, so that we can remove the RDB file in case
@ -3066,54 +3067,17 @@ void dualChannelSyncHandleRdbLoadCompletion(void) {
return;
}
/* Try a partial resynchronization with the primary if we are about to reconnect.
* If there is no cached primary structure, at least try to issue a
* "PSYNC ? -1" command in order to trigger a full resync using the PSYNC
* command in order to obtain the primary replid and the primary replication
* global offset.
/* Handles the initial step of the partial resynchronization process by
* preparing and sending a PSYNC command to the primary server.
* This function determines the appropriate replication ID (replid)
* and replication offset based on the server's state. If successful,
* the function signals readiness for the reply processing phase, which is
* processed in replicaProcessPsyncReply().
*
* This function is designed to be called from syncWithPrimary(), so the
* following assumptions are made:
*
* 1) We pass the function an already connected socket "fd".
* 2) This function does not close the file descriptor "fd". However in case
* of successful partial resynchronization, the function will reuse
* 'fd' as file descriptor of the server.primary client structure.
*
* The function is split in two halves: if read_reply is 0, the function
* writes the PSYNC command on the socket, and a new function call is
* needed, with read_reply set to 1, in order to read the reply of the
* command. This is useful in order to support non blocking operations, so
* that we write, return into the event loop, and read when there are data.
*
* When read_reply is 0 the function returns PSYNC_WRITE_ERR if there
* was a write error, or PSYNC_WAIT_REPLY to signal we need another call
* with read_reply set to 1. However even when read_reply is set to 1
* the function may return PSYNC_WAIT_REPLY again to signal there were
* insufficient data to read to complete its work. We should re-enter
* into the event loop and wait in such a case.
*
* The function returns:
*
* PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue.
* PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
* In this case the primary replid and global replication
* offset is saved.
* PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
* the caller should fall back to SYNC.
* PSYNC_WRITE_ERROR: There was an error writing the command to the socket.
* PSYNC_WAIT_REPLY: Call again the function with read_reply set to 1.
* PSYNC_TRY_LATER: Primary is currently in a transient error condition.
*
* Notable side effects:
*
* 1) As a side effect of the function call the function removes the readable
* event handler from "fd", unless the return value is PSYNC_WAIT_REPLY.
* 2) server.primary_initial_offset is set to the right value according
* to the primary reply. This will be used to populate the 'server.primary'
* structure replication offset.
*/
* Return Values:
* - PSYNC_WRITE_ERROR: There was an error writing the command to the socket.
* - PSYNC_WAIT_REPLY: PSYNC was successfully sent, awaiting a reply. The next
* step is to call replicaProcessPsyncReply(). */
#define PSYNC_WRITE_ERROR 0
#define PSYNC_WAIT_REPLY 1
#define PSYNC_CONTINUE 2
@ -3121,55 +3085,75 @@ void dualChannelSyncHandleRdbLoadCompletion(void) {
#define PSYNC_NOT_SUPPORTED 4
#define PSYNC_TRY_LATER 5
#define PSYNC_FULLRESYNC_DUAL_CHANNEL 6
int replicaTryPartialResynchronization(connection *conn, int read_reply) {
int replicaSendPsyncCommand(connection *conn) {
char *psync_replid;
char psync_offset[32];
sds reply;
/* Writing half */
if (!read_reply) {
/* Initially set primary_initial_offset to -1 to mark the current
* primary replid and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the primary into server.primary. */
server.primary_initial_offset = -1;
/* Initially set primary_initial_offset to -1 to mark the current
* primary replid and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the primary into server.primary. */
server.primary_initial_offset = -1;
if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) {
/* While in dual channel replication, we should use our prepared repl id and offset. */
psync_replid = server.repl_provisional_primary.replid;
snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_primary.reploff + 1);
dualChannelServerLog(LL_NOTICE,
"Trying a partial resynchronization using main channel (request %s:%s).",
psync_replid, psync_offset);
} else if (server.cached_primary) {
psync_replid = server.cached_primary->repl_data->replid;
snprintf(psync_offset, sizeof(psync_offset), "%lld", server.cached_primary->repl_data->reploff + 1);
serverLog(LL_NOTICE, "Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
serverLog(LL_NOTICE, "Partial resynchronization not possible (no cached primary)");
psync_replid = "?";
memcpy(psync_offset, "-1", 3);
}
/* Issue the PSYNC command, if this is a primary with a failover in
* progress then send the failover argument to the replica to cause it
* to become a primary */
if (server.failover_state == FAILOVER_IN_PROGRESS) {
reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, "FAILOVER", NULL);
} else {
reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, NULL);
}
if (reply != NULL) {
serverLog(LL_WARNING, "Unable to send PSYNC to primary: %s", reply);
sdsfree(reply);
connSetReadHandler(conn, NULL);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
if (server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE) {
/* While in dual channel replication, we should use our prepared repl id and offset. */
psync_replid = server.repl_provisional_primary.replid;
snprintf(psync_offset, sizeof(psync_offset), "%lld", server.repl_provisional_primary.reploff + 1);
dualChannelServerLog(LL_NOTICE,
"Trying a partial resynchronization using main channel (request %s:%s).",
psync_replid, psync_offset);
} else if (server.cached_primary) {
psync_replid = server.cached_primary->repl_data->replid;
snprintf(psync_offset, sizeof(psync_offset), "%lld", server.cached_primary->repl_data->reploff + 1);
serverLog(LL_NOTICE, "Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
serverLog(LL_NOTICE, "Partial resynchronization not possible (no cached primary)");
psync_replid = "?";
memcpy(psync_offset, "-1", 3);
}
/* Issue the PSYNC command, if this is a primary with a failover in
* progress then send the failover argument to the replica to cause it
* to become a primary */
if (server.failover_state == FAILOVER_IN_PROGRESS) {
reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, "FAILOVER", NULL);
} else {
reply = sendCommand(conn, "PSYNC", psync_replid, psync_offset, NULL);
}
if (reply != NULL) {
serverLog(LL_WARNING, "Unable to send PSYNC to primary: %s", reply);
sdsfree(reply);
connSetReadHandler(conn, NULL);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}
/* Processes the reply from the primary server following a PSYNC command that
* was sent with replicaSendPsyncCommand().
* This function interprets the reply to determine if partial resynchronization
* can proceed or if a full synchronization is required.
*
* Return Values:
* - PSYNC_TRY_LATER: Primary is currently in a transient error condition.
* - PSYNC_FULLRESYNC: If PSYNC is supported but a full resync is needed.
* In this case the primary replid and global replication
* offset is saved.
* - PSYNC_CONTINUE: If the PSYNC command succeeded and we can continue
* - PSYNC_WAIT_REPLY: Still awaiting a reply, call this function again.
* - PSYNC_NOT_SUPPORTED: If the server does not understand PSYNC at all and
* the caller should fall back to SYNC.
* - PSYNC_FULLRESYNC_DUAL_CHANNEL: If partial synchronization is not possible
* but the primary supports full synchronization using
* a dedicated RDB channel. In this case, the RDB channel
* is initialized, and full synchronization will continue
* via the dual-channel approach. */
int replicaProcessPsyncReply(connection *conn) {
sds reply;
/* Reading half */
reply = receiveSynchronousResponse(conn);
/* Primary did not reply to PSYNC */
@ -3340,7 +3324,7 @@ int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) {
int dualChannelReplMainConnSendPsync(connection *conn, sds *err) {
if (server.debug_pause_after_fork) debugPauseProcess();
if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
if (replicaSendPsyncCommand(conn) == PSYNC_WRITE_ERROR) {
dualChannelServerLog(LL_WARNING, "Aborting dual channel sync. Write error.");
*err = sdsnew(connGetLastError(conn));
return C_ERR;
@ -3349,7 +3333,7 @@ int dualChannelReplMainConnSendPsync(connection *conn, sds *err) {
}
int dualChannelReplMainConnRecvPsyncReply(connection *conn, sds *err) {
int psync_result = replicaTryPartialResynchronization(conn, 1);
int psync_result = replicaProcessPsyncReply(conn);
if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */
if (psync_result == PSYNC_CONTINUE) {
@ -3408,6 +3392,209 @@ void dualChannelSetupMainConnForPsync(connection *conn) {
sdsfree(err);
}
int syncWithPrimaryHandleConnectingState(connection *conn, sds *err) {
serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
connSetReadHandler(conn, syncWithPrimary);
connSetWriteHandler(conn, NULL);
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
*err = sendCommand(conn, "PING", NULL);
if (*err) return C_ERR;
return C_OK;
}
int syncWithPrimaryHandleReceivePingReplyState(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
/* The primary did not reply */
if (*err == NULL) return C_ERR;
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis OSS replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (*err[0] != '+' && strncmp(*err, "-NOAUTH", 7) != 0 && strncmp(*err, "-NOPERM", 7) != 0 &&
strncmp(*err, "-ERR operation not permitted", 28) != 0) {
serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", *err);
sdsfree(*err);
return C_ERR;
} else {
serverLog(LL_NOTICE, "Primary replied to PING, replication can continue...");
}
sdsfree(*err);
*err = NULL;
return C_OK;
}
int syncWithPrimaryHandleSendHandshakeState(connection *conn, sds *err) {
/* AUTH with the primary if required. */
if (server.primary_auth) {
char *args[3] = {"AUTH", NULL, NULL};
size_t lens[3] = {4, 0, 0};
int argc = 1;
if (server.primary_user) {
args[argc] = server.primary_user;
lens[argc] = strlen(server.primary_user);
argc++;
}
args[argc] = server.primary_auth;
lens[argc] = sdslen(server.primary_auth);
argc++;
*err = sendCommandArgv(conn, argc, args, lens);
if (*err) return C_ERR;
}
/* Set the replica port, so that primary's INFO command can list the
* replica listening port correctly. */
{
sds portstr = getReplicaPortString();
*err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL);
sdsfree(portstr);
if (*err) return C_ERR;
}
/* Set the replica ip, so that primary's INFO command can list the
* replica IP address port correctly in case of port forwarding or NAT.
* Skip REPLCONF ip-address if there is no replica-announce-ip option set. */
if (server.replica_announce_ip) {
*err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL);
if (*err) return C_ERR;
}
/* Inform the primary of our (replica) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The primary will ignore capabilities it does not understand. */
*err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
server.dual_channel_replication ? "capa" : NULL,
server.dual_channel_replication ? "dual-channel" : NULL, NULL);
if (*err) return C_ERR;
/* Inform the primary of our (replica) version. */
*err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL);
if (*err) return C_ERR;
return C_OK;
}
int syncWithPrimaryHandleReceiveAuthReplyState(connection *conn, sds *err) {
if (server.primary_auth) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
if (*err[0] == '-') {
serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", *err);
sdsfree(*err);
return C_ERR;
}
sdsfree(*err);
*err = NULL;
}
return C_OK;
}
int syncWithPrimaryHandleReceivePortReplyState(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF listening-port. */
if (*err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF listening-port: %s",
*err);
}
sdsfree(*err);
return C_OK;
}
int syncWithPrimaryHandleReceiveIPReplyState(connection *conn, sds *err) {
if (server.replica_announce_ip) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF ip-address. */
if (*err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF ip-address: %s",
*err);
}
sdsfree(*err);
}
return C_OK;
}
int syncWithPrimaryHandleReceiveCapaReplyState(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF capa. */
if (*err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF capa: %s",
*err);
}
sdsfree(*err);
*err = NULL;
return C_OK;
}
int syncWithPrimaryHandleReceiveVersionReplyState(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
/* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */
if (*err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF VERSION: %s",
*err);
}
sdsfree(*err);
*err = NULL;
return C_OK;
}
int syncWithPrimaryHandleSendPsyncState(connection *conn, sds *err) {
if (replicaSendPsyncCommand(conn) == PSYNC_WRITE_ERROR) {
*err = sdsnew("Write error sending the PSYNC command.");
abortFailover("Write error to failover target");
return C_ERR;
}
return C_OK;
}
void syncWithPrimaryHandleError(connection *conn) {
connClose(conn);
server.repl_transfer_s = NULL;
if (server.repl_rdb_transfer_s) {
connClose(server.repl_rdb_transfer_s);
server.repl_rdb_transfer_s = NULL;
}
if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd);
if (server.repl_transfer_tmpfile) zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_state = REPL_STATE_CONNECT;
}
void syncWithPrimaryHandleWriteError(connection *conn, sds *err) {
serverLog(LL_WARNING, "Sending command to primary in replication handshake: %s", *err);
sdsfree(*err);
syncWithPrimaryHandleError(conn);
}
void syncWithPrimaryHandleNoResponseError(connection *conn) {
serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake");
syncWithPrimaryHandleError(conn);
}
/*
* Dual channel for full sync
*
@ -3501,217 +3688,106 @@ void syncWithPrimary(connection *conn) {
* may find that the socket is in error state. */
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING, "Error condition on socket for SYNC: %s", connGetLastError(conn));
goto error;
}
/* Send a PING to check the primary is able to reply without errors. */
if (server.repl_state == REPL_STATE_CONNECTING) {
serverLog(LL_NOTICE, "Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
connSetReadHandler(conn, syncWithPrimary);
connSetWriteHandler(conn, NULL);
server.repl_state = REPL_STATE_RECEIVE_PING_REPLY;
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
err = sendCommand(conn, "PING", NULL);
if (err) goto write_error;
syncWithPrimaryHandleError(conn);
return;
}
switch (server.repl_state) {
/* Send a PING to check the primary is able to reply without errors. */
case REPL_STATE_CONNECTING:
if (syncWithPrimaryHandleConnectingState(conn, &err) == C_ERR) {
syncWithPrimaryHandleWriteError(conn, &err);
return;
}
server.repl_state = REPL_STATE_RECEIVE_PING_REPLY;
return;
/* Receive the PONG command. */
if (server.repl_state == REPL_STATE_RECEIVE_PING_REPLY) {
err = receiveSynchronousResponse(conn);
/* The primary did not reply */
if (err == NULL) goto no_response_error;
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis OSS replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (err[0] != '+' && strncmp(err, "-NOAUTH", 7) != 0 && strncmp(err, "-NOPERM", 7) != 0 &&
strncmp(err, "-ERR operation not permitted", 28) != 0) {
serverLog(LL_WARNING, "Error reply to PING from primary: '%s'", err);
sdsfree(err);
goto error;
} else {
serverLog(LL_NOTICE, "Primary replied to PING, replication can continue...");
case REPL_STATE_RECEIVE_PING_REPLY:
if (syncWithPrimaryHandleReceivePingReplyState(conn, &err) == C_ERR) {
if (err == NULL)
syncWithPrimaryHandleNoResponseError(conn);
else
syncWithPrimaryHandleError(conn);
return;
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_HANDSHAKE;
}
if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) {
/* AUTH with the primary if required. */
if (server.primary_auth) {
char *args[3] = {"AUTH", NULL, NULL};
size_t lens[3] = {4, 0, 0};
int argc = 1;
if (server.primary_user) {
args[argc] = server.primary_user;
lens[argc] = strlen(server.primary_user);
argc++;
}
args[argc] = server.primary_auth;
lens[argc] = sdslen(server.primary_auth);
argc++;
err = sendCommandArgv(conn, argc, args, lens);
if (err) goto write_error;
/* fall through */
case REPL_STATE_SEND_HANDSHAKE:
if (syncWithPrimaryHandleSendHandshakeState(conn, &err) == C_ERR) {
syncWithPrimaryHandleWriteError(conn, &err);
return;
}
/* Set the replica port, so that primary's INFO command can list the
* replica listening port correctly. */
{
sds portstr = getReplicaPortString();
err = sendCommand(conn, "REPLCONF", "listening-port", portstr, NULL);
sdsfree(portstr);
if (err) goto write_error;
}
/* Set the replica ip, so that primary's INFO command can list the
* replica IP address port correctly in case of port forwarding or NAT.
* Skip REPLCONF ip-address if there is no replica-announce-ip option set. */
if (server.replica_announce_ip) {
err = sendCommand(conn, "REPLCONF", "ip-address", server.replica_announce_ip, NULL);
if (err) goto write_error;
}
/* Inform the primary of our (replica) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The primary will ignore capabilities it does not understand. */
err = sendCommand(conn, "REPLCONF", "capa", "eof", "capa", "psync2",
server.dual_channel_replication ? "capa" : NULL,
server.dual_channel_replication ? "dual-channel" : NULL, NULL);
if (err) goto write_error;
/* Inform the primary of our (replica) version. */
err = sendCommand(conn, "REPLCONF", "version", VALKEY_VERSION, NULL);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
return;
}
if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.primary_auth)
server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY;
/* Receive AUTH reply. */
if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
if (err[0] == '-') {
serverLog(LL_WARNING, "Unable to AUTH to PRIMARY: %s", err);
sdsfree(err);
goto error;
case REPL_STATE_RECEIVE_AUTH_REPLY:
if (syncWithPrimaryHandleReceiveAuthReplyState(conn, &err) == C_ERR) {
if (err == NULL)
syncWithPrimaryHandleNoResponseError(conn);
else
syncWithPrimaryHandleError(conn);
return;
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY;
return;
}
if (server.primary_auth) return;
/* fall through */
/* Receive REPLCONF listening-port reply. */
if (server.repl_state == REPL_STATE_RECEIVE_PORT_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF listening-port: %s",
err);
case REPL_STATE_RECEIVE_PORT_REPLY:
if (syncWithPrimaryHandleReceivePortReplyState(conn, &err) == C_ERR) {
syncWithPrimaryHandleNoResponseError(conn);
return;
}
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_IP_REPLY;
return;
}
if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.replica_announce_ip)
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
/* Receive REPLCONF ip-address reply. */
if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF ip-address. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF ip-address: %s",
err);
case REPL_STATE_RECEIVE_IP_REPLY:
if (syncWithPrimaryHandleReceiveIPReplyState(conn, &err) == C_ERR) {
syncWithPrimaryHandleNoResponseError(conn);
return;
}
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
return;
}
if (server.replica_announce_ip) return;
/* fall through */
/* Receive CAPA reply. */
if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any, not all the Redis OSS versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF capa: %s",
err);
case REPL_STATE_RECEIVE_CAPA_REPLY:
if (syncWithPrimaryHandleReceiveCapaReplyState(conn, &err) == C_ERR) {
syncWithPrimaryHandleNoResponseError(conn);
return;
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_RECEIVE_VERSION_REPLY;
return;
}
/* Receive VERSION reply. */
if (server.repl_state == REPL_STATE_RECEIVE_VERSION_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto no_response_error;
/* Ignore the error if any. Valkey >= 8 supports REPLCONF VERSION. */
if (err[0] == '-') {
serverLog(LL_NOTICE,
"(Non critical) Primary does not understand "
"REPLCONF VERSION: %s",
err);
case REPL_STATE_RECEIVE_VERSION_REPLY:
if (syncWithPrimaryHandleReceiveVersionReplyState(conn, &err) == C_ERR) {
syncWithPrimaryHandleNoResponseError(conn);
return;
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC;
}
/* fall through */
/* Try a partial resynchronization. If we don't have a cached primary
* replicaTryPartialResynchronization() will at least try to use PSYNC
* replicaSendPsyncCommand() will at least try to use PSYNC
* to start a full resynchronization so that we get the primary replid
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
err = sdsnew("Write error sending the PSYNC command.");
abortFailover("Write error to failover target");
goto write_error;
case REPL_STATE_SEND_PSYNC:
if (syncWithPrimaryHandleSendPsyncState(conn, &err) == C_ERR) {
syncWithPrimaryHandleWriteError(conn, &err);
return;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
return;
}
/* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC_REPLY. */
if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) {
serverLog(LL_WARNING,
"syncWithPrimary(): state machine error, "
"state should be RECEIVE_PSYNC but is %d",
server.repl_state);
goto error;
default:
if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) {
serverLog(LL_WARNING,
"syncWithPrimary(): state machine error, "
"state should be RECEIVE_PSYNC but is %d",
server.repl_state);
syncWithPrimaryHandleError(conn);
return;
}
}
psync_result = replicaTryPartialResynchronization(conn, 1);
psync_result = replicaProcessPsyncReply(conn);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
/* Check the status of the planned failover. We expect PSYNC_CONTINUE,
@ -3730,7 +3806,10 @@ void syncWithPrimary(connection *conn) {
* from scratch later, so go to the error path. This happens when
* the server is loading the dataset or is not connected with its
* primary and so forth. */
if (psync_result == PSYNC_TRY_LATER) goto error;
if (psync_result == PSYNC_TRY_LATER) {
syncWithPrimaryHandleError(conn);
return;
}
/* Note: if PSYNC does not return WAIT_REPLY, it will take care of
* uninstalling the read handler from the file descriptor. */
@ -3751,7 +3830,8 @@ void syncWithPrimary(connection *conn) {
serverLog(LL_NOTICE, "Retrying with SYNC...");
if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) {
serverLog(LL_WARNING, "I/O error writing to PRIMARY: %s", connGetLastError(conn));
goto error;
syncWithPrimaryHandleError(conn);
return;
}
}
@ -3771,7 +3851,8 @@ void syncWithPrimary(connection *conn) {
if (dfd == -1) {
serverLog(LL_WARNING, "Opening the temp file needed for PRIMARY <-> REPLICA synchronization: %s",
strerror(errno));
goto error;
syncWithPrimaryHandleError(conn);
return;
}
server.repl_transfer_tmpfile = zstrdup(tmpfile);
server.repl_transfer_fd = dfd;
@ -3787,13 +3868,15 @@ void syncWithPrimary(connection *conn) {
serverLog(LL_WARNING, "Unable to connect to Primary: %s", connGetLastError(server.repl_transfer_s));
connClose(server.repl_rdb_transfer_s);
server.repl_rdb_transfer_s = NULL;
goto error;
syncWithPrimaryHandleError(conn);
return;
}
if (connSetReadHandler(conn, NULL) == C_ERR) {
char conninfo[CONN_INFO_LEN];
dualChannelServerLog(LL_WARNING, "Can't clear main connection handler: %s (%s)", strerror(errno),
connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
syncWithPrimaryHandleError(conn);
return;
}
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_SEND_HANDSHAKE;
return;
@ -3803,7 +3886,8 @@ void syncWithPrimary(connection *conn) {
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno),
connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
syncWithPrimaryHandleError(conn);
return;
}
server.repl_state = REPL_STATE_TRANSFER;
@ -3811,30 +3895,6 @@ void syncWithPrimary(connection *conn) {
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_lastio = server.unixtime;
return;
no_response_error: /* Handle receiveSynchronousResponse() error when primary has no reply */
serverLog(LL_WARNING, "Primary did not respond to command during SYNC handshake");
/* Fall through to regular error handling */
error:
connClose(conn);
server.repl_transfer_s = NULL;
if (server.repl_rdb_transfer_s) {
connClose(server.repl_rdb_transfer_s);
server.repl_rdb_transfer_s = NULL;
}
if (server.repl_transfer_fd != -1) close(server.repl_transfer_fd);
if (server.repl_transfer_tmpfile) zfree(server.repl_transfer_tmpfile);
server.repl_transfer_tmpfile = NULL;
server.repl_transfer_fd = -1;
server.repl_state = REPL_STATE_CONNECT;
return;
write_error: /* Handle sendCommand() errors. */
serverLog(LL_WARNING, "Sending command to primary in replication handshake: %s", err);
sdsfree(err);
goto error;
}
int connectWithPrimary(void) {