syncWithMaster: use pipeline for AUTH+REPLCONF*3

The commit deals with the syncWithMaster and the ugly state machine in it.
It attempts to clean it a bit, but more importantly it uses pipeline for
part of the work (rather than 7 round trips, we now have 4).
i.e. the connect and PING are separate, then AUTH + 3 REPLCONF in one pipeline,
and finally the PSYNC (must be separate since the master has to have an empty
output buffer).
This commit is contained in:
Oran Agra 2020-12-18 22:10:31 +02:00
parent 9bd212cf24
commit e87c31de66
2 changed files with 81 additions and 85 deletions

View File

@ -1334,8 +1334,8 @@ void shiftReplicationId(void) {
/* Returns 1 if the given replication state is a handshake state, /* Returns 1 if the given replication state is a handshake state,
* 0 otherwise. */ * 0 otherwise. */
int slaveIsInHandshakeState(void) { int slaveIsInHandshakeState(void) {
return server.repl_state >= REPL_STATE_RECEIVE_PONG && return server.repl_state >= REPL_STATE_RECEIVE_PING_REPLY &&
server.repl_state <= REPL_STATE_RECEIVE_PSYNC; server.repl_state <= REPL_STATE_RECEIVE_PSYNC_REPLY;
} }
/* Avoid the master to detect the slave is timing out while loading the /* Avoid the master to detect the slave is timing out while loading the
@ -2159,7 +2159,7 @@ void syncWithMaster(connection *conn) {
* registered and we can wait for the PONG reply. */ * registered and we can wait for the PONG reply. */
connSetReadHandler(conn, syncWithMaster); connSetReadHandler(conn, syncWithMaster);
connSetWriteHandler(conn, NULL); connSetWriteHandler(conn, NULL);
server.repl_state = REPL_STATE_RECEIVE_PONG; server.repl_state = REPL_STATE_RECEIVE_PING_REPLY;
/* Send the PING, don't check for errors at all, we have the timeout /* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */ * that will take care about this. */
err = sendCommand(conn,"PING",NULL); err = sendCommand(conn,"PING",NULL);
@ -2168,7 +2168,7 @@ void syncWithMaster(connection *conn) {
} }
/* Receive the PONG command. */ /* Receive the PONG command. */
if (server.repl_state == REPL_STATE_RECEIVE_PONG) { if (server.repl_state == REPL_STATE_RECEIVE_PING_REPLY) {
err = receiveSynchronousResponse(conn); err = receiveSynchronousResponse(conn);
/* We accept only two replies as valid, a positive +PONG reply /* We accept only two replies as valid, a positive +PONG reply
@ -2189,11 +2189,12 @@ void syncWithMaster(connection *conn) {
"Master replied to PING, replication can continue..."); "Master replied to PING, replication can continue...");
} }
sdsfree(err); sdsfree(err);
server.repl_state = REPL_STATE_SEND_AUTH; err = NULL;
server.repl_state = REPL_STATE_SEND_HANDSHAKE;
} }
/* AUTH with the master if required. */ if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) {
if (server.repl_state == REPL_STATE_SEND_AUTH) { /* AUTH with the master if required. */
if (server.masterauth) { if (server.masterauth) {
char *args[3] = {"AUTH",NULL,NULL}; char *args[3] = {"AUTH",NULL,NULL};
size_t lens[3] = {4,0,0}; size_t lens[3] = {4,0,0};
@ -2208,15 +2209,53 @@ void syncWithMaster(connection *conn) {
argc++; argc++;
err = sendCommandArgv(conn, argc, args, lens); err = sendCommandArgv(conn, argc, args, lens);
if (err) goto write_error; if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else {
server.repl_state = REPL_STATE_SEND_PORT;
} }
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
{
int port;
if (server.slave_announce_port)
port = server.slave_announce_port;
else if (server.tls_replication && server.tls_port)
port = server.tls_port;
else
port = server.port;
sds portstr = sdsfromlonglong(port);
err = sendCommand(conn,"REPLCONF",
"listening-port",portstr, NULL);
sdsfree(portstr);
if (err) goto write_error;
}
/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT.
* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
if (server.slave_announce_ip) {
err = sendCommand(conn,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
}
/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
err = sendCommand(conn,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY;
return;
} }
if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.masterauth)
server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY;
/* Receive AUTH reply. */ /* Receive AUTH reply. */
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) {
err = receiveSynchronousResponse(conn); err = receiveSynchronousResponse(conn);
if (err[0] == '-') { if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
@ -2224,28 +2263,13 @@ void syncWithMaster(connection *conn) {
goto error; goto error;
} }
sdsfree(err); sdsfree(err);
server.repl_state = REPL_STATE_SEND_PORT; err = NULL;
} server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY;
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
if (server.repl_state == REPL_STATE_SEND_PORT) {
int port;
if (server.slave_announce_port) port = server.slave_announce_port;
else if (server.tls_replication && server.tls_port) port = server.tls_port;
else port = server.port;
sds portstr = sdsfromlonglong(port);
err = sendCommand(conn,"REPLCONF",
"listening-port",portstr, NULL);
sdsfree(portstr);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_PORT;
return; return;
} }
/* Receive REPLCONF listening-port reply. */ /* Receive REPLCONF listening-port reply. */
if (server.repl_state == REPL_STATE_RECEIVE_PORT) { if (server.repl_state == REPL_STATE_RECEIVE_PORT_REPLY) {
err = receiveSynchronousResponse(conn); err = receiveSynchronousResponse(conn);
/* Ignore the error if any, not all the Redis versions support /* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */ * REPLCONF listening-port. */
@ -2254,29 +2278,15 @@ void syncWithMaster(connection *conn) {
"REPLCONF listening-port: %s", err); "REPLCONF listening-port: %s", err);
} }
sdsfree(err); sdsfree(err);
server.repl_state = REPL_STATE_SEND_IP; server.repl_state = REPL_STATE_RECEIVE_IP_REPLY;
}
/* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
if (server.repl_state == REPL_STATE_SEND_IP &&
server.slave_announce_ip == NULL)
{
server.repl_state = REPL_STATE_SEND_CAPA;
}
/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT. */
if (server.repl_state == REPL_STATE_SEND_IP) {
err = sendCommand(conn,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_IP;
return; return;
} }
if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.slave_announce_ip)
server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
/* Receive REPLCONF ip-address reply. */ /* Receive REPLCONF ip-address reply. */
if (server.repl_state == REPL_STATE_RECEIVE_IP) { if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) {
err = receiveSynchronousResponse(conn); err = receiveSynchronousResponse(conn);
/* Ignore the error if any, not all the Redis versions support /* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */ * REPLCONF listening-port. */
@ -2285,26 +2295,12 @@ void syncWithMaster(connection *conn) {
"REPLCONF ip-address: %s", err); "REPLCONF ip-address: %s", err);
} }
sdsfree(err); sdsfree(err);
server.repl_state = REPL_STATE_SEND_CAPA; server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY;
}
/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendCommand(conn,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
return; return;
} }
/* Receive CAPA reply. */ /* Receive CAPA reply. */
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) {
err = receiveSynchronousResponse(conn); err = receiveSynchronousResponse(conn);
/* Ignore the error if any, not all the Redis versions support /* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */ * REPLCONF capa. */
@ -2313,6 +2309,7 @@ void syncWithMaster(connection *conn) {
"REPLCONF capa: %s", err); "REPLCONF capa: %s", err);
} }
sdsfree(err); sdsfree(err);
err = NULL;
server.repl_state = REPL_STATE_SEND_PSYNC; server.repl_state = REPL_STATE_SEND_PSYNC;
} }
@ -2326,12 +2323,12 @@ void syncWithMaster(connection *conn) {
err = sdsnew("Write error sending the PSYNC command."); err = sdsnew("Write error sending the PSYNC command.");
goto write_error; goto write_error;
} }
server.repl_state = REPL_STATE_RECEIVE_PSYNC; server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY;
return; return;
} }
/* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */ /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */
if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) {
serverLog(LL_WARNING,"syncWithMaster(): state machine error, " serverLog(LL_WARNING,"syncWithMaster(): state machine error, "
"state should be RECEIVE_PSYNC but is %d", "state should be RECEIVE_PSYNC but is %d",
server.repl_state); server.repl_state);

View File

@ -299,24 +299,23 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
/* Slave replication state. Used in server.repl_state for slaves to remember /* Slave replication state. Used in server.repl_state for slaves to remember
* what to do next. */ * what to do next. */
#define REPL_STATE_NONE 0 /* No active replication */ typedef enum {
#define REPL_STATE_CONNECT 1 /* Must connect to master */ REPL_STATE_NONE = 0, /* No active replication */
#define REPL_STATE_CONNECTING 2 /* Connecting to master */ REPL_STATE_CONNECT, /* Must connect to master */
/* --- Handshake states, must be ordered --- */ REPL_STATE_CONNECTING, /* Connecting to master */
#define REPL_STATE_RECEIVE_PONG 3 /* Wait for PING reply */ /* --- Handshake states, must be ordered --- */
#define REPL_STATE_SEND_AUTH 4 /* Send AUTH to master */ REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */
#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */ REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequance to master */
#define REPL_STATE_SEND_PORT 6 /* Send REPLCONF listening-port */ REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */
#define REPL_STATE_RECEIVE_PORT 7 /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_IP 8 /* Send REPLCONF ip-address */ REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */
#define REPL_STATE_RECEIVE_IP 9 /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_CAPA 10 /* Send REPLCONF capa */ REPL_STATE_SEND_PSYNC, /* Send PSYNC */
#define REPL_STATE_RECEIVE_CAPA 11 /* Wait for REPLCONF reply */ REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */
#define REPL_STATE_SEND_PSYNC 12 /* Send PSYNC */ /* --- End of handshake states --- */
#define REPL_STATE_RECEIVE_PSYNC 13 /* Wait for PSYNC reply */ REPL_STATE_TRANSFER, /* Receiving .rdb from master */
/* --- End of handshake states --- */ REPL_STATE_CONNECTED, /* Connected to master */
#define REPL_STATE_TRANSFER 14 /* Receiving .rdb from master */ } repl_state;
#define REPL_STATE_CONNECTED 15 /* Connected to master */
/* State of slaves from the POV of the master. Used in client->replstate. /* State of slaves from the POV of the master. Used in client->replstate.
* In SEND_BULK and ONLINE state the slave receives new updates * In SEND_BULK and ONLINE state the slave receives new updates