Merge pull request #66 from Snapchat/psync_multimaster_fixes

Psync multimaster fixes
This commit is contained in:
John Sully 2022-05-02 20:24:28 -04:00 committed by GitHub Enterprise
commit 07ffc4b380
9 changed files with 412 additions and 121 deletions

View File

@ -1218,37 +1218,15 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
== -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset)
== -1) return -1;
if (g_pserver->fActiveReplica && listLength(g_pserver->masters) > 0) {
if (g_pserver->fActiveReplica) {
sdsstring val = sdsstring(sdsempty());
listNode *ln;
listIter li;
redisMaster* mi;
listRewind(g_pserver->masters,&li);
while ((ln = listNext(&li)) != NULL) {
mi = (redisMaster*)listNodeValue(ln);
if (!mi->master) {
// If master client is not available, use info from master struct - better than nothing
serverLog(LL_NOTICE, "saving master %s", mi->master_replid);
if (mi->master_replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = val.catfmt("%s:%I:%s:%i;", mi->master_replid,
mi->master_initial_offset,
mi->masterhost,
mi->masterport);
}
else {
serverLog(LL_NOTICE, "saving master %s", mi->master->replid);
if (mi->master->replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = val.catfmt("%s:%I:%s:%i;", mi->master->replid,
mi->master->reploff,
mi->masterhost,
mi->masterport);
}
for (auto &msi : rsi->vecmastersaveinfo) {
val = val.catfmt("%s:%I:%s:%i:%i;", msi.master_replid,
msi.master_initial_offset,
msi.masterhost.get(),
msi.masterport,
msi.selected_db);
}
if (rdbSaveAuxFieldStrStr(rdb, "repl-masters",val.get()) == -1) return -1;
}
@ -1661,11 +1639,12 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
return rdbSaveBackgroundFork(rsi);
} else
{
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL);
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zcalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL);
// Placement new
rdbSaveInfo rsiT;
if (rsi == nullptr)
rsi = &rsiT;
args->rsi = *(new (args) rdbSaveInfo(*rsi));
args->rsi = *rsi;
memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid));
args->rsi.master_repl_offset = g_pserver->master_repl_offset;
@ -2922,11 +2901,11 @@ public:
* snapshot taken by the master may not be reflected on the replica. */
bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now;
if (fStaleMvccKey || fExpiredKey) {
if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->masters != nullptr && this->rsi->masters->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) {
if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us
robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key)));
this->rsi->masters->staleKeyMap->operator[](job.db->id).push_back(objKeyDup);
this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup);
}
sdsfree(job.key);
job.key = nullptr;
@ -3242,19 +3221,26 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
} else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) {
if (rsi) {
struct redisMaster mi;
MasterSaveInfo msi;
char *masters = szFromObj(auxval);
char *entry = strtok(masters, ":");
char *saveptr;
char *entry = strtok_r(masters, ":", &saveptr);
while (entry != NULL) {
memcpy(mi.master_replid, entry, sizeof(mi.master_replid));
entry = strtok(NULL, ":");
mi.master_initial_offset = atoi(entry);
entry = strtok(NULL, ":");
mi.masterhost = entry;
entry = strtok(NULL, ";");
mi.masterport = atoi(entry);
entry = strtok(NULL, ":");
rsi->addMaster(mi);
memcpy(msi.master_replid, entry, sizeof(msi.master_replid));
entry = strtok_r(NULL, ":", &saveptr);
if (entry == nullptr) break;
msi.master_initial_offset = atoll(entry);
entry = strtok_r(NULL, ":", &saveptr);
if (entry == nullptr) break;
msi.masterhost = sdsstring(sdsnew(entry));
entry = strtok_r(NULL, ":", &saveptr);
if (entry == nullptr) break;
msi.masterport = atoi(entry);
entry = strtok_r(NULL, ";", &saveptr);
if (entry == nullptr) break;
msi.selected_db = atoi(entry);
entry = strtok_r(NULL, ":", &saveptr);
rsi->addMaster(msi);
}
}
} else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) {
@ -3533,6 +3519,32 @@ eoferr:
return C_ERR;
}
void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi) {
if (rsi != nullptr && g_pserver->fActiveReplica) {
serverLog(LL_NOTICE, "RDB contains information on %d masters", (int)rsi->numMasters());
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
redisMaster *mi = (redisMaster*)listNodeValue(ln);
if (mi->master != nullptr) {
continue; //ignore connected masters
}
for (size_t i = 0; i < rsi->numMasters(); i++) {
if (!sdscmp(mi->masterhost, (sds)rsi->vecmastersaveinfo[i].masterhost.get()) && mi->masterport == rsi->vecmastersaveinfo[i].masterport) {
memcpy(mi->master_replid, rsi->vecmastersaveinfo[i].master_replid, sizeof(mi->master_replid));
mi->master_initial_offset = rsi->vecmastersaveinfo[i].master_initial_offset;
replicationCacheMasterUsingMaster(mi);
serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport);
break;
}
}
}
}
}
int rdbLoad(rdbSaveInfo *rsi, int rdbflags)
{
int err = C_ERR;
@ -3913,11 +3925,22 @@ void bgsaveCommand(client *c) {
* information. */
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
rdbSaveInfo rsi_init;
*rsi = std::move(rsi_init);
*rsi = rsi_init;
memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid));
rsi->master_repl_offset = g_pserver->master_repl_offset;
if (g_pserver->fActiveReplica) {
listIter li;
listNode *ln = nullptr;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li))) {
redisMaster *mi = (redisMaster*)listNodeValue(ln);
MasterSaveInfo msi(*mi);
rsi->addMaster(msi);
}
}
/* If the instance is a master, we can populate the replication info
* only when repl_backlog is not NULL. If the repl_backlog is NULL,
* it means that the instance isn't in any replication chains. In this
@ -3935,11 +3958,6 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
return rsi;
}
if (listLength(g_pserver->masters) > 1)
{
// BUGBUG, warn user about this incomplete implementation
serverLog(LL_WARNING, "Warning: Only backing up first master's information in RDB");
}
struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL);
/* If the instance is a replica we need a connected master

View File

@ -2856,6 +2856,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi,
* gets promoted. */
return false;
}
if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi);
/* RDB loading succeeded if we reach this point. */
if (g_pserver->repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
@ -2934,7 +2935,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi,
mi->staleKeyMap->clear();
else
mi->staleKeyMap = new (MALLOC_LOCAL) std::map<int, std::vector<robj_sharedptr>>();
rsi.addMaster(*mi);
rsi.mi = mi;
}
if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING,
@ -2951,6 +2952,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi,
it'll be restarted when sync succeeds or replica promoted. */
return false;
}
if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi);
/* Cleanup. */
if (g_pserver->rdb_del_sync_files && allPersistenceDisabled()) {

View File

@ -1141,7 +1141,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"rreplay",replicaReplayCommand,-3,
"read-only fast noprop",
"read-only fast noprop ok-stale",
0,NULL,0,0,0,0,0,0},
{"keydb.cron",cronCommand,-5,
@ -4938,7 +4938,8 @@ int processCommand(client *c, int callFlags) {
if (FBrokenLinkToMaster() &&
g_pserver->repl_serve_stale_data == 0 &&
is_denystale_command &&
!(g_pserver->fActiveReplica && c->cmd->proc == syncCommand))
!(g_pserver->fActiveReplica && c->cmd->proc == syncCommand)
&& !FInReplicaReplay())
{
rejectCommand(c, shared.masterdownerr);
return C_OK;
@ -6962,32 +6963,16 @@ void loadDataFromDisk(void) {
g_pserver->master_repl_offset = rsi.repl_offset;
if (g_pserver->repl_batch_offStart >= 0)
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
redisMaster *mi = (redisMaster*)listNodeValue(ln);
if (g_pserver->fActiveReplica) {
for (size_t i = 0; i < rsi.numMasters(); i++) {
if (!strcmp(mi->masterhost, rsi.masters[i].masterhost) && mi->masterport == rsi.masters[i].masterport) {
memcpy(mi->master_replid, rsi.masters[i].master_replid, sizeof(mi->master_replid));
mi->master_initial_offset = rsi.masters[i].master_initial_offset;
replicationCacheMasterUsingMaster(mi);
serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport);
}
}
}
else {
updateActiveReplicaMastersFromRsi(&rsi);
if (!g_pserver->fActiveReplica && listLength(g_pserver->masters)) {
redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters));
/* If we are a replica, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself(mi);
selectDb(mi->cached_master,rsi.repl_stream_db);
}
}
}
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
exit(1);
@ -7293,6 +7278,7 @@ void *workerThreadMain(void *parg)
serverAssert(!GlobalLocksAcquired());
aeDeleteEventLoop(el);
tlsCleanupThread();
return NULL;
}

View File

@ -1887,6 +1887,40 @@ struct redisMaster {
int ielReplTransfer = -1;
};
struct MasterSaveInfo {
MasterSaveInfo() = default;
MasterSaveInfo(const redisMaster &mi) {
memcpy(master_replid, mi.master_replid, sizeof(mi.master_replid));
if (mi.master) {
master_initial_offset = mi.master->reploff;
selected_db = mi.master->db->id;
} else if (mi.cached_master) {
master_initial_offset = mi.cached_master->reploff;
selected_db = mi.cached_master->db->id;
} else {
master_initial_offset = -1;
selected_db = 0;
}
masterport = mi.masterport;
masterhost = sdsstring(sdsdup(mi.masterhost));
masterport = mi.masterport;
}
MasterSaveInfo &operator=(const MasterSaveInfo &other) {
masterhost = other.masterhost;
masterport = other.masterport;
memcpy(master_replid, other.master_replid, sizeof(master_replid));
master_initial_offset = other.master_initial_offset;
return *this;
}
sdsstring masterhost;
int masterport;
char master_replid[CONFIG_RUN_ID_SIZE+1];
long long master_initial_offset;
int selected_db;
};
/* This structure can be optionally passed to RDB save/load functions in
* order to implement additional functionalities, by storing and loading
* metadata to the RDB file.
@ -1904,8 +1938,6 @@ public:
repl_offset = -1;
fForceSetKey = TRUE;
mvccMinThreshold = 0;
masters = nullptr;
masterCount = 0;
}
rdbSaveInfo(const rdbSaveInfo &other) {
repl_stream_db = other.repl_stream_db;
@ -1914,45 +1946,31 @@ public:
repl_offset = other.repl_offset;
fForceSetKey = other.fForceSetKey;
mvccMinThreshold = other.mvccMinThreshold;
masters = (struct redisMaster*)malloc(sizeof(struct redisMaster) * other.masterCount);
memcpy(masters, other.masters, sizeof(struct redisMaster) * other.masterCount);
masterCount = other.masterCount;
vecmastersaveinfo = other.vecmastersaveinfo;
master_repl_offset = other.master_repl_offset;
mi = other.mi;
}
rdbSaveInfo(rdbSaveInfo &&other) : rdbSaveInfo() {
swap(*this, other);
}
rdbSaveInfo &operator=(rdbSaveInfo other) {
swap(*this, other);
rdbSaveInfo &operator=(const rdbSaveInfo &other) {
repl_stream_db = other.repl_stream_db;
repl_id_is_set = other.repl_id_is_set;
memcpy(repl_id, other.repl_id, sizeof(repl_id));
repl_offset = other.repl_offset;
fForceSetKey = other.fForceSetKey;
mvccMinThreshold = other.mvccMinThreshold;
vecmastersaveinfo = other.vecmastersaveinfo;
master_repl_offset = other.master_repl_offset;
mi = other.mi;
return *this;
}
~rdbSaveInfo() {
free(masters);
}
friend void swap(rdbSaveInfo &first, rdbSaveInfo &second) {
std::swap(first.repl_stream_db, second.repl_stream_db);
std::swap(first.repl_id_is_set, second.repl_id_is_set);
std::swap(first.repl_id, second.repl_id);
std::swap(first.repl_offset, second.repl_offset);
std::swap(first.fForceSetKey, second.fForceSetKey);
std::swap(first.mvccMinThreshold, second.mvccMinThreshold);
std::swap(first.masters, second.masters);
std::swap(first.masterCount, second.masterCount);
}
void addMaster(const struct redisMaster &mi) {
masterCount++;
if (masters == nullptr) {
masters = (struct redisMaster*)malloc(sizeof(struct redisMaster));
}
else {
masters = (struct redisMaster*)realloc(masters, sizeof(struct redisMaster) * masterCount);
}
memcpy(masters + masterCount - 1, &mi, sizeof(struct redisMaster));
void addMaster(const MasterSaveInfo &si) {
vecmastersaveinfo.push_back(si);
}
size_t numMasters() {
return masterCount;
return vecmastersaveinfo.size();
}
/* Used saving and loading. */
@ -1968,10 +1986,8 @@ public:
long long master_repl_offset;
uint64_t mvccMinThreshold;
struct redisMaster *masters;
private:
size_t masterCount;
std::vector<MasterSaveInfo> vecmastersaveinfo;
struct redisMaster *mi = nullptr;
};
struct malloc_stats {
@ -3853,6 +3869,8 @@ void lfenceCommand(client *c);
int FBrokenLinkToMaster(int *pconnectMasters = nullptr);
int FActiveMaster(client *c);
struct redisMaster *MasterInfoFromClient(client *c);
bool FInReplicaReplay();
void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi);
/* MVCC */
uint64_t getMvccTstamp();
@ -3950,6 +3968,7 @@ void makeThreadKillable(void);
/* TLS stuff */
void tlsInit(void);
void tlsInitThread();
void tlsCleanupThread();
void tlsCleanup(void);
int tlsConfigure(redisTLSContextConfig *ctx_config);
void tlsReload(void);

View File

@ -56,7 +56,6 @@
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
bool FInReplicaReplay();
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);

View File

@ -180,6 +180,12 @@ void tlsInitThread(void)
pending_list = listCreate();
}
void tlsCleanupThread(void)
{
if (pending_list)
listRelease(pending_list);
}
void tlsCleanup(void) {
if (redis_tls_ctx) {
SSL_CTX_free(redis_tls_ctx);
@ -1260,6 +1266,7 @@ int tlsProcessPendingData() {
}
void tlsInitThread() {}
void tlsCleanupThread(void) {}
sds connTLSGetPeerCert(connection *conn_) {
(void) conn_;

View File

@ -0,0 +1,102 @@
# Issue 3899 regression test.
# We create a chain of three instances: master -> replica -> replica2
# and continuously break the link while traffic is generated by
# keydb-benchmark. At the end we check that the data is the same
# everywhere.
start_server {tags {"psync2"} overrides {active-replica {yes} multi-master {yes} client-output-buffer-limit {replica 200mb 10mb 999999} } } {
start_server {overrides {active-replica {yes} multi-master {yes} client-output-buffer-limit {replica 200mb 10mb 999999} } } {
start_server {overrides {active-replica {yes} multi-master {yes} client-output-buffer-limit {replica 200mb 10mb 999999} } } {
# Config
set debug_msg 0 ; # Enable additional debug messages
set no_exit 0 ; # Do not exit at end of the test
set duration 20 ; # Total test seconds
for {set j 0} {$j < 3} {incr j} {
set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port]
set R_unixsocket($j) [srv [expr 0-$j] unixsocket]
if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"}
}
# Setup the replication and backlog parameters
test "PSYNC2 #3899 regression: setup" {
$R(0) replicaof $R_host(1) $R_port(1)
$R(0) replicaof $R_host(2) $R_port(2)
$R(1) replicaof $R_host(0) $R_port(0)
$R(1) replicaof $R_host(2) $R_port(2)
$R(2) replicaof $R_host(0) $R_port(0)
$R(2) replicaof $R_host(1) $R_port(1)
$R(0) set foo bar
wait_for_condition 50 1000 {
[status $R(1) master_link_status] == "up" &&
[status $R(2) master_link_status] == "up" &&
[$R(1) dbsize] == 1 &&
[$R(2) dbsize] == 1
} else {
fail "Replicas not replicating from master"
}
$R(0) config set repl-backlog-size 200mb
$R(1) config set repl-backlog-size 200mb
$R(2) config set repl-backlog-size 200mb
}
set cycle_start_time [clock milliseconds]
set bench_pid [exec src/keydb-benchmark -s $R_unixsocket(0) -n 10000000 -r 1000 incr __rand_int__ > /dev/null &]
while 1 {
set elapsed [expr {[clock milliseconds]-$cycle_start_time}]
if {$elapsed > $duration*1000} break
if {rand() < .05} {
test "PSYNC2 #3899 regression: kill first replica" {
$R(1) client kill type master
}
}
if {rand() < .05} {
test "PSYNC2 #3899 regression: kill chained replica" {
$R(2) client kill type master
}
}
after 100
}
exec kill -9 $bench_pid
if {$debug_msg} {
for {set j 0} {$j < 100} {incr j} {
if {
[$R(0) debug digest] == [$R(1) debug digest] &&
[$R(1) debug digest] == [$R(2) debug digest]
} break
puts [$R(0) debug digest]
puts [$R(1) debug digest]
puts [$R(2) debug digest]
after 1000
}
}
test "PSYNC2 #3899 regression: verify consistency" {
wait_for_condition 50 1000 {
([$R(0) debug digest] eq [$R(1) debug digest]) &&
([$R(1) debug digest] eq [$R(2) debug digest])
} else {
set csv3 [csvdump {r -2}]
set csv2 [csvdump {r -1}]
set csv1 [csvdump r]
set fd [open /tmp/repldump1.txt w]
puts -nonewline $fd $csv1
close $fd
set fd [open /tmp/repldump2.txt w]
puts -nonewline $fd $csv2
close $fd
set fd [open /tmp/repldump3.txt w]
puts -nonewline $fd $csv3
close $fd
fail [format "The three instances have different data sets:\n\tnode 1: %s\n\tnode 2: %s\n\tnode 3: %s\nRun diff -u against /tmp/repldump*.txt for more info" [$R(0) debug digest] [$R(1) debug digest] [$R(2) debug digest] ]
}
}
}}}

View File

@ -241,10 +241,18 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
$slave replicaof $master_host $master_port
after 1000
$master replicaof $slave_host $slave_port
after 1000
assert_equal {bar} [$slave get testkey] {replica is correct}
assert_equal {bar} [$master get testkey] {master is correct}
wait_for_condition 50 100 {
[string match bar [$slave get testkey]]
} else {
fail "Replica is not correct"
}
wait_for_condition 50 100 {
[string match bar [$master get testkey]]
} else {
fail "Master is not correct"
}
}
test {Active replica merge works with client blocked} {

View File

@ -0,0 +1,150 @@
# Creates a master-replica pair and breaks the link continuously to force
# partial resyncs attempts, all this while flooding the master with
# write queries.
#
# You can specify backlog size, ttl, delay before reconnection, test duration
# in seconds, and an additional condition to verify at the end.
#
# If reconnect is > 0, the test actually try to break the connection and
# reconnect with the master, otherwise just the initial synchronization is
# checked for consistency.
proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} {
start_server [list tags [list "repl"] overrides [list active-replica yes client-output-buffer-limit [list replica $backlog_size $backlog_size 9999999] ] ] {
start_server [list overrides [list client-output-buffer-limit [list replica $backlog_size $backlog_size 9999999] active-replica yes ] ] {
set master [srv -1 client]
set master_host [srv -1 host]
set master_port [srv -1 port]
set replica [srv 0 client]
set replica_host [srv 0 host]
set replica_port [srv 0 port]
$master config set repl-backlog-size $backlog_size
$master config set repl-backlog-ttl $backlog_ttl
$master config set repl-diskless-sync $mdl
$master config set repl-diskless-sync-delay 1
$replica config set repl-diskless-load $sdl
$replica config set repl-backlog-size $backlog_size
$replica config set repl-backlog-ttl $backlog_ttl
$replica config set repl-diskless-sync $mdl
$replica config set repl-diskless-sync-delay 1
$master config set repl-diskless-load $sdl
test {Replica should be able to synchronize with the master} {
$replica replicaof $master_host $master_port
}
after 1000
test {Master should be able to synchronize with the replica} {
$master replicaof $replica_host $replica_port
}
set load_handle0 [start_climbing_load $master_host $master_port 9 100000]
set load_handle1 [start_climbing_load $master_host $master_port 11 100000]
set load_handle2 [start_climbing_load $master_host $master_port 12 100000]
# Check that the background clients are actually writing.
test {Detect write load to master} {
wait_for_condition 50 1000 {
[$master dbsize] > 100
} else {
fail "Can't detect write load from background clients."
}
}
test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" {
# Now while the clients are writing data, break the maste-replica
# link multiple times.
if ($reconnect) {
for {set j 0} {$j < $duration*10} {incr j} {
after 100
# catch {puts "MASTER [$master dbsize] keys, REPLICA [$replica dbsize] keys"}
if {($j % 20) == 0} {
catch {
if {$delay} {
$replica multi
$replica client kill $master_host:$master_port
$replica debug sleep $delay
$replica exec
} else {
$replica client kill $master_host:$master_port
}
}
}
}
}
stop_bg_complex_data $load_handle0
stop_bg_complex_data $load_handle1
stop_bg_complex_data $load_handle2
# Wait for the replica to reach the "online"
# state from the POV of the master.
set retry 5000
while {$retry} {
set info [$master info]
if {[string match {*slave0:*state=online*} $info]} {
break
} else {
incr retry -1
after 100
}
}
if {$retry == 0} {
error "assertion:replica not correctly synchronized"
}
# Wait that replica acknowledge it is online so
# we are sure that DBSIZE and DEBUG DIGEST will not
# fail because of timing issues. (-LOADING error)
wait_for_condition 5000 100 {
[lindex [$replica role] 3] eq {connected}
} else {
fail "replica still not connected after some time"
}
wait_for_condition 100 100 {
[$master debug digest] == [$replica debug digest]
} else {
set csv1 [csvdump r]
set csv2 [csvdump {r -1}]
set fd [open /tmp/repldump1.txt w]
puts -nonewline $fd $csv1
close $fd
set fd [open /tmp/repldump2.txt w]
puts -nonewline $fd $csv2
close $fd
fail "Master - Replica inconsistency, Run diff -u against /tmp/repldump*.txt for more info"
}
assert {[$master dbsize] > 0}
eval $cond
}
}
}
}
foreach mdl {no yes} {
foreach sdl {disabled swapdb} {
test_psync {no reconnection, just sync} 6 1000000 3600 0 {
} $mdl $sdl 0
test_psync {ok psync} 6 100000000 3600 0 {
assert {[s -1 sync_partial_ok] > 0}
} $mdl $sdl 1
test_psync {no backlog} 6 100 3600 0.5 {
assert {[s -1 sync_partial_err] > 0}
} $mdl $sdl 1
test_psync {ok after delay} 3 100000000 3600 3 {
assert {[s -1 sync_partial_ok] > 0}
} $mdl $sdl 1
test_psync {backlog expired} 3 100000000 1 3 {
assert {[s -1 sync_partial_err] > 0}
} $mdl $sdl 1
}
}