Fix dual-channel-replication related issues (#837)
- Fix TLS bug where connection were shutdown by primary's main process while the child process was still writing- causing main process to be blocked. - TLS connection fix -file descriptors are set to blocking mode in the main thread, followed by a blocking write. This sets the file descriptors to non-blocking if TLS is used (see `connTLSSyncWrite()`) (@xbasel). - Improve the reliability of dual-channel tests. Modify the pause mechanism to verify process status directly, rather than relying on log. - Ensure that `server.repl_offset` and `server.replid` are updated correctly when dual channel synchronization completes successfully. Thist led to failures in replication tests that validate replication IDs or compare replication offsets. --------- Signed-off-by: naglera <anagler123@gmail.com> Signed-off-by: naglera <58042354+naglera@users.noreply.github.com> Signed-off-by: xbasel <103044017+xbasel@users.noreply.github.com> Signed-off-by: Madelyn Olson <madelyneolson@gmail.com> Signed-off-by: Binbin <binloveplay1314@qq.com> Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com> Co-authored-by: xbasel <103044017+xbasel@users.noreply.github.com> Co-authored-by: Madelyn Olson <madelyneolson@gmail.com> Co-authored-by: Binbin <binloveplay1314@qq.com>
This commit is contained in:
parent
1c198a95ac
commit
27fce29500
@ -1518,7 +1518,11 @@ void unlinkClient(client *c) {
|
||||
}
|
||||
}
|
||||
/* Only use shutdown when the fork is active and we are the parent. */
|
||||
if (server.child_type) connShutdown(c->conn);
|
||||
if (server.child_type && !c->flag.repl_rdb_channel) {
|
||||
connShutdown(c->conn);
|
||||
} else if (c->flag.repl_rdb_channel) {
|
||||
shutdown(c->conn->fd, SHUT_RDWR);
|
||||
}
|
||||
connClose(c->conn);
|
||||
c->conn = NULL;
|
||||
}
|
||||
|
@ -3557,14 +3557,14 @@ int rdbSaveToReplicasSockets(int req, rdbSaveInfo *rsi) {
|
||||
|
||||
conns[connsnum++] = replica->conn;
|
||||
if (dual_channel) {
|
||||
/* Put the socket in blocking mode to simplify RDB transfer. */
|
||||
connBlock(replica->conn);
|
||||
connSendTimeout(replica->conn, server.repl_timeout * 1000);
|
||||
/* This replica uses diskless dual channel sync, hence we need
|
||||
* to inform it with the save end offset.*/
|
||||
sendCurrentOffsetToReplica(replica);
|
||||
/* Make sure repl traffic is appended to the replication backlog */
|
||||
addRdbReplicaToPsyncWait(replica);
|
||||
/* Put the socket in blocking mode to simplify RDB transfer. */
|
||||
connBlock(replica->conn);
|
||||
} else {
|
||||
server.rdb_pipe_numconns++;
|
||||
}
|
||||
|
@ -391,7 +391,7 @@ void freeReplicaReferencedReplBuffer(client *replica) {
|
||||
uint64_t rdb_cid = htonu64(replica->id);
|
||||
if (raxRemove(server.replicas_waiting_psync, (unsigned char *)&rdb_cid, sizeof(rdb_cid), NULL)) {
|
||||
serverLog(LL_DEBUG, "Remove psync waiting replica %s with cid %llu from replicas rax.",
|
||||
replicationGetReplicaName(replica), (long long unsigned int)replica->associated_rdb_client_id);
|
||||
replicationGetReplicaName(replica), (long long unsigned int)replica->id);
|
||||
}
|
||||
}
|
||||
if (replica->ref_repl_buf_node != NULL) {
|
||||
@ -956,7 +956,9 @@ int startBgsaveForReplication(int mincapa, int req) {
|
||||
/* `SYNC` should have failed with error if we don't support socket and require a filter, assert this here */
|
||||
serverAssert(socket_target || !(req & REPLICA_REQ_RDB_MASK));
|
||||
|
||||
serverLog(LL_NOTICE, "Starting BGSAVE for SYNC with target: %s", socket_target ? "replicas sockets" : "disk");
|
||||
serverLog(LL_NOTICE, "Starting BGSAVE for SYNC with target: %s using: %s",
|
||||
socket_target ? "replicas sockets" : "disk",
|
||||
(req & REPLICA_REQ_RDB_CHANNEL) ? "dual-channel" : "normal sync");
|
||||
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
@ -4159,6 +4161,8 @@ void replicationResurrectProvisionalPrimary(void) {
|
||||
memcpy(server.primary->replid, server.repl_provisional_primary.replid, CONFIG_RUN_ID_SIZE);
|
||||
server.primary->reploff = server.repl_provisional_primary.reploff;
|
||||
server.primary->read_reploff = server.repl_provisional_primary.read_reploff;
|
||||
server.primary_repl_offset = server.primary->reploff;
|
||||
memcpy(server.replid, server.primary->replid, sizeof(server.primary->replid));
|
||||
establishPrimaryConnection();
|
||||
}
|
||||
|
||||
|
@ -23,9 +23,14 @@ proc get_client_id_by_last_cmd {r cmd} {
|
||||
return $client_id
|
||||
}
|
||||
|
||||
# Wait until the process enters a paused state, then resume the process.
|
||||
proc wait_and_resume_process idx {
|
||||
set pid [srv $idx pid]
|
||||
wait_for_log_messages $idx {"*Process is about to stop.*"} 0 2000 1
|
||||
wait_for_condition 50 1000 {
|
||||
[string match "T*" [exec ps -o state= -p $pid]]
|
||||
} else {
|
||||
fail "Process $pid didn't stop, current state is [exec ps -o state= -p $pid]"
|
||||
}
|
||||
resume_process $pid
|
||||
}
|
||||
|
||||
@ -315,13 +320,12 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
}
|
||||
|
||||
$replica1 replicaof no one
|
||||
$primary set key3 val3
|
||||
|
||||
|
||||
test "Test replica's buffer limit reached" {
|
||||
$primary config set repl-diskless-sync-delay 0
|
||||
$primary config set rdb-key-save-delay 500
|
||||
$primary config set rdb-key-save-delay 10000
|
||||
# At this point we have about 10k keys in the db,
|
||||
# We expect that the next full sync will take 5 seconds (10k*500)ms
|
||||
# We expect that the next full sync will take 100 seconds (10k*10000)ms
|
||||
# It will give us enough time to fill the replica buffer.
|
||||
$replica1 config set dual-channel-replication-enabled yes
|
||||
$replica1 config set client-output-buffer-limit "replica 16383 16383 0"
|
||||
@ -343,19 +347,25 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
}
|
||||
assert {[s -2 replicas_replication_buffer_size] <= 16385*2}
|
||||
|
||||
# Wait for sync to succeed
|
||||
# Primary replication buffer should grow
|
||||
wait_for_condition 50 1000 {
|
||||
[status $replica1 master_link_status] == "up"
|
||||
[status $primary mem_total_replication_buffers] >= 81915
|
||||
} else {
|
||||
fail "Replica is not synced"
|
||||
fail "Primary should take the load"
|
||||
}
|
||||
wait_for_value_to_propegate_to_replica $primary $replica1 "key3"
|
||||
}
|
||||
|
||||
$replica1 replicaof no one
|
||||
$replica1 config set client-output-buffer-limit "replica 256mb 256mb 0"; # remove repl buffer limitation
|
||||
$primary config set rdb-key-save-delay 0
|
||||
|
||||
$primary set key4 val4
|
||||
wait_for_condition 500 1000 {
|
||||
[s 0 rdb_bgsave_in_progress] eq 0
|
||||
} else {
|
||||
fail "can't kill rdb child"
|
||||
}
|
||||
|
||||
$primary set key3 val3
|
||||
|
||||
test "dual-channel-replication fails when primary diskless disabled" {
|
||||
set cur_psync [status $primary sync_partial_ok]
|
||||
@ -370,7 +380,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
} else {
|
||||
fail "Replica is not synced"
|
||||
}
|
||||
wait_for_value_to_propegate_to_replica $primary $replica1 "key4"
|
||||
wait_for_value_to_propegate_to_replica $primary $replica1 "key3"
|
||||
|
||||
# Verify that we did not use dual-channel-replication sync
|
||||
assert {[status $primary sync_partial_ok] == $cur_psync}
|
||||
@ -921,8 +931,8 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
fail "replica didn't start sync session in time"
|
||||
}
|
||||
|
||||
$primary debug log "killing replica rdb connection"
|
||||
set replica_rdb_channel_id [get_client_id_by_last_cmd $primary "sync"]
|
||||
$primary debug log "killing replica rdb connection $replica_rdb_channel_id"
|
||||
assert {$replica_rdb_channel_id != ""}
|
||||
set loglines [count_log_lines -1]
|
||||
$primary client kill id $replica_rdb_channel_id
|
||||
@ -956,6 +966,7 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
$primary debug log "killing replica rdb connection $replica_rdb_channel_id"
|
||||
$primary client kill id $replica_rdb_channel_id
|
||||
# Wait for primary to abort the sync
|
||||
wait_and_resume_process 0
|
||||
wait_for_condition 10000000 10 {
|
||||
[s -1 rdb_bgsave_in_progress] eq 0 &&
|
||||
[string match {*replicas_waiting_psync:0*} [$primary info replication]]
|
||||
@ -965,7 +976,6 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
# Verify primary reject replconf set-rdb-client-id
|
||||
set res [catch {$primary replconf set-rdb-client-id $replica_rdb_channel_id} err]
|
||||
assert [string match *ERR* $err]
|
||||
wait_and_resume_process 0
|
||||
}
|
||||
stop_write_load $load_handle
|
||||
}
|
||||
@ -982,9 +992,9 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
$primary config set loglevel debug
|
||||
$primary config set repl-diskless-sync-delay 0; # don't wait for other replicas
|
||||
|
||||
# Generating RDB will cost 5s(10000 * 0.0001s)
|
||||
# Generating RDB will cost 100s
|
||||
$primary debug populate 10000 primary 1
|
||||
$primary config set rdb-key-save-delay 100
|
||||
$primary config set rdb-key-save-delay 10000
|
||||
|
||||
start_server {} {
|
||||
set replica_1 [srv 0 client]
|
||||
@ -1016,11 +1026,6 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
}
|
||||
$replica_2 replicaof $primary_host $primary_port
|
||||
wait_for_log_messages -2 {"*Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC*"} $loglines 100 1000
|
||||
$primary config set rdb-key-save-delay 0
|
||||
# Verify second replica needed new session
|
||||
wait_for_sync $replica_2
|
||||
assert {[s -2 sync_partial_ok] eq 2}
|
||||
assert {[s -2 sync_full] eq 2}
|
||||
}
|
||||
stop_write_load $load_handle
|
||||
}
|
||||
@ -1038,9 +1043,9 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
$primary config set loglevel debug
|
||||
$primary config set repl-diskless-sync-delay 5; # allow catch failed sync before retry
|
||||
|
||||
# Generating RDB will cost 5s(10000 * 0.0001s)
|
||||
# Generating RDB will cost 100 sec to generate
|
||||
$primary debug populate 10000 primary 1
|
||||
$primary config set rdb-key-save-delay 100
|
||||
$primary config set rdb-key-save-delay 10000
|
||||
|
||||
start_server {} {
|
||||
set replica [srv 0 client]
|
||||
@ -1051,8 +1056,8 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
$replica config set dual-channel-replication-enabled yes
|
||||
$replica config set loglevel debug
|
||||
$replica config set repl-timeout 10
|
||||
set load_handle [start_one_key_write_load $primary_host $primary_port 100 "mykey"]
|
||||
test "Replica recover rdb-connection killed" {
|
||||
set load_handle [start_one_key_write_load $primary_host $primary_port 100 "mykey"]
|
||||
$replica replicaof $primary_host $primary_port
|
||||
# Wait for sync session to start
|
||||
wait_for_condition 500 1000 {
|
||||
@ -1076,18 +1081,21 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
}
|
||||
wait_for_log_messages -1 {"*Background RDB transfer error*"} $loglines 1000 10
|
||||
# Replica should retry
|
||||
verify_replica_online $primary 0 500
|
||||
stop_write_load $load_handle
|
||||
wait_for_condition 1000 100 {
|
||||
[s -1 master_repl_offset] eq [s master_repl_offset]
|
||||
wait_for_condition 500 1000 {
|
||||
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
|
||||
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
|
||||
[s -1 rdb_bgsave_in_progress] eq 1
|
||||
} else {
|
||||
fail "Replica offset didn't catch up with the primary after too long time"
|
||||
fail "replica didn't retry after connection close"
|
||||
}
|
||||
}
|
||||
$replica replicaof no one
|
||||
|
||||
wait_for_condition 500 1000 {
|
||||
[s -1 rdb_bgsave_in_progress] eq 0
|
||||
} else {
|
||||
fail "Primary should abort sync"
|
||||
}
|
||||
test "Replica recover main-connection killed" {
|
||||
set load_handle [start_one_key_write_load $primary_host $primary_port 100 "mykey"]
|
||||
$replica replicaof $primary_host $primary_port
|
||||
# Wait for sync session to start
|
||||
wait_for_condition 500 1000 {
|
||||
@ -1111,13 +1119,14 @@ start_server {tags {"dual-channel-replication external:skip"}} {
|
||||
}
|
||||
wait_for_log_messages -1 {"*Background RDB transfer error*"} $loglines 1000 10
|
||||
# Replica should retry
|
||||
verify_replica_online $primary 0 500
|
||||
stop_write_load $load_handle
|
||||
wait_for_condition 1000 100 {
|
||||
[s -1 master_repl_offset] eq [s master_repl_offset]
|
||||
wait_for_condition 500 1000 {
|
||||
[string match "*slave*,state=wait_bgsave*,type=rdb-channel*" [$primary info replication]] &&
|
||||
[string match "*slave*,state=bg_transfer*,type=main-channel*" [$primary info replication]] &&
|
||||
[s -1 rdb_bgsave_in_progress] eq 1
|
||||
} else {
|
||||
fail "Replica offset didn't catch up with the primary after too long time"
|
||||
}
|
||||
fail "replica didn't retry after connection close"
|
||||
}
|
||||
}
|
||||
stop_write_load $load_handle
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user