diff --git a/src/replication.c b/src/replication.c index 4977bba42..520c43fa4 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1334,8 +1334,8 @@ void shiftReplicationId(void) { /* Returns 1 if the given replication state is a handshake state, * 0 otherwise. */ int slaveIsInHandshakeState(void) { - return server.repl_state >= REPL_STATE_RECEIVE_PONG && - server.repl_state <= REPL_STATE_RECEIVE_PSYNC; + return server.repl_state >= REPL_STATE_RECEIVE_PING_REPLY && + server.repl_state <= REPL_STATE_RECEIVE_PSYNC_REPLY; } /* Avoid the master to detect the slave is timing out while loading the @@ -2159,7 +2159,7 @@ void syncWithMaster(connection *conn) { * registered and we can wait for the PONG reply. */ connSetReadHandler(conn, syncWithMaster); connSetWriteHandler(conn, NULL); - server.repl_state = REPL_STATE_RECEIVE_PONG; + server.repl_state = REPL_STATE_RECEIVE_PING_REPLY; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ err = sendCommand(conn,"PING",NULL); @@ -2168,7 +2168,7 @@ void syncWithMaster(connection *conn) { } /* Receive the PONG command. */ - if (server.repl_state == REPL_STATE_RECEIVE_PONG) { + if (server.repl_state == REPL_STATE_RECEIVE_PING_REPLY) { err = receiveSynchronousResponse(conn); /* We accept only two replies as valid, a positive +PONG reply @@ -2189,11 +2189,12 @@ void syncWithMaster(connection *conn) { "Master replied to PING, replication can continue..."); } sdsfree(err); - server.repl_state = REPL_STATE_SEND_AUTH; + err = NULL; + server.repl_state = REPL_STATE_SEND_HANDSHAKE; } - /* AUTH with the master if required. */ - if (server.repl_state == REPL_STATE_SEND_AUTH) { + if (server.repl_state == REPL_STATE_SEND_HANDSHAKE) { + /* AUTH with the master if required. */ if (server.masterauth) { char *args[3] = {"AUTH",NULL,NULL}; size_t lens[3] = {4,0,0}; @@ -2208,15 +2209,53 @@ void syncWithMaster(connection *conn) { argc++; err = sendCommandArgv(conn, argc, args, lens); if (err) goto write_error; - server.repl_state = REPL_STATE_RECEIVE_AUTH; - return; - } else { - server.repl_state = REPL_STATE_SEND_PORT; } + + /* Set the slave port, so that Master's INFO command can list the + * slave listening port correctly. */ + { + int port; + if (server.slave_announce_port) + port = server.slave_announce_port; + else if (server.tls_replication && server.tls_port) + port = server.tls_port; + else + port = server.port; + sds portstr = sdsfromlonglong(port); + err = sendCommand(conn,"REPLCONF", + "listening-port",portstr, NULL); + sdsfree(portstr); + if (err) goto write_error; + } + + /* Set the slave ip, so that Master's INFO command can list the + * slave IP address port correctly in case of port forwarding or NAT. + * Skip REPLCONF ip-address if there is no slave-announce-ip option set. */ + if (server.slave_announce_ip) { + err = sendCommand(conn,"REPLCONF", + "ip-address",server.slave_announce_ip, NULL); + if (err) goto write_error; + } + + /* Inform the master of our (slave) capabilities. + * + * EOF: supports EOF-style RDB transfer for diskless replication. + * PSYNC2: supports PSYNC v2, so understands +CONTINUE . + * + * The master will ignore capabilities it does not understand. */ + err = sendCommand(conn,"REPLCONF", + "capa","eof","capa","psync2",NULL); + if (err) goto write_error; + + server.repl_state = REPL_STATE_RECEIVE_AUTH_REPLY; + return; } + if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY && !server.masterauth) + server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; + /* Receive AUTH reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { + if (server.repl_state == REPL_STATE_RECEIVE_AUTH_REPLY) { err = receiveSynchronousResponse(conn); if (err[0] == '-') { serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); @@ -2224,28 +2263,13 @@ void syncWithMaster(connection *conn) { goto error; } sdsfree(err); - server.repl_state = REPL_STATE_SEND_PORT; - } - - /* Set the slave port, so that Master's INFO command can list the - * slave listening port correctly. */ - if (server.repl_state == REPL_STATE_SEND_PORT) { - int port; - if (server.slave_announce_port) port = server.slave_announce_port; - else if (server.tls_replication && server.tls_port) port = server.tls_port; - else port = server.port; - sds portstr = sdsfromlonglong(port); - err = sendCommand(conn,"REPLCONF", - "listening-port",portstr, NULL); - sdsfree(portstr); - if (err) goto write_error; - sdsfree(err); - server.repl_state = REPL_STATE_RECEIVE_PORT; + err = NULL; + server.repl_state = REPL_STATE_RECEIVE_PORT_REPLY; return; } /* Receive REPLCONF listening-port reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_PORT) { + if (server.repl_state == REPL_STATE_RECEIVE_PORT_REPLY) { err = receiveSynchronousResponse(conn); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ @@ -2254,29 +2278,15 @@ void syncWithMaster(connection *conn) { "REPLCONF listening-port: %s", err); } sdsfree(err); - server.repl_state = REPL_STATE_SEND_IP; - } - - /* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */ - if (server.repl_state == REPL_STATE_SEND_IP && - server.slave_announce_ip == NULL) - { - server.repl_state = REPL_STATE_SEND_CAPA; - } - - /* Set the slave ip, so that Master's INFO command can list the - * slave IP address port correctly in case of port forwarding or NAT. */ - if (server.repl_state == REPL_STATE_SEND_IP) { - err = sendCommand(conn,"REPLCONF", - "ip-address",server.slave_announce_ip, NULL); - if (err) goto write_error; - sdsfree(err); - server.repl_state = REPL_STATE_RECEIVE_IP; + server.repl_state = REPL_STATE_RECEIVE_IP_REPLY; return; } + if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY && !server.slave_announce_ip) + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; + /* Receive REPLCONF ip-address reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_IP) { + if (server.repl_state == REPL_STATE_RECEIVE_IP_REPLY) { err = receiveSynchronousResponse(conn); /* Ignore the error if any, not all the Redis versions support * REPLCONF listening-port. */ @@ -2285,26 +2295,12 @@ void syncWithMaster(connection *conn) { "REPLCONF ip-address: %s", err); } sdsfree(err); - server.repl_state = REPL_STATE_SEND_CAPA; - } - - /* Inform the master of our (slave) capabilities. - * - * EOF: supports EOF-style RDB transfer for diskless replication. - * PSYNC2: supports PSYNC v2, so understands +CONTINUE . - * - * The master will ignore capabilities it does not understand. */ - if (server.repl_state == REPL_STATE_SEND_CAPA) { - err = sendCommand(conn,"REPLCONF", - "capa","eof","capa","psync2",NULL); - if (err) goto write_error; - sdsfree(err); - server.repl_state = REPL_STATE_RECEIVE_CAPA; + server.repl_state = REPL_STATE_RECEIVE_CAPA_REPLY; return; } /* Receive CAPA reply. */ - if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { + if (server.repl_state == REPL_STATE_RECEIVE_CAPA_REPLY) { err = receiveSynchronousResponse(conn); /* Ignore the error if any, not all the Redis versions support * REPLCONF capa. */ @@ -2313,6 +2309,7 @@ void syncWithMaster(connection *conn) { "REPLCONF capa: %s", err); } sdsfree(err); + err = NULL; server.repl_state = REPL_STATE_SEND_PSYNC; } @@ -2326,12 +2323,12 @@ void syncWithMaster(connection *conn) { err = sdsnew("Write error sending the PSYNC command."); goto write_error; } - server.repl_state = REPL_STATE_RECEIVE_PSYNC; + server.repl_state = REPL_STATE_RECEIVE_PSYNC_REPLY; return; } /* If reached this point, we should be in REPL_STATE_RECEIVE_PSYNC. */ - if (server.repl_state != REPL_STATE_RECEIVE_PSYNC) { + if (server.repl_state != REPL_STATE_RECEIVE_PSYNC_REPLY) { serverLog(LL_WARNING,"syncWithMaster(): state machine error, " "state should be RECEIVE_PSYNC but is %d", server.repl_state); diff --git a/src/server.h b/src/server.h index 887f7752e..a19b5ad9e 100644 --- a/src/server.h +++ b/src/server.h @@ -299,24 +299,23 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; /* Slave replication state. Used in server.repl_state for slaves to remember * what to do next. */ -#define REPL_STATE_NONE 0 /* No active replication */ -#define REPL_STATE_CONNECT 1 /* Must connect to master */ -#define REPL_STATE_CONNECTING 2 /* Connecting to master */ -/* --- Handshake states, must be ordered --- */ -#define REPL_STATE_RECEIVE_PONG 3 /* Wait for PING reply */ -#define REPL_STATE_SEND_AUTH 4 /* Send AUTH to master */ -#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */ -#define REPL_STATE_SEND_PORT 6 /* Send REPLCONF listening-port */ -#define REPL_STATE_RECEIVE_PORT 7 /* Wait for REPLCONF reply */ -#define REPL_STATE_SEND_IP 8 /* Send REPLCONF ip-address */ -#define REPL_STATE_RECEIVE_IP 9 /* Wait for REPLCONF reply */ -#define REPL_STATE_SEND_CAPA 10 /* Send REPLCONF capa */ -#define REPL_STATE_RECEIVE_CAPA 11 /* Wait for REPLCONF reply */ -#define REPL_STATE_SEND_PSYNC 12 /* Send PSYNC */ -#define REPL_STATE_RECEIVE_PSYNC 13 /* Wait for PSYNC reply */ -/* --- End of handshake states --- */ -#define REPL_STATE_TRANSFER 14 /* Receiving .rdb from master */ -#define REPL_STATE_CONNECTED 15 /* Connected to master */ +typedef enum { + REPL_STATE_NONE = 0, /* No active replication */ + REPL_STATE_CONNECT, /* Must connect to master */ + REPL_STATE_CONNECTING, /* Connecting to master */ + /* --- Handshake states, must be ordered --- */ + REPL_STATE_RECEIVE_PING_REPLY, /* Wait for PING reply */ + REPL_STATE_SEND_HANDSHAKE, /* Send handshake sequance to master */ + REPL_STATE_RECEIVE_AUTH_REPLY, /* Wait for AUTH reply */ + REPL_STATE_RECEIVE_PORT_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_IP_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_RECEIVE_CAPA_REPLY, /* Wait for REPLCONF reply */ + REPL_STATE_SEND_PSYNC, /* Send PSYNC */ + REPL_STATE_RECEIVE_PSYNC_REPLY, /* Wait for PSYNC reply */ + /* --- End of handshake states --- */ + REPL_STATE_TRANSFER, /* Receiving .rdb from master */ + REPL_STATE_CONNECTED, /* Connected to master */ +} repl_state; /* State of slaves from the POV of the master. Used in client->replstate. * In SEND_BULK and ONLINE state the slave receives new updates