Replica flush the old data after RDB file is ok in disk-based replication (#926)

Call emptyData right before rdbLoad to prevent errors in the middle
and we drop the replication stream and leaving an empty database.
The real changes is in disk-based part, the rest is just code movement.

Signed-off-by: Binbin <binloveplay1314@qq.com>
Signed-off-by: Ping Xie <pingxie@google.com>
This commit is contained in:
Binbin 2024-09-14 11:49:49 +08:00 committed by Ping Xie
parent cf51462b5b
commit 6f9786b9b3
2 changed files with 63 additions and 6 deletions

View File

@ -2180,11 +2180,6 @@ void readSyncBulkPayload(connection *conn) {
temp_functions_lib_ctx = functionsLibCtxCreate();
moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL);
} else {
replicationAttachToNewPrimary();
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}
/* Before loading the DB into memory we need to delete the readable
@ -2193,7 +2188,6 @@ void readSyncBulkPayload(connection *conn) {
* time for non blocking loading. */
connSetReadHandler(conn, NULL);
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
if (use_diskless_load) {
rio rdb;
@ -2213,6 +2207,14 @@ void readSyncBulkPayload(connection *conn) {
dbarray = diskless_load_tempDb;
functions_lib_ctx = temp_functions_lib_ctx;
} else {
/* We will soon start loading the RDB from socket, the replication history is changed,
* we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary();
/* Even though we are on-empty-db and the database is empty, we still call emptyData. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
dbarray = server.db;
functions_lib_ctx = functionsLibCtxGetCurrent();
functionsLibCtxClear(functions_lib_ctx);
@ -2224,6 +2226,8 @@ void readSyncBulkPayload(connection *conn) {
* We'll restore it when the RDB is received. */
connBlock(conn);
connRecvTimeout(conn, server.repl_timeout * 1000);
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);
int loadingFailed = 0;
@ -2256,6 +2260,7 @@ void readSyncBulkPayload(connection *conn) {
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
} else {
/* Remove the half-loaded data in case we started with an empty replica. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
}
@ -2332,6 +2337,17 @@ void readSyncBulkPayload(connection *conn) {
return;
}
/* We will soon start loading the RDB from disk, the replication history is changed,
* we must discard the cached primary structure and force resync of sub-replicas. */
replicationAttachToNewPrimary();
/* Empty the databases only after the RDB file is ok, that is, before the RDB file
* is actually loaded, in case we encounter an error and drop the replication stream
* and leave an empty database. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) {
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
"DB from disk, check server logs.");
@ -2344,6 +2360,7 @@ void readSyncBulkPayload(connection *conn) {
}
/* If disk-based RDB loading fails, remove the half-loaded dataset. */
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
/* Note that there's no point in restarting the AOF on sync failure,

View File

@ -1477,3 +1477,43 @@ start_server {tags {"repl external:skip"}} {
}
}
}
start_server {tags {"repl external:skip"}} {
set replica [srv 0 client]
$replica set replica_key replica_value
start_server {} {
set primary [srv 0 client]
set primary_host [srv 0 host]
set primary_port [srv 0 port]
$primary set primary_key primary_value
test {Replica keep the old data if RDB file save fails in disk-based replication} {
# Create a folder called 'dump.rdb' to trigger temp-rdb rename failure
# and it will cause RDB file save to fail at the rename.
set dump_rdb [file join [lindex [$replica config get dir] 1] dump.rdb]
if {[file exists $dump_rdb]} { exec rm -f $dump_rdb }
exec mkdir -p $dump_rdb
$replica replicaof $primary_host $primary_port
# Waiting for the rename to fail.
wait_for_log_messages -1 {"*Failed trying to rename the temp DB into dump.rdb*"} 0 1000 10
# Make sure the replica has not completed sync and keep the old data.
assert_equal {} [$replica get primary_key]
assert_equal {replica_value} [$replica get replica_key]
# Remove the test folder and make the rename success
exec rm -rf $dump_rdb
wait_for_condition 500 100 {
[$replica get primary_key] == {primary_value} &&
[$replica get replica_key] == {}
} else {
puts [$primary keys *]
puts [$replica keys *]
fail "Replication failed."
}
}
}
}