diff --git a/src/replication.c b/src/replication.c index a8f46be95..05d450558 100644 --- a/src/replication.c +++ b/src/replication.c @@ -43,7 +43,7 @@ void replicationDiscardCachedMaster(void); void replicationResurrectCachedMaster(connection *conn); void replicationSendAck(void); void putSlaveOnline(client *slave); -int cancelReplicationHandshake(void); +int cancelReplicationHandshake(int reconnect); /* We take a global flag to remember if this instance generated an RDB * because of replication, so that we can remove the RDB file in case @@ -832,6 +832,8 @@ void syncCommand(client *c) { * few seconds to wait for more slaves to arrive. */ if (server.repl_diskless_sync_delay) serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); + else + startBgsaveForReplication(c->slave_capa); } else { /* Target is disk (or the slave is not capable of supporting * diskless replication) and we don't have a BGSAVE in progress, @@ -1225,19 +1227,13 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, * (if it had a disk or socket target). */ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { listNode *ln; - int startbgsave = 0; - int mincapa = -1; listIter li; listRewind(server.slaves,&li); while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) { - startbgsave = 1; - mincapa = (mincapa == -1) ? slave->slave_capa : - (mincapa & slave->slave_capa); - } else if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) { struct redis_stat buf; if (bgsaveerr != C_OK) { @@ -1304,7 +1300,6 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { } } } - if (startbgsave) startBgsaveForReplication(mincapa); } /* Change the current instance replication ID with a new, random one. @@ -1568,7 +1563,7 @@ void readSyncBulkPayload(connection *conn) { } serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); - cancelReplicationHandshake(); + cancelReplicationHandshake(1); return; } server.stat_net_input_bytes += nread; @@ -1695,7 +1690,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization DB " "from socket"); - cancelReplicationHandshake(); + cancelReplicationHandshake(1); rioFreeConn(&rdb, NULL); if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { /* Restore the backed up databases. */ @@ -1728,7 +1723,7 @@ void readSyncBulkPayload(connection *conn) { memcmp(buf,eofmark,CONFIG_RUN_ID_SIZE) != 0) { serverLog(LL_WARNING,"Replication stream EOF marker is broken"); - cancelReplicationHandshake(); + cancelReplicationHandshake(1); rioFreeConn(&rdb, NULL); return; } @@ -1758,7 +1753,7 @@ void readSyncBulkPayload(connection *conn) { "Failed trying to rename the temp DB into %s in " "MASTER <-> REPLICA synchronization: %s", server.rdb_filename, strerror(errno)); - cancelReplicationHandshake(); + cancelReplicationHandshake(1); if (old_rdb_fd != -1) close(old_rdb_fd); return; } @@ -1769,7 +1764,7 @@ void readSyncBulkPayload(connection *conn) { serverLog(LL_WARNING, "Failed trying to load the MASTER synchronization " "DB from disk"); - cancelReplicationHandshake(); + cancelReplicationHandshake(1); if (server.rdb_del_sync_files && allPersistenceDisabled()) { serverLog(LL_NOTICE,"Removing the RDB file obtained from " "the master. This replica has persistence " @@ -1824,6 +1819,9 @@ void readSyncBulkPayload(connection *conn) { redisCommunicateSystemd("READY=1\n"); } + /* Send the initial ACK immediately to put this replica in online state. */ + if (usemark) replicationSendAck(); + /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */ @@ -1831,7 +1829,7 @@ void readSyncBulkPayload(connection *conn) { return; error: - cancelReplicationHandshake(); + cancelReplicationHandshake(1); return; } @@ -2421,6 +2419,7 @@ int connectWithMaster(void) { server.repl_transfer_lastio = server.unixtime; server.repl_state = REPL_STATE_CONNECTING; + serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started"); return C_OK; } @@ -2456,7 +2455,7 @@ void replicationAbortSyncTransfer(void) { * the replication state (server.repl_state) set to REPL_STATE_CONNECT. * * Otherwise zero is returned and no operation is perforemd at all. */ -int cancelReplicationHandshake(void) { +int cancelReplicationHandshake(int reconnect) { if (server.repl_state == REPL_STATE_TRANSFER) { replicationAbortSyncTransfer(); server.repl_state = REPL_STATE_CONNECT; @@ -2468,6 +2467,16 @@ int cancelReplicationHandshake(void) { } else { return 0; } + + if (!reconnect) + return 1; + + /* try to re-connect without waiting for replicationCron, this is needed + * for the "diskless loading short read" test. */ + serverLog(LL_NOTICE,"Reconnecting to MASTER %s:%d after failure", + server.masterhost, server.masterport); + connectWithMaster(); + return 1; } @@ -2476,17 +2485,22 @@ void replicationSetMaster(char *ip, int port) { int was_master = server.masterhost == NULL; sdsfree(server.masterhost); - server.masterhost = sdsnew(ip); - server.masterport = port; + server.masterhost = NULL; if (server.master) { freeClient(server.master); } disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ + /* Setting masterhost only after the call to freeClient since it calls + * replicationHandleMasterDisconnection which can trigger a re-connect + * directly from within that call. */ + server.masterhost = sdsnew(ip); + server.masterport = port; + /* Force our slaves to resync with us as well. They may hopefully be able * to partially resync with us, but we can notify the replid change. */ disconnectSlaves(); - cancelReplicationHandshake(); + cancelReplicationHandshake(0); /* Before destroying our master state, create a cached master using * our own parameters, to later PSYNC with the new master. */ if (was_master) { @@ -2506,6 +2520,9 @@ void replicationSetMaster(char *ip, int port) { NULL); server.repl_state = REPL_STATE_CONNECT; + serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", + server.masterhost, server.masterport); + connectWithMaster(); } /* Cancel replication, setting the instance as a master itself. */ @@ -2518,11 +2535,13 @@ void replicationUnsetMaster(void) { REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, NULL); + /* Clear masterhost first, since the freeClient calls + * replicationHandleMasterDisconnection which can attempt to re-connect. */ sdsfree(server.masterhost); server.masterhost = NULL; if (server.master) freeClient(server.master); replicationDiscardCachedMaster(); - cancelReplicationHandshake(); + cancelReplicationHandshake(0); /* When a slave is turned into a master, the current replication ID * (that was inherited from the master at synchronization time) is * used as secondary ID up to the current offset, and a new replication @@ -2576,6 +2595,14 @@ void replicationHandleMasterDisconnection(void) { /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ + + /* Try to re-connect immediately rather than wait for replicationCron + * waiting 1 second may risk backlog being recycled. */ + if (server.masterhost) { + serverLog(LL_NOTICE,"Reconnecting to MASTER %s:%d", + server.masterhost, server.masterport); + connectWithMaster(); + } } void replicaofCommand(client *c) { @@ -3121,7 +3148,7 @@ void replicationCron(void) { (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) { serverLog(LL_WARNING,"Timeout connecting to the MASTER..."); - cancelReplicationHandshake(); + cancelReplicationHandshake(1); } /* Bulk transfer I/O timeout? */ @@ -3129,7 +3156,7 @@ void replicationCron(void) { (time(NULL)-server.repl_transfer_lastio) > server.repl_timeout) { serverLog(LL_WARNING,"Timeout receiving bulk data from MASTER... If the problem persists try to set the 'repl-timeout' parameter in redis.conf to a larger value."); - cancelReplicationHandshake(); + cancelReplicationHandshake(1); } /* Timed out master when we are an already connected slave? */ @@ -3144,9 +3171,7 @@ void replicationCron(void) { if (server.repl_state == REPL_STATE_CONNECT) { serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", server.masterhost, server.masterport); - if (connectWithMaster() == C_OK) { - serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started"); - } + connectWithMaster(); } /* Send ACK to master from time to time. @@ -3280,6 +3305,18 @@ void replicationCron(void) { replicationScriptCacheFlush(); } + replicationStartPendingFork(); + + /* Remove the RDB file used for replication if Redis is not running + * with any persistence. */ + removeRDBUsedToSyncReplicas(); + + /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ + refreshGoodSlavesCount(); + replication_cron_loops++; /* Incremented with frequency 1 HZ. */ +} + +void replicationStartPendingFork(void) { /* Start a BGSAVE good for replication if we have slaves in * WAIT_BGSAVE_START state. * @@ -3307,7 +3344,7 @@ void replicationCron(void) { if (slaves_waiting && (!server.repl_diskless_sync || - max_idle > server.repl_diskless_sync_delay)) + max_idle >= server.repl_diskless_sync_delay)) { /* Start the BGSAVE. The called function may start a * BGSAVE with socket target or disk target depending on the @@ -3315,12 +3352,4 @@ void replicationCron(void) { startBgsaveForReplication(mincapa); } } - - /* Remove the RDB file used for replication if Redis is not running - * with any persistence. */ - removeRDBUsedToSyncReplicas(); - - /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ - refreshGoodSlavesCount(); - replication_cron_loops++; /* Incremented with frequency 1 HZ. */ } diff --git a/src/server.c b/src/server.c index 45db1cbcf..c9d995e0b 100644 --- a/src/server.c +++ b/src/server.c @@ -1828,6 +1828,9 @@ void checkChildrenDone(void) { } updateDictResizePolicy(); closeChildInfoPipe(); + + /* start any pending forks immediately. */ + replicationStartPendingFork(); } } diff --git a/src/server.h b/src/server.h index efb7d2ee8..71bbfec7b 100644 --- a/src/server.h +++ b/src/server.h @@ -1807,6 +1807,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc); void updateSlavesWaitingBgsave(int bgsaveerr, int type); void replicationCron(void); +void replicationStartPendingFork(void); void replicationHandleMasterDisconnection(void); void replicationCacheMaster(client *c); void resizeReplicationBacklog(long long newsize); diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 1052fbdd5..a2f9e75bd 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -407,7 +407,12 @@ test {diskless loading short read} { $master config set repl-diskless-sync yes $master config set rdbcompression no $replica config set repl-diskless-load swapdb + $master config set hz 500 + $replica config set hz 500 + $master config set dynamic-hz no + $replica config set dynamic-hz no # Try to fill the master with all types of data types / encodings + set start [clock clicks -milliseconds] for {set k 0} {$k < 3} {incr k} { for {set i 0} {$i < 10} {incr i} { r set "$k int_$i" [expr {int(rand()*10000)}] @@ -429,14 +434,21 @@ test {diskless loading short read} { } } + if {$::verbose} { + set end [clock clicks -milliseconds] + set duration [expr $end - $start] + puts "filling took $duration ms (TODO: use pipeline)" + set start [clock clicks -milliseconds] + } + # Start the replication process... set loglines [count_log_lines -1] $master config set repl-diskless-sync-delay 0 $replica replicaof $master_host $master_port # kill the replication at various points - set attempts 3 - if {$::accurate} { set attempts 10 } + set attempts 100 + if {$::accurate} { set attempts 500 } for {set i 0} {$i < $attempts} {incr i} { # wait for the replica to start reading the rdb # using the log file since the replica only responds to INFO once in 2mb @@ -469,6 +481,11 @@ test {diskless loading short read} { fail "Replica didn't disconnect" } } + if {$::verbose} { + set end [clock clicks -milliseconds] + set duration [expr $end - $start] + puts "test took $duration ms" + } # enable fast shutdown $master config set rdb-key-save-delay 0 } diff --git a/tests/unit/moduleapi/testrdb.tcl b/tests/unit/moduleapi/testrdb.tcl index 02c82c7c3..0d26b0a26 100644 --- a/tests/unit/moduleapi/testrdb.tcl +++ b/tests/unit/moduleapi/testrdb.tcl @@ -62,18 +62,30 @@ tags "modules" { $master config set repl-diskless-sync yes $master config set rdbcompression no $replica config set repl-diskless-load swapdb + $master config set hz 500 + $replica config set hz 500 + $master config set dynamic-hz no + $replica config set dynamic-hz no + set start [clock clicks -milliseconds] for {set k 0} {$k < 30} {incr k} { r testrdb.set.key key$k [string repeat A [expr {int(rand()*1000000)}]] } + if {$::verbose} { + set end [clock clicks -milliseconds] + set duration [expr $end - $start] + puts "filling took $duration ms (TODO: use pipeline)" + set start [clock clicks -milliseconds] + } + # Start the replication process... set loglines [count_log_lines -1] $master config set repl-diskless-sync-delay 0 $replica replicaof $master_host $master_port # kill the replication at various points - set attempts 3 - if {$::accurate} { set attempts 10 } + set attempts 100 + if {$::accurate} { set attempts 500 } for {set i 0} {$i < $attempts} {incr i} { # wait for the replica to start reading the rdb # using the log file since the replica only responds to INFO once in 2mb @@ -106,6 +118,11 @@ tags "modules" { fail "Replica didn't disconnect" } } + if {$::verbose} { + set end [clock clicks -milliseconds] + set duration [expr $end - $start] + puts "test took $duration ms" + } # enable fast shutdown $master config set rdb-key-save-delay 0 }