From 27fce29500745899d4fc7322a40569567fef5685 Mon Sep 17 00:00:00 2001 From: naglera <58042354+naglera@users.noreply.github.com> Date: Mon, 12 Aug 2024 23:03:12 +0300 Subject: [PATCH] Fix dual-channel-replication related issues (#837) - Fix TLS bug where connection were shutdown by primary's main process while the child process was still writing- causing main process to be blocked. - TLS connection fix -file descriptors are set to blocking mode in the main thread, followed by a blocking write. This sets the file descriptors to non-blocking if TLS is used (see `connTLSSyncWrite()`) (@xbasel). - Improve the reliability of dual-channel tests. Modify the pause mechanism to verify process status directly, rather than relying on log. - Ensure that `server.repl_offset` and `server.replid` are updated correctly when dual channel synchronization completes successfully. Thist led to failures in replication tests that validate replication IDs or compare replication offsets. --------- Signed-off-by: naglera Signed-off-by: naglera <58042354+naglera@users.noreply.github.com> Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> Signed-off-by: Madelyn Olson Signed-off-by: Binbin Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com> Co-authored-by: xbasel <103044017+xbasel@users.noreply.github.com> Co-authored-by: Madelyn Olson Co-authored-by: Binbin --- src/networking.c | 6 +- src/rdb.c | 4 +- src/replication.c | 8 +- .../integration/dual-channel-replication.tcl | 81 ++++++++++--------- 4 files changed, 58 insertions(+), 41 deletions(-) diff --git a/src/networking.c b/src/networking.c index f39df9d95..aec03c29c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1518,7 +1518,11 @@ void unlinkClient(client *c) { } } /* Only use shutdown when the fork is active and we are the parent. */ - if (server.child_type) connShutdown(c->conn); + if (server.child_type && !c->flag.repl_rdb_channel) { + connShutdown(c->conn); + } else if (c->flag.repl_rdb_channel) { + shutdown(c->conn->fd, SHUT_RDWR); + } connClose(c->conn); c->conn = NULL; } diff --git a/src/rdb.c b/src/rdb.c index a344dd696..32b5210ca 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -3557,14 +3557,14 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) { conns[connsnum++] = replica->conn; if (dual_channel) { - /* Put the socket in blocking mode to simplify RDB transfer. */ - connBlock(replica->conn); connSendTimeout(replica->conn, server.repl_timeout * 1000); /* This replica uses diskless dual channel sync, hence we need * to inform it with the save end offset.*/ sendCurrentOffsetToReplica(replica); /* Make sure repl traffic is appended to the replication backlog */ addRdbReplicaToPsyncWait(replica); + /* Put the socket in blocking mode to simplify RDB transfer. */ + connBlock(replica->conn); } else { server.rdb_pipe_numconns++; } diff --git a/src/replication.c b/src/replication.c index 6be8d3f9d..8b751d3a7 100644 --- a/src/replication.c +++ b/src/replication.c @@ -391,7 +391,7 @@ void freeReplicaReferencedReplBuffer(client *replica) { uint64_t rdb_cid = htonu64(replica->id); if (raxRemove(server.replicas_waiting_psync, (unsigned char *)&rdb_cid, sizeof(rdb_cid), NULL)) { serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.", - replicationGetReplicaName(replica), (long long unsigned int)replica->associated_rdb_client_id); + replicationGetReplicaName(replica), (long long unsigned int)replica->id); } } if (replica->ref_repl_buf_node != NULL) { @@ -956,7 +956,9 @@ int startBgsaveForReplication(int mincapa, int req) { /* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */ serverAssert(socket_target || !(req & REPLICA_REQ_RDB_MASK)); - serverLog(LL_NOTICE, "Starting BGSAVE for SYNC with target: %s", socket_target ? "replicas sockets" : "disk"); + serverLog(LL_NOTICE, "Starting BGSAVE for SYNC with target: %s using: %s", + socket_target ? "replicas sockets" : "disk", + (req & REPLICA_REQ_RDB_CHANNEL) ? "dual-channel" : "normal sync"); rdbSaveInfo rsi, *rsiptr; rsiptr = rdbPopulateSaveInfo(&rsi); @@ -4159,6 +4161,8 @@ void replicationResurrectProvisionalPrimary(void) { memcpy(server.primary->replid, server.repl_provisional_primary.replid, CONFIG_RUN_ID_SIZE); server.primary->reploff = server.repl_provisional_primary.reploff; server.primary->read_reploff = server.repl_provisional_primary.read_reploff; + server.primary_repl_offset = server.primary->reploff; + memcpy(server.replid, server.primary->replid, sizeof(server.primary->replid)); establishPrimaryConnection(); } diff --git a/tests/integration/dual-channel-replication.tcl b/tests/integration/dual-channel-replication.tcl index 5baa2960f..004225b14 100644 --- a/tests/integration/dual-channel-replication.tcl +++ b/tests/integration/dual-channel-replication.tcl @@ -23,9 +23,14 @@ proc get_client_id_by_last_cmd {r cmd} { return $client_id } +# Wait until the process enters a paused state, then resume the process. proc wait_and_resume_process idx { set pid [srv $idx pid] - wait_for_log_messages $idx {"*Process is about to stop.*"} 0 2000 1 + wait_for_condition 50 1000 { + [string match "T*" [exec ps -o state= -p $pid]] + } else { + fail "Process $pid didn't stop, current state is [exec ps -o state= -p $pid]" + } resume_process $pid } @@ -315,13 +320,12 @@ start_server {tags {"dual-channel-replication external:skip"}} { } $replica1 replicaof no one - $primary set key3 val3 - + test "Test replica's buffer limit reached" { $primary config set repl-diskless-sync-delay 0 - $primary config set rdb-key-save-delay 500 + $primary config set rdb-key-save-delay 10000 # At this point we have about 10k keys in the db, - # We expect that the next full sync will take 5 seconds (10k*500)ms + # We expect that the next full sync will take 100 seconds (10k*10000)ms # It will give us enough time to fill the replica buffer. $replica1 config set dual-channel-replication-enabled yes $replica1 config set client-output-buffer-limit "replica 16383 16383 0" @@ -343,19 +347,25 @@ start_server {tags {"dual-channel-replication external:skip"}} { } assert {[s -2 replicas_replication_buffer_size] <= 16385*2} - # Wait for sync to succeed + # Primary replication buffer should grow wait_for_condition 50 1000 { - [status $replica1 master_link_status] == "up" + [status $primary mem_total_replication_buffers] >= 81915 } else { - fail "Replica is not synced" + fail "Primary should take the load" } - wait_for_value_to_propegate_to_replica $primary $replica1 "key3" } $replica1 replicaof no one $replica1 config set client-output-buffer-limit "replica 256mb 256mb 0"; # remove repl buffer limitation + $primary config set rdb-key-save-delay 0 - $primary set key4 val4 + wait_for_condition 500 1000 { + [s 0 rdb_bgsave_in_progress] eq 0 + } else { + fail "can't kill rdb child" + } + + $primary set key3 val3 test "dual-channel-replication fails when primary diskless disabled" { set cur_psync [status $primary sync_partial_ok] @@ -370,7 +380,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { } else { fail "Replica is not synced" } - wait_for_value_to_propegate_to_replica $primary $replica1 "key4" + wait_for_value_to_propegate_to_replica $primary $replica1 "key3" # Verify that we did not use dual-channel-replication sync assert {[status $primary sync_partial_ok] == $cur_psync} @@ -921,8 +931,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { fail "replica didn't start sync session in time" } - $primary debug log "killing replica rdb connection" set replica_rdb_channel_id [get_client_id_by_last_cmd $primary "sync"] + $primary debug log "killing replica rdb connection $replica_rdb_channel_id" assert {$replica_rdb_channel_id != ""} set loglines [count_log_lines -1] $primary client kill id $replica_rdb_channel_id @@ -956,6 +966,7 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary debug log "killing replica rdb connection $replica_rdb_channel_id" $primary client kill id $replica_rdb_channel_id # Wait for primary to abort the sync + wait_and_resume_process 0 wait_for_condition 10000000 10 { [s -1 rdb_bgsave_in_progress] eq 0 && [string match {*replicas_waiting_psync:0*} [$primary info replication]] @@ -965,7 +976,6 @@ start_server {tags {"dual-channel-replication external:skip"}} { # Verify primary reject replconf set-rdb-client-id set res [catch {$primary replconf set-rdb-client-id $replica_rdb_channel_id} err] assert [string match *ERR* $err] - wait_and_resume_process 0 } stop_write_load $load_handle } @@ -982,9 +992,9 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary config set loglevel debug $primary config set repl-diskless-sync-delay 0; # don't wait for other replicas - # Generating RDB will cost 5s(10000 * 0.0001s) + # Generating RDB will cost 100s $primary debug populate 10000 primary 1 - $primary config set rdb-key-save-delay 100 + $primary config set rdb-key-save-delay 10000 start_server {} { set replica_1 [srv 0 client] @@ -1016,11 +1026,6 @@ start_server {tags {"dual-channel-replication external:skip"}} { } $replica_2 replicaof $primary_host $primary_port wait_for_log_messages -2 {"*Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC*"} $loglines 100 1000 - $primary config set rdb-key-save-delay 0 - # Verify second replica needed new session - wait_for_sync $replica_2 - assert {[s -2 sync_partial_ok] eq 2} - assert {[s -2 sync_full] eq 2} } stop_write_load $load_handle } @@ -1038,9 +1043,9 @@ start_server {tags {"dual-channel-replication external:skip"}} { $primary config set loglevel debug $primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry - # Generating RDB will cost 5s(10000 * 0.0001s) + # Generating RDB will cost 100 sec to generate $primary debug populate 10000 primary 1 - $primary config set rdb-key-save-delay 100 + $primary config set rdb-key-save-delay 10000 start_server {} { set replica [srv 0 client] @@ -1051,8 +1056,8 @@ start_server {tags {"dual-channel-replication external:skip"}} { $replica config set dual-channel-replication-enabled yes $replica config set loglevel debug $replica config set repl-timeout 10 + set load_handle [start_one_key_write_load $primary_host $primary_port 100 "mykey"] test "Replica recover rdb-connection killed" { - set load_handle [start_one_key_write_load $primary_host $primary_port 100 "mykey"] $replica replicaof $primary_host $primary_port # Wait for sync session to start wait_for_condition 500 1000 { @@ -1076,18 +1081,21 @@ start_server {tags {"dual-channel-replication external:skip"}} { } wait_for_log_messages -1 {"*Background RDB transfer error*"} $loglines 1000 10 # Replica should retry - verify_replica_online $primary 0 500 - stop_write_load $load_handle - wait_for_condition 1000 100 { - [s -1 master_repl_offset] eq [s master_repl_offset] + wait_for_condition 500 1000 { + [string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] && + [string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] && + [s -1 rdb_bgsave_in_progress] eq 1 } else { - fail "Replica offset didn't catch up with the primary after too long time" + fail "replica didn't retry after connection close" } } $replica replicaof no one - + wait_for_condition 500 1000 { + [s -1 rdb_bgsave_in_progress] eq 0 + } else { + fail "Primary should abort sync" + } test "Replica recover main-connection killed" { - set load_handle [start_one_key_write_load $primary_host $primary_port 100 "mykey"] $replica replicaof $primary_host $primary_port # Wait for sync session to start wait_for_condition 500 1000 { @@ -1111,13 +1119,14 @@ start_server {tags {"dual-channel-replication external:skip"}} { } wait_for_log_messages -1 {"*Background RDB transfer error*"} $loglines 1000 10 # Replica should retry - verify_replica_online $primary 0 500 - stop_write_load $load_handle - wait_for_condition 1000 100 { - [s -1 master_repl_offset] eq [s master_repl_offset] + wait_for_condition 500 1000 { + [string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] && + [string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] && + [s -1 rdb_bgsave_in_progress] eq 1 } else { - fail "Replica offset didn't catch up with the primary after too long time" - } + fail "replica didn't retry after connection close" + } } + stop_write_load $load_handle } }