diff --git a/src/aof.cpp b/src/aof.cpp index de8a8260e..4a7eb26ed 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -597,21 +597,59 @@ sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, r return buf; } -void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { - sds buf = sdsempty(); - robj *tmpargv[3]; - - /* The DB this command was targeting is not the same as the last command - * we appended. To issue a SELECT command is needed. */ - if (dictid != g_pserver->aof_selected_db) { - char seldb[64]; - - snprintf(seldb,sizeof(seldb),"%d",dictid); - buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", - (unsigned long)strlen(seldb),seldb); - g_pserver->aof_selected_db = dictid; +sds catAppendOnlyExpireMemberAtCommand(sds buf, struct redisCommand *cmd, robj **argv, const size_t argc) { + long long when = 0; + int unit = UNIT_SECONDS; + bool fAbsolute = false; + + if (cmd->proc == expireMemberCommand) { + if (getLongLongFromObject(argv[3], &when) != C_OK) + serverPanic("propogating invalid EXPIREMEMBER command"); + + if (argc == 5) { + unit = parseUnitString(szFromObj(argv[4])); + } + } else if (cmd->proc == expireMemberAtCommand) { + if (getLongLongFromObject(argv[3], &when) != C_OK) + serverPanic("propogating invalid EXPIREMEMBERAT command"); + fAbsolute = true; + } else if (cmd->proc == pexpireMemberAtCommand) { + if (getLongLongFromObject(argv[3], &when) != C_OK) + serverPanic("propogating invalid PEXPIREMEMBERAT command"); + fAbsolute = true; + unit = UNIT_MILLISECONDS; + } else { + serverPanic("Unknown expiremember command"); } + switch (unit) + { + case UNIT_SECONDS: + when *= 1000; + break; + + case UNIT_MILLISECONDS: + break; + } + + if (!fAbsolute) + when += mstime(); + + robj *argvNew[4]; + argvNew[0] = createStringObject("PEXPIREMEMBERAT",15); + argvNew[1] = argv[1]; + argvNew[2] = argv[2]; + argvNew[3] = createStringObjectFromLongLong(when); + buf = catAppendOnlyGenericCommand(buf, 4, argvNew); + decrRefCount(argvNew[0]); + decrRefCount(argvNew[3]); + return buf; +} + +sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj **argv, int argc) +{ + robj *tmpargv[3]; + if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ @@ -640,6 +678,10 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a if (pxarg) buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1], pxarg); + } else if (cmd->proc == expireMemberCommand || cmd->proc == expireMemberAtCommand || + cmd->proc == pexpireMemberAtCommand) { + /* Translate subkey expire commands to PEXPIREMEMBERAT */ + buf = catAppendOnlyExpireMemberAtCommand(buf, cmd, argv, argc); } else { /* All the other commands don't need translation or need the * same translation already operated in the command vector @@ -647,6 +689,25 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a buf = catAppendOnlyGenericCommand(buf,argc,argv); } + return buf; +} + +void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { + sds buf = sdsempty(); + + /* The DB this command was targeting is not the same as the last command + * we appended. To issue a SELECT command is needed. */ + if (dictid != g_pserver->aof_selected_db) { + char seldb[64]; + + snprintf(seldb,sizeof(seldb),"%d",dictid); + buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", + (unsigned long)strlen(seldb),seldb); + g_pserver->aof_selected_db = dictid; + } + + buf = catCommandForAofAndActiveReplication(buf, cmd, argv, argc); + /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ @@ -1378,7 +1439,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { } else { - char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n"; + char cmd[]="*4\r\n$12\r\nPEXPIREMEMBERAT\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(aof,&key) == 0) goto werr; if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr; diff --git a/src/cron.cpp b/src/cron.cpp index 230cf4ed4..9ef8bbfb4 100644 --- a/src/cron.cpp +++ b/src/cron.cpp @@ -70,6 +70,7 @@ void cronCommand(client *c) decrRefCount(o); // use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it. setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval); + ++g_pserver->dirty; addReply(c, shared.ok); } @@ -86,6 +87,7 @@ void executeCronJobExpireHook(const char *key, robj *o) serverAssert(cFake->argc == 0); // Setup the args for the EVAL command + cFake->cmd = lookupCommandByCString("EVAL"); cFake->argc = 3 + job->veckeys.size() + job->vecargs.size(); cFake->argv = (robj**)zmalloc(sizeof(robj*) * cFake->argc, MALLOC_LOCAL); cFake->argv[0] = createStringObject("EVAL", 4); @@ -96,7 +98,17 @@ void executeCronJobExpireHook(const char *key, robj *o) for (size_t i = 0; i < job->vecargs.size(); ++i) cFake->argv[3+job->veckeys.size()+i] = createStringObject(job->vecargs[i].get(), job->vecargs[i].size()); + int lua_replicate_backup = g_pserver->lua_always_replicate_commands; + g_pserver->lua_always_replicate_commands = 0; evalCommand(cFake); + g_pserver->lua_always_replicate_commands = lua_replicate_backup; + + if (g_pserver->aof_state != AOF_OFF) + feedAppendOnlyFile(cFake->cmd,cFake->db->id,cFake->argv,cFake->argc); + // Active replicas do their own expiries, do not propogate + if (!g_pserver->fActiveReplica) + replicationFeedSlaves(g_pserver->slaves,cFake->db->id,cFake->argv,cFake->argc); + resetClient(cFake); robj *keyobj = createStringObject(key,sdslen(key)); diff --git a/src/db.cpp b/src/db.cpp index 562485941..5ecd3253c 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1372,6 +1372,7 @@ expireEntry *getExpire(redisDb *db, robj_roptr key) { * keys. */ void propagateExpire(redisDb *db, robj *key, int lazy) { serverAssert(GlobalLocksAcquired()); + robj *argv[2]; argv[0] = lazy ? shared.unlink : shared.del; @@ -1389,6 +1390,48 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { decrRefCount(argv[1]); } +void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey) +{ + robj *argv[3]; + robj objT; + redisCommand *cmd = nullptr; + switch (type) + { + case OBJ_SET: + argv[0] = shared.srem; + argv[1] = key; + argv[2] = subkey; + cmd = cserver.sremCommand; + break; + + case OBJ_HASH: + argv[0] = shared.hdel; + argv[1] = key; + argv[2] = subkey; + cmd = cserver.hdelCommand; + break; + + case OBJ_ZSET: + argv[0] = shared.zrem; + argv[1] = key; + argv[2] = subkey; + cmd = cserver.zremCommand; + break; + + case OBJ_CRON: + return; // CRON jobs replicate in their own handler + + default: + serverPanic("Unknown subkey type"); + } + + if (g_pserver->aof_state != AOF_OFF) + feedAppendOnlyFile(cmd,db->id,argv,3); + // Active replicas do their own expiries, do not propogate + if (!g_pserver->fActiveReplica) + replicationFeedSlaves(g_pserver->slaves,db->id,argv,3); +} + /* Check if the key is expired. Note, this does not check subexpires */ int keyIsExpired(redisDb *db, robj *key) { expireEntry *pexpire = getExpire(db,key); diff --git a/src/debug.cpp b/src/debug.cpp index 1916dc79b..ca4da2d0a 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -31,6 +31,7 @@ #include "server.h" #include "sha1.h" /* SHA1 is used for DEBUG DIGEST */ #include "crc64.h" +#include "cron.h" #include #include @@ -251,6 +252,10 @@ void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj mt->digest(&md,mv->value); xorDigest(digest,md.x,sizeof(md.x)); } + } else if (o->type == OBJ_CRON) { + cronjob *job = (cronjob*)ptrFromObj(o); + mixDigest(digest, &job->interval, sizeof(job->interval)); + mixDigest(digest, job->script.get(), job->script.size()); } else { serverPanic("Unknown object type"); } diff --git a/src/expire.cpp b/src/expire.cpp index 1ac6ab415..ae9c75e8a 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -78,6 +78,10 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { dictEntry *de = dictFind(db->pdict, e.key()); robj *val = (robj*)dictGetVal(de); int deleted = 0; + + robj objKey; + initStaticStringObject(objKey, (char*)e.key()); + while (!pfat->FEmpty()) { if (pfat->nextExpireEntry().when > now) @@ -130,6 +134,11 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { default: serverAssert(false); } + + robj objSubkey; + initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get()); + propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); + pfat->popfrontExpireEntry(); } @@ -144,22 +153,18 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { db->setexpire->insert(eT); } - robj objT; switch (val->type) { case OBJ_SET: - initStaticStringObject(objT, (char*)e.key()); - signalModifiedKey(db,&objT); - notifyKeyspaceEvent(NOTIFY_SET,"srem",&objT,db->id); + signalModifiedKey(db,&objKey); + notifyKeyspaceEvent(NOTIFY_SET,"srem",&objKey,db->id); break; } } if (pfat->FEmpty()) { - robj *keyobj = createStringObject(e.key(),sdslen(e.key())); - removeExpire(db, keyobj); - decrRefCount(keyobj); + removeExpire(db, &objKey); } } @@ -224,7 +229,8 @@ void expireMemberCore(client *c, robj *key, robj *subkey, long long basetime, lo } setExpire(c, c->db, key, subkey, when); - + signalModifiedKey(c->db, key); + g_pserver->dirty++; addReply(c, shared.cone); } @@ -256,6 +262,14 @@ void expireMemberAtCommand(client *c) expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_SECONDS); } +void pexpireMemberAtCommand(client *c) +{ + long long when; + if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK) + return; + + expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_MILLISECONDS); +} /* Try to expire a few timed out keys. The algorithm used is adaptive and * will use few CPU cycles if there are few expiring keys, otherwise diff --git a/src/replication.cpp b/src/replication.cpp index dcd6a915b..a0a69da26 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -251,6 +251,8 @@ void feedReplicationBacklogWithObject(robj *o) { feedReplicationBacklog(p,len); } +sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj **argv, int argc); + void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bool fSendRaw) { char llstr[LONG_STR_SIZE]; @@ -289,13 +291,23 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ - /* Add the multi bulk length. */ - addReplyArrayLenAsync(replica,argc); + if (fSendRaw) + { + /* Add the multi bulk length. */ + addReplyArrayLenAsync(replica,argc); - /* Finally any additional argument that was not stored inside the - * static buffer if any (from j to argc). */ - for (int j = 0; j < argc; j++) - addReplyBulkAsync(replica,argv[j]); + /* Finally any additional argument that was not stored inside the + * static buffer if any (from j to argc). */ + for (int j = 0; j < argc; j++) + addReplyBulkAsync(replica,argv[j]); + } + else + { + struct redisCommand *cmd = lookupCommand(szFromObj(argv[0])); + sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc); + addReplyProtoAsync(replica, buf, sdslen(buf)); + sdsfree(buf); + } } /* Propagate write commands to slaves, and populate the replication backlog diff --git a/src/server.cpp b/src/server.cpp index 88f0b539b..4efe103ab 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -634,6 +634,10 @@ struct redisCommand redisCommandTable[] = { {"expirememberat", expireMemberAtCommand, 4, "write fast @keyspace", 0,NULL,1,1,1,0,0,0}, + + {"pexpirememberat", pexpireMemberAtCommand, 4, + "write fast @keyspace", + 0,NULL,1,1,1,0,0,0}, {"pexpire",pexpireCommand,3, "write fast @keyspace", @@ -2289,6 +2293,9 @@ void createSharedObjects(void) { shared.rpoplpush = makeObjectShared(createStringObject("RPOPLPUSH",9)); shared.zpopmin = makeObjectShared(createStringObject("ZPOPMIN",7)); shared.zpopmax = makeObjectShared(createStringObject("ZPOPMAX",7)); + shared.hdel = makeObjectShared(createStringObject("HDEL", 4)); + shared.zrem = makeObjectShared(createStringObject("ZREM", 4)); + shared.srem = makeObjectShared(createStringObject("SREM", 4)); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); @@ -2514,6 +2521,8 @@ void initServerConfig(void) { cserver.xclaimCommand = lookupCommandByCString("xclaim"); cserver.xgroupCommand = lookupCommandByCString("xgroup"); cserver.rreplayCommand = lookupCommandByCString("rreplay"); + cserver.hdelCommand = lookupCommandByCString("hdel"); + cserver.zremCommand = lookupCommandByCString("zrem"); /* Slow log */ g_pserver->slowlog_log_slower_than = CONFIG_DEFAULT_SLOWLOG_LOG_SLOWER_THAN; diff --git a/src/server.h b/src/server.h index 218982bda..307321bab 100644 --- a/src/server.h +++ b/src/server.h @@ -1360,7 +1360,7 @@ struct sharedObjectsStruct { *masterdownerr, *roslaveerr, *execaborterr, *noautherr, *noreplicaserr, *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, - *rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, + *rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, *srem, *hdel, *zrem, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -1581,7 +1581,7 @@ struct redisServerConst { *lpopCommand, *rpopCommand, *zpopminCommand, *zpopmaxCommand, *sremCommand, *execCommand, *expireCommand, *pexpireCommand, *xclaimCommand, - *xgroupCommand, *rreplayCommand; + *xgroupCommand, *rreplayCommand, *hdelCommand, *zremCommand; /* Configuration */ char *default_masteruser; /* AUTH with this user and masterauth with master */ @@ -2536,6 +2536,7 @@ int removeExpire(redisDb *db, robj *key); int removeExpireCore(redisDb *db, robj *key, dictEntry *de); int removeSubkeyExpire(redisDb *db, robj *key, robj *subkey); void propagateExpire(redisDb *db, robj *key, int lazy); +void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey); int expireIfNeeded(redisDb *db, robj *key); expireEntry *getExpire(redisDb *db, robj_roptr key); void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when); @@ -2659,6 +2660,8 @@ extern "C" char *redisGitSHA1(void); extern "C" char *redisGitDirty(void); extern "C" uint64_t redisBuildId(void); +int parseUnitString(const char *sz); + /* Commands prototypes */ void authCommand(client *c); void pingCommand(client *c); @@ -2736,6 +2739,7 @@ void expireCommand(client *c); void expireatCommand(client *c); void expireMemberCommand(client *c); void expireMemberAtCommand(client *c); +void pexpireMemberAtCommand(client *c); void pexpireCommand(client *c); void pexpireatCommand(client *c); void getsetCommand(client *c); diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index 277b0d3df..0a1a21fdd 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -257,4 +257,26 @@ tags {"aof"} { r expire x -1 } } + + ## Test that PEXPIREMEMBERAT is loaded correctly + create_aof { + append_to_aof [formatCommand sadd testkey a b c d] + append_to_aof [formatCommand pexpirememberat testkey a 1000] + } + + start_server_aof [list dir $server_path aof-load-truncated no] { + test "AOF+EXPIREMEMBER: Server shuold have been started" { + assert_equal 1 [is_alive $srv] + } + + test "AOF+PEXPIREMEMBERAT: set should have 3 values" { + set client [redis [dict get $srv host] [dict get $srv port]] + wait_for_condition 50 100 { + [catch {$client ping} e] == 0 + } else { + fail "Loading DB is taking too much time." + } + assert_equal 3 [$client scard testkey] + } + } } diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl index 3c6df52a8..4065de786 100644 --- a/tests/integration/replication-4.tcl +++ b/tests/integration/replication-4.tcl @@ -151,5 +151,93 @@ start_server {tags {"repl"}} { fail "SPOP replication inconsistency" } } + + test {Replication of EXPIREMEMBER (set) command} { + $master sadd testkey a b c d + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master scard testkey] eq 3 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave scard testkey] eq 3 + } else { + assert_equal [$slave scard testkey] 3 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (hash) command} { + $master hset testkey a value + $master hset testkey b value + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master hlen testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave hlen testkey] eq 1 + } else { + assert_equal [$slave hlen testkey] 1 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (zset) command} { + $master zadd testkey 1 a + $master zadd testkey 2 b + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master zcard testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave zcard testkey] eq 1 + } else { + assert_equal [$slave zcard testkey] 1 + } + } + + test {keydb.cron replicates} { + $master del testkey + $master keydb.cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey + after 300 + assert_equal 1 [$master get testkey] + assert_equal 1 [$master exists testjob] + + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "KEYDB.CRON failed to replicate" + } + $master del testjob + $master del testkey + wait_for_condition 50 1000 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "cron delete failed to propogate" + } + } } } diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 0c28eb85d..b1d1c5217 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -77,6 +77,94 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { $master flushall } + test {Replication of EXPIREMEMBER (set) command (Active)} { + $master sadd testkey a b c d + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master scard testkey] eq 3 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave scard testkey] eq 3 + } else { + assert_equal [$slave scard testkey] 3 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (hash) command (Active)} { + $master hset testkey a value + $master hset testkey b value + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master hlen testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave hlen testkey] eq 1 + } else { + assert_equal [$slave hlen testkey] 1 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (zset) command (Active)} { + $master zadd testkey 1 a + $master zadd testkey 2 b + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master zcard testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave zcard testkey] eq 1 + } else { + assert_equal [$slave zcard testkey] 1 + } + } + + test {keydb.cron replicates (Active) } { + $master del testkey + $master keydb.cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey + after 300 + assert_equal 1 [$master get testkey] + assert_equal 1 [$master exists testjob] + + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "KEYDB.CRON failed to replicate" + } + $master del testjob + $master del testkey + wait_for_condition 50 1000 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "cron delete failed to propogate" + } + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo @@ -113,37 +201,38 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { assert_equal {0} [$slave del testkey1] } - test {Active replica expire propogates when source is down} { - $slave flushall - $slave set testkey2 foo - $slave set testkey1 foo - wait_for_condition 50 1000 { - [string match *foo* [$master get testkey1]] - } else { - fail "Replication failed to propogate" - } - $slave expire testkey1 2 - assert_equal {1} [$slave wait 1 500] { "value should propogate - within 0.5 seconds" } - exec kill -SIGSTOP $slave_pid + test {Active replica expire propogates when source is down} { + $slave flushall + $slave set testkey2 foo + $slave set testkey1 foo + wait_for_condition 50 1000 { + [string match *foo* [$master get testkey1]] + } else { + fail "Replication failed to propogate" + } + $slave expire testkey1 2 + assert_equal {1} [$slave wait 1 500] { "value should propogate + within 0.5 seconds" } + exec kill -SIGSTOP $slave_pid after 3000 - # Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us - # about what is actually in the dict. The only way to know is with a count from info + # Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us + # about what is actually in the dict. The only way to know is with a count from info assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"} - } - exec kill -SIGCONT $slave_pid + } - test {Active replica different databases} { - $master select 3 - $master set testkey abcd - $master select 2 - $master del testkey - $slave select 3 - wait_for_condition 50 1000 { - [string match abcd [$slave get testkey]] - } else { - fail "Replication failed to propogate DB 3" - } + exec kill -SIGCONT $slave_pid + + test {Active replica different databases} { + $master select 3 + $master set testkey abcd + $master select 2 + $master del testkey + $slave select 3 + wait_for_condition 50 1000 { + [string match abcd [$slave get testkey]] + } else { + fail "Replication failed to propogate DB 3" } } } +}