diff --git a/src/networking.c b/src/networking.c index 977074df0..2355a376b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -154,6 +154,7 @@ client *createClient(connection *conn) { c->read_reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; + c->repl_last_partial_write = 0; c->slave_listening_port = 0; c->slave_addr = NULL; c->slave_capa = SLAVE_CAPA_NONE; diff --git a/src/replication.c b/src/replication.c index bba96b93e..005a350e0 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1153,6 +1153,8 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { if (!connHasWriteHandler(conn)) return; connSetWriteHandler(conn, NULL); + client *slave = connGetPrivateData(conn); + slave->repl_last_partial_write = 0; server.rdb_pipe_numconns_writing--; /* if there are no more writes for now for this conn, or write error: */ if (server.rdb_pipe_numconns_writing == 0) { @@ -1180,8 +1182,10 @@ void rdbPipeWriteHandler(struct connection *conn) { } else { slave->repldboff += nwritten; atomicIncr(server.stat_net_output_bytes, nwritten); - if (slave->repldboff < server.rdb_pipe_bufflen) + if (slave->repldboff < server.rdb_pipe_bufflen) { + slave->repl_last_partial_write = server.unixtime; return; /* more data to write.. */ + } } rdbPipeWriteHandlerConnRemoved(conn); } @@ -1262,6 +1266,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, /* If we were unable to write all the data to one of the replicas, * setup write handler (and disable pipe read handler, below) */ if (nwritten != server.rdb_pipe_bufflen) { + slave->repl_last_partial_write = server.unixtime; server.rdb_pipe_numconns_writing++; connSetWriteHandler(conn, rdbPipeWriteHandler); } @@ -3390,13 +3395,26 @@ void replicationCron(void) { while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate != SLAVE_STATE_ONLINE) continue; - if (slave->flags & CLIENT_PRE_PSYNC) continue; - if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) - { - serverLog(LL_WARNING, "Disconnecting timedout replica: %s", - replicationGetSlaveName(slave)); - freeClient(slave); + if (slave->replstate == SLAVE_STATE_ONLINE) { + if (slave->flags & CLIENT_PRE_PSYNC) + continue; + if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { + serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s", + replicationGetSlaveName(slave)); + freeClient(slave); + } + } + /* We consider disconnecting only diskless replicas because disk-based replicas aren't fed + * by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child + * from terminating. */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { + if (slave->repl_last_partial_write != 0 && + (server.unixtime - slave->repl_last_partial_write) > server.repl_timeout) + { + serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s", + replicationGetSlaveName(slave)); + freeClient(slave); + } } } } diff --git a/src/server.h b/src/server.h index 22eea0ab8..03746775c 100644 --- a/src/server.h +++ b/src/server.h @@ -900,6 +900,7 @@ typedef struct client { long long reploff; /* Applied replication offset if this is a master. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ + long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 47d83f24a..74936e2af 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -599,7 +599,7 @@ start_server {tags {"repl"}} { # If running on Linux, we also measure utime/stime to detect possible I/O handling issues set os [catch {exec unamee}] set measure_time [expr {$os == "Linux"} ? 1 : 0] - foreach all_drop {no slow fast all} { + foreach all_drop {no slow fast all timeout} { test "diskless $all_drop replicas drop during rdb pipe" { set replicas {} set replicas_alive {} @@ -647,6 +647,12 @@ start_server {tags {"repl"}} { exec kill [srv -1 pid] set replicas_alive [lreplace $replicas_alive 0 0] } + if {$all_drop == "timeout"} { + $master config set repl-timeout 1 + # we want this replica to hang on a key for very long so it'll reach repl-timeout + exec kill -SIGSTOP [srv -1 pid] + after 3000 + } # wait for rdb child to exit wait_for_condition 500 100 { @@ -665,6 +671,14 @@ start_server {tags {"repl"}} { if {$all_drop == "slow" || $all_drop == "fast"} { wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 } + if {$all_drop == "timeout"} { + wait_for_log_messages -2 {"*Disconnecting timedout replica (full sync)*"} $loglines 1 1 + wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 + # master disconnected the slow replica, remove from array + set replicas_alive [lreplace $replicas_alive 0 0] + # release it + exec kill -SIGCONT [srv -1 pid] + } # make sure we don't have a busy loop going thought epoll_wait if {$measure_time} { @@ -678,7 +692,7 @@ start_server {tags {"repl"}} { puts "master utime: $master_utime" puts "master stime: $master_stime" } - if {!$::no_latency && ($all_drop == "all" || $all_drop == "slow")} { + if {!$::no_latency && ($all_drop == "all" || $all_drop == "slow" || $all_drop == "timeout")} { assert {$master_utime < 70} assert {$master_stime < 70} }