From ad7d4c6b709ce94ed9a2fa3a1fcf9b8cf7843eff Mon Sep 17 00:00:00 2001 From: Wang Yuan Date: Wed, 27 Jan 2021 23:13:10 +0800 Subject: [PATCH] Implement rdb-only replication (#8303) In some scenarios, such as remote backup, we only want to get remote redis server db snapshot. Currently, redis-cli acts as a replica and sends SYNC to redis, but redis still accumulates replication buffer in the replica client output buffer, that may result in using vast memory, or failing to transfer RDB because of client-output-buffer-limit. In this commit, we add 'replconf rdb-only 0|1', redis doesn't send incremental replication buffer to them if they send 'replconf rdb-only 1', so we can reduce used memory and improve success of getting RDB. --- src/redis-cli.c | 6 ++++- src/replication.c | 57 +++++++++++++++++++++++++++++++++++++---------- src/server.h | 2 ++ 3 files changed, 52 insertions(+), 13 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index 2e8984c71..ed3075317 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -6940,6 +6940,10 @@ void sendCapa() { sendReplconf("capa", "eof"); } +void sendRdbOnly(void) { + sendReplconf("rdb-only", "1"); +} + /* Read raw bytes through a redisContext. The read operation is not greedy * and may not fill the buffer entirely. */ @@ -7137,7 +7141,6 @@ static void getRDB(clusterManagerNode *node) { node->context = NULL; fsync(fd); close(fd); - fprintf(stderr,"Transfer finished with success.\n"); if (node) { sdsfree(filename); return; @@ -8258,6 +8261,7 @@ int main(int argc, char **argv) { if (config.getrdb_mode) { if (cliConnect(0) == REDIS_ERR) exit(1); sendCapa(); + sendRdbOnly(); getRDB(NULL); } diff --git a/src/replication.c b/src/replication.c index 9fb19eaca..a9ec30e74 100644 --- a/src/replication.c +++ b/src/replication.c @@ -200,6 +200,16 @@ void feedReplicationBacklogWithObject(robj *o) { feedReplicationBacklog(p,len); } +int canFeedReplicaReplBuffer(client *replica) { + /* Don't feed replicas that only want the RDB. */ + if (replica->flags & CLIENT_REPL_RDBONLY) return 0; + + /* Don't feed replicas that are still waiting for BGSAVE to start. */ + if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0; + + return 1; +} + /* Propagate write commands to slaves, and populate the replication backlog * as well. This function is used if the instance is a master: we use * the commands received by our clients in order to create the replication @@ -249,7 +259,8 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + + if (!canFeedReplicaReplBuffer(slave)) continue; addReply(slave,selectcmd); } @@ -290,8 +301,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { while((ln = listNext(&li))) { client *slave = ln->value; - /* Don't feed slaves that are still waiting for BGSAVE to start. */ - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (!canFeedReplicaReplBuffer(slave)) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), @@ -363,8 +373,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle while((ln = listNext(&li))) { client *slave = ln->value; - /* Don't feed slaves that are still waiting for BGSAVE to start. */ - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (!canFeedReplicaReplBuffer(slave)) continue; addReplyProto(slave,buf,buflen); } } @@ -799,14 +808,20 @@ void syncCommand(client *c) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; + /* If the client needs a buffer of commands, we can't use + * a replica without replication buffer. */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && + (!(slave->flags & CLIENT_REPL_RDBONLY) || + (c->flags & CLIENT_REPL_RDBONLY))) + break; } /* To attach this slave, we check that it has at least all the * capabilities of the slave that triggered the current BGSAVE. */ if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { /* Perfect, the server is already registering differences for - * another slave. Set the right state, and copy the buffer. */ - copyClientOutputBuffer(c,slave); + * another slave. Set the right state, and copy the buffer. + * We don't copy buffer if clients don't want. */ + if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave); replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { @@ -925,6 +940,15 @@ void replconfCommand(client *c) { * to the slave. */ if (server.masterhost && server.master) replicationSendAck(); return; + } else if (!strcasecmp(c->argv[j]->ptr,"rdb-only")) { + /* REPLCONF RDB-ONLY is used to identify the client only wants + * RDB snapshot without replication buffer. */ + long rdb_only = 0; + if (getRangeLongFromObjectOrReply(c,c->argv[j+1], + 0,1,&rdb_only,NULL) != C_OK) + return; + if (rdb_only == 1) c->flags |= CLIENT_REPL_RDBONLY; + else c->flags &= ~CLIENT_REPL_RDBONLY; } else { addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr); @@ -939,19 +963,28 @@ void replconfCommand(client *c) { * we are finally ready to send the incremental stream of commands. * * It does a few things: - * - * 1) Put the slave in ONLINE state. Note that the function may also be called + * 1) Close the replica's connection async if it doesn't need replication + * commands buffer stream, since it actually isn't a valid replica. + * 2) Put the slave in ONLINE state. Note that the function may also be called * for a replicas that are already in ONLINE state, but having the flag * repl_put_online_on_ack set to true: we still have to install the write * handler in that case. This function will take care of that. - * 2) Make sure the writable event is re-installed, since calling the SYNC + * 3) Make sure the writable event is re-installed, since calling the SYNC * command disables it, so that we can accumulate output buffer without * sending it to the replica. - * 3) Update the count of "good replicas". */ + * 4) Update the count of "good replicas". */ void putSlaveOnline(client *slave) { slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 0; slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ + + if (slave->flags & CLIENT_REPL_RDBONLY) { + serverLog(LL_NOTICE, + "Close the connection with replica %s as RDB transfer is complete", + replicationGetSlaveName(slave)); + freeClientAsync(slave); + return; + } if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) { serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno)); freeClient(slave); diff --git a/src/server.h b/src/server.h index 8f1ca50b1..5568155f9 100644 --- a/src/server.h +++ b/src/server.h @@ -270,6 +270,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_DENY_BLOCKING (1ULL<<41) /* Indicate that the client should not be blocked. currently, turned on inside MULTI, Lua, RM_Call, and AOF client */ +#define CLIENT_REPL_RDBONLY (1ULL<<42) /* This client is a replica that only wants + RDB without replication buffer. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */