Implement rdb-only replication (#8303)
In some scenarios, such as remote backup, we only want to get remote redis server db snapshot. Currently, redis-cli acts as a replica and sends SYNC to redis, but redis still accumulates replication buffer in the replica client output buffer, that may result in using vast memory, or failing to transfer RDB because of client-output-buffer-limit. In this commit, we add 'replconf rdb-only 0|1', redis doesn't send incremental replication buffer to them if they send 'replconf rdb-only 1', so we can reduce used memory and improve success of getting RDB.
This commit is contained in:
parent
9e56d3969a
commit
ad7d4c6b70
@ -6940,6 +6940,10 @@ void sendCapa() {
|
||||
sendReplconf("capa", "eof");
|
||||
}
|
||||
|
||||
void sendRdbOnly(void) {
|
||||
sendReplconf("rdb-only", "1");
|
||||
}
|
||||
|
||||
/* Read raw bytes through a redisContext. The read operation is not greedy
|
||||
* and may not fill the buffer entirely.
|
||||
*/
|
||||
@ -7137,7 +7141,6 @@ static void getRDB(clusterManagerNode *node) {
|
||||
node->context = NULL;
|
||||
fsync(fd);
|
||||
close(fd);
|
||||
fprintf(stderr,"Transfer finished with success.\n");
|
||||
if (node) {
|
||||
sdsfree(filename);
|
||||
return;
|
||||
@ -8258,6 +8261,7 @@ int main(int argc, char **argv) {
|
||||
if (config.getrdb_mode) {
|
||||
if (cliConnect(0) == REDIS_ERR) exit(1);
|
||||
sendCapa();
|
||||
sendRdbOnly();
|
||||
getRDB(NULL);
|
||||
}
|
||||
|
||||
|
@ -200,6 +200,16 @@ void feedReplicationBacklogWithObject(robj *o) {
|
||||
feedReplicationBacklog(p,len);
|
||||
}
|
||||
|
||||
int canFeedReplicaReplBuffer(client *replica) {
|
||||
/* Don't feed replicas that only want the RDB. */
|
||||
if (replica->flags & CLIENT_REPL_RDBONLY) return 0;
|
||||
|
||||
/* Don't feed replicas that are still waiting for BGSAVE to start. */
|
||||
if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0;
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Propagate write commands to slaves, and populate the replication backlog
|
||||
* as well. This function is used if the instance is a master: we use
|
||||
* the commands received by our clients in order to create the replication
|
||||
@ -249,7 +259,8 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
listRewind(slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = ln->value;
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
|
||||
if (!canFeedReplicaReplBuffer(slave)) continue;
|
||||
addReply(slave,selectcmd);
|
||||
}
|
||||
|
||||
@ -290,8 +301,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = ln->value;
|
||||
|
||||
/* Don't feed slaves that are still waiting for BGSAVE to start. */
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
if (!canFeedReplicaReplBuffer(slave)) continue;
|
||||
|
||||
/* Feed slaves that are waiting for the initial SYNC (so these commands
|
||||
* are queued in the output buffer until the initial SYNC completes),
|
||||
@ -363,8 +373,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle
|
||||
while((ln = listNext(&li))) {
|
||||
client *slave = ln->value;
|
||||
|
||||
/* Don't feed slaves that are still waiting for BGSAVE to start. */
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
|
||||
if (!canFeedReplicaReplBuffer(slave)) continue;
|
||||
addReplyProto(slave,buf,buflen);
|
||||
}
|
||||
}
|
||||
@ -799,14 +808,20 @@ void syncCommand(client *c) {
|
||||
listRewind(server.slaves,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
slave = ln->value;
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
|
||||
/* If the client needs a buffer of commands, we can't use
|
||||
* a replica without replication buffer. */
|
||||
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END &&
|
||||
(!(slave->flags & CLIENT_REPL_RDBONLY) ||
|
||||
(c->flags & CLIENT_REPL_RDBONLY)))
|
||||
break;
|
||||
}
|
||||
/* To attach this slave, we check that it has at least all the
|
||||
* capabilities of the slave that triggered the current BGSAVE. */
|
||||
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
|
||||
/* Perfect, the server is already registering differences for
|
||||
* another slave. Set the right state, and copy the buffer. */
|
||||
copyClientOutputBuffer(c,slave);
|
||||
* another slave. Set the right state, and copy the buffer.
|
||||
* We don't copy buffer if clients don't want. */
|
||||
if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave);
|
||||
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
|
||||
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
|
||||
} else {
|
||||
@ -925,6 +940,15 @@ void replconfCommand(client *c) {
|
||||
* to the slave. */
|
||||
if (server.masterhost && server.master) replicationSendAck();
|
||||
return;
|
||||
} else if (!strcasecmp(c->argv[j]->ptr,"rdb-only")) {
|
||||
/* REPLCONF RDB-ONLY is used to identify the client only wants
|
||||
* RDB snapshot without replication buffer. */
|
||||
long rdb_only = 0;
|
||||
if (getRangeLongFromObjectOrReply(c,c->argv[j+1],
|
||||
0,1,&rdb_only,NULL) != C_OK)
|
||||
return;
|
||||
if (rdb_only == 1) c->flags |= CLIENT_REPL_RDBONLY;
|
||||
else c->flags &= ~CLIENT_REPL_RDBONLY;
|
||||
} else {
|
||||
addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s",
|
||||
(char*)c->argv[j]->ptr);
|
||||
@ -939,19 +963,28 @@ void replconfCommand(client *c) {
|
||||
* we are finally ready to send the incremental stream of commands.
|
||||
*
|
||||
* It does a few things:
|
||||
*
|
||||
* 1) Put the slave in ONLINE state. Note that the function may also be called
|
||||
* 1) Close the replica's connection async if it doesn't need replication
|
||||
* commands buffer stream, since it actually isn't a valid replica.
|
||||
* 2) Put the slave in ONLINE state. Note that the function may also be called
|
||||
* for a replicas that are already in ONLINE state, but having the flag
|
||||
* repl_put_online_on_ack set to true: we still have to install the write
|
||||
* handler in that case. This function will take care of that.
|
||||
* 2) Make sure the writable event is re-installed, since calling the SYNC
|
||||
* 3) Make sure the writable event is re-installed, since calling the SYNC
|
||||
* command disables it, so that we can accumulate output buffer without
|
||||
* sending it to the replica.
|
||||
* 3) Update the count of "good replicas". */
|
||||
* 4) Update the count of "good replicas". */
|
||||
void putSlaveOnline(client *slave) {
|
||||
slave->replstate = SLAVE_STATE_ONLINE;
|
||||
slave->repl_put_online_on_ack = 0;
|
||||
slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */
|
||||
|
||||
if (slave->flags & CLIENT_REPL_RDBONLY) {
|
||||
serverLog(LL_NOTICE,
|
||||
"Close the connection with replica %s as RDB transfer is complete",
|
||||
replicationGetSlaveName(slave));
|
||||
freeClientAsync(slave);
|
||||
return;
|
||||
}
|
||||
if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) {
|
||||
serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno));
|
||||
freeClient(slave);
|
||||
|
@ -270,6 +270,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
||||
#define CLIENT_DENY_BLOCKING (1ULL<<41) /* Indicate that the client should not be blocked.
|
||||
currently, turned on inside MULTI, Lua, RM_Call,
|
||||
and AOF client */
|
||||
#define CLIENT_REPL_RDBONLY (1ULL<<42) /* This client is a replica that only wants
|
||||
RDB without replication buffer. */
|
||||
|
||||
/* Client block type (btype field in client structure)
|
||||
* if CLIENT_BLOCKED flag is set. */
|
||||
|
Loading…
x
Reference in New Issue
Block a user