Add a timeout mechanism for replicas stuck in fullsync (#8762)

Starting redis 6.0 (part of the TLS feature), diskless master uses pipe from the fork
child so that the parent is the one sending data to the replicas.
This mechanism has an issue in which a hung replica will cause the master to wait
for it to read the data sent to it forever, thus preventing the fork child from terminating
and preventing the creations of any other forks.

This PR adds a timeout mechanism, much like the ACK-based timeout,
we disconnect replicas that aren't reading the RDB file fast enough.
This commit is contained in:
guybe7 2021-04-15 16:18:51 +02:00 committed by GitHub
parent eb08b431fe
commit 0bf6c205db
4 changed files with 44 additions and 10 deletions

View File

@ -154,6 +154,7 @@ client *createClient(connection *conn) {
c->read_reploff = 0; c->read_reploff = 0;
c->repl_ack_off = 0; c->repl_ack_off = 0;
c->repl_ack_time = 0; c->repl_ack_time = 0;
c->repl_last_partial_write = 0;
c->slave_listening_port = 0; c->slave_listening_port = 0;
c->slave_addr = NULL; c->slave_addr = NULL;
c->slave_capa = SLAVE_CAPA_NONE; c->slave_capa = SLAVE_CAPA_NONE;

View File

@ -1153,6 +1153,8 @@ void rdbPipeWriteHandlerConnRemoved(struct connection *conn) {
if (!connHasWriteHandler(conn)) if (!connHasWriteHandler(conn))
return; return;
connSetWriteHandler(conn, NULL); connSetWriteHandler(conn, NULL);
client *slave = connGetPrivateData(conn);
slave->repl_last_partial_write = 0;
server.rdb_pipe_numconns_writing--; server.rdb_pipe_numconns_writing--;
/* if there are no more writes for now for this conn, or write error: */ /* if there are no more writes for now for this conn, or write error: */
if (server.rdb_pipe_numconns_writing == 0) { if (server.rdb_pipe_numconns_writing == 0) {
@ -1180,9 +1182,11 @@ void rdbPipeWriteHandler(struct connection *conn) {
} else { } else {
slave->repldboff += nwritten; slave->repldboff += nwritten;
atomicIncr(server.stat_net_output_bytes, nwritten); atomicIncr(server.stat_net_output_bytes, nwritten);
if (slave->repldboff < server.rdb_pipe_bufflen) if (slave->repldboff < server.rdb_pipe_bufflen) {
slave->repl_last_partial_write = server.unixtime;
return; /* more data to write.. */ return; /* more data to write.. */
} }
}
rdbPipeWriteHandlerConnRemoved(conn); rdbPipeWriteHandlerConnRemoved(conn);
} }
@ -1262,6 +1266,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* If we were unable to write all the data to one of the replicas, /* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */ * setup write handler (and disable pipe read handler, below) */
if (nwritten != server.rdb_pipe_bufflen) { if (nwritten != server.rdb_pipe_bufflen) {
slave->repl_last_partial_write = server.unixtime;
server.rdb_pipe_numconns_writing++; server.rdb_pipe_numconns_writing++;
connSetWriteHandler(conn, rdbPipeWriteHandler); connSetWriteHandler(conn, rdbPipeWriteHandler);
} }
@ -3390,15 +3395,28 @@ void replicationCron(void) {
while((ln = listNext(&li))) { while((ln = listNext(&li))) {
client *slave = ln->value; client *slave = ln->value;
if (slave->replstate != SLAVE_STATE_ONLINE) continue; if (slave->replstate == SLAVE_STATE_ONLINE) {
if (slave->flags & CLIENT_PRE_PSYNC) continue; if (slave->flags & CLIENT_PRE_PSYNC)
if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) continue;
{ if ((server.unixtime - slave->repl_ack_time) > server.repl_timeout) {
serverLog(LL_WARNING, "Disconnecting timedout replica: %s", serverLog(LL_WARNING, "Disconnecting timedout replica (streaming sync): %s",
replicationGetSlaveName(slave)); replicationGetSlaveName(slave));
freeClient(slave); freeClient(slave);
} }
} }
/* We consider disconnecting only diskless replicas because disk-based replicas aren't fed
* by the fork child so if a disk-based replica is stuck it doesn't prevent the fork child
* from terminating. */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END && server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) {
if (slave->repl_last_partial_write != 0 &&
(server.unixtime - slave->repl_last_partial_write) > server.repl_timeout)
{
serverLog(LL_WARNING, "Disconnecting timedout replica (full sync): %s",
replicationGetSlaveName(slave));
freeClient(slave);
}
}
}
} }
/* If this is a master without attached slaves and there is a replication /* If this is a master without attached slaves and there is a replication

View File

@ -900,6 +900,7 @@ typedef struct client {
long long reploff; /* Applied replication offset if this is a master. */ long long reploff; /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */ long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */ long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long repl_last_partial_write; /* The last time the server did a partial write from the RDB child pipe to this replica */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer copying this slave output buffer
should use. */ should use. */

View File

@ -599,7 +599,7 @@ start_server {tags {"repl"}} {
# If running on Linux, we also measure utime/stime to detect possible I/O handling issues # If running on Linux, we also measure utime/stime to detect possible I/O handling issues
set os [catch {exec unamee}] set os [catch {exec unamee}]
set measure_time [expr {$os == "Linux"} ? 1 : 0] set measure_time [expr {$os == "Linux"} ? 1 : 0]
foreach all_drop {no slow fast all} { foreach all_drop {no slow fast all timeout} {
test "diskless $all_drop replicas drop during rdb pipe" { test "diskless $all_drop replicas drop during rdb pipe" {
set replicas {} set replicas {}
set replicas_alive {} set replicas_alive {}
@ -647,6 +647,12 @@ start_server {tags {"repl"}} {
exec kill [srv -1 pid] exec kill [srv -1 pid]
set replicas_alive [lreplace $replicas_alive 0 0] set replicas_alive [lreplace $replicas_alive 0 0]
} }
if {$all_drop == "timeout"} {
$master config set repl-timeout 1
# we want this replica to hang on a key for very long so it'll reach repl-timeout
exec kill -SIGSTOP [srv -1 pid]
after 3000
}
# wait for rdb child to exit # wait for rdb child to exit
wait_for_condition 500 100 { wait_for_condition 500 100 {
@ -665,6 +671,14 @@ start_server {tags {"repl"}} {
if {$all_drop == "slow" || $all_drop == "fast"} { if {$all_drop == "slow" || $all_drop == "fast"} {
wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1 wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1
} }
if {$all_drop == "timeout"} {
wait_for_log_messages -2 {"*Disconnecting timedout replica (full sync)*"} $loglines 1 1
wait_for_log_messages -2 {"*Diskless rdb transfer, done reading from pipe, 1 replicas still up*"} $loglines 1 1
# master disconnected the slow replica, remove from array
set replicas_alive [lreplace $replicas_alive 0 0]
# release it
exec kill -SIGCONT [srv -1 pid]
}
# make sure we don't have a busy loop going thought epoll_wait # make sure we don't have a busy loop going thought epoll_wait
if {$measure_time} { if {$measure_time} {
@ -678,7 +692,7 @@ start_server {tags {"repl"}} {
puts "master utime: $master_utime" puts "master utime: $master_utime"
puts "master stime: $master_stime" puts "master stime: $master_stime"
} }
if {!$::no_latency && ($all_drop == "all" || $all_drop == "slow")} { if {!$::no_latency && ($all_drop == "all" || $all_drop == "slow" || $all_drop == "timeout")} {
assert {$master_utime < 70} assert {$master_utime < 70}
assert {$master_stime < 70} assert {$master_stime < 70}
} }