Replica flush the old data after RDB file is ok in disk-based replication (#926)
Call emptyData right before rdbLoad to prevent errors in the middle and we drop the replication stream and leaving an empty database. The real changes is in disk-based part, the rest is just code movement. Signed-off-by: Binbin <binloveplay1314@qq.com>
This commit is contained in:
parent
09def3cf03
commit
17390383b5
@ -2180,11 +2180,6 @@ void readSyncBulkPayload(connection *conn) {
|
||||
temp_functions_lib_ctx = functionsLibCtxCreate();
|
||||
|
||||
moduleFireServerEvent(VALKEYMODULE_EVENT_REPL_ASYNC_LOAD, VALKEYMODULE_SUBEVENT_REPL_ASYNC_LOAD_STARTED, NULL);
|
||||
} else {
|
||||
replicationAttachToNewPrimary();
|
||||
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
|
||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||
}
|
||||
|
||||
/* Before loading the DB into memory we need to delete the readable
|
||||
@ -2193,7 +2188,6 @@ void readSyncBulkPayload(connection *conn) {
|
||||
* time for non blocking loading. */
|
||||
connSetReadHandler(conn, NULL);
|
||||
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
|
||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||
if (use_diskless_load) {
|
||||
rio rdb;
|
||||
@ -2213,6 +2207,14 @@ void readSyncBulkPayload(connection *conn) {
|
||||
dbarray = diskless_load_tempDb;
|
||||
functions_lib_ctx = temp_functions_lib_ctx;
|
||||
} else {
|
||||
/* We will soon start loading the RDB from socket, the replication history is changed,
|
||||
* we must discard the cached primary structure and force resync of sub-replicas. */
|
||||
replicationAttachToNewPrimary();
|
||||
|
||||
/* Even though we are on-empty-db and the database is empty, we still call emptyData. */
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
|
||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||
|
||||
dbarray = server.db;
|
||||
functions_lib_ctx = functionsLibCtxGetCurrent();
|
||||
functionsLibCtxClear(functions_lib_ctx);
|
||||
@ -2224,6 +2226,8 @@ void readSyncBulkPayload(connection *conn) {
|
||||
* We'll restore it when the RDB is received. */
|
||||
connBlock(conn);
|
||||
connRecvTimeout(conn, server.repl_timeout * 1000);
|
||||
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
|
||||
startLoading(server.repl_transfer_size, RDBFLAGS_REPLICATION, asyncLoading);
|
||||
|
||||
int loadingFailed = 0;
|
||||
@ -2256,6 +2260,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding temporary DB in background");
|
||||
} else {
|
||||
/* Remove the half-loaded data in case we started with an empty replica. */
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
|
||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||
}
|
||||
|
||||
@ -2332,6 +2337,17 @@ void readSyncBulkPayload(connection *conn) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* We will soon start loading the RDB from disk, the replication history is changed,
|
||||
* we must discard the cached primary structure and force resync of sub-replicas. */
|
||||
replicationAttachToNewPrimary();
|
||||
|
||||
/* Empty the databases only after the RDB file is ok, that is, before the RDB file
|
||||
* is actually loaded, in case we encounter an error and drop the replication stream
|
||||
* and leave an empty database. */
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Flushing old data");
|
||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Loading DB in memory");
|
||||
if (rdbLoad(server.rdb_filename, &rsi, RDBFLAGS_REPLICATION) != RDB_OK) {
|
||||
serverLog(LL_WARNING, "Failed trying to load the PRIMARY synchronization "
|
||||
"DB from disk, check server logs.");
|
||||
@ -2344,6 +2360,7 @@ void readSyncBulkPayload(connection *conn) {
|
||||
}
|
||||
|
||||
/* If disk-based RDB loading fails, remove the half-loaded dataset. */
|
||||
serverLog(LL_NOTICE, "PRIMARY <-> REPLICA sync: Discarding the half-loaded data");
|
||||
emptyData(-1, empty_db_flags, replicationEmptyDbCallback);
|
||||
|
||||
/* Note that there's no point in restarting the AOF on sync failure,
|
||||
|
@ -1477,3 +1477,43 @@ start_server {tags {"repl external:skip"}} {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
start_server {tags {"repl external:skip"}} {
|
||||
set replica [srv 0 client]
|
||||
$replica set replica_key replica_value
|
||||
|
||||
start_server {} {
|
||||
set primary [srv 0 client]
|
||||
set primary_host [srv 0 host]
|
||||
set primary_port [srv 0 port]
|
||||
$primary set primary_key primary_value
|
||||
|
||||
test {Replica keep the old data if RDB file save fails in disk-based replication} {
|
||||
# Create a folder called 'dump.rdb' to trigger temp-rdb rename failure
|
||||
# and it will cause RDB file save to fail at the rename.
|
||||
set dump_rdb [file join [lindex [$replica config get dir] 1] dump.rdb]
|
||||
if {[file exists $dump_rdb]} { exec rm -f $dump_rdb }
|
||||
exec mkdir -p $dump_rdb
|
||||
|
||||
$replica replicaof $primary_host $primary_port
|
||||
|
||||
# Waiting for the rename to fail.
|
||||
wait_for_log_messages -1 {"*Failed trying to rename the temp DB into dump.rdb*"} 0 1000 10
|
||||
|
||||
# Make sure the replica has not completed sync and keep the old data.
|
||||
assert_equal {} [$replica get primary_key]
|
||||
assert_equal {replica_value} [$replica get replica_key]
|
||||
|
||||
# Remove the test folder and make the rename success
|
||||
exec rm -rf $dump_rdb
|
||||
wait_for_condition 500 100 {
|
||||
[$replica get primary_key] == {primary_value} &&
|
||||
[$replica get replica_key] == {}
|
||||
} else {
|
||||
puts [$primary keys *]
|
||||
puts [$replica keys *]
|
||||
fail "Replication failed."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user