Fix subkey expires not replicating correctly, and AOF issues

Former-commit-id: bd183cdee13081a02efef5df75edf2292b872a16
This commit is contained in:
John Sully 2020-04-04 21:52:27 -04:00
parent 12f76b2b95
commit 2684a266c8
11 changed files with 417 additions and 58 deletions

View File

@ -597,21 +597,59 @@ sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, r
return buf;
}
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
robj *tmpargv[3];
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
if (dictid != g_pserver->aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
g_pserver->aof_selected_db = dictid;
sds catAppendOnlyExpireMemberAtCommand(sds buf, struct redisCommand *cmd, robj **argv, const size_t argc) {
long long when = 0;
int unit = UNIT_SECONDS;
bool fAbsolute = false;
if (cmd->proc == expireMemberCommand) {
if (getLongLongFromObject(argv[3], &when) != C_OK)
serverPanic("propogating invalid EXPIREMEMBER command");
if (argc == 5) {
unit = parseUnitString(szFromObj(argv[4]));
}
} else if (cmd->proc == expireMemberAtCommand) {
if (getLongLongFromObject(argv[3], &when) != C_OK)
serverPanic("propogating invalid EXPIREMEMBERAT command");
fAbsolute = true;
} else if (cmd->proc == pexpireMemberAtCommand) {
if (getLongLongFromObject(argv[3], &when) != C_OK)
serverPanic("propogating invalid PEXPIREMEMBERAT command");
fAbsolute = true;
unit = UNIT_MILLISECONDS;
} else {
serverPanic("Unknown expiremember command");
}
switch (unit)
{
case UNIT_SECONDS:
when *= 1000;
break;
case UNIT_MILLISECONDS:
break;
}
if (!fAbsolute)
when += mstime();
robj *argvNew[4];
argvNew[0] = createStringObject("PEXPIREMEMBERAT",15);
argvNew[1] = argv[1];
argvNew[2] = argv[2];
argvNew[3] = createStringObjectFromLongLong(when);
buf = catAppendOnlyGenericCommand(buf, 4, argvNew);
decrRefCount(argvNew[0]);
decrRefCount(argvNew[3]);
return buf;
}
sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj **argv, int argc)
{
robj *tmpargv[3];
if (cmd->proc == expireCommand || cmd->proc == pexpireCommand ||
cmd->proc == expireatCommand) {
/* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */
@ -640,6 +678,10 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
if (pxarg)
buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1],
pxarg);
} else if (cmd->proc == expireMemberCommand || cmd->proc == expireMemberAtCommand ||
cmd->proc == pexpireMemberAtCommand) {
/* Translate subkey expire commands to PEXPIREMEMBERAT */
buf = catAppendOnlyExpireMemberAtCommand(buf, cmd, argv, argc);
} else {
/* All the other commands don't need translation or need the
* same translation already operated in the command vector
@ -647,6 +689,25 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a
buf = catAppendOnlyGenericCommand(buf,argc,argv);
}
return buf;
}
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
sds buf = sdsempty();
/* The DB this command was targeting is not the same as the last command
* we appended. To issue a SELECT command is needed. */
if (dictid != g_pserver->aof_selected_db) {
char seldb[64];
snprintf(seldb,sizeof(seldb),"%d",dictid);
buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n",
(unsigned long)strlen(seldb),seldb);
g_pserver->aof_selected_db = dictid;
}
buf = catCommandForAofAndActiveReplication(buf, cmd, argv, argc);
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
@ -1378,7 +1439,7 @@ int rewriteAppendOnlyFileRio(rio *aof) {
}
else
{
char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n";
char cmd[]="*4\r\n$12\r\nPEXPIREMEMBERAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr;

View File

@ -70,6 +70,7 @@ void cronCommand(client *c)
decrRefCount(o);
// use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it.
setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval);
++g_pserver->dirty;
addReply(c, shared.ok);
}
@ -86,6 +87,7 @@ void executeCronJobExpireHook(const char *key, robj *o)
serverAssert(cFake->argc == 0);
// Setup the args for the EVAL command
cFake->cmd = lookupCommandByCString("EVAL");
cFake->argc = 3 + job->veckeys.size() + job->vecargs.size();
cFake->argv = (robj**)zmalloc(sizeof(robj*) * cFake->argc, MALLOC_LOCAL);
cFake->argv[0] = createStringObject("EVAL", 4);
@ -96,7 +98,17 @@ void executeCronJobExpireHook(const char *key, robj *o)
for (size_t i = 0; i < job->vecargs.size(); ++i)
cFake->argv[3+job->veckeys.size()+i] = createStringObject(job->vecargs[i].get(), job->vecargs[i].size());
int lua_replicate_backup = g_pserver->lua_always_replicate_commands;
g_pserver->lua_always_replicate_commands = 0;
evalCommand(cFake);
g_pserver->lua_always_replicate_commands = lua_replicate_backup;
if (g_pserver->aof_state != AOF_OFF)
feedAppendOnlyFile(cFake->cmd,cFake->db->id,cFake->argv,cFake->argc);
// Active replicas do their own expiries, do not propogate
if (!g_pserver->fActiveReplica)
replicationFeedSlaves(g_pserver->slaves,cFake->db->id,cFake->argv,cFake->argc);
resetClient(cFake);
robj *keyobj = createStringObject(key,sdslen(key));

View File

@ -1372,6 +1372,7 @@ expireEntry *getExpire(redisDb *db, robj_roptr key) {
* keys. */
void propagateExpire(redisDb *db, robj *key, int lazy) {
serverAssert(GlobalLocksAcquired());
robj *argv[2];
argv[0] = lazy ? shared.unlink : shared.del;
@ -1389,6 +1390,48 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
decrRefCount(argv[1]);
}
void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey)
{
robj *argv[3];
robj objT;
redisCommand *cmd = nullptr;
switch (type)
{
case OBJ_SET:
argv[0] = shared.srem;
argv[1] = key;
argv[2] = subkey;
cmd = cserver.sremCommand;
break;
case OBJ_HASH:
argv[0] = shared.hdel;
argv[1] = key;
argv[2] = subkey;
cmd = cserver.hdelCommand;
break;
case OBJ_ZSET:
argv[0] = shared.zrem;
argv[1] = key;
argv[2] = subkey;
cmd = cserver.zremCommand;
break;
case OBJ_CRON:
return; // CRON jobs replicate in their own handler
default:
serverPanic("Unknown subkey type");
}
if (g_pserver->aof_state != AOF_OFF)
feedAppendOnlyFile(cmd,db->id,argv,3);
// Active replicas do their own expiries, do not propogate
if (!g_pserver->fActiveReplica)
replicationFeedSlaves(g_pserver->slaves,db->id,argv,3);
}
/* Check if the key is expired. Note, this does not check subexpires */
int keyIsExpired(redisDb *db, robj *key) {
expireEntry *pexpire = getExpire(db,key);

View File

@ -31,6 +31,7 @@
#include "server.h"
#include "sha1.h" /* SHA1 is used for DEBUG DIGEST */
#include "crc64.h"
#include "cron.h"
#include <arpa/inet.h>
#include <signal.h>
@ -251,6 +252,10 @@ void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj
mt->digest(&md,mv->value);
xorDigest(digest,md.x,sizeof(md.x));
}
} else if (o->type == OBJ_CRON) {
cronjob *job = (cronjob*)ptrFromObj(o);
mixDigest(digest, &job->interval, sizeof(job->interval));
mixDigest(digest, job->script.get(), job->script.size());
} else {
serverPanic("Unknown object type");
}

View File

@ -78,6 +78,10 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
dictEntry *de = dictFind(db->pdict, e.key());
robj *val = (robj*)dictGetVal(de);
int deleted = 0;
robj objKey;
initStaticStringObject(objKey, (char*)e.key());
while (!pfat->FEmpty())
{
if (pfat->nextExpireEntry().when > now)
@ -130,6 +134,11 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
default:
serverAssert(false);
}
robj objSubkey;
initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get());
propagateSubkeyExpire(db, val->type, &objKey, &objSubkey);
pfat->popfrontExpireEntry();
}
@ -144,22 +153,18 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
db->setexpire->insert(eT);
}
robj objT;
switch (val->type)
{
case OBJ_SET:
initStaticStringObject(objT, (char*)e.key());
signalModifiedKey(db,&objT);
notifyKeyspaceEvent(NOTIFY_SET,"srem",&objT,db->id);
signalModifiedKey(db,&objKey);
notifyKeyspaceEvent(NOTIFY_SET,"srem",&objKey,db->id);
break;
}
}
if (pfat->FEmpty())
{
robj *keyobj = createStringObject(e.key(),sdslen(e.key()));
removeExpire(db, keyobj);
decrRefCount(keyobj);
removeExpire(db, &objKey);
}
}
@ -224,7 +229,8 @@ void expireMemberCore(client *c, robj *key, robj *subkey, long long basetime, lo
}
setExpire(c, c->db, key, subkey, when);
signalModifiedKey(c->db, key);
g_pserver->dirty++;
addReply(c, shared.cone);
}
@ -256,6 +262,14 @@ void expireMemberAtCommand(client *c)
expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_SECONDS);
}
void pexpireMemberAtCommand(client *c)
{
long long when;
if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK)
return;
expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_MILLISECONDS);
}
/* Try to expire a few timed out keys. The algorithm used is adaptive and
* will use few CPU cycles if there are few expiring keys, otherwise

View File

@ -251,6 +251,8 @@ void feedReplicationBacklogWithObject(robj *o) {
feedReplicationBacklog(p,len);
}
sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj **argv, int argc);
void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bool fSendRaw)
{
char llstr[LONG_STR_SIZE];
@ -289,13 +291,23 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
/* Add the multi bulk length. */
addReplyArrayLenAsync(replica,argc);
if (fSendRaw)
{
/* Add the multi bulk length. */
addReplyArrayLenAsync(replica,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (int j = 0; j < argc; j++)
addReplyBulkAsync(replica,argv[j]);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (int j = 0; j < argc; j++)
addReplyBulkAsync(replica,argv[j]);
}
else
{
struct redisCommand *cmd = lookupCommand(szFromObj(argv[0]));
sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc);
addReplyProtoAsync(replica, buf, sdslen(buf));
sdsfree(buf);
}
}
/* Propagate write commands to slaves, and populate the replication backlog

View File

@ -634,6 +634,10 @@ struct redisCommand redisCommandTable[] = {
{"expirememberat", expireMemberAtCommand, 4,
"write fast @keyspace",
0,NULL,1,1,1,0,0,0},
{"pexpirememberat", pexpireMemberAtCommand, 4,
"write fast @keyspace",
0,NULL,1,1,1,0,0,0},
{"pexpire",pexpireCommand,3,
"write fast @keyspace",
@ -2289,6 +2293,9 @@ void createSharedObjects(void) {
shared.rpoplpush = makeObjectShared(createStringObject("RPOPLPUSH",9));
shared.zpopmin = makeObjectShared(createStringObject("ZPOPMIN",7));
shared.zpopmax = makeObjectShared(createStringObject("ZPOPMAX",7));
shared.hdel = makeObjectShared(createStringObject("HDEL", 4));
shared.zrem = makeObjectShared(createStringObject("ZREM", 4));
shared.srem = makeObjectShared(createStringObject("SREM", 4));
for (j = 0; j < OBJ_SHARED_INTEGERS; j++) {
shared.integers[j] =
makeObjectShared(createObject(OBJ_STRING,(void*)(long)j));
@ -2514,6 +2521,8 @@ void initServerConfig(void) {
cserver.xclaimCommand = lookupCommandByCString("xclaim");
cserver.xgroupCommand = lookupCommandByCString("xgroup");
cserver.rreplayCommand = lookupCommandByCString("rreplay");
cserver.hdelCommand = lookupCommandByCString("hdel");
cserver.zremCommand = lookupCommandByCString("zrem");
/* Slow log */
g_pserver->slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN;

View File

@ -1360,7 +1360,7 @@ struct sharedObjectsStruct {
*masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr,
*busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk,
*unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink,
*rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan,
*rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, *srem, *hdel, *zrem,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@ -1581,7 +1581,7 @@ struct redisServerConst {
*lpopCommand, *rpopCommand, *zpopminCommand,
*zpopmaxCommand, *sremCommand, *execCommand,
*expireCommand, *pexpireCommand, *xclaimCommand,
*xgroupCommand, *rreplayCommand;
*xgroupCommand, *rreplayCommand, *hdelCommand, *zremCommand;
/* Configuration */
char *default_masteruser; /* AUTH with this user and masterauth with master */
@ -2536,6 +2536,7 @@ int removeExpire(redisDb *db, robj *key);
int removeExpireCore(redisDb *db, robj *key, dictEntry *de);
int removeSubkeyExpire(redisDb *db, robj *key, robj *subkey);
void propagateExpire(redisDb *db, robj *key, int lazy);
void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey);
int expireIfNeeded(redisDb *db, robj *key);
expireEntry *getExpire(redisDb *db, robj_roptr key);
void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when);
@ -2659,6 +2660,8 @@ extern "C" char *redisGitSHA1(void);
extern "C" char *redisGitDirty(void);
extern "C" uint64_t redisBuildId(void);
int parseUnitString(const char *sz);
/* Commands prototypes */
void authCommand(client *c);
void pingCommand(client *c);
@ -2736,6 +2739,7 @@ void expireCommand(client *c);
void expireatCommand(client *c);
void expireMemberCommand(client *c);
void expireMemberAtCommand(client *c);
void pexpireMemberAtCommand(client *c);
void pexpireCommand(client *c);
void pexpireatCommand(client *c);
void getsetCommand(client *c);

View File

@ -257,4 +257,26 @@ tags {"aof"} {
r expire x -1
}
}
## Test that PEXPIREMEMBERAT is loaded correctly
create_aof {
append_to_aof [formatCommand sadd testkey a b c d]
append_to_aof [formatCommand pexpirememberat testkey a 1000]
}
start_server_aof [list dir $server_path aof-load-truncated no] {
test "AOF+EXPIREMEMBER: Server shuold have been started" {
assert_equal 1 [is_alive $srv]
}
test "AOF+PEXPIREMEMBERAT: set should have 3 values" {
set client [redis [dict get $srv host] [dict get $srv port]]
wait_for_condition 50 100 {
[catch {$client ping} e] == 0
} else {
fail "Loading DB is taking too much time."
}
assert_equal 3 [$client scard testkey]
}
}
}

View File

@ -151,5 +151,93 @@ start_server {tags {"repl"}} {
fail "SPOP replication inconsistency"
}
}
test {Replication of EXPIREMEMBER (set) command} {
$master sadd testkey a b c d
wait_for_condition 50 100 {
[$master debug digest] eq [$slave debug digest]
} else {
fail "Failed to replicate set"
}
$master expiremember testkey a 1
after 1000
wait_for_condition 50 100 {
[$master scard testkey] eq 3
} else {
fail "expiremember failed to work on master"
}
wait_for_condition 50 100 {
[$slave scard testkey] eq 3
} else {
assert_equal [$slave scard testkey] 3
}
$master del testkey
}
test {Replication of EXPIREMEMBER (hash) command} {
$master hset testkey a value
$master hset testkey b value
wait_for_condition 50 100 {
[$master debug digest] eq [$slave debug digest]
} else {
fail "Failed to replicate set"
}
$master expiremember testkey a 1
after 1000
wait_for_condition 50 100 {
[$master hlen testkey] eq 1
} else {
fail "expiremember failed to work on master"
}
wait_for_condition 50 100 {
[$slave hlen testkey] eq 1
} else {
assert_equal [$slave hlen testkey] 1
}
$master del testkey
}
test {Replication of EXPIREMEMBER (zset) command} {
$master zadd testkey 1 a
$master zadd testkey 2 b
wait_for_condition 50 100 {
[$master debug digest] eq [$slave debug digest]
} else {
fail "Failed to replicate set"
}
$master expiremember testkey a 1
after 1000
wait_for_condition 50 100 {
[$master zcard testkey] eq 1
} else {
fail "expiremember failed to work on master"
}
wait_for_condition 50 100 {
[$slave zcard testkey] eq 1
} else {
assert_equal [$slave zcard testkey] 1
}
}
test {keydb.cron replicates} {
$master del testkey
$master keydb.cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey
after 300
assert_equal 1 [$master get testkey]
assert_equal 1 [$master exists testjob]
wait_for_condition 50 100 {
[$master debug digest] eq [$slave debug digest]
} else {
fail "KEYDB.CRON failed to replicate"
}
$master del testjob
$master del testkey
wait_for_condition 50 1000 {
[$master debug digest] eq [$slave debug digest]
} else {
fail "cron delete failed to propogate"
}
}
}
}

View File

@ -77,6 +77,94 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
$master flushall
}
test {Replication of EXPIREMEMBER (set) command (Active)} {
$master sadd testkey a b c d
wait_for_condition 50 100 {
[$master debug digest] eq [$slave debug digest]
} else {
fail "Failed to replicate set"
}
$master expiremember testkey a 1
after 1000
wait_for_condition 50 100 {
[$master scard testkey] eq 3
} else {
fail "expiremember failed to work on master"
}
wait_for_condition 50 100 {
[$slave scard testkey] eq 3
} else {
assert_equal [$slave scard testkey] 3
}
$master del testkey
}
test {Replication of EXPIREMEMBER (hash) command (Active)} {
$master hset testkey a value
$master hset testkey b value
wait_for_condition 50 100 {
[$master debug digest] eq [$slave debug digest]
} else {
fail "Failed to replicate set"
}
$master expiremember testkey a 1
after 1000
wait_for_condition 50 100 {
[$master hlen testkey] eq 1
} else {
fail "expiremember failed to work on master"
}
wait_for_condition 50 100 {
[$slave hlen testkey] eq 1
} else {
assert_equal [$slave hlen testkey] 1
}
$master del testkey
}
test {Replication of EXPIREMEMBER (zset) command (Active)} {
$master zadd testkey 1 a
$master zadd testkey 2 b
wait_for_condition 50 100 {
[$master debug digest] eq [$slave debug digest]
} else {
fail "Failed to replicate set"
}
$master expiremember testkey a 1
after 1000
wait_for_condition 50 100 {
[$master zcard testkey] eq 1
} else {
fail "expiremember failed to work on master"
}
wait_for_condition 50 100 {
[$slave zcard testkey] eq 1
} else {
assert_equal [$slave zcard testkey] 1
}
}
test {keydb.cron replicates (Active) } {
$master del testkey
$master keydb.cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey
after 300
assert_equal 1 [$master get testkey]
assert_equal 1 [$master exists testjob]
wait_for_condition 50 100 {
[$master debug digest] eq [$slave debug digest]
} else {
fail "KEYDB.CRON failed to replicate"
}
$master del testjob
$master del testkey
wait_for_condition 50 1000 {
[$master debug digest] eq [$slave debug digest]
} else {
fail "cron delete failed to propogate"
}
}
test {Active replicas WAIT} {
# Test that wait succeeds since replicas should be syncronized
$master set testkey foo
@ -113,37 +201,38 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
assert_equal {0} [$slave del testkey1]
}
test {Active replica expire propogates when source is down} {
$slave flushall
$slave set testkey2 foo
$slave set testkey1 foo
wait_for_condition 50 1000 {
[string match *foo* [$master get testkey1]]
} else {
fail "Replication failed to propogate"
}
$slave expire testkey1 2
assert_equal {1} [$slave wait 1 500] { "value should propogate
within 0.5 seconds" }
exec kill -SIGSTOP $slave_pid
test {Active replica expire propogates when source is down} {
$slave flushall
$slave set testkey2 foo
$slave set testkey1 foo
wait_for_condition 50 1000 {
[string match *foo* [$master get testkey1]]
} else {
fail "Replication failed to propogate"
}
$slave expire testkey1 2
assert_equal {1} [$slave wait 1 500] { "value should propogate
within 0.5 seconds" }
exec kill -SIGSTOP $slave_pid
after 3000
# Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us
# about what is actually in the dict. The only way to know is with a count from info
# Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us
# about what is actually in the dict. The only way to know is with a count from info
assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"}
}
exec kill -SIGCONT $slave_pid
}
test {Active replica different databases} {
$master select 3
$master set testkey abcd
$master select 2
$master del testkey
$slave select 3
wait_for_condition 50 1000 {
[string match abcd [$slave get testkey]]
} else {
fail "Replication failed to propogate DB 3"
}
exec kill -SIGCONT $slave_pid
test {Active replica different databases} {
$master select 3
$master set testkey abcd
$master select 2
$master del testkey
$slave select 3
wait_for_condition 50 1000 {
[string match abcd [$slave get testkey]]
} else {
fail "Replication failed to propogate DB 3"
}
}
}
}