PSYNC2: make partial sync possible after master reboot (#8015)
The main idea is how to allow a master to load replication info from RDB file when rebooting, if master can load replication info it means that replicas may have the chance to psync with master, it can save much traffic. The key point is we need guarantee safety and consistency, so there are two differences between master and replica: 1. master would load the replication info as secondary ID and offset, in case other masters have the same replid. 2. when master loading RDB, it would propagate expired keys as DEL command to replication backlog, then replica can receive these commands to delete stale keys. p.s. the expired keys when RDB loading is useful for users, so we show it as `rdb_last_load_keys_expired` and `rdb_last_load_keys_loaded` in info persistence. Moreover, after load replication info, master should update `no_replica_time` in case loading RDB cost too long time.
This commit is contained in:
parent
75dd230994
commit
794442b130
28
src/rdb.c
28
src/rdb.c
@ -2482,6 +2482,8 @@ void startLoading(size_t size, int rdbflags) {
|
||||
server.loading_loaded_bytes = 0;
|
||||
server.loading_total_bytes = size;
|
||||
server.loading_rdb_used_mem = 0;
|
||||
server.rdb_last_load_keys_expired = 0;
|
||||
server.rdb_last_load_keys_loaded = 0;
|
||||
blockingOperationStarts();
|
||||
|
||||
/* Fire the loading modules start event. */
|
||||
@ -2567,12 +2569,12 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
|
||||
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
|
||||
* otherwise C_ERR is returned and 'errno' is set accordingly. */
|
||||
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
uint64_t dbid;
|
||||
uint64_t dbid = 0;
|
||||
int type, rdbver;
|
||||
redisDb *db = server.db+0;
|
||||
char buf[1024];
|
||||
int error;
|
||||
long long empty_keys_skipped = 0, expired_keys_skipped = 0, keys_loaded = 0;
|
||||
long long empty_keys_skipped = 0;
|
||||
|
||||
rdb->update_cksum = rdbLoadProgressCallback;
|
||||
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
|
||||
@ -2801,16 +2803,28 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
!(rdbflags&RDBFLAGS_AOF_PREAMBLE) &&
|
||||
expiretime != -1 && expiretime < now)
|
||||
{
|
||||
if (rdbflags & RDBFLAGS_FEED_REPL) {
|
||||
/* Caller should have created replication backlog,
|
||||
* and now this path only works when rebooting,
|
||||
* so we don't have replicas yet. */
|
||||
serverAssert(server.repl_backlog != NULL && listLength(server.slaves) == 0);
|
||||
robj keyobj;
|
||||
initStaticStringObject(keyobj,key);
|
||||
robj *argv[2];
|
||||
argv[0] = server.lazyfree_lazy_expire ? shared.unlink : shared.del;
|
||||
argv[1] = &keyobj;
|
||||
replicationFeedSlaves(server.slaves,dbid,argv,2);
|
||||
}
|
||||
sdsfree(key);
|
||||
decrRefCount(val);
|
||||
expired_keys_skipped++;
|
||||
server.rdb_last_load_keys_expired++;
|
||||
} else {
|
||||
robj keyobj;
|
||||
initStaticStringObject(keyobj,key);
|
||||
|
||||
/* Add the new object in the hash table */
|
||||
int added = dbAddRDBLoad(db,key,val);
|
||||
keys_loaded++;
|
||||
server.rdb_last_load_keys_loaded++;
|
||||
if (!added) {
|
||||
if (rdbflags & RDBFLAGS_ALLOW_DUP) {
|
||||
/* This flag is useful for DEBUG RELOAD special modes.
|
||||
@ -2871,11 +2885,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
||||
if (empty_keys_skipped) {
|
||||
serverLog(LL_WARNING,
|
||||
"Done loading RDB, keys loaded: %lld, keys expired: %lld, empty keys skipped: %lld.",
|
||||
keys_loaded, expired_keys_skipped, empty_keys_skipped);
|
||||
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired, empty_keys_skipped);
|
||||
} else {
|
||||
serverLog(LL_WARNING,
|
||||
serverLog(LL_NOTICE,
|
||||
"Done loading RDB, keys loaded: %lld, keys expired: %lld.",
|
||||
keys_loaded, expired_keys_skipped);
|
||||
server.rdb_last_load_keys_loaded, server.rdb_last_load_keys_expired);
|
||||
}
|
||||
return C_OK;
|
||||
|
||||
|
@ -128,6 +128,7 @@
|
||||
#define RDBFLAGS_AOF_PREAMBLE (1<<0) /* Load/save the RDB as AOF preamble. */
|
||||
#define RDBFLAGS_REPLICATION (1<<1) /* Load/save for SYNC. */
|
||||
#define RDBFLAGS_ALLOW_DUP (1<<2) /* Allow duplicated keys when loading.*/
|
||||
#define RDBFLAGS_FEED_REPL (1<<3) /* Feed replication stream when loading.*/
|
||||
|
||||
/* When rdbLoadObject() returns NULL, the err flag is
|
||||
* set to hold the type of error that occurred */
|
||||
|
47
src/server.c
47
src/server.c
@ -3304,6 +3304,8 @@ void initServer(void) {
|
||||
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
|
||||
server.rdb_save_time_last = -1;
|
||||
server.rdb_save_time_start = -1;
|
||||
server.rdb_last_load_keys_expired = 0;
|
||||
server.rdb_last_load_keys_loaded = 0;
|
||||
server.dirty = 0;
|
||||
resetServerStats();
|
||||
/* A few stats we don't want to reset: server startup time, and peak mem. */
|
||||
@ -4924,6 +4926,8 @@ sds genRedisInfoString(const char *section) {
|
||||
"rdb_last_bgsave_time_sec:%jd\r\n"
|
||||
"rdb_current_bgsave_time_sec:%jd\r\n"
|
||||
"rdb_last_cow_size:%zu\r\n"
|
||||
"rdb_last_load_keys_expired:%lld\r\n"
|
||||
"rdb_last_load_keys_loaded:%lld\r\n"
|
||||
"aof_enabled:%d\r\n"
|
||||
"aof_rewrite_in_progress:%d\r\n"
|
||||
"aof_rewrite_scheduled:%d\r\n"
|
||||
@ -4949,6 +4953,8 @@ sds genRedisInfoString(const char *section) {
|
||||
(intmax_t)((server.child_type != CHILD_TYPE_RDB) ?
|
||||
-1 : time(NULL)-server.rdb_save_time_start),
|
||||
server.stat_rdb_cow_bytes,
|
||||
server.rdb_last_load_keys_expired,
|
||||
server.rdb_last_load_keys_loaded,
|
||||
server.aof_state != AOF_OFF,
|
||||
server.child_type == CHILD_TYPE_AOF,
|
||||
server.aof_rewrite_scheduled,
|
||||
@ -6072,28 +6078,45 @@ void loadDataFromDisk(void) {
|
||||
} else {
|
||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||
errno = 0; /* Prevent a stale value from affecting error checking */
|
||||
if (rdbLoad(server.rdb_filename,&rsi,RDBFLAGS_NONE) == C_OK) {
|
||||
int rdb_flags = RDBFLAGS_NONE;
|
||||
if (iAmMaster()) {
|
||||
/* Master may delete expired keys when loading, we should
|
||||
* propagate expire to replication backlog. */
|
||||
createReplicationBacklog();
|
||||
rdb_flags |= RDBFLAGS_FEED_REPL;
|
||||
}
|
||||
if (rdbLoad(server.rdb_filename,&rsi,rdb_flags) == C_OK) {
|
||||
serverLog(LL_NOTICE,"DB loaded from disk: %.3f seconds",
|
||||
(float)(ustime()-start)/1000000);
|
||||
|
||||
/* Restore the replication ID / offset from the RDB file. */
|
||||
if ((server.masterhost ||
|
||||
(server.cluster_enabled &&
|
||||
nodeIsSlave(server.cluster->myself))) &&
|
||||
rsi.repl_id_is_set &&
|
||||
if (rsi.repl_id_is_set &&
|
||||
rsi.repl_offset != -1 &&
|
||||
/* Note that older implementations may save a repl_stream_db
|
||||
* of -1 inside the RDB file in a wrong way, see more
|
||||
* information in function rdbPopulateSaveInfo. */
|
||||
rsi.repl_stream_db != -1)
|
||||
{
|
||||
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
|
||||
server.master_repl_offset = rsi.repl_offset;
|
||||
/* If we are a slave, create a cached master from this
|
||||
* information, in order to allow partial resynchronization
|
||||
* with masters. */
|
||||
replicationCacheMasterUsingMyself();
|
||||
selectDb(server.cached_master,rsi.repl_stream_db);
|
||||
if (!iAmMaster()) {
|
||||
memcpy(server.replid,rsi.repl_id,sizeof(server.replid));
|
||||
server.master_repl_offset = rsi.repl_offset;
|
||||
/* If this is a replica, create a cached master from this
|
||||
* information, in order to allow partial resynchronizations
|
||||
* with masters. */
|
||||
replicationCacheMasterUsingMyself();
|
||||
selectDb(server.cached_master,rsi.repl_stream_db);
|
||||
} else {
|
||||
/* If this is a master, we can save the replication info
|
||||
* as secondary ID and offset, in order to allow replicas
|
||||
* to partial resynchronizations with masters. */
|
||||
memcpy(server.replid2,rsi.repl_id,sizeof(server.replid));
|
||||
server.second_replid_offset = rsi.repl_offset+1;
|
||||
/* Rebase master_repl_offset from rsi.repl_offset. */
|
||||
server.master_repl_offset += rsi.repl_offset;
|
||||
server.repl_backlog_off = server.master_repl_offset -
|
||||
server.repl_backlog_histlen + 1;
|
||||
server.repl_no_slaves_since = time(NULL);
|
||||
}
|
||||
}
|
||||
} else if (errno != ENOENT) {
|
||||
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
|
||||
|
@ -1435,6 +1435,8 @@ struct redisServer {
|
||||
/* RDB persistence */
|
||||
long long dirty; /* Changes to DB from the last save */
|
||||
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
|
||||
long long rdb_last_load_keys_expired; /* number of expired keys when loading RDB */
|
||||
long long rdb_last_load_keys_loaded; /* number of loaded keys when loading RDB */
|
||||
struct saveparam *saveparams; /* Save points array for RDB */
|
||||
int saveparamslen; /* Number of saving points */
|
||||
char *rdb_filename; /* Name of RDB file */
|
||||
@ -2101,7 +2103,7 @@ long long getPsyncInitialOffset(void);
|
||||
int replicationSetupSlaveForFullResync(client *slave, long long offset);
|
||||
void changeReplicationId(void);
|
||||
void clearReplicationId2(void);
|
||||
void chopReplicationBacklog(void);
|
||||
void createReplicationBacklog(void);
|
||||
void replicationCacheMasterUsingMyself(void);
|
||||
void feedReplicationBacklog(void *ptr, size_t len);
|
||||
void showLatestBacklog(void);
|
||||
|
159
tests/integration/psync2-master-restart.tcl
Normal file
159
tests/integration/psync2-master-restart.tcl
Normal file
@ -0,0 +1,159 @@
|
||||
start_server {tags {"psync2 external:skip"}} {
|
||||
start_server {} {
|
||||
start_server {} {
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
|
||||
set replica [srv -1 client]
|
||||
set replica_host [srv -1 host]
|
||||
set replica_port [srv -1 port]
|
||||
|
||||
set sub_replica [srv -2 client]
|
||||
|
||||
# Build replication chain
|
||||
$replica replicaof $master_host $master_port
|
||||
$sub_replica replicaof $replica_host $replica_port
|
||||
|
||||
wait_for_condition 50 100 {
|
||||
[status $replica master_link_status] eq {up} &&
|
||||
[status $sub_replica master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replication not started."
|
||||
}
|
||||
|
||||
# Avoid PINGs
|
||||
$master config set repl-ping-replica-period 3600
|
||||
$master config rewrite
|
||||
|
||||
# Generate some data
|
||||
createComplexDataset $master 1000
|
||||
|
||||
test "PSYNC2: Partial resync after Master restart using RDB aux fields" {
|
||||
wait_for_condition 500 100 {
|
||||
[status $master master_repl_offset] == [status $replica master_repl_offset] &&
|
||||
[status $master master_repl_offset] == [status $sub_replica master_repl_offset]
|
||||
} else {
|
||||
fail "Replicas and master offsets were unable to match *exactly*."
|
||||
}
|
||||
|
||||
set replid [status $master master_replid]
|
||||
set offset [status $master master_repl_offset]
|
||||
$replica config resetstat
|
||||
|
||||
catch {
|
||||
restart_server 0 true false
|
||||
set master [srv 0 client]
|
||||
}
|
||||
wait_for_condition 50 1000 {
|
||||
[status $replica master_link_status] eq {up} &&
|
||||
[status $sub_replica master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replicas didn't sync after master restart"
|
||||
}
|
||||
|
||||
# Make sure master restore replication info correctly
|
||||
assert {[status $master master_replid] != $replid}
|
||||
assert {[status $master master_repl_offset] == $offset}
|
||||
assert {[status $master master_replid2] eq $replid}
|
||||
assert {[status $master second_repl_offset] == [expr $offset+1]}
|
||||
|
||||
# Make sure master set replication backlog correctly
|
||||
assert {[status $master repl_backlog_active] == 1}
|
||||
assert {[status $master repl_backlog_first_byte_offset] == [expr $offset+1]}
|
||||
assert {[status $master repl_backlog_histlen] == 0}
|
||||
|
||||
# Partial resync after Master restart
|
||||
assert {[status $master sync_partial_ok] == 1}
|
||||
assert {[status $replica sync_partial_ok] == 1}
|
||||
}
|
||||
|
||||
test "PSYNC2: Partial resync after Master restart using RDB aux fields with expire" {
|
||||
$master debug set-active-expire 0
|
||||
for {set j 0} {$j < 1024} {incr j} {
|
||||
$master select [expr $j%16]
|
||||
$master set $j somevalue px 10
|
||||
}
|
||||
|
||||
after 20
|
||||
|
||||
wait_for_condition 500 100 {
|
||||
[status $master master_repl_offset] == [status $replica master_repl_offset] &&
|
||||
[status $master master_repl_offset] == [status $sub_replica master_repl_offset]
|
||||
} else {
|
||||
show_cluster_status
|
||||
fail "Replicas and master offsets were unable to match *exactly*."
|
||||
}
|
||||
|
||||
set offset [status $master master_repl_offset]
|
||||
$replica config resetstat
|
||||
|
||||
catch {
|
||||
restart_server 0 true false
|
||||
set master [srv 0 client]
|
||||
}
|
||||
wait_for_condition 50 1000 {
|
||||
[status $replica master_link_status] eq {up} &&
|
||||
[status $sub_replica master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replicas didn't sync after master restart"
|
||||
}
|
||||
|
||||
set expired_offset [status $master repl_backlog_histlen]
|
||||
# Stale keys expired and master_repl_offset grows correctly
|
||||
assert {[status $master rdb_last_load_keys_expired] == 1024}
|
||||
assert {[status $master master_repl_offset] == [expr $offset+$expired_offset]}
|
||||
|
||||
# Partial resync after Master restart
|
||||
assert {[status $master sync_partial_ok] == 1}
|
||||
assert {[status $replica sync_partial_ok] == 1}
|
||||
|
||||
set digest [$master debug digest]
|
||||
assert {$digest eq [$replica debug digest]}
|
||||
assert {$digest eq [$sub_replica debug digest]}
|
||||
}
|
||||
|
||||
test "PSYNC2: Full resync after Master restart when too many key expired" {
|
||||
$master config set repl-backlog-size 16384
|
||||
$master config rewrite
|
||||
|
||||
$master debug set-active-expire 0
|
||||
for {set j 0} {$j < 1024} {incr j} {
|
||||
$master select [expr $j%16]
|
||||
$master set $j somevalue px 10
|
||||
}
|
||||
|
||||
after 20
|
||||
|
||||
wait_for_condition 500 100 {
|
||||
[status $master master_repl_offset] == [status $replica master_repl_offset] &&
|
||||
[status $master master_repl_offset] == [status $sub_replica master_repl_offset]
|
||||
} else {
|
||||
fail "Replicas and master offsets were unable to match *exactly*."
|
||||
}
|
||||
|
||||
$replica config resetstat
|
||||
|
||||
catch {
|
||||
restart_server 0 true false
|
||||
set master [srv 0 client]
|
||||
}
|
||||
wait_for_condition 50 1000 {
|
||||
[status $replica master_link_status] eq {up} &&
|
||||
[status $sub_replica master_link_status] eq {up}
|
||||
} else {
|
||||
fail "Replicas didn't sync after master restart"
|
||||
}
|
||||
|
||||
# Replication backlog is full
|
||||
assert {[status $master repl_backlog_first_byte_offset] > [status $master second_repl_offset]}
|
||||
assert {[status $master sync_partial_ok] == 0}
|
||||
assert {[status $master sync_full] == 1}
|
||||
assert {[status $master rdb_last_load_keys_expired] == 1024}
|
||||
assert {[status $replica sync_full] == 1}
|
||||
|
||||
set digest [$master debug digest]
|
||||
assert {$digest eq [$replica debug digest]}
|
||||
assert {$digest eq [$sub_replica debug digest]}
|
||||
}
|
||||
}}}
|
@ -202,6 +202,30 @@ test {client freed during loading} {
|
||||
}
|
||||
}
|
||||
|
||||
start_server {} {
|
||||
test {Test RDB load info} {
|
||||
r debug populate 1000
|
||||
r save
|
||||
restart_server 0 true false
|
||||
wait_done_loading r
|
||||
assert {[s rdb_last_load_keys_expired] == 0}
|
||||
assert {[s rdb_last_load_keys_loaded] == 1000}
|
||||
|
||||
r debug set-active-expire 0
|
||||
for {set j 0} {$j < 1024} {incr j} {
|
||||
r select [expr $j%16]
|
||||
r set $j somevalue px 10
|
||||
}
|
||||
after 20
|
||||
|
||||
r save
|
||||
restart_server 0 true false
|
||||
wait_done_loading r
|
||||
assert {[s rdb_last_load_keys_expired] == 1024}
|
||||
assert {[s rdb_last_load_keys_loaded] == 1000}
|
||||
}
|
||||
}
|
||||
|
||||
# Our COW metrics (Private_Dirty) work only on Linux
|
||||
set system_name [string tolower [exec uname -s]]
|
||||
if {$system_name eq {linux}} {
|
||||
|
@ -54,6 +54,7 @@ set ::all_tests {
|
||||
integration/psync2
|
||||
integration/psync2-reg
|
||||
integration/psync2-pingoff
|
||||
integration/psync2-master-restart
|
||||
integration/failover
|
||||
integration/redis-cli
|
||||
integration/redis-benchmark
|
||||
|
Loading…
x
Reference in New Issue
Block a user