Refactor return and goto statements (#945)

Consolidate the cleanup of local variables to a single point within the
method, ensuring proper resource management and p
reventing memory leaks or double-free issues.

Previoslly descused here:
- https://github.com/valkey-io/valkey/pull/60#discussion_r1667872633
- https://github.com/valkey-io/valkey/pull/60#discussion_r1668045666

---------

Signed-off-by: naglera <anagler123@gmail.com>
Signed-off-by: Amit Nagler <58042354+naglera@users.noreply.github.com>
Co-authored-by: Ping Xie <pingxie@outlook.com>
This commit is contained in:
Amit Nagler 2024-10-15 19:26:42 +03:00 committed by GitHub
parent 247a8f23c5
commit b0f23df165
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 246 additions and 177 deletions

View File

@ -53,8 +53,9 @@ int replicaPutOnline(client *replica);
void replicaStartCommandStream(client *replica);
int cancelReplicationHandshake(int reconnect);
void replicationSteadyStateInit(void);
void setupMainConnForPsync(connection *conn);
void dualChannelSetupMainConnForPsync(connection *conn);
void dualChannelSyncHandleRdbLoadCompletion(void);
static void dualChannelFullSyncWithPrimary(connection *conn);
/* We take a global flag to remember if this instance generated an RDB
* because of replication, so that we can remove the RDB file in case
@ -2588,13 +2589,135 @@ int sendCurrentOffsetToReplica(client *replica) {
return C_OK;
}
static int dualChannelReplHandleHandshake(connection *conn, sds *err) {
serverLog(LL_DEBUG, "Received first reply from primary using rdb connection.");
/* AUTH with the primary if required. */
if (server.primary_auth) {
char *args[] = {"AUTH", NULL, NULL};
size_t lens[] = {4, 0, 0};
int argc = 1;
if (server.primary_user) {
args[argc] = server.primary_user;
lens[argc] = strlen(server.primary_user);
argc++;
}
args[argc] = server.primary_auth;
lens[argc] = sdslen(server.primary_auth);
argc++;
*err = sendCommandArgv(conn, argc, args, lens);
if (*err) {
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err);
return C_ERR;
}
}
/* Send replica listening port to primary for clarification */
sds portstr = getReplicaPortString();
*err = sendCommand(conn, "REPLCONF", "capa", "eof", "rdb-only", "1", "rdb-channel", "1", "listening-port", portstr,
NULL);
sdsfree(portstr);
if (*err) {
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", *err);
return C_ERR;
}
if (connSetReadHandler(conn, dualChannelFullSyncWithPrimary) == C_ERR) {
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno),
connGetInfo(conn, conninfo, sizeof(conninfo)));
return C_ERR;
}
return C_OK;
}
static int dualChannelReplHandleAuthReply(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) {
serverLog(LL_WARNING, "Primary did not respond to auth command during SYNC handshake");
return C_ERR;
}
if ((*err)[0] == '-') {
serverLog(LL_WARNING, "Unable to AUTH to Primary: %s", *err);
return C_ERR;
}
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY;
return C_OK;
}
static int dualChannelReplHandleReplconfReply(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) {
serverLog(LL_WARNING, "Primary did not respond to replconf command during SYNC handshake");
return C_ERR;
}
if (*err[0] == '-') {
serverLog(LL_NOTICE, "Server does not support sync with offset, dual channel sync approach cannot be used: %s",
*err);
return C_ERR;
}
if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) {
serverLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn));
return C_ERR;
}
return C_OK;
}
static int dualChannelReplHandleEndOffsetResponse(connection *conn, sds *err) {
uint64_t rdb_client_id;
*err = receiveSynchronousResponse(conn);
if (*err == NULL) {
return C_ERR;
}
if (*err[0] == '\0') {
/* Retry again later */
serverLog(LL_DEBUG, "Received empty $ENDOFF response");
return C_RETRY;
}
long long reploffset;
char primary_replid[CONFIG_RUN_ID_SIZE + 1];
int dbid;
/* Parse end offset response */
char *endoff_format = "$ENDOFF:%lld %40s %d %llu";
if (sscanf(*err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) {
serverLog(LL_WARNING, "Received unexpected $ENDOFF response: %s", *err);
return C_ERR;
}
server.rdb_client_id = rdb_client_id;
server.primary_initial_offset = reploffset;
/* Initiate repl_provisional_primary to act as this replica temp primary until RDB is loaded */
server.repl_provisional_primary.conn = server.repl_transfer_s;
memcpy(server.repl_provisional_primary.replid, primary_replid, CONFIG_RUN_ID_SIZE);
server.repl_provisional_primary.reploff = reploffset;
server.repl_provisional_primary.read_reploff = reploffset;
server.repl_provisional_primary.dbid = dbid;
/* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the
* main connection accordingly.*/
server.repl_transfer_s->state = CONN_STATE_CONNECTED;
server.repl_state = REPL_STATE_SEND_HANDSHAKE;
serverAssert(connSetReadHandler(server.repl_transfer_s, dualChannelSetupMainConnForPsync) != C_ERR);
dualChannelSetupMainConnForPsync(server.repl_transfer_s);
/* As the next block we will receive using this connection is the rdb, we need to prepare
* the connection accordingly */
serverAssert(connSetReadHandler(server.repl_rdb_transfer_s, readSyncBulkPayload) != C_ERR);
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_lastio = server.unixtime;
return C_OK;
}
/* Replication: Replica side.
* This connection handler is used to initialize the RDB connection (dual-channel-replication).
* Once a replica with dual-channel-replication enabled, denied from PSYNC with its primary,
* fullSyncWithPrimary begins its role. The connection handler prepares server.repl_rdb_transfer_s
* dualChannelFullSyncWithPrimary begins its role. The connection handler prepares server.repl_rdb_transfer_s
* for a rdb stream, and server.repl_transfer_s for incremental replication data stream. */
static void fullSyncWithPrimary(connection *conn) {
static void dualChannelFullSyncWithPrimary(connection *conn) {
char *err = NULL;
int ret = 0;
serverAssert(conn == server.repl_rdb_transfer_s);
/* If this event fired after the user turned the instance into a primary
* with REPLICAOF NO ONE we must just return ASAP. */
@ -2607,138 +2730,40 @@ static void fullSyncWithPrimary(connection *conn) {
serverLog(LL_WARNING, "Error condition on socket for dual channel replication: %s", connGetLastError(conn));
goto error;
}
/* Send replica capabilities */
if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_SEND_HANDSHAKE) {
serverLog(LL_DEBUG, "Received first reply from primary using rdb connection.");
/* AUTH with the primary if required. */
switch (server.repl_rdb_channel_state) {
case REPL_DUAL_CHANNEL_SEND_HANDSHAKE:
ret = dualChannelReplHandleHandshake(conn, &err);
if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY;
break;
case REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY:
if (server.primary_auth) {
char *args[] = {"AUTH", NULL, NULL};
size_t lens[] = {4, 0, 0};
int argc = 1;
if (server.primary_user) {
args[argc] = server.primary_user;
lens[argc] = strlen(server.primary_user);
argc++;
}
args[argc] = server.primary_auth;
lens[argc] = sdslen(server.primary_auth);
argc++;
err = sendCommandArgv(conn, argc, args, lens);
if (err) {
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", err);
return;
}
ret = dualChannelReplHandleAuthReply(conn, &err);
if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY;
/* Wait for next bulk before trying to read replconf reply. */
break;
}
/* Send replica listening port to primary for clarification */
sds portstr = getReplicaPortString();
err = sendCommand(conn, "REPLCONF", "capa", "eof", "rdb-only", "1", "rdb-channel", "1", "listening-port",
portstr, NULL);
sdsfree(portstr);
if (err) {
serverLog(LL_WARNING, "Sending command to primary in dual channel replication handshake: %s", err);
return;
}
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY;
if (connSetReadHandler(conn, fullSyncWithPrimary) == C_ERR) {
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING, "Can't create readable event for SYNC: %s (%s)", strerror(errno),
connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
}
return;
}
if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY && !server.primary_auth) {
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY;
/* fall through */
case REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY:
ret = dualChannelReplHandleReplconfReply(conn, &err);
if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_ENDOFF;
break;
case REPL_DUAL_CHANNEL_RECEIVE_ENDOFF:
ret = dualChannelReplHandleEndOffsetResponse(conn, &err);
if (ret == C_OK) server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RDB_LOAD;
break;
default:
serverPanic("Unexpected dual replication state: %d", server.repl_rdb_channel_state);
}
/* Receive AUTH reply. */
if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_AUTH_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) {
serverLog(LL_WARNING, "Primary did not respond to auth command during SYNC handshake");
goto error;
}
if (err[0] == '-') {
serverLog(LL_WARNING, "Unable to AUTH to Primary: %s", err);
goto error;
}
sdsfree(err);
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY;
return;
}
/* Receive replconf response */
if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_REPLCONF_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) {
serverLog(LL_WARNING, "Primary did not respond to replconf command during SYNC handshake");
goto error;
}
if (err[0] == '-') {
serverLog(LL_NOTICE,
"Server does not support sync with offset, dual channel sync approach cannot be used: %s", err);
goto error;
}
if (connSyncWrite(conn, "SYNC\r\n", 6, server.repl_syncio_timeout * 1000) == -1) {
serverLog(LL_WARNING, "I/O error writing to Primary: %s", connGetLastError(conn));
goto error;
}
sdsfree(err);
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RECEIVE_ENDOFF;
return;
}
/* Receive end offset response */
if (server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RECEIVE_ENDOFF) {
uint64_t rdb_client_id;
err = receiveSynchronousResponse(conn);
if (err == NULL) goto error;
if (err[0] == '\0') {
/* Retry again later */
serverLog(LL_DEBUG, "Received empty $ENDOFF response");
sdsfree(err);
return;
}
long long reploffset;
char primary_replid[CONFIG_RUN_ID_SIZE + 1];
int dbid;
/* Parse end offset response */
char *endoff_format = "$ENDOFF:%lld %40s %d %llu";
if (sscanf(err, endoff_format, &reploffset, primary_replid, &dbid, &rdb_client_id) != 4) {
serverLog(LL_WARNING, "Received unexpected $ENDOFF response: %s", err);
goto error;
}
sdsfree(err);
server.rdb_client_id = rdb_client_id;
server.primary_initial_offset = reploffset;
/* Initiate repl_provisional_primary to act as this replica temp primary until RDB is loaded */
server.repl_provisional_primary.conn = server.repl_transfer_s;
memcpy(server.repl_provisional_primary.replid, primary_replid, CONFIG_RUN_ID_SIZE);
server.repl_provisional_primary.reploff = reploffset;
server.repl_provisional_primary.read_reploff = reploffset;
server.repl_provisional_primary.dbid = dbid;
/* Now that we have the snapshot end-offset, we can ask for psync from that offset. Prepare the
* main connection accordingly.*/
server.repl_transfer_s->state = CONN_STATE_CONNECTED;
server.repl_state = REPL_STATE_SEND_HANDSHAKE;
serverAssert(connSetReadHandler(server.repl_transfer_s, setupMainConnForPsync) != C_ERR);
setupMainConnForPsync(server.repl_transfer_s);
/* As the next block we will receive using this connection is the rdb, we need to prepare
* the connection accordingly */
serverAssert(connSetReadHandler(server.repl_rdb_transfer_s, readSyncBulkPayload) != C_ERR);
server.repl_transfer_size = -1;
server.repl_transfer_read = 0;
server.repl_transfer_last_fsync_off = 0;
server.repl_transfer_lastio = server.unixtime;
server.repl_rdb_channel_state = REPL_DUAL_CHANNEL_RDB_LOAD;
return;
}
if (ret == C_ERR) goto error;
sdsfree(err);
return;
error:
sdsfree(err);
if (err) {
serverLog(LL_WARNING, "Dual channel sync failed with error %s", err);
sdsfree(err);
}
if (server.repl_transfer_s) {
connClose(server.repl_transfer_s);
server.repl_transfer_s = NULL;
@ -2751,7 +2776,6 @@ error:
server.repl_transfer_fd = -1;
server.repl_state = REPL_STATE_CONNECT;
replicationAbortDualChannelSyncTransfer();
return;
}
/* Replication: Replica side.
@ -2920,24 +2944,23 @@ void dualChannelSyncSuccess(void) {
/* Replication: Replica side.
* Main channel successfully established psync with primary. Check whether the rdb channel
* has completed its part and act accordingly. */
void dualChannelSyncHandlePsync(void) {
int dualChannelSyncHandlePsync(void) {
serverAssert(server.repl_state == REPL_STATE_RECEIVE_PSYNC_REPLY);
if (server.repl_rdb_channel_state < REPL_DUAL_CHANNEL_RDB_LOADED) {
/* RDB is still loading */
if (connSetReadHandler(server.repl_provisional_primary.conn, bufferReplData) == C_ERR) {
serverLog(LL_WARNING, "Error while setting readable handler: %s", strerror(errno));
cancelReplicationHandshake(1);
return;
return C_ERR;
}
replDataBufInit();
server.repl_state = REPL_STATE_TRANSFER;
return;
return C_OK;
}
serverAssert(server.repl_rdb_channel_state == REPL_DUAL_CHANNEL_RDB_LOADED);
/* RDB is loaded */
serverLog(LL_DEBUG, "Dual channel sync - psync established after rdb load");
dualChannelSyncSuccess();
return;
return C_OK;
}
/* Replication: Replica side.
@ -3195,46 +3218,54 @@ int replicaTryPartialResynchronization(connection *conn, int read_reply) {
return PSYNC_NOT_SUPPORTED;
}
/* Replication: Replica side.
* This connection handler fires after rdb-connection was initialized. We use it
* to adjust the replica main for loading incremental changes into the local buffer. */
void setupMainConnForPsync(connection *conn) {
int psync_result = -1;
sds getTryPsyncString(int result) {
switch (result) {
case PSYNC_WRITE_ERROR: return sdsnew("PSYNC_WRITE_ERROR");
case PSYNC_WAIT_REPLY: return sdsnew("PSYNC_WAIT_REPLY");
case PSYNC_CONTINUE: return sdsnew("PSYNC_CONTINUE");
case PSYNC_FULLRESYNC: return sdsnew("PSYNC_FULLRESYNC");
case PSYNC_NOT_SUPPORTED: return sdsnew("PSYNC_NOT_SUPPORTED");
case PSYNC_TRY_LATER: return sdsnew("PSYNC_TRY_LATER");
case PSYNC_FULLRESYNC_DUAL_CHANNEL: return sdsnew("PSYNC_FULLRESYNC_DUAL_CHANNEL");
default: return sdsnew("Unknown result");
}
}
int dualChannelReplMainConnSendHandshake(connection *conn, sds *err) {
char llstr[LONG_STR_SIZE];
char *err = NULL;
if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) {
/* We already have an initialized connection at primary side, we only need to associate it with RDB connection */
ull2string(llstr, sizeof(llstr), server.rdb_client_id);
err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL);
if (err) goto error;
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
sdsfree(err);
return;
}
ull2string(llstr, sizeof(llstr), server.rdb_client_id);
*err = sendCommand(conn, "REPLCONF", "set-rdb-client-id", llstr, NULL);
if (*err) return C_ERR;
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
return C_OK;
}
if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) {
err = receiveSynchronousResponse(conn);
if (err == NULL) goto error;
if (err[0] == '-') {
serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", err);
goto error;
}
sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC;
int dualChannelReplMainConnRecvCapaReply(connection *conn, sds *err) {
*err = receiveSynchronousResponse(conn);
if (*err == NULL) return C_ERR;
if ((*err)[0] == '-') {
serverLog(LL_NOTICE, "Primary does not understand REPLCONF identify: %s", *err);
return C_ERR;
}
server.repl_state = REPL_STATE_SEND_PSYNC;
return C_OK;
}
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (server.debug_pause_after_fork) debugPauseProcess();
if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
serverLog(LL_WARNING, "Aborting dual channel sync. Write error.");
cancelReplicationHandshake(1);
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
return;
int dualChannelReplMainConnSendPsync(connection *conn, sds *err) {
if (server.debug_pause_after_fork) debugPauseProcess();
if (replicaTryPartialResynchronization(conn, 0) == PSYNC_WRITE_ERROR) {
serverLog(LL_WARNING, "Aborting dual channel sync. Write error.");
*err = sdsnew(connGetLastError(conn));
return C_ERR;
}
psync_result = replicaTryPartialResynchronization(conn, 1);
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
return C_OK;
}
int dualChannelReplMainConnRecvPsyncReply(connection *conn, sds *err) {
int psync_result = replicaTryPartialResynchronization(conn, 1);
if (psync_result == PSYNC_WAIT_REPLY) return C_OK; /* Try again later... */
if (psync_result == PSYNC_CONTINUE) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Primary accepted a Partial Resynchronization%s",
@ -3244,15 +3275,52 @@ void setupMainConnForPsync(connection *conn) {
"accept connections in read-write mode.\n");
}
dualChannelSyncHandlePsync();
return;
return C_OK;
}
*err = getTryPsyncString(psync_result);
return C_ERR;
}
/* Replication: Replica side.
* This connection handler fires after rdb-connection was initialized. We use it
* to adjust the replica main for loading incremental changes into the local buffer. */
void dualChannelSetupMainConnForPsync(connection *conn) {
char *err = NULL;
int ret;
switch (server.repl_state) {
case REPL_STATE_SEND_HANDSHAKE:
ret = dualChannelReplMainConnSendHandshake(conn, &err);
if (ret == C_OK) server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
break;
case REPL_STATE_RECEIVE_CAPA_REPLY:
ret = dualChannelReplMainConnRecvCapaReply(conn, &err);
if (ret == C_ERR) {
break;
}
if (ret == C_OK) server.repl_state = REPL_STATE_SEND_PSYNC;
sdsfree(err);
err = NULL;
/* fall through */
case REPL_STATE_SEND_PSYNC:
ret = dualChannelReplMainConnSendPsync(conn, &err);
if (ret == C_OK) server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
break;
case REPL_STATE_RECEIVE_PSYNC_REPLY:
ret = dualChannelReplMainConnRecvPsyncReply(conn, &err);
if (ret == C_OK && server.repl_rdb_channel_state != REPL_DUAL_CHANNEL_STATE_NONE)
server.repl_state = REPL_STATE_TRANSFER;
/* In case the RDB is already loaded, the repl_state will be set during establishPrimaryConnection. */
break;
default:
serverPanic("Unexpected replication state: %d", server.repl_state);
}
error:
if (ret == C_ERR) {
serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d %s", ret, err ? err : "");
cancelReplicationHandshake(1);
}
sdsfree(err);
/* The dual-channel sync session must be aborted for any psync_result other than PSYNC_CONTINUE or PSYNC_WAIT_REPLY.
*/
serverLog(LL_WARNING, "Aborting dual channel sync. Main channel psync result %d", psync_result);
cancelReplicationHandshake(1);
}
/*
@ -3625,7 +3693,7 @@ void syncWithPrimary(connection *conn) {
/* Create RDB connection */
server.repl_rdb_transfer_s = connCreate(connTypeOfReplication());
if (connConnect(server.repl_rdb_transfer_s, server.primary_host, server.primary_port, server.bind_source_addr,
fullSyncWithPrimary) == C_ERR) {
dualChannelFullSyncWithPrimary) == C_ERR) {
serverLog(LL_WARNING, "Unable to connect to Primary: %s", connGetLastError(server.repl_transfer_s));
connClose(server.repl_rdb_transfer_s);
server.repl_rdb_transfer_s = NULL;

View File

@ -110,6 +110,7 @@ struct hdr_histogram;
/* Error codes */
#define C_OK 0
#define C_ERR -1
#define C_RETRY -2
/* Static server configuration */
#define CONFIG_DEFAULT_HZ 10 /* Time interrupt calls/sec. */