Make sure we re-emit SELECT after each new slave full sync setup.
In previous commits we moved the FULLRESYNC to the moment we start the BGSAVE, so that the offset we provide is the right one. However this also means that we need to re-emit the SELECT statement every time a new slave starts to accumulate the changes. To obtian this effect in a more clean way, the function that sends the FULLRESYNC reply was overloaded with a more important role of also doing this and chanigng the slave state. So it was renamed to replicationSetupSlaveForFullResync() to better reflect what it does now.
This commit is contained in:
parent
175707e550
commit
15de6b108b
@ -1572,7 +1572,7 @@ int rdbSaveToSlavesSockets(void) {
|
|||||||
clientids[numfds] = slave->id;
|
clientids[numfds] = slave->id;
|
||||||
fds[numfds++] = slave->fd;
|
fds[numfds++] = slave->fd;
|
||||||
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
||||||
replicationSendFullresyncReply(slave,getPsyncInitialOffset());
|
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
|
||||||
/* Put the socket in non-blocking mode to simplify RDB transfer.
|
/* Put the socket in non-blocking mode to simplify RDB transfer.
|
||||||
* We'll restore it when the children returns (since duped socket
|
* We'll restore it when the children returns (since duped socket
|
||||||
* will share the O_NONBLOCK attribute with the parent). */
|
* will share the O_NONBLOCK attribute with the parent). */
|
||||||
|
@ -362,16 +362,30 @@ long long getPsyncInitialOffset(void) {
|
|||||||
return psync_offset;
|
return psync_offset;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Send a PSYNC reply in the specific case of a full resynchronization.
|
/* Send a FULLRESYNC reply in the specific case of a full resynchronization,
|
||||||
* As a side effect, set into the slave client structure the offset
|
* as a side effect setup the slave for a full sync in different ways:
|
||||||
* we sent here, so that if new slaves will later attach to the same
|
*
|
||||||
* background RDB saving process (by duplicating this client output
|
* 1) Remember, into the slave client structure, the offset we sent
|
||||||
* buffer), we can get the right offset from this slave. */
|
* here, so that if new slaves will later attach to the same
|
||||||
int replicationSendFullresyncReply(client *slave, long long offset) {
|
* background RDB saving process (by duplicating this client output
|
||||||
|
* buffer), we can get the right offset from this slave.
|
||||||
|
* 2) Set the replication state of the slave to WAIT_BGSAVE_END so that
|
||||||
|
* we start accumulating differences from this point.
|
||||||
|
* 3) Force the replication stream to re-emit a SELECT statement so
|
||||||
|
* the new slave incremental differences will start selecting the
|
||||||
|
* right database number.
|
||||||
|
*/
|
||||||
|
int replicationSetupSlaveForFullResync(client *slave, long long offset) {
|
||||||
char buf[128];
|
char buf[128];
|
||||||
int buflen;
|
int buflen;
|
||||||
|
|
||||||
slave->psync_initial_offset = offset;
|
slave->psync_initial_offset = offset;
|
||||||
|
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
||||||
|
/* We are going to accumulate the incremental changes for this
|
||||||
|
* slave as well. Set slaveseldb to -1 in order to force to re-emit
|
||||||
|
* a SLEECT statement in the replication stream. */
|
||||||
|
server.slaveseldb = -1;
|
||||||
|
|
||||||
/* Don't send this reply to slaves that approached us with
|
/* Don't send this reply to slaves that approached us with
|
||||||
* the old SYNC command. */
|
* the old SYNC command. */
|
||||||
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
|
if (!(slave->flags & CLIENT_PRE_PSYNC)) {
|
||||||
@ -566,8 +580,7 @@ void syncCommand(client *c) {
|
|||||||
/* Perfect, the server is already registering differences for
|
/* Perfect, the server is already registering differences for
|
||||||
* another slave. Set the right state, and copy the buffer. */
|
* another slave. Set the right state, and copy the buffer. */
|
||||||
copyClientOutputBuffer(c,slave);
|
copyClientOutputBuffer(c,slave);
|
||||||
c->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
|
||||||
replicationSendFullresyncReply(c,slave->psync_initial_offset);
|
|
||||||
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
|
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
|
||||||
} else {
|
} else {
|
||||||
/* No way, we need to wait for the next BGSAVE in order to
|
/* No way, we need to wait for the next BGSAVE in order to
|
||||||
@ -603,8 +616,7 @@ void syncCommand(client *c) {
|
|||||||
addReplyError(c,"Unable to perform background save");
|
addReplyError(c,"Unable to perform background save");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
c->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
replicationSetupSlaveForFullResync(c,getPsyncInitialOffset());
|
||||||
replicationSendFullresyncReply(c,getPsyncInitialOffset());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -612,7 +624,6 @@ void syncCommand(client *c) {
|
|||||||
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
|
anetDisableTcpNoDelay(NULL, c->fd); /* Non critical if it fails. */
|
||||||
c->repldbfd = -1;
|
c->repldbfd = -1;
|
||||||
c->flags |= CLIENT_SLAVE;
|
c->flags |= CLIENT_SLAVE;
|
||||||
server.slaveseldb = -1; /* Force to re-emit the SELECT command. */
|
|
||||||
listAddNodeTail(server.slaves,c);
|
listAddNodeTail(server.slaves,c);
|
||||||
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
|
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL)
|
||||||
createReplicationBacklog();
|
createReplicationBacklog();
|
||||||
@ -791,8 +802,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) {
|
|||||||
|
|
||||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
|
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
|
||||||
startbgsave = 1;
|
startbgsave = 1;
|
||||||
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
replicationSetupSlaveForFullResync(slave,getPsyncInitialOffset());
|
||||||
replicationSendFullresyncReply(slave,getPsyncInitialOffset());
|
|
||||||
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
|
} else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) {
|
||||||
struct redis_stat buf;
|
struct redis_stat buf;
|
||||||
|
|
||||||
@ -2156,8 +2166,7 @@ void replicationCron(void) {
|
|||||||
while((ln = listNext(&li))) {
|
while((ln = listNext(&li))) {
|
||||||
client *slave = ln->value;
|
client *slave = ln->value;
|
||||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
|
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) {
|
||||||
slave->replstate = SLAVE_STATE_WAIT_BGSAVE_END;
|
replicationSetupSlaveForFullResync(slave,
|
||||||
replicationSendFullresyncReply(slave,
|
|
||||||
getPsyncInitialOffset());
|
getPsyncInitialOffset());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1202,7 +1202,7 @@ void replicationSendNewlineToMaster(void);
|
|||||||
long long replicationGetSlaveOffset(void);
|
long long replicationGetSlaveOffset(void);
|
||||||
char *replicationGetSlaveName(client *c);
|
char *replicationGetSlaveName(client *c);
|
||||||
long long getPsyncInitialOffset(void);
|
long long getPsyncInitialOffset(void);
|
||||||
int replicationSendFullresyncReply(client *slave, long long offset);
|
int replicationSetupSlaveForFullResync(client *slave, long long offset);
|
||||||
|
|
||||||
/* Generic persistence functions */
|
/* Generic persistence functions */
|
||||||
void startLoading(FILE *fp);
|
void startLoading(FILE *fp);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user