diff --git a/src/aof.cpp b/src/aof.cpp index 7837311c0..292d52549 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -599,21 +599,59 @@ sds catAppendOnlyExpireAtCommand(sds buf, struct redisCommand *cmd, robj *key, r return buf; } -void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { - sds buf = sdsempty(); - robj *tmpargv[3]; - - /* The DB this command was targeting is not the same as the last command - * we appended. To issue a SELECT command is needed. */ - if (dictid != g_pserver->aof_selected_db) { - char seldb[64]; - - snprintf(seldb,sizeof(seldb),"%d",dictid); - buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", - (unsigned long)strlen(seldb),seldb); - g_pserver->aof_selected_db = dictid; +sds catAppendOnlyExpireMemberAtCommand(sds buf, struct redisCommand *cmd, robj **argv, const size_t argc) { + long long when = 0; + int unit = UNIT_SECONDS; + bool fAbsolute = false; + + if (cmd->proc == expireMemberCommand) { + if (getLongLongFromObject(argv[3], &when) != C_OK) + serverPanic("propogating invalid EXPIREMEMBER command"); + + if (argc == 5) { + unit = parseUnitString(szFromObj(argv[4])); + } + } else if (cmd->proc == expireMemberAtCommand) { + if (getLongLongFromObject(argv[3], &when) != C_OK) + serverPanic("propogating invalid EXPIREMEMBERAT command"); + fAbsolute = true; + } else if (cmd->proc == pexpireMemberAtCommand) { + if (getLongLongFromObject(argv[3], &when) != C_OK) + serverPanic("propogating invalid PEXPIREMEMBERAT command"); + fAbsolute = true; + unit = UNIT_MILLISECONDS; + } else { + serverPanic("Unknown expiremember command"); } + switch (unit) + { + case UNIT_SECONDS: + when *= 1000; + break; + + case UNIT_MILLISECONDS: + break; + } + + if (!fAbsolute) + when += mstime(); + + robj *argvNew[4]; + argvNew[0] = createStringObject("PEXPIREMEMBERAT",15); + argvNew[1] = argv[1]; + argvNew[2] = argv[2]; + argvNew[3] = createStringObjectFromLongLong(when); + buf = catAppendOnlyGenericCommand(buf, 4, argvNew); + decrRefCount(argvNew[0]); + decrRefCount(argvNew[3]); + return buf; +} + +sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj **argv, int argc) +{ + robj *tmpargv[3]; + if (cmd->proc == expireCommand || cmd->proc == pexpireCommand || cmd->proc == expireatCommand) { /* Translate EXPIRE/PEXPIRE/EXPIREAT into PEXPIREAT */ @@ -642,6 +680,10 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a if (pxarg) buf = catAppendOnlyExpireAtCommand(buf,cserver.pexpireCommand,argv[1], pxarg); + } else if (cmd->proc == expireMemberCommand || cmd->proc == expireMemberAtCommand || + cmd->proc == pexpireMemberAtCommand) { + /* Translate subkey expire commands to PEXPIREMEMBERAT */ + buf = catAppendOnlyExpireMemberAtCommand(buf, cmd, argv, argc); } else { /* All the other commands don't need translation or need the * same translation already operated in the command vector @@ -649,6 +691,25 @@ void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int a buf = catAppendOnlyGenericCommand(buf,argc,argv); } + return buf; +} + +void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { + sds buf = sdsempty(); + + /* The DB this command was targeting is not the same as the last command + * we appended. To issue a SELECT command is needed. */ + if (dictid != g_pserver->aof_selected_db) { + char seldb[64]; + + snprintf(seldb,sizeof(seldb),"%d",dictid); + buf = sdscatprintf(buf,"*2\r\n$6\r\nSELECT\r\n$%lu\r\n%s\r\n", + (unsigned long)strlen(seldb),seldb); + g_pserver->aof_selected_db = dictid; + } + + buf = catCommandForAofAndActiveReplication(buf, cmd, argv, argc); + /* Append to the AOF buffer. This will be flushed on disk just before * of re-entering the event loop, so before the client will get a * positive reply about the operation performed. */ @@ -1398,7 +1459,7 @@ int rewriteAppendOnlyFileRio(rio *aof) { } else { - char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n"; + char cmd[]="*4\r\n$12\r\nPEXPIREMEMBERAT\r\n"; if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (rioWriteBulkObject(aof,&key) == 0) goto werr; if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr; diff --git a/src/config.cpp b/src/config.cpp index 983b06220..4b1d56235 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -530,8 +530,14 @@ void loadServerConfigFromString(char *config) { } else if (strcasecmp(argv[1], "false") == 0) { cserver.fThreadAffinity = FALSE; } else { - err = "Unknown argument: server-thread-affinity expects either true or false"; - goto loaderr; + int offset = atoi(argv[1]); + if (offset > 0) { + cserver.fThreadAffinity = TRUE; + cserver.threadAffinityOffset = offset-1; + } else { + err = "Unknown argument: server-thread-affinity expects either true or false"; + goto loaderr; + } } } else if (!strcasecmp(argv[0], "active-replica") && argc == 2) { g_pserver->fActiveReplica = yesnotoi(argv[1]); diff --git a/src/cron.cpp b/src/cron.cpp index 58f709a2d..3e40dd78a 100644 --- a/src/cron.cpp +++ b/src/cron.cpp @@ -70,6 +70,7 @@ void cronCommand(client *c) decrRefCount(o); // use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it. setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval); + ++g_pserver->dirty; addReply(c, shared.ok); } @@ -86,6 +87,7 @@ void executeCronJobExpireHook(const char *key, robj *o) serverAssert(cFake->argc == 0); // Setup the args for the EVAL command + cFake->cmd = lookupCommandByCString("EVAL"); cFake->argc = 3 + job->veckeys.size() + job->vecargs.size(); cFake->argv = (robj**)zmalloc(sizeof(robj*) * cFake->argc, MALLOC_LOCAL); cFake->argv[0] = createStringObject("EVAL", 4); @@ -96,7 +98,17 @@ void executeCronJobExpireHook(const char *key, robj *o) for (size_t i = 0; i < job->vecargs.size(); ++i) cFake->argv[3+job->veckeys.size()+i] = createStringObject(job->vecargs[i].get(), job->vecargs[i].size()); + int lua_replicate_backup = g_pserver->lua_always_replicate_commands; + g_pserver->lua_always_replicate_commands = 0; evalCommand(cFake); + g_pserver->lua_always_replicate_commands = lua_replicate_backup; + + if (g_pserver->aof_state != AOF_OFF) + feedAppendOnlyFile(cFake->cmd,cFake->db->id,cFake->argv,cFake->argc); + // Active replicas do their own expiries, do not propogate + if (!g_pserver->fActiveReplica) + replicationFeedSlaves(g_pserver->slaves,cFake->db->id,cFake->argv,cFake->argc); + resetClient(cFake); robj *keyobj = createStringObject(key,sdslen(key)); diff --git a/src/db.cpp b/src/db.cpp index 666812982..01836d713 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1446,6 +1446,7 @@ expireEntry *getExpire(redisDb *db, robj_roptr key) { * keys. */ void propagateExpire(redisDb *db, robj *key, int lazy) { serverAssert(GlobalLocksAcquired()); + robj *argv[2]; argv[0] = lazy ? shared.unlink : shared.del; @@ -1463,6 +1464,48 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { decrRefCount(argv[1]); } +void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey) +{ + robj *argv[3]; + robj objT; + redisCommand *cmd = nullptr; + switch (type) + { + case OBJ_SET: + argv[0] = shared.srem; + argv[1] = key; + argv[2] = subkey; + cmd = cserver.sremCommand; + break; + + case OBJ_HASH: + argv[0] = shared.hdel; + argv[1] = key; + argv[2] = subkey; + cmd = cserver.hdelCommand; + break; + + case OBJ_ZSET: + argv[0] = shared.zrem; + argv[1] = key; + argv[2] = subkey; + cmd = cserver.zremCommand; + break; + + case OBJ_CRON: + return; // CRON jobs replicate in their own handler + + default: + serverPanic("Unknown subkey type"); + } + + if (g_pserver->aof_state != AOF_OFF) + feedAppendOnlyFile(cmd,db->id,argv,3); + // Active replicas do their own expiries, do not propogate + if (!g_pserver->fActiveReplica) + replicationFeedSlaves(g_pserver->slaves,db->id,argv,3); +} + /* Check if the key is expired. Note, this does not check subexpires */ int keyIsExpired(redisDb *db, robj *key) { expireEntry *pexpire = getExpire(db,key); diff --git a/src/debug.cpp b/src/debug.cpp index 79beba2a2..0ed278477 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -31,6 +31,7 @@ #include "server.h" #include "sha1.h" /* SHA1 is used for DEBUG DIGEST */ #include "crc64.h" +#include "cron.h" #include #include @@ -251,6 +252,10 @@ void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj mt->digest(&md,mv->value); xorDigest(digest,md.x,sizeof(md.x)); } + } else if (o->type == OBJ_CRON) { + cronjob *job = (cronjob*)ptrFromObj(o); + mixDigest(digest, &job->interval, sizeof(job->interval)); + mixDigest(digest, job->script.get(), job->script.size()); } else { serverPanic("Unknown object type"); } diff --git a/src/evict.cpp b/src/evict.cpp index 7b666523b..eb847825b 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -463,7 +463,7 @@ int freeMemoryIfNeeded(void) { int keys_freed = 0; /* By default replicas should ignore maxmemory * and just be masters exact copies. */ - if (listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory) return C_OK; + if (listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory && !g_pserver->fActiveReplica) return C_OK; size_t mem_reported, mem_tofree, mem_freed; mstime_t latency, eviction_latency; diff --git a/src/expire.cpp b/src/expire.cpp index 1ac6ab415..ae9c75e8a 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -78,6 +78,10 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { dictEntry *de = dictFind(db->pdict, e.key()); robj *val = (robj*)dictGetVal(de); int deleted = 0; + + robj objKey; + initStaticStringObject(objKey, (char*)e.key()); + while (!pfat->FEmpty()) { if (pfat->nextExpireEntry().when > now) @@ -130,6 +134,11 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { default: serverAssert(false); } + + robj objSubkey; + initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get()); + propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); + pfat->popfrontExpireEntry(); } @@ -144,22 +153,18 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { db->setexpire->insert(eT); } - robj objT; switch (val->type) { case OBJ_SET: - initStaticStringObject(objT, (char*)e.key()); - signalModifiedKey(db,&objT); - notifyKeyspaceEvent(NOTIFY_SET,"srem",&objT,db->id); + signalModifiedKey(db,&objKey); + notifyKeyspaceEvent(NOTIFY_SET,"srem",&objKey,db->id); break; } } if (pfat->FEmpty()) { - robj *keyobj = createStringObject(e.key(),sdslen(e.key())); - removeExpire(db, keyobj); - decrRefCount(keyobj); + removeExpire(db, &objKey); } } @@ -224,7 +229,8 @@ void expireMemberCore(client *c, robj *key, robj *subkey, long long basetime, lo } setExpire(c, c->db, key, subkey, when); - + signalModifiedKey(c->db, key); + g_pserver->dirty++; addReply(c, shared.cone); } @@ -256,6 +262,14 @@ void expireMemberAtCommand(client *c) expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_SECONDS); } +void pexpireMemberAtCommand(client *c) +{ + long long when; + if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK) + return; + + expireMemberCore(c, c->argv[1], c->argv[2], 0, when, UNIT_MILLISECONDS); +} /* Try to expire a few timed out keys. The algorithm used is adaptive and * will use few CPU cycles if there are few expiring keys, otherwise diff --git a/src/replication.cpp b/src/replication.cpp index ded3fa01f..1e6db4b10 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -286,6 +286,8 @@ void feedReplicationBacklogWithObject(robj *o) { feedReplicationBacklog(p,len); } +sds catCommandForAofAndActiveReplication(sds buf, struct redisCommand *cmd, robj **argv, int argc); + void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bool fSendRaw) { char llstr[LONG_STR_SIZE]; @@ -324,13 +326,43 @@ void replicationFeedSlave(client *replica, int dictid, robj **argv, int argc, bo * are queued in the output buffer until the initial SYNC completes), * or are already in sync with the master. */ - /* Add the multi bulk length. */ - addReplyArrayLenAsync(replica,argc); + if (fSendRaw) + { + /* Add the multi bulk length. */ + addReplyArrayLenAsync(replica,argc); - /* Finally any additional argument that was not stored inside the - * static buffer if any (from j to argc). */ - for (int j = 0; j < argc; j++) - addReplyBulkAsync(replica,argv[j]); + /* Finally any additional argument that was not stored inside the + * static buffer if any (from j to argc). */ + for (int j = 0; j < argc; j++) + addReplyBulkAsync(replica,argv[j]); + } + else + { + struct redisCommand *cmd = lookupCommand(szFromObj(argv[0])); + sds buf = catCommandForAofAndActiveReplication(sdsempty(), cmd, argv, argc); + addReplyProtoAsync(replica, buf, sdslen(buf)); + sdsfree(buf); + } +} + +static int writeProtoNum(char *dst, const size_t cchdst, long long num) +{ + if (cchdst < 1) + return 0; + dst[0] = '$'; + int cch = 1; + cch += ll2string(dst + cch, cchdst - cch, digits10(num)); + int chCpyT = std::min(cchdst - cch, 2); + memcpy(dst + cch, "\r\n", chCpyT); + cch += chCpyT; + cch += ll2string(dst + cch, cchdst-cch, num); + chCpyT = std::min(cchdst - cch, 3); + memcpy(dst + cch, "\r\n", chCpyT); + if (chCpyT == 3) + cch += 2; + else + cch += chCpyT; + return cch; } /* Propagate write commands to slaves, and populate the replication backlog @@ -343,6 +375,8 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listIter li, liReply; int j, len; serverAssert(GlobalLocksAcquired()); + static client *fake = nullptr; + if (dictid < 0) dictid = 0; // this can happen if we send a PING before any real operation @@ -360,8 +394,12 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { /* We can't have slaves attached and no backlog. */ serverAssert(!(listLength(slaves) != 0 && g_pserver->repl_backlog == NULL)); - client *fake = createClient(nullptr, serverTL - g_pserver->rgthreadvar); - fake->flags |= CLIENT_FORCE_REPLY; + if (fake == nullptr) + { + fake = createClient(nullptr, serverTL - g_pserver->rgthreadvar); + fake->flags |= CLIENT_FORCE_REPLY; + } + bool fSendRaw = !g_pserver->fActiveReplica; replicationFeedSlave(fake, dictid, argv, argc, fSendRaw); // Note: updates the repl log, keep above the repl update code below @@ -377,24 +415,37 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { serverAssert(argc > 0); serverAssert(cchbuf > 0); - char uuid[40] = {'\0'}; - uuid_unparse(cserver.uuid, uuid); + // The code below used to be: snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); + // but that was much too slow + static const char *protoRREPLAY = "*5\r\n$7\r\nRREPLAY\r\n$36\r\n00000000-0000-0000-0000-000000000000\r\n$"; char proto[1024]; - int cchProto = snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); - cchProto = std::min((int)sizeof(proto), cchProto); + int cchProto = 0; + if (!fSendRaw) + { + char uuid[37]; + uuid_unparse(cserver.uuid, uuid); + + cchProto = strlen(protoRREPLAY); + memcpy(proto, protoRREPLAY, strlen(protoRREPLAY)); + memcpy(proto + 22, uuid, 36); // Note UUID_STR_LEN includes the \0 trailing byte which we don't want + cchProto += ll2string(proto + cchProto, sizeof(proto)-cchProto, cchbuf); + memcpy(proto + cchProto, "\r\n", 3); + cchProto += 2; + } + long long master_repl_offset_start = g_pserver->master_repl_offset; char szDbNum[128]; - int cchDictIdNum = snprintf(szDbNum, sizeof(szDbNum), "%d", dictid); - int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", cchDictIdNum, dictid); - cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that + int cchDbNum = 0; + if (!fSendRaw) + cchDbNum = writeProtoNum(szDbNum, sizeof(szDbNum), dictid); + char szMvcc[128]; - incrementMvccTstamp(); - uint64_t mvccTstamp = getMvccTstamp(); - int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%" PRIu64, mvccTstamp); - int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%" PRIu64 "\r\n", cchMvccNum, mvccTstamp); - cchMvcc = std::min(cchMvcc, sizeof(szMvcc)); // tricky snprintf + int cchMvcc = 0; + incrementMvccTstamp(); // Always increment MVCC tstamp so we're consistent with active and normal replication + if (!fSendRaw) + cchMvcc = writeProtoNum(szMvcc, sizeof(szMvcc), getMvccTstamp()); /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) @@ -483,7 +534,11 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { } } - freeClient(fake); + // Cleanup cached fake client output buffers + fake->bufpos = 0; + fake->sentlen = 0; + fake->reply_bytes = 0; + listEmpty(fake->reply); } /* This function is used in order to proxy what we receive from our master @@ -3107,6 +3162,8 @@ void roleCommand(client *c) { listNode *ln; listRewind(g_pserver->masters, &li); + if (listLength(g_pserver->masters) > 1) + addReplyArrayLen(c,listLength(g_pserver->masters)); while ((ln = listNext(&li))) { redisMaster *mi = (redisMaster*)listNodeValue(ln); diff --git a/src/server.cpp b/src/server.cpp index 1ebf71b1e..0212547cf 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -640,6 +640,10 @@ struct redisCommand redisCommandTable[] = { {"expirememberat", expireMemberAtCommand, 4, "write fast @keyspace", 0,NULL,1,1,1,0,0,0}, + + {"pexpirememberat", pexpireMemberAtCommand, 4, + "write fast @keyspace", + 0,NULL,1,1,1,0,0,0}, {"pexpire",pexpireCommand,3, "write fast @keyspace", @@ -2343,6 +2347,9 @@ void createSharedObjects(void) { shared.zpopmax = makeObjectShared("ZPOPMAX",7); shared.multi = makeObjectShared("MULTI",5); shared.exec = makeObjectShared("EXEC",4); + shared.hdel = makeObjectShared(createStringObject("HDEL", 4)); + shared.zrem = makeObjectShared(createStringObject("ZREM", 4)); + shared.srem = makeObjectShared(createStringObject("SREM", 4)); for (j = 0; j < OBJ_SHARED_INTEGERS; j++) { shared.integers[j] = makeObjectShared(createObject(OBJ_STRING,(void*)(long)j)); @@ -2496,6 +2503,8 @@ void initServerConfig(void) { cserver.xgroupCommand = lookupCommandByCString("xgroup"); cserver.rreplayCommand = lookupCommandByCString("rreplay"); cserver.rpoplpushCommand = lookupCommandByCString("rpoplpush"); + cserver.hdelCommand = lookupCommandByCString("hdel"); + cserver.zremCommand = lookupCommandByCString("zrem"); /* Debugging */ g_pserver->assert_failed = ""; @@ -2513,6 +2522,7 @@ void initServerConfig(void) { /* Multithreading */ cserver.cthreads = CONFIG_DEFAULT_THREADS; cserver.fThreadAffinity = CONFIG_DEFAULT_THREAD_AFFINITY; + cserver.threadAffinityOffset = 0; initConfigValues(); } @@ -5515,10 +5525,10 @@ int main(int argc, char **argv) { #ifdef __linux__ cpu_set_t cpuset; CPU_ZERO(&cpuset); - CPU_SET(iel, &cpuset); + CPU_SET(iel + cserver.threadAffinityOffset, &cpuset); if (pthread_setaffinity_np(rgthread[iel], sizeof(cpu_set_t), &cpuset) == 0) { - serverLog(LOG_INFO, "Binding thread %d to cpu %d", iel, iel); + serverLog(LOG_INFO, "Binding thread %d to cpu %d", iel, iel + cserver.threadAffinityOffset + 1); } #else serverLog(LL_WARNING, "CPU pinning not available on this platform"); diff --git a/src/server.h b/src/server.h index 3a08b4d6a..8870a6a07 100644 --- a/src/server.h +++ b/src/server.h @@ -1343,7 +1343,7 @@ struct sharedObjectsStruct { *busykeyerr, *oomerr, *plus, *messagebulk, *pmessagebulk, *subscribebulk, *unsubscribebulk, *psubscribebulk, *punsubscribebulk, *del, *unlink, *rpop, *lpop, *lpush, *rpoplpush, *zpopmin, *zpopmax, *emptyscan, - *multi, *exec, + *multi, *exec, *srem, *hdel, *zrem, *select[PROTO_SHARED_SELECT_CMDS], *integers[OBJ_SHARED_INTEGERS], *mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*\r\n" */ @@ -1577,6 +1577,7 @@ struct redisServerConst { int cthreads; /* Number of main worker threads */ int fThreadAffinity; /* Should we pin threads to cores? */ + int threadAffinityOffset = 0; /* Where should we start pinning them? */ char *pidfile; /* PID file path */ /* Fast pointers to often looked up command */ @@ -1584,7 +1585,8 @@ struct redisServerConst { *lpopCommand, *rpopCommand, *zpopminCommand, *zpopmaxCommand, *sremCommand, *execCommand, *expireCommand, *pexpireCommand, *xclaimCommand, - *xgroupCommand, *rreplayCommand, *rpoplpushCommand; + *xgroupCommand, *rreplayCommand, *rpoplpushCommand, + *hdelCommand, *zremCommand; /* Configuration */ char *default_masteruser; /* AUTH with this user and masterauth with master */ @@ -2622,6 +2624,7 @@ int removeExpire(redisDb *db, robj *key); int removeExpireCore(redisDb *db, robj *key, dictEntry *de); int removeSubkeyExpire(redisDb *db, robj *key, robj *subkey); void propagateExpire(redisDb *db, robj *key, int lazy); +void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey); int expireIfNeeded(redisDb *db, robj *key); expireEntry *getExpire(redisDb *db, robj_roptr key); void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when); @@ -2759,6 +2762,8 @@ extern "C" char *redisGitDirty(void); extern "C" uint64_t redisBuildId(void); extern "C" char *redisBuildIdString(void); +int parseUnitString(const char *sz); + /* Commands prototypes */ void authCommand(client *c); void pingCommand(client *c); @@ -2837,6 +2842,7 @@ void expireCommand(client *c); void expireatCommand(client *c); void expireMemberCommand(client *c); void expireMemberAtCommand(client *c); +void pexpireMemberAtCommand(client *c); void pexpireCommand(client *c); void pexpireatCommand(client *c); void getsetCommand(client *c); diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index 9b605c266..2dddbd7e3 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -290,4 +290,26 @@ tags {"aof"} { } } } + + ## Test that PEXPIREMEMBERAT is loaded correctly + create_aof { + append_to_aof [formatCommand sadd testkey a b c d] + append_to_aof [formatCommand pexpirememberat testkey a 1000] + } + + start_server_aof [list dir $server_path aof-load-truncated no] { + test "AOF+EXPIREMEMBER: Server shuold have been started" { + assert_equal 1 [is_alive $srv] + } + + test "AOF+PEXPIREMEMBERAT: set should have 3 values" { + set client [redis [dict get $srv host] [dict get $srv port]] + wait_for_condition 50 100 { + [catch {$client ping} e] == 0 + } else { + fail "Loading DB is taking too much time." + } + assert_equal 3 [$client scard testkey] + } + } } diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl index 54891151b..5db67025c 100644 --- a/tests/integration/replication-4.tcl +++ b/tests/integration/replication-4.tcl @@ -142,5 +142,93 @@ start_server {tags {"repl"}} { fail "SPOP replication inconsistency" } } + + test {Replication of EXPIREMEMBER (set) command} { + $master sadd testkey a b c d + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master scard testkey] eq 3 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave scard testkey] eq 3 + } else { + assert_equal [$slave scard testkey] 3 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (hash) command} { + $master hset testkey a value + $master hset testkey b value + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master hlen testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave hlen testkey] eq 1 + } else { + assert_equal [$slave hlen testkey] 1 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (zset) command} { + $master zadd testkey 1 a + $master zadd testkey 2 b + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master zcard testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave zcard testkey] eq 1 + } else { + assert_equal [$slave zcard testkey] 1 + } + } + + test {keydb.cron replicates} { + $master del testkey + $master keydb.cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey + after 300 + assert_equal 1 [$master get testkey] + assert_equal 1 [$master exists testjob] + + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "KEYDB.CRON failed to replicate" + } + $master del testjob + $master del testkey + wait_for_condition 50 1000 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "cron delete failed to propogate" + } + } } } diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 0c28eb85d..b1d1c5217 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -77,6 +77,94 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { $master flushall } + test {Replication of EXPIREMEMBER (set) command (Active)} { + $master sadd testkey a b c d + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master scard testkey] eq 3 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave scard testkey] eq 3 + } else { + assert_equal [$slave scard testkey] 3 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (hash) command (Active)} { + $master hset testkey a value + $master hset testkey b value + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master hlen testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave hlen testkey] eq 1 + } else { + assert_equal [$slave hlen testkey] 1 + } + $master del testkey + } + + test {Replication of EXPIREMEMBER (zset) command (Active)} { + $master zadd testkey 1 a + $master zadd testkey 2 b + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "Failed to replicate set" + } + $master expiremember testkey a 1 + after 1000 + wait_for_condition 50 100 { + [$master zcard testkey] eq 1 + } else { + fail "expiremember failed to work on master" + } + wait_for_condition 50 100 { + [$slave zcard testkey] eq 1 + } else { + assert_equal [$slave zcard testkey] 1 + } + } + + test {keydb.cron replicates (Active) } { + $master del testkey + $master keydb.cron testjob repeat 0 1000000 {redis.call("incr", "testkey")} 1 testkey + after 300 + assert_equal 1 [$master get testkey] + assert_equal 1 [$master exists testjob] + + wait_for_condition 50 100 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "KEYDB.CRON failed to replicate" + } + $master del testjob + $master del testkey + wait_for_condition 50 1000 { + [$master debug digest] eq [$slave debug digest] + } else { + fail "cron delete failed to propogate" + } + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo @@ -113,37 +201,38 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { assert_equal {0} [$slave del testkey1] } - test {Active replica expire propogates when source is down} { - $slave flushall - $slave set testkey2 foo - $slave set testkey1 foo - wait_for_condition 50 1000 { - [string match *foo* [$master get testkey1]] - } else { - fail "Replication failed to propogate" - } - $slave expire testkey1 2 - assert_equal {1} [$slave wait 1 500] { "value should propogate - within 0.5 seconds" } - exec kill -SIGSTOP $slave_pid + test {Active replica expire propogates when source is down} { + $slave flushall + $slave set testkey2 foo + $slave set testkey1 foo + wait_for_condition 50 1000 { + [string match *foo* [$master get testkey1]] + } else { + fail "Replication failed to propogate" + } + $slave expire testkey1 2 + assert_equal {1} [$slave wait 1 500] { "value should propogate + within 0.5 seconds" } + exec kill -SIGSTOP $slave_pid after 3000 - # Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us - # about what is actually in the dict. The only way to know is with a count from info + # Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us + # about what is actually in the dict. The only way to know is with a count from info assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"} - } - exec kill -SIGCONT $slave_pid + } - test {Active replica different databases} { - $master select 3 - $master set testkey abcd - $master select 2 - $master del testkey - $slave select 3 - wait_for_condition 50 1000 { - [string match abcd [$slave get testkey]] - } else { - fail "Replication failed to propogate DB 3" - } + exec kill -SIGCONT $slave_pid + + test {Active replica different databases} { + $master select 3 + $master set testkey abcd + $master select 2 + $master del testkey + $slave select 3 + wait_for_condition 50 1000 { + [string match abcd [$slave get testkey]] + } else { + fail "Replication failed to propogate DB 3" } } } +}