diff --git a/src/redis-cli.c b/src/redis-cli.c index 2e8984c71..ed3075317 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -6940,6 +6940,10 @@ void sendCapa() { sendReplconf("capa", "eof"); } +void sendRdbOnly(void) { + sendReplconf("rdb-only", "1"); +} + /* Read raw bytes through a redisContext. The read operation is not greedy * and may not fill the buffer entirely. */ @@ -7137,7 +7141,6 @@ static void getRDB(clusterManagerNode *node) { node->context = NULL; fsync(fd); close(fd); - fprintf(stderr,"Transfer finished with success.\n"); if (node) { sdsfree(filename); return; @@ -8258,6 +8261,7 @@ int main(int argc, char **argv) { if (config.getrdb_mode) { if (cliConnect(0) == REDIS_ERR) exit(1); sendCapa(); + sendRdbOnly(); getRDB(NULL); } diff --git a/src/replication.c b/src/replication.c index 9fb19eaca..a9ec30e74 100644 --- a/src/replication.c +++ b/src/replication.c @@ -200,6 +200,16 @@ void feedReplicationBacklogWithObject(robj *o) { feedReplicationBacklog(p,len); } +int canFeedReplicaReplBuffer(client *replica) { + /* Don't feed replicas that only want the RDB. */ + if (replica->flags & CLIENT_REPL_RDBONLY) return 0; + + /* Don't feed replicas that are still waiting for BGSAVE to start. */ + if (replica->replstate == SLAVE_STATE_WAIT_BGSAVE_START) return 0; + + return 1; +} + /* Propagate write commands to slaves, and populate the replication backlog * as well. This function is used if the instance is a master: we use * the commands received by our clients in order to create the replication @@ -249,7 +259,8 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listRewind(slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + + if (!canFeedReplicaReplBuffer(slave)) continue; addReply(slave,selectcmd); } @@ -290,8 +301,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { while((ln = listNext(&li))) { client *slave = ln->value; - /* Don't feed slaves that are still waiting for BGSAVE to start. */ - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (!canFeedReplicaReplBuffer(slave)) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands * are queued in the output buffer until the initial SYNC completes), @@ -363,8 +373,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle while((ln = listNext(&li))) { client *slave = ln->value; - /* Don't feed slaves that are still waiting for BGSAVE to start. */ - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; + if (!canFeedReplicaReplBuffer(slave)) continue; addReplyProto(slave,buf,buflen); } } @@ -799,14 +808,20 @@ void syncCommand(client *c) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; + /* If the client needs a buffer of commands, we can't use + * a replica without replication buffer. */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && + (!(slave->flags & CLIENT_REPL_RDBONLY) || + (c->flags & CLIENT_REPL_RDBONLY))) + break; } /* To attach this slave, we check that it has at least all the * capabilities of the slave that triggered the current BGSAVE. */ if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { /* Perfect, the server is already registering differences for - * another slave. Set the right state, and copy the buffer. */ - copyClientOutputBuffer(c,slave); + * another slave. Set the right state, and copy the buffer. + * We don't copy buffer if clients don't want. */ + if (!(c->flags & CLIENT_REPL_RDBONLY)) copyClientOutputBuffer(c,slave); replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); } else { @@ -925,6 +940,15 @@ void replconfCommand(client *c) { * to the slave. */ if (server.masterhost && server.master) replicationSendAck(); return; + } else if (!strcasecmp(c->argv[j]->ptr,"rdb-only")) { + /* REPLCONF RDB-ONLY is used to identify the client only wants + * RDB snapshot without replication buffer. */ + long rdb_only = 0; + if (getRangeLongFromObjectOrReply(c,c->argv[j+1], + 0,1,&rdb_only,NULL) != C_OK) + return; + if (rdb_only == 1) c->flags |= CLIENT_REPL_RDBONLY; + else c->flags &= ~CLIENT_REPL_RDBONLY; } else { addReplyErrorFormat(c,"Unrecognized REPLCONF option: %s", (char*)c->argv[j]->ptr); @@ -939,19 +963,28 @@ void replconfCommand(client *c) { * we are finally ready to send the incremental stream of commands. * * It does a few things: - * - * 1) Put the slave in ONLINE state. Note that the function may also be called + * 1) Close the replica's connection async if it doesn't need replication + * commands buffer stream, since it actually isn't a valid replica. + * 2) Put the slave in ONLINE state. Note that the function may also be called * for a replicas that are already in ONLINE state, but having the flag * repl_put_online_on_ack set to true: we still have to install the write * handler in that case. This function will take care of that. - * 2) Make sure the writable event is re-installed, since calling the SYNC + * 3) Make sure the writable event is re-installed, since calling the SYNC * command disables it, so that we can accumulate output buffer without * sending it to the replica. - * 3) Update the count of "good replicas". */ + * 4) Update the count of "good replicas". */ void putSlaveOnline(client *slave) { slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 0; slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ + + if (slave->flags & CLIENT_REPL_RDBONLY) { + serverLog(LL_NOTICE, + "Close the connection with replica %s as RDB transfer is complete", + replicationGetSlaveName(slave)); + freeClientAsync(slave); + return; + } if (connSetWriteHandler(slave->conn, sendReplyToClient) == C_ERR) { serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno)); freeClient(slave); diff --git a/src/server.h b/src/server.h index 8f1ca50b1..5568155f9 100644 --- a/src/server.h +++ b/src/server.h @@ -270,6 +270,8 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_DENY_BLOCKING (1ULL<<41) /* Indicate that the client should not be blocked. currently, turned on inside MULTI, Lua, RM_Call, and AOF client */ +#define CLIENT_REPL_RDBONLY (1ULL<<42) /* This client is a replica that only wants + RDB without replication buffer. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */