From 0bf6c205dbb61aa7f7dafbcd0f27c70d1a4d18c6 Mon Sep 17 00:00:00 2001 From: guybe7 Date: Thu, 15 Apr 2021 16:18:51 +0200 Subject: [PATCH] Add a timeout mechanism for replicas stuck in fullsync (#8762) Starting redis 6.0 (part of the TLS feature), diskless master uses pipe from the fork child so that the parent is the one sending data to the replicas. This mechanism has an issue in which a hung replica will cause the master to wait for it to read the data sent to it forever, thus preventing the fork child from terminating and preventing the creations of any other forks. This PR adds a timeout mechanism, much like the ACK-based timeout, we disconnect replicas that aren't reading the RDB file fast enough. --- src/networking.c | 1 + src/replication.c | 34 +++++++++++++++++++++++-------- src/server.h | 1 + tests/integration/replication.tcl | 18 ++++++++++++++-- 4 files changed, 44 insertions(+), 10 deletions(-) diff --git a/src/networking.c b/src/networking.c index 977074df0..2355a376b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -154,6 +154,7 @@ client *createClient(connection *conn) { c->read_reploff = 0; c->repl_ack_off = 0; c->repl_ack_time = 0; + c->repl_last_partial_write = 0; c->slave_listening_port = 0; c->slave_addr = NULL; c->slave_capa = SLAVE_CAPA_NONE; diff --git a/src/replication.c b/src/replication.c index bba96b93e..005a350e0 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1153,6 +1153,8 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) { if (!connHasWriteHandler(conn)) return; connSetWriteHandler(conn, NULL); + client *slave = connGetPrivateData(conn); + slave->repl_last_partial_write = 0; server.rdb_pipe_numconns_writing--; /* if there are no more writes for now for this conn, or write error: */ if (server.rdb_pipe_numconns_writing == 0) { @@ -1180,8 +1182,10 @@ void rdbPipeWriteHandler(struct connection *conn) { } else { slave->repldboff += nwritten; atomicIncr(server.stat_net_output_bytes, nwritten); - if (slave->repldboff < server.rdb_pipe_bufflen) + if (slave->repldboff < server.rdb_pipe_bufflen) { + slave->repl_last_partial_write = server.unixtime; return; /* more data to write.. */ + } } rdbPipeWriteHandlerConnRemoved(conn); } @@ -1262,6 +1266,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, /* If we were unable to write all the data to one of the replicas, * setup write handler (and disable pipe read handler, below) */ if (nwritten != server.rdb_pipe_bufflen) { + slave->repl_last_partial_write = server.unixtime; server.rdb_pipe_numconns_writing++; connSetWriteHandler(conn, rdbPipeWriteHandler); } @@ -3390,13 +3395,26 @@ void replicationCron(void) { while((ln = listNext(&li))) { client *slave = ln->value; - if (slave->replstate != SLAVE_STATE_ONLINE) continue; - if (slave->flags & CLIENT_PRE_PSYNC) continue; - if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) - { - serverLog(LL_WARNING, "Disconnecting timedout replica: %s", - replicationGetSlaveName(slave)); - freeClient(slave); + if (slave->replstate == SLAVE_STATE_ONLINE) { + if (slave->flags & CLIENT_PRE_PSYNC) + continue; + if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) { + serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s", + replicationGetSlaveName(slave)); + freeClient(slave); + } + } + /* We consider disconnecting only diskless replicas because disk-based replicas aren't fed + * by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child + * from terminating. */ + if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) { + if (slave->repl_last_partial_write != 0 && + (server.unixtime - slave->repl_last_partial_write) > server.repl_timeout) + { + serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s", + replicationGetSlaveName(slave)); + freeClient(slave); + } } } } diff --git a/src/server.h b/src/server.h index 22eea0ab8..03746775c 100644 --- a/src/server.h +++ b/src/server.h @@ -900,6 +900,7 @@ typedef struct client { long long reploff; /* Applied replication offset if this is a master. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */ + long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */ long long psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 47d83f24a..74936e2af 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -599,7 +599,7 @@ start_server {tags {"repl"}} { # If running on Linux, we also measure utime/stime to detect possible I/O handling issues set os [catch {exec unamee}] set measure_time [expr {$os == "Linux"} ? 1 : 0] - foreach all_drop {no slow fast all} { + foreach all_drop {no slow fast all timeout} { test "diskless $all_drop replicas drop during rdb pipe" { set replicas {} set replicas_alive {} @@ -647,6 +647,12 @@ start_server {tags {"repl"}} { exec kill [srv -1 pid] set replicas_alive [lreplace $replicas_alive 0 0] } + if {$all_drop == "timeout"} { + $master config set repl-timeout 1 + # we want this replica to hang on a key for very long so it'll reach repl-timeout + exec kill -SIGSTOP [srv -1 pid] + after 3000 + } # wait for rdb child to exit wait_for_condition 500 100 { @@ -665,6 +671,14 @@ start_server {tags {"repl"}} { if {$all_drop == "slow" || $all_drop == "fast"} { wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 } + if {$all_drop == "timeout"} { + wait_for_log_messages -2 {"*Disconnecting timedout replica (full sync)*"} $loglines 1 1 + wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 + # master disconnected the slow replica, remove from array + set replicas_alive [lreplace $replicas_alive 0 0] + # release it + exec kill -SIGCONT [srv -1 pid] + } # make sure we don't have a busy loop going thought epoll_wait if {$measure_time} { @@ -678,7 +692,7 @@ start_server {tags {"repl"}} { puts "master utime: $master_utime" puts "master stime: $master_stime" } - if {!$::no_latency && ($all_drop == "all" || $all_drop == "slow")} { + if {!$::no_latency && ($all_drop == "all" || $all_drop == "slow" || $all_drop == "timeout")} { assert {$master_utime < 70} assert {$master_stime < 70} }