Replica that asks for rdb only should be closed right after the rdb part (#11296)
The bug is that the the server keeps on sending newlines to the client. As a result, the receiver might not find the EOF marker since it searches for it only on the end of each payload it reads from the socket. The but only affects `CLIENT_REPL_RDBONLY`. This affects `redis-cli --rdb` (depending on timing) The fixed consist of two steps: 1. The `CLIENT_REPL_RDBONLY` should be closed ASAP (we cannot always call to `freeClient` so we use `freeClientAsync`) 2. Add new replication state `SLAVE_STATE_RDB_TRANSMITTED`
This commit is contained in:
parent
bb6513cbba
commit
e53bf65245
@ -45,7 +45,7 @@
|
|||||||
void replicationDiscardCachedMaster(void);
|
void replicationDiscardCachedMaster(void);
|
||||||
void replicationResurrectCachedMaster(connection *conn);
|
void replicationResurrectCachedMaster(connection *conn);
|
||||||
void replicationSendAck(void);
|
void replicationSendAck(void);
|
||||||
void replicaPutOnline(client *slave);
|
int replicaPutOnline(client *slave);
|
||||||
void replicaStartCommandStream(client *slave);
|
void replicaStartCommandStream(client *slave);
|
||||||
int cancelReplicationHandshake(int reconnect);
|
int cancelReplicationHandshake(int reconnect);
|
||||||
|
|
||||||
@ -1260,12 +1260,19 @@ void replconfCommand(client *c) {
|
|||||||
* It does a few things:
|
* It does a few things:
|
||||||
* 1) Put the slave in ONLINE state.
|
* 1) Put the slave in ONLINE state.
|
||||||
* 2) Update the count of "good replicas".
|
* 2) Update the count of "good replicas".
|
||||||
* 3) Trigger the module event. */
|
* 3) Trigger the module event.
|
||||||
void replicaPutOnline(client *slave) {
|
*
|
||||||
|
* the return value indicates that the replica should be disconnected.
|
||||||
|
* */
|
||||||
|
int replicaPutOnline(client *slave) {
|
||||||
if (slave->flags & CLIENT_REPL_RDBONLY) {
|
if (slave->flags & CLIENT_REPL_RDBONLY) {
|
||||||
return;
|
slave->replstate = SLAVE_STATE_RDB_TRANSMITTED;
|
||||||
|
/* The client asked for RDB only so we should close it ASAP */
|
||||||
|
serverLog(LL_NOTICE,
|
||||||
|
"RDB transfer completed, rdb only replica (%s) should be disconnected asap",
|
||||||
|
replicationGetSlaveName(slave));
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
slave->replstate = SLAVE_STATE_ONLINE;
|
slave->replstate = SLAVE_STATE_ONLINE;
|
||||||
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
|
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
|
||||||
|
|
||||||
@ -1276,6 +1283,7 @@ void replicaPutOnline(client *slave) {
|
|||||||
NULL);
|
NULL);
|
||||||
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
|
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
|
||||||
replicationGetSlaveName(slave));
|
replicationGetSlaveName(slave));
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function should be called just after a replica received the RDB file
|
/* This function should be called just after a replica received the RDB file
|
||||||
@ -1290,14 +1298,8 @@ void replicaPutOnline(client *slave) {
|
|||||||
* accumulate output buffer data without sending it to the replica so it
|
* accumulate output buffer data without sending it to the replica so it
|
||||||
* won't get mixed with the RDB stream. */
|
* won't get mixed with the RDB stream. */
|
||||||
void replicaStartCommandStream(client *slave) {
|
void replicaStartCommandStream(client *slave) {
|
||||||
|
serverAssert(!(slave->flags & CLIENT_REPL_RDBONLY));
|
||||||
slave->repl_start_cmd_stream_on_ack = 0;
|
slave->repl_start_cmd_stream_on_ack = 0;
|
||||||
if (slave->flags & CLIENT_REPL_RDBONLY) {
|
|
||||||
serverLog(LL_NOTICE,
|
|
||||||
"Close the connection with replica %s as RDB transfer is complete",
|
|
||||||
replicationGetSlaveName(slave));
|
|
||||||
freeClientAsync(slave);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
putClientInPendingWriteQueue(slave);
|
putClientInPendingWriteQueue(slave);
|
||||||
}
|
}
|
||||||
@ -1400,7 +1402,10 @@ void sendBulkToSlave(connection *conn) {
|
|||||||
close(slave->repldbfd);
|
close(slave->repldbfd);
|
||||||
slave->repldbfd = -1;
|
slave->repldbfd = -1;
|
||||||
connSetWriteHandler(slave->conn,NULL);
|
connSetWriteHandler(slave->conn,NULL);
|
||||||
replicaPutOnline(slave);
|
if (!replicaPutOnline(slave)) {
|
||||||
|
freeClient(slave);
|
||||||
|
return;
|
||||||
|
}
|
||||||
replicaStartCommandStream(slave);
|
replicaStartCommandStream(slave);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1603,7 +1608,10 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
|
|||||||
* after such final EOF. So we don't want to glue the end of
|
* after such final EOF. So we don't want to glue the end of
|
||||||
* the RDB transfer with the start of the other replication
|
* the RDB transfer with the start of the other replication
|
||||||
* data. */
|
* data. */
|
||||||
replicaPutOnline(slave);
|
if (!replicaPutOnline(slave)) {
|
||||||
|
freeClientAsync(slave);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
slave->repl_start_cmd_stream_on_ack = 1;
|
slave->repl_start_cmd_stream_on_ack = 1;
|
||||||
} else {
|
} else {
|
||||||
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
|
if ((slave->repldbfd = open(server.rdb_filename,O_RDONLY)) == -1 ||
|
||||||
|
@ -433,6 +433,8 @@ typedef enum {
|
|||||||
#define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */
|
#define SLAVE_STATE_WAIT_BGSAVE_END 7 /* Waiting RDB file creation to finish. */
|
||||||
#define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to slave. */
|
#define SLAVE_STATE_SEND_BULK 8 /* Sending RDB file to slave. */
|
||||||
#define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */
|
#define SLAVE_STATE_ONLINE 9 /* RDB file transmitted, sending just updates. */
|
||||||
|
#define SLAVE_STATE_RDB_TRANSMITTED 10 /* RDB file transmitted - This state is used only for
|
||||||
|
* a replica that only wants RDB without replication buffer */
|
||||||
|
|
||||||
/* Slave capabilities. */
|
/* Slave capabilities. */
|
||||||
#define SLAVE_CAPA_NONE 0
|
#define SLAVE_CAPA_NONE 0
|
||||||
|
Loading…
x
Reference in New Issue
Block a user