diff --git a/src/aof.cpp b/src/aof.cpp index c7160489b..5c6385c84 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1321,13 +1321,12 @@ int rewriteAppendOnlyFileRio(rio *aof) { while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; - long long expiretime; keystr = (sds)dictGetKey(de); o = (robj*)dictGetVal(de); initStaticStringObject(key,keystr); - expiretime = getExpire(db,&key); + expireEntry *pexpire = getExpire(db,&key); /* Save the key and associated value */ if (o->type == OBJ_STRING) { @@ -1353,11 +1352,23 @@ int rewriteAppendOnlyFileRio(rio *aof) { serverPanic("Unknown object type"); } /* Save the expire time */ - if (expiretime != -1) { - char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; + if (pexpire != nullptr) { + for (auto &subExpire : *pexpire) { + if (subExpire.subkey() == nullptr) + { + char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) goto werr; + } + else + { + char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n"; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr; + } + if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) goto werr; // common + } } /* Read some diff from the parent process from time to time. */ if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { diff --git a/src/bio.cpp b/src/bio.cpp index 844464e77..97fa7cf18 100644 --- a/src/bio.cpp +++ b/src/bio.cpp @@ -85,7 +85,7 @@ struct bio_job { void *bioProcessBackgroundJobs(void *arg); void lazyfreeFreeObjectFromBioThread(robj *o); -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset *set); +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, expireset *set); void lazyfreeFreeSlotsMapFromBioThread(rax *rt); /* Make sure we have enough stack to perform all the things we do in the @@ -196,7 +196,7 @@ void *bioProcessBackgroundJobs(void *arg) { if (job->arg1) lazyfreeFreeObjectFromBioThread((robj*)job->arg1); else if (job->arg2 && job->arg3) - lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(semiorderedset*)job->arg3); + lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(expireset*)job->arg3); else if (job->arg3) lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3); } else { diff --git a/src/cluster.cpp b/src/cluster.cpp index 79cb0972d..619ce3b3a 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -4949,7 +4949,7 @@ void restoreCommand(client *c) { dbAdd(c->db,c->argv[1],obj); if (ttl) { if (!absttl) ttl+=mstime(); - setExpire(c,c->db,c->argv[1],ttl); + setExpire(c,c->db,c->argv[1],nullptr,ttl); } objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); signalModifiedKey(c->db,c->argv[1]); @@ -5194,7 +5194,10 @@ try_again: /* Create RESTORE payload and generate the protocol to call the command. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; - long long expireat = getExpire(c->db,kv[j]); + expireEntry *pexpire = getExpire(c->db,kv[j]); + long long expireat = -1; + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expireat); if (expireat != -1) { ttl = expireat-mstime(); diff --git a/src/db.cpp b/src/db.cpp index dd75c28e9..f6607b40e 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -457,7 +457,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { } else { dictEmpty(g_pserver->db[j].pdict,callback); delete g_pserver->db[j].setexpire; - g_pserver->db[j].setexpire = new (MALLOC_LOCAL) semiorderedset(); + g_pserver->db[j].setexpire = new (MALLOC_LOCAL) expireset(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); } } @@ -976,7 +976,6 @@ void shutdownCommand(client *c) { void renameGenericCommand(client *c, int nx) { robj *o; - long long expire; int samekey = 0; /* When source and dest key is the same, no operation is performed, @@ -992,7 +991,15 @@ void renameGenericCommand(client *c, int nx) { } incrRefCount(o); - expire = getExpire(c->db,c->argv[1]); + + std::unique_ptr spexpire; + + { // scope pexpireOld since it will be invalid soon + expireEntry *pexpireOld = getExpire(c->db,c->argv[1]); + if (pexpireOld != nullptr) + spexpire = std::make_unique(std::move(*pexpireOld)); + } + if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { if (nx) { decrRefCount(o); @@ -1005,8 +1012,8 @@ void renameGenericCommand(client *c, int nx) { } dbDelete(c->db,c->argv[1]); dbAdd(c->db,c->argv[2],o); - if (expire != -1) - setExpire(c,c->db,c->argv[2],expire); + if (spexpire != nullptr) + setExpire(c,c->db,c->argv[2],std::move(*spexpire)); signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", @@ -1029,7 +1036,7 @@ void moveCommand(client *c) { robj *o; redisDb *src, *dst; int srcid; - long long dbid, expire; + long long dbid; if (g_pserver->cluster_enabled) { addReplyError(c,"MOVE is not allowed in cluster mode"); @@ -1063,7 +1070,13 @@ void moveCommand(client *c) { addReply(c,shared.czero); return; } - expire = getExpire(c->db,c->argv[1]); + + std::unique_ptr spexpire; + { // scope pexpireOld + expireEntry *pexpireOld = getExpire(c->db,c->argv[1]); + if (pexpireOld != nullptr) + spexpire = std::make_unique(std::move(*pexpireOld)); + } if (o->FExpires()) removeExpire(c->db,c->argv[1]); serverAssert(!o->FExpires()); @@ -1077,7 +1090,7 @@ void moveCommand(client *c) { return; } dbAdd(dst,c->argv[1],o); - if (expire != -1) setExpire(c,dst,c->argv[1],expire); + if (spexpire != nullptr) setExpire(c,dst,c->argv[1],std::move(*spexpire)); addReply(c,shared.cone); } @@ -1201,7 +1214,58 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de) { * of an user calling a command 'c' is the client, otherwise 'c' is set * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ -void setExpire(client *c, redisDb *db, robj *key, long long when) { +void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) { + dictEntry *kde; + + serverAssert(GlobalLocksAcquired()); + + /* Reuse the sds from the main dict in the expire dict */ + kde = dictFind(db->pdict,ptrFromObj(key)); + serverAssertWithInfo(NULL,key,kde != NULL); + + if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + { + // shared objects cannot have the expire bit set, create a real object + dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); + } + + /* Update TTL stats (exponential moving average) */ + /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ + long long now = g_pserver->mstime; + db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed + if (db->setexpire->empty()) + db->avg_ttl = 0; + else + db->avg_ttl -= db->avg_ttl / db->setexpire->size(); // slide one entry out the window + if (db->avg_ttl < 0) + db->avg_ttl = 0; // TTLs are never negative + db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry + db->last_expire_set = now; + + /* Update the expire set */ + const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr; + if (((robj*)dictGetVal(kde))->FExpires()) { + auto itr = db->setexpire->find((sds)dictGetKey(kde)); + serverAssert(itr != db->setexpire->end()); + expireEntry eNew(std::move(*itr)); + eNew.update(szSubKey, when); + db->setexpire->erase(itr); + db->setexpire->insert(eNew); + } + else + { + expireEntry e((sds)dictGetKey(kde), szSubKey, when); + ((robj*)dictGetVal(kde))->SetFExpires(true); + db->setexpire->insert(e); + } + + int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; + if (c && writable_slave && !(c->flags & CLIENT_MASTER)) + rememberSlaveKeyWithExpire(db,key); +} + +void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) +{ dictEntry *kde; serverAssert(GlobalLocksAcquired()); @@ -1217,49 +1281,36 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) { } if (((robj*)dictGetVal(kde))->FExpires()) - removeExpire(db, key); // should we optimize for when this is called with an already set expiry? + removeExpire(db, key); - expireEntry e((sds)dictGetKey(kde), when); + e.setKeyUnsafe((sds)dictGetKey(kde)); + db->setexpire->insert(e); ((robj*)dictGetVal(kde))->SetFExpires(true); - /* Update TTL stats (exponential moving average) */ - /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ - long long now = g_pserver->mstime; - db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed - if (db->setexpire->empty()) - db->avg_ttl = 0; - else - db->avg_ttl -= db->avg_ttl / db->setexpire->size(); // slide one entry out the window - if (db->avg_ttl < 0) - db->avg_ttl = 0; // TTLs are never negative - db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry - db->last_expire_set = now; - - db->setexpire->insert(e); int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) rememberSlaveKeyWithExpire(db,key); } -/* Return the expire time of the specified key, or -1 if no expire +/* Return the expire time of the specified key, or null if no expire * is associated with this key (i.e. the key is non volatile) */ -long long getExpire(redisDb *db, robj_roptr key) { +expireEntry *getExpire(redisDb *db, robj_roptr key) { dictEntry *de; /* No expire? return ASAP */ if (db->setexpire->size() == 0) - return -1; + return nullptr; de = dictFind(db->pdict, ptrFromObj(key)); if (de == NULL) - return -1; + return nullptr; robj *obj = (robj*)dictGetVal(de); if (!obj->FExpires()) - return -1; + return nullptr; auto itr = db->setexpire->find((sds)dictGetKey(de)); - return itr->when(); + return itr.operator->(); } /* Propagate expires into slaves and the AOF file. @@ -1287,15 +1338,28 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { decrRefCount(argv[1]); } -/* Check if the key is expired. */ +/* Check if the key is expired. Note, this does not check subexpires */ int keyIsExpired(redisDb *db, robj *key) { - mstime_t when = getExpire(db,key); + expireEntry *pexpire = getExpire(db,key); - if (when < 0) return 0; /* No expire for this key */ + if (pexpire == nullptr) return 0; /* No expire for this key */ /* Don't expire anything while loading. It will be done later. */ if (g_pserver->loading) return 0; + long long when = -1; + for (auto &exp : *pexpire) + { + if (exp.subkey() == nullptr) + { + when = exp.when(); + break; + } + } + + if (when == -1) + return 0; + /* If we are in the context of a Lua script, we pretend that time is * blocked to when the Lua script started. This way a key can expire * only the first time it is accessed and not in the middle of the diff --git a/src/debug.cpp b/src/debug.cpp index 4d2f4bbca..41c73b55c 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -124,9 +124,13 @@ void mixStringObjectDigest(unsigned char *digest, robj_roptr o) { void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj_roptr o) { uint32_t aux = htonl(o->type); mixDigest(digest,&aux,sizeof(aux)); - long long expiretime = getExpire(db,keyobj); + expireEntry *pexpire = getExpire(db,keyobj); + long long expiretime = -1; char buf[128]; + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expiretime); + /* Save the key and associated value */ if (o->type == OBJ_STRING) { mixStringObjectDigest(digest,o); diff --git a/src/defrag.cpp b/src/defrag.cpp index b11564c4b..c49cd2665 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -48,7 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); -void replaceSateliteOSetKeyPtr(semiorderedset &set, sds oldkey, sds newkey); +void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); /* Defrag helper for generic allocations. * @@ -407,11 +407,12 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } -void replaceSateliteOSetKeyPtr(semiorderedset &set, sds oldkey, sds newkey) { +void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { auto itr = set.find(oldkey); if (itr != set.end()) { - expireEntry eNew(newkey, itr->when()); + expireEntry eNew(std::move(*itr)); + eNew.setKeyUnsafe(newkey); set.erase(itr); set.insert(eNew); } diff --git a/src/evict.cpp b/src/evict.cpp index 4acdb5ad0..8cf24dd5e 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -252,7 +252,7 @@ struct visitFunctor return count < g_pserver->maxmemory_samples; } }; -void evictionPoolPopulate(int dbid, dict *dbdict, semiorderedset *setexpire, struct evictionPoolEntry *pool) +void evictionPoolPopulate(int dbid, dict *dbdict, expireset *setexpire, struct evictionPoolEntry *pool) { if (setexpire != nullptr) { diff --git a/src/expire.cpp b/src/expire.cpp index 38a65cf44..5d257428d 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -32,6 +32,21 @@ #include "server.h" +void activeExpireCycleExpireFullKey(redisDb *db, const char *key) { + robj *keyobj = createStringObject(key,sdslen(key)); + + propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); + if (g_pserver->lazyfree_lazy_expire) + dbAsyncDelete(db,keyobj); + else + dbSyncDelete(db,keyobj); + notifyKeyspaceEvent(NOTIFY_EXPIRED, + "expired",keyobj,db->id); + if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); + decrRefCount(keyobj); + g_pserver->stat_expiredkeys++; +} + /*----------------------------------------------------------------------------- * Incremental collection of expired keys. * @@ -51,19 +66,102 @@ * * The parameter 'now' is the current time in milliseconds as is passed * to the function to avoid too many gettimeofday() syscalls. */ -void activeExpireCycleExpire(redisDb *db, const char *key) { - robj *keyobj = createStringObject(key,sdslen(key)); +void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { + if (!e.FFat()) + { + activeExpireCycleExpireFullKey(db, e.key()); + return; + } - propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); - if (g_pserver->lazyfree_lazy_expire) - dbAsyncDelete(db,keyobj); - else - dbSyncDelete(db,keyobj); - notifyKeyspaceEvent(NOTIFY_EXPIRED, - "expired",keyobj,db->id); - if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); - decrRefCount(keyobj); - g_pserver->stat_expiredkeys++; + expireEntryFat *pfat = e.pfatentry(); + dictEntry *de = dictFind(db->pdict, e.key()); + robj *val = (robj*)dictGetVal(de); + int deleted = 0; + while (!pfat->FEmpty()) + { + if (pfat->nextExpireEntry().when > now) + break; + + // Is it the full key expiration? + if (pfat->nextExpireEntry().spsubkey == nullptr) + { + activeExpireCycleExpireFullKey(db, e.key()); + return; + } + + switch (val->type) + { + case OBJ_SET: + if (setTypeRemove(val,pfat->nextExpireEntry().spsubkey.get())) { + deleted++; + if (setTypeSize(val) == 0) { + activeExpireCycleExpireFullKey(db, e.key()); + return; + } + } + break; + case OBJ_LIST: + case OBJ_ZSET: + case OBJ_HASH: + default: + serverAssert(false); + } + pfat->popfrontExpireEntry(); + } + + if (deleted) + { + robj objT; + switch (val->type) + { + case OBJ_SET: + initStaticStringObject(objT, (char*)e.key()); + signalModifiedKey(db,&objT); + notifyKeyspaceEvent(NOTIFY_SET,"srem",&objT,db->id); + break; + } + } + + if (pfat->FEmpty()) + { + robj *keyobj = createStringObject(e.key(),sdslen(e.key())); + removeExpire(db, keyobj); + decrRefCount(keyobj); + } +} + +void expireMemberCommand(client *c) +{ + long long when; + if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK) + return; + + when *= 1000; + when += mstime(); + + /* No key, return zero. */ + dictEntry *de = dictFind(c->db->pdict, szFromObj(c->argv[1])); + if (de == NULL) { + addReply(c,shared.czero); + return; + } + + robj *val = (robj*)dictGetVal(de); + + switch (val->type) + { + case OBJ_SET: + // these types are safe + break; + + default: + addReplyError(c, "object type is unsupported"); + return; + } + + setExpire(c, c->db, c->argv[1], c->argv[2], when); + + addReply(c, shared.ok); } /* Try to expire a few timed out keys. The algorithm used is adaptive and @@ -162,10 +260,10 @@ void activeExpireCycle(int type) { size_t expired = 0; size_t tried = 0; - db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](const expireEntry &e) __attribute__((always_inline)) { + db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) { if (e.when() < now) { - activeExpireCycleExpire(db, e.key()); + activeExpireCycleExpire(db, e, now); ++expired; } ++tried; @@ -270,7 +368,7 @@ void expireSlaveKeys(void) { if (itr != db->setexpire->end()) { if (itr->when() < start) { - activeExpireCycleExpire(g_pserver->db+dbid,itr->key()); + activeExpireCycleExpire(g_pserver->db+dbid,*itr,start); expired = 1; } } @@ -406,7 +504,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { addReply(c, shared.cone); return; } else { - setExpire(c,c->db,key,when); + setExpire(c,c->db,key,nullptr,when); addReply(c,shared.cone); signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); @@ -437,7 +535,7 @@ void pexpireatCommand(client *c) { /* Implements TTL and PTTL */ void ttlGenericCommand(client *c, int output_ms) { - long long expire, ttl = -1; + long long expire = -1, ttl = -1; /* If the key does not exist at all, return -2 */ if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == nullptr) { @@ -446,7 +544,10 @@ void ttlGenericCommand(client *c, int output_ms) { } /* The key exists. Return -1 if it has no expire, or the actual * TTL value otherwise. */ - expire = getExpire(c->db,c->argv[1]); + expireEntry *pexpire = getExpire(c->db,c->argv[1]); + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expire); + if (expire != -1) { ttl = expire-mstime(); if (ttl < 0) ttl = 0; diff --git a/src/help.h b/src/help.h index 184d76724..01b856b9d 100644 --- a/src/help.h +++ b/src/help.h @@ -343,6 +343,9 @@ struct commandHelp { "Set the expiration for a key as a UNIX timestamp", 0, "1.2.0" }, + { "EXPIREMEMBER", + "key subkey seconds", + "set a subkey's time to live in seconds"}, { "FLUSHALL", "[ASYNC]", "Remove all keys from all databases", diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 0dbfd57d1..91577cb85 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -110,7 +110,7 @@ void freeObjAsync(robj *o) { void emptyDbAsync(redisDb *db) { dict *oldht1 = db->pdict; auto *set = db->setexpire; - db->setexpire = new (MALLOC_LOCAL) semiorderedset(); + db->setexpire = new (MALLOC_LOCAL) expireset(); db->expireitr = db->setexpire->end(); db->pdict = dictCreate(&dbDictType,NULL); atomicIncr(lazyfree_objects,dictSize(oldht1)); @@ -141,7 +141,7 @@ void lazyfreeFreeObjectFromBioThread(robj *o) { * when the database was logically deleted. 'sl' is a skiplist used by * Redis Cluster in order to take the hash slots -> keys mapping. This * may be NULL if Redis Cluster is disabled. */ -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset *set) { +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, expireset *set) { size_t numkeys = dictSize(ht1); dictRelease(ht1); delete set; diff --git a/src/module.cpp b/src/module.cpp index 7863ca4cf..052c8744a 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -1644,7 +1644,11 @@ int RM_UnlinkKey(RedisModuleKey *key) { * If no TTL is associated with the key or if the key is empty, * REDISMODULE_NO_EXPIRE is returned. */ mstime_t RM_GetExpire(RedisModuleKey *key) { - mstime_t expire = getExpire(key->db,key->key); + expireEntry *pexpire = getExpire(key->db,key->key); + mstime_t expire = -1; + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expire); + if (expire == -1 || key->value == NULL) return -1; expire -= mstime(); return expire >= 0 ? expire : 0; @@ -1664,7 +1668,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_ERR; if (expire != REDISMODULE_NO_EXPIRE) { expire += mstime(); - setExpire(key->ctx->client,key->db,key->key,expire); + setExpire(key->ctx->client,key->db,key->key,nullptr,expire); } else { removeExpire(key->db,key->key); } diff --git a/src/object.cpp b/src/object.cpp index 900a9058c..ce6265ad1 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1490,4 +1490,4 @@ void redisObject::setrefcount(unsigned ref) { serverAssert(!FExpires()); refcount.store(ref, std::memory_order_relaxed); -} \ No newline at end of file +} diff --git a/src/rdb.cpp b/src/rdb.cpp index 5443ca064..97ade6d1f 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1031,12 +1031,13 @@ size_t rdbSavedObjectLen(robj *o) { * On error -1 is returned. * On success if the key was actually saved 1 is returned, otherwise 0 * is returned (the key was already expired). */ -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { +int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) { int savelru = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU; int savelfu = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU; /* Save the expire time */ - if (expiretime != -1) { + long long expiretime = -1; + if (pexpire != nullptr && pexpire->FGetPrimaryExpire(&expiretime)) { if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; } @@ -1061,14 +1062,29 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { if (rdbWriteRaw(rdb,buf,1) == -1) return -1; } - char szMvcc[32]; - snprintf(szMvcc, 32, "%" PRIu64, val->mvcc_tstamp); - if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szMvcc) == -1) return -1; + char szT[32]; + snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); + if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1; if (rdbSaveObject(rdb,val,key) == -1) return -1; + + /* Save expire entry after as it will apply to the previously loaded key */ + /* This is because we update the expire datastructure directly without buffering */ + if (pexpire != nullptr) + { + for (auto itr : *pexpire) + { + if (itr.subkey() == nullptr) + continue; // already saved + snprintf(szT, 32, "%lld", itr.when()); + rdbSaveAuxFieldStrStr(rdb,"keydb-subexpire-key",itr.subkey()); + rdbSaveAuxFieldStrStr(rdb,"keydb-subexpire-when",szT); + } + } + return 1; } @@ -1099,12 +1115,11 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o) { robj key; - long long expire; initStaticStringObject(key,(char*)keystr); - expire = getExpire(db, &key); + expireEntry *pexpire = getExpire(db, &key); - if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) + if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1) return 0; /* When this RDB is produced as part of an AOF rewrite, move @@ -1907,6 +1922,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_clock = 0; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; + robj *subexpireKey = nullptr; + robj *key = nullptr; rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; @@ -1928,7 +1945,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { lru_clock = LRU_CLOCK(); while(1) { - robj *key, *val; + robj *val; /* Read type. */ if ((type = rdbLoadType(rdb)) == -1) goto eoferr; @@ -2036,6 +2053,18 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } else if (!strcasecmp(szFromObj(auxkey),"mvcc-tstamp")) { static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits"); mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); + } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-key")) { + subexpireKey = auxval; + incrRefCount(subexpireKey); + } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) { + if (key == nullptr || subexpireKey == nullptr) { + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping."); + } + else { + setExpire(NULL, db, key, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } } else { /* We ignore fields we don't understand, as by AUX field * contract. */ @@ -2077,6 +2106,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } /* Read key */ + if (key != nullptr) + decrRefCount(key); + if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; /* Read value */ if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr; @@ -2090,24 +2122,19 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(val); } else { /* Add the new object in the hash table */ - int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); + int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); // Note: dbMerge will incrRef if (fInserted) { /* Set the expire time if needed */ if (expiretime != -1) - setExpire(NULL,db,key,expiretime); + setExpire(NULL,db,key,nullptr,expiretime); /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); - - /* Decrement the key refcount since dbMerge() will take its - * own reference. */ - decrRefCount(key); } else { - decrRefCount(key); decrRefCount(val); } } @@ -2118,6 +2145,16 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { lfu_freq = -1; lru_idle = -1; } + + if (key != nullptr) + decrRefCount(key); + + if (subexpireKey != nullptr) + { + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB."); + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } /* Verify the checksum if RDB version is >= 5 */ if (rdbver >= 5) { diff --git a/src/semiorderedset.h b/src/semiorderedset.h index 7713d5533..450910c49 100644 --- a/src/semiorderedset.h +++ b/src/semiorderedset.h @@ -15,11 +15,11 @@ extern uint64_t dictGenHashFunction(const void *key, int len); -template +template class semiorderedset { friend struct setiter; - std::vector> m_data; + std::vector> m_data; size_t celem = 0; static const size_t bits_min = 8; size_t bits = bits_min; @@ -109,7 +109,7 @@ public: if (!fRehash) ++celem; - typename compactvector::iterator itrInsert; + typename compactvector::iterator itrInsert; if (!m_data[idx].empty() && !(e < m_data[idx].back())) itrInsert = m_data[idx].end(); else @@ -292,7 +292,7 @@ private: int steps = 0; for (; idxRehash < (m_data.size()/2); ++idxRehash) { - compactvector vecT; + compactvector vecT; std::swap(m_data[idxRehash], vecT); for (auto &v : vecT) diff --git a/src/server.cpp b/src/server.cpp index e6e86f6ea..2d9627c0f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -618,6 +618,10 @@ struct redisCommand redisCommandTable[] = { "write fast @keyspace", 0,NULL,1,1,1,0,0,0}, + {"expiremember", expireMemberCommand, 4, + "write fast @keyspace", + 0,NULL,1,1,1,0,0,0}, + {"pexpire",pexpireCommand,3, "write fast @keyspace", 0,NULL,1,1,1,0,0,0}, @@ -2919,7 +2923,7 @@ void initServer(void) { /* Create the Redis databases, and initialize other internal state. */ for (int j = 0; j < cserver.dbnum; j++) { g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); - g_pserver->db[j].setexpire = new(MALLOC_LOCAL) semiorderedset; + g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); diff --git a/src/server.h b/src/server.h index 8bc30a0f5..f350410d0 100644 --- a/src/server.h +++ b/src/server.h @@ -53,6 +53,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -767,39 +768,215 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o) return (char*)ptrFromObj(o); } -class expireEntry { - sds m_key; - long long m_when; +class expireEntryFat +{ + friend class expireEntry; +public: + struct subexpireEntry + { + long long when; + std::unique_ptr spsubkey; + + subexpireEntry(long long when, const char *subkey) + : when(when), spsubkey(subkey, sdsfree) + {} + + bool operator<(long long when) const noexcept { return this->when < when; } + bool operator<(const subexpireEntry &se) { return this->when < se.when; } + }; + +private: + sds m_keyPrimary; + std::vector m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key public: - expireEntry(sds key, long long when) + expireEntryFat(sds keyPrimary) + : m_keyPrimary(keyPrimary) + {} + long long when() const noexcept { return m_vecexpireEntries.front().when; } + const char *key() const noexcept { return m_keyPrimary; } + + bool operator<(long long when) const noexcept { return this->when() < when; } + + void expireSubKey(const char *szSubkey, long long when) { - m_key = key; - m_when = when; + auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when); + m_vecexpireEntries.emplace(itrInsert, when, sdsdup(szSubkey)); } - bool operator!=(const expireEntry &e) const noexcept - { - return m_when != e.m_when || m_key != e.m_key; - } - bool operator==(const expireEntry &e) const noexcept - { - return m_when == e.m_when && m_key == e.m_key; - } - bool operator==(const char *key) const noexcept { return m_key == key; } - - bool operator<(const expireEntry &e) const noexcept { return m_when < e.m_when; } - bool operator<(const char *key) const noexcept { return m_key < key; } - bool operator<(long long when) const noexcept { return m_when < when; } - - const char *key() const noexcept { return m_key; } - long long when() const noexcept { return m_when; } - - - explicit operator const char*() const noexcept { return m_key; } - explicit operator long long() const noexcept { return m_when; } + bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } + const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } + void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); } + const subexpireEntry &operator[](size_t idx) { return m_vecexpireEntries[idx]; } + size_t size() const noexcept { return m_vecexpireEntries.size(); } }; +class expireEntry { + union + { + sds m_key; + expireEntryFat *m_pfatentry; + } u; + long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer + +public: + class iter + { + expireEntry *m_pentry = nullptr; + size_t m_idx = 0; + + public: + iter(expireEntry *pentry, size_t idx) + : m_pentry(pentry), m_idx(idx) + {} + + iter &operator++() { ++m_idx; return *this; } + + const char *subkey() const + { + if (m_pentry->FFat()) + return (*m_pentry->pfatentry())[m_idx].spsubkey.get(); + return nullptr; + } + long long when() const + { + if (m_pentry->FFat()) + return (*m_pentry->pfatentry())[m_idx].when; + return m_pentry->when(); + } + + bool operator!=(const iter &other) + { + return m_idx != other.m_idx; + } + + const iter &operator*() const { return *this; } + }; + + expireEntry(sds key, const char *subkey, long long when) + { + if (subkey != nullptr) + { + m_when = LLONG_MIN; + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(key); + u.m_pfatentry->expireSubKey(subkey, when); + } + else + { + u.m_key = key; + m_when = when; + } + } + + expireEntry(expireEntryFat *pfatentry) + { + u.m_pfatentry = pfatentry; + m_when = LLONG_MIN; + } + + expireEntry(expireEntry &&e) + { + u.m_key = e.u.m_key; + m_when = e.m_when; + e.u.m_key = (char*)key(); // we do this so it can still be found in the set + e.m_when = 0; + } + + ~expireEntry() + { + if (FFat()) + delete u.m_pfatentry; + } + + void setKeyUnsafe(sds key) + { + if (FFat()) + u.m_pfatentry->m_keyPrimary = key; + else + u.m_key = key; + } + + inline bool FFat() const noexcept { return m_when == LLONG_MIN; } + expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; } + + + bool operator==(const char *key) const noexcept + { + return this->key() == key; + } + + bool operator<(const expireEntry &e) const noexcept + { + return when() < e.when(); + } + bool operator<(long long when) const noexcept + { + return this->when() < when; + } + + const char *key() const noexcept + { + if (FFat()) + return u.m_pfatentry->key(); + return u.m_key; + } + long long when() const noexcept + { + if (FFat()) + return u.m_pfatentry->when(); + return m_when; + } + + void update(const char *subkey, long long when) + { + if (!FFat()) + { + if (subkey == nullptr) + { + m_when = when; + return; + } + else + { + // we have to upgrade to a fat entry + long long whenT = m_when; + sds keyPrimary = u.m_key; + m_when = LLONG_MIN; + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary); + u.m_pfatentry->expireSubKey(nullptr, whenT); + // at this point we're fat so fall through + } + } + u.m_pfatentry->expireSubKey(subkey, when); + } + + iter begin() { return iter(this, 0); } + iter end() + { + if (FFat()) + return iter(this, u.m_pfatentry->size()); + return iter(this, 1); + } + + bool FGetPrimaryExpire(long long *pwhen) + { + *pwhen = -1; + for (auto itr : *this) + { + if (itr.subkey() == nullptr) + { + *pwhen = itr.when(); + return true; + } + } + return false; + } + + explicit operator const char*() const noexcept { return key(); } + explicit operator long long() const noexcept { return when(); } +}; +typedef semiorderedset expireset; + /* The a string name for an object's type as listed above * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, * and Module types have their registered name returned. */ @@ -837,8 +1014,8 @@ typedef struct clientReplyBlock { * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { dict *pdict; /* The keyspace for this DB */ - semiorderedset *setexpire; - semiorderedset::setiter expireitr; + expireset *setexpire; + expireset::setiter expireitr; dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ @@ -2224,8 +2401,9 @@ int removeExpire(redisDb *db, robj *key); int removeExpireCore(redisDb *db, robj *key, dictEntry *de); void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); -long long getExpire(redisDb *db, robj_roptr key); -void setExpire(client *c, redisDb *db, robj *key, long long when); +expireEntry *getExpire(redisDb *db, robj_roptr key); +void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when); +void setExpire(client *c, redisDb *db, robj *key, expireEntry &&entry); robj_roptr lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply); @@ -2420,6 +2598,7 @@ void mgetCommand(client *c); void monitorCommand(client *c); void expireCommand(client *c); void expireatCommand(client *c); +void expireMemberCommand(client *c); void pexpireCommand(client *c); void pexpireatCommand(client *c); void getsetCommand(client *c); diff --git a/src/t_string.cpp b/src/t_string.cpp index a254f4f53..8b79097c0 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -85,7 +85,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, } setKey(c->db,key,val); g_pserver->dirty++; - if (expire) setExpire(c,c->db,key,mstime()+milliseconds); + if (expire) setExpire(c,c->db,key,nullptr,mstime()+milliseconds); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire",key,c->db->id);