From 2a2225d150a7af32d91d898d6f7b397b7af98501 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 13 Jul 2019 20:11:49 -0400 Subject: [PATCH] Initial prototype of EXPIREMEMBER command Former-commit-id: 0b3d74ea67d616a6869cbd66198c8dd7ffa72eb7 --- src/bio.cpp | 4 +- src/cluster.cpp | 2 +- src/compactvector.h | 21 ++++-- src/db.cpp | 28 +++++--- src/defrag.cpp | 7 +- src/evict.cpp | 2 +- src/expire.cpp | 127 ++++++++++++++++++++++++++++----- src/help.h | 3 + src/lazyfree.cpp | 4 +- src/module.cpp | 2 +- src/rdb.cpp | 2 +- src/semiorderedset.h | 8 +-- src/server.cpp | 6 +- src/server.h | 164 ++++++++++++++++++++++++++++++++++++------- src/t_string.cpp | 2 +- 15 files changed, 307 insertions(+), 75 deletions(-) diff --git a/src/bio.cpp b/src/bio.cpp index 844464e77..97fa7cf18 100644 --- a/src/bio.cpp +++ b/src/bio.cpp @@ -85,7 +85,7 @@ struct bio_job { void *bioProcessBackgroundJobs(void *arg); void lazyfreeFreeObjectFromBioThread(robj *o); -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset *set); +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, expireset *set); void lazyfreeFreeSlotsMapFromBioThread(rax *rt); /* Make sure we have enough stack to perform all the things we do in the @@ -196,7 +196,7 @@ void *bioProcessBackgroundJobs(void *arg) { if (job->arg1) lazyfreeFreeObjectFromBioThread((robj*)job->arg1); else if (job->arg2 && job->arg3) - lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(semiorderedset*)job->arg3); + lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(expireset*)job->arg3); else if (job->arg3) lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3); } else { diff --git a/src/cluster.cpp b/src/cluster.cpp index 79cb0972d..8978f184b 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -4949,7 +4949,7 @@ void restoreCommand(client *c) { dbAdd(c->db,c->argv[1],obj); if (ttl) { if (!absttl) ttl+=mstime(); - setExpire(c,c->db,c->argv[1],ttl); + setExpire(c,c->db,c->argv[1],nullptr,ttl); } objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); signalModifiedKey(c->db,c->argv[1]); diff --git a/src/compactvector.h b/src/compactvector.h index 8f9e8e74e..ee10a135b 100644 --- a/src/compactvector.h +++ b/src/compactvector.h @@ -12,6 +12,7 @@ * *************************************************/ +<<<<<<< HEAD <<<<<<< HEAD template class compactvector @@ -23,6 +24,12 @@ class compactvector { static_assert(std::is_trivially_copyable::value, "compactvector requires trivially copyable types"); >>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time +======= +template +class compactvector +{ + static_assert(MEMMOVE_SAFE || std::is_trivially_copyable::value, "compactvector requires trivially copyable types"); +>>>>>>> Initial prototype of EXPIREMEMBER command T *m_data = nullptr; unsigned m_celem = 0; unsigned m_max = 0; @@ -33,10 +40,14 @@ public: compactvector() noexcept = default; ~compactvector() noexcept { +<<<<<<< HEAD <<<<<<< HEAD clear(); // call dtors ======= >>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time +======= + clear(); // call dtors +>>>>>>> Initial prototype of EXPIREMEMBER command zfree(m_data); } @@ -88,11 +99,15 @@ public: assert(idx < m_max); where = m_data + idx; memmove(m_data + idx + 1, m_data + idx, (m_celem - idx)*sizeof(T)); +<<<<<<< HEAD <<<<<<< HEAD new(m_data + idx) T(std::move(val)); ======= m_data[idx] = val; >>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time +======= + new(m_data + idx) T(std::move(val)); +>>>>>>> Initial prototype of EXPIREMEMBER command ++m_celem; return where; } @@ -116,10 +131,7 @@ public: assert(where >= m_data); size_t idx = where - m_data; assert(idx < m_celem); -<<<<<<< HEAD where->~T(); -======= ->>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time memmove(where, where+1, ((m_celem - idx - 1)*sizeof(T))); --m_celem; @@ -146,11 +158,8 @@ public: void clear() { -<<<<<<< HEAD for (size_t idx = 0; idx < m_celem; ++idx) m_data[idx].~T(); -======= ->>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time zfree(m_data); m_data = nullptr; m_celem = 0; diff --git a/src/db.cpp b/src/db.cpp index dd75c28e9..9a47b415d 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -457,7 +457,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { } else { dictEmpty(g_pserver->db[j].pdict,callback); delete g_pserver->db[j].setexpire; - g_pserver->db[j].setexpire = new (MALLOC_LOCAL) semiorderedset(); + g_pserver->db[j].setexpire = new (MALLOC_LOCAL) expireset(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); } } @@ -1006,7 +1006,7 @@ void renameGenericCommand(client *c, int nx) { dbDelete(c->db,c->argv[1]); dbAdd(c->db,c->argv[2],o); if (expire != -1) - setExpire(c,c->db,c->argv[2],expire); + setExpire(c,c->db,c->argv[2],nullptr,expire); signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", @@ -1077,7 +1077,7 @@ void moveCommand(client *c) { return; } dbAdd(dst,c->argv[1],o); - if (expire != -1) setExpire(c,dst,c->argv[1],expire); + if (expire != -1) setExpire(c,dst,c->argv[1],nullptr,expire); addReply(c,shared.cone); } @@ -1201,7 +1201,7 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de) { * of an user calling a command 'c' is the client, otherwise 'c' is set * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ -void setExpire(client *c, redisDb *db, robj *key, long long when) { +void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) { dictEntry *kde; serverAssert(GlobalLocksAcquired()); @@ -1216,12 +1216,6 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) { dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); } - if (((robj*)dictGetVal(kde))->FExpires()) - removeExpire(db, key); // should we optimize for when this is called with an already set expiry? - - expireEntry e((sds)dictGetKey(kde), when); - ((robj*)dictGetVal(kde))->SetFExpires(true); - /* Update TTL stats (exponential moving average) */ /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ long long now = g_pserver->mstime; @@ -1235,7 +1229,19 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) { db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry db->last_expire_set = now; - db->setexpire->insert(e); + /* Update the expire set */ + const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr; + if (((robj*)dictGetVal(kde))->FExpires()) { + auto itr = db->setexpire->find((sds)dictGetKey(kde)); + serverAssert(itr != db->setexpire->end()); + itr->update(szSubKey, when); + } + else + { + expireEntry e((sds)dictGetKey(kde), szSubKey, when); + ((robj*)dictGetVal(kde))->SetFExpires(true); + db->setexpire->insert(e); + } int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) diff --git a/src/defrag.cpp b/src/defrag.cpp index b11564c4b..a6acb8e72 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -48,7 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); -void replaceSateliteOSetKeyPtr(semiorderedset &set, sds oldkey, sds newkey); +void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); /* Defrag helper for generic allocations. * @@ -407,11 +407,12 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } -void replaceSateliteOSetKeyPtr(semiorderedset &set, sds oldkey, sds newkey) { +void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { auto itr = set.find(oldkey); + serverAssert(false); if (itr != set.end()) { - expireEntry eNew(newkey, itr->when()); + expireEntry eNew(newkey, nullptr, itr->when()); set.erase(itr); set.insert(eNew); } diff --git a/src/evict.cpp b/src/evict.cpp index 4acdb5ad0..8cf24dd5e 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -252,7 +252,7 @@ struct visitFunctor return count < g_pserver->maxmemory_samples; } }; -void evictionPoolPopulate(int dbid, dict *dbdict, semiorderedset *setexpire, struct evictionPoolEntry *pool) +void evictionPoolPopulate(int dbid, dict *dbdict, expireset *setexpire, struct evictionPoolEntry *pool) { if (setexpire != nullptr) { diff --git a/src/expire.cpp b/src/expire.cpp index 38a65cf44..b6833db04 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -32,6 +32,21 @@ #include "server.h" +void activeExpireCycleExpireFullKey(redisDb *db, const char *key) { + robj *keyobj = createStringObject(key,sdslen(key)); + + propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); + if (g_pserver->lazyfree_lazy_expire) + dbAsyncDelete(db,keyobj); + else + dbSyncDelete(db,keyobj); + notifyKeyspaceEvent(NOTIFY_EXPIRED, + "expired",keyobj,db->id); + if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); + decrRefCount(keyobj); + g_pserver->stat_expiredkeys++; +} + /*----------------------------------------------------------------------------- * Incremental collection of expired keys. * @@ -51,19 +66,99 @@ * * The parameter 'now' is the current time in milliseconds as is passed * to the function to avoid too many gettimeofday() syscalls. */ -void activeExpireCycleExpire(redisDb *db, const char *key) { - robj *keyobj = createStringObject(key,sdslen(key)); +void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { + if (!e.FFat()) + activeExpireCycleExpireFullKey(db, e.key()); - propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); - if (g_pserver->lazyfree_lazy_expire) - dbAsyncDelete(db,keyobj); - else - dbSyncDelete(db,keyobj); - notifyKeyspaceEvent(NOTIFY_EXPIRED, - "expired",keyobj,db->id); - if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); - decrRefCount(keyobj); - g_pserver->stat_expiredkeys++; + expireEntryFat *pfat = e.pfatentry(); + dictEntry *de = dictFind(db->pdict, e.key()); + robj *val = (robj*)dictGetVal(de); + int deleted = 0; + while (!pfat->FEmpty()) + { + if (pfat->nextExpireEntry().when > now) + break; + + // Is it the full key expiration? + if (pfat->nextExpireEntry().spsubkey == nullptr) + { + activeExpireCycleExpireFullKey(db, e.key()); + return; + } + + switch (val->type) + { + case OBJ_SET: + if (setTypeRemove(val,pfat->nextExpireEntry().spsubkey.get())) { + deleted++; + if (setTypeSize(val) == 0) { + activeExpireCycleExpireFullKey(db, e.key()); + return; + } + } + break; + case OBJ_LIST: + case OBJ_ZSET: + case OBJ_HASH: + default: + serverAssert(false); + } + pfat->popfrontExpireEntry(); + } + + if (deleted) + { + robj objT; + switch (val->type) + { + case OBJ_SET: + initStaticStringObject(objT, (char*)e.key()); + signalModifiedKey(db,&objT); + notifyKeyspaceEvent(NOTIFY_SET,"srem",&objT,db->id); + break; + } + } + + if (pfat->FEmpty()) + { + robj *keyobj = createStringObject(e.key(),sdslen(e.key())); + removeExpire(db, keyobj); + decrRefCount(keyobj); + } +} + +void expireMemberCommand(client *c) +{ + long long when; + if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK) + return; + + when *= 1000; + when += mstime(); + + /* No key, return zero. */ + dictEntry *de = dictFind(c->db->pdict, szFromObj(c->argv[1])); + if (de == NULL) { + addReply(c,shared.czero); + return; + } + + robj *val = (robj*)dictGetVal(de); + + switch (val->type) + { + case OBJ_SET: + // these types are safe + break; + + default: + addReplyError(c, "object type is unsupported"); + return; + } + + setExpire(c, c->db, c->argv[1], c->argv[2], when); + + addReply(c, shared.ok); } /* Try to expire a few timed out keys. The algorithm used is adaptive and @@ -162,10 +257,10 @@ void activeExpireCycle(int type) { size_t expired = 0; size_t tried = 0; - db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](const expireEntry &e) __attribute__((always_inline)) { + db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) { if (e.when() < now) { - activeExpireCycleExpire(db, e.key()); + activeExpireCycleExpire(db, e, now); ++expired; } ++tried; @@ -270,7 +365,7 @@ void expireSlaveKeys(void) { if (itr != db->setexpire->end()) { if (itr->when() < start) { - activeExpireCycleExpire(g_pserver->db+dbid,itr->key()); + activeExpireCycleExpire(g_pserver->db+dbid,*itr,start); expired = 1; } } @@ -406,7 +501,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { addReply(c, shared.cone); return; } else { - setExpire(c,c->db,key,when); + setExpire(c,c->db,key,nullptr,when); addReply(c,shared.cone); signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); diff --git a/src/help.h b/src/help.h index 184d76724..01b856b9d 100644 --- a/src/help.h +++ b/src/help.h @@ -343,6 +343,9 @@ struct commandHelp { "Set the expiration for a key as a UNIX timestamp", 0, "1.2.0" }, + { "EXPIREMEMBER", + "key subkey seconds", + "set a subkey's time to live in seconds"}, { "FLUSHALL", "[ASYNC]", "Remove all keys from all databases", diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 0dbfd57d1..91577cb85 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -110,7 +110,7 @@ void freeObjAsync(robj *o) { void emptyDbAsync(redisDb *db) { dict *oldht1 = db->pdict; auto *set = db->setexpire; - db->setexpire = new (MALLOC_LOCAL) semiorderedset(); + db->setexpire = new (MALLOC_LOCAL) expireset(); db->expireitr = db->setexpire->end(); db->pdict = dictCreate(&dbDictType,NULL); atomicIncr(lazyfree_objects,dictSize(oldht1)); @@ -141,7 +141,7 @@ void lazyfreeFreeObjectFromBioThread(robj *o) { * when the database was logically deleted. 'sl' is a skiplist used by * Redis Cluster in order to take the hash slots -> keys mapping. This * may be NULL if Redis Cluster is disabled. */ -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset *set) { +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, expireset *set) { size_t numkeys = dictSize(ht1); dictRelease(ht1); delete set; diff --git a/src/module.cpp b/src/module.cpp index 7863ca4cf..3ef824a7f 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -1664,7 +1664,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_ERR; if (expire != REDISMODULE_NO_EXPIRE) { expire += mstime(); - setExpire(key->ctx->client,key->db,key->key,expire); + setExpire(key->ctx->client,key->db,key->key,nullptr,expire); } else { removeExpire(key->db,key->key); } diff --git a/src/rdb.cpp b/src/rdb.cpp index 5443ca064..c1b15e2ca 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2096,7 +2096,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { { /* Set the expire time if needed */ if (expiretime != -1) - setExpire(NULL,db,key,expiretime); + setExpire(NULL,db,key,nullptr,expiretime); /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); diff --git a/src/semiorderedset.h b/src/semiorderedset.h index 7713d5533..450910c49 100644 --- a/src/semiorderedset.h +++ b/src/semiorderedset.h @@ -15,11 +15,11 @@ extern uint64_t dictGenHashFunction(const void *key, int len); -template +template class semiorderedset { friend struct setiter; - std::vector> m_data; + std::vector> m_data; size_t celem = 0; static const size_t bits_min = 8; size_t bits = bits_min; @@ -109,7 +109,7 @@ public: if (!fRehash) ++celem; - typename compactvector::iterator itrInsert; + typename compactvector::iterator itrInsert; if (!m_data[idx].empty() && !(e < m_data[idx].back())) itrInsert = m_data[idx].end(); else @@ -292,7 +292,7 @@ private: int steps = 0; for (; idxRehash < (m_data.size()/2); ++idxRehash) { - compactvector vecT; + compactvector vecT; std::swap(m_data[idxRehash], vecT); for (auto &v : vecT) diff --git a/src/server.cpp b/src/server.cpp index e6e86f6ea..2d9627c0f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -618,6 +618,10 @@ struct redisCommand redisCommandTable[] = { "write fast @keyspace", 0,NULL,1,1,1,0,0,0}, + {"expiremember", expireMemberCommand, 4, + "write fast @keyspace", + 0,NULL,1,1,1,0,0,0}, + {"pexpire",pexpireCommand,3, "write fast @keyspace", 0,NULL,1,1,1,0,0,0}, @@ -2919,7 +2923,7 @@ void initServer(void) { /* Create the Redis databases, and initialize other internal state. */ for (int j = 0; j < cserver.dbnum; j++) { g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); - g_pserver->db[j].setexpire = new(MALLOC_LOCAL) semiorderedset; + g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); diff --git a/src/server.h b/src/server.h index 8bc30a0f5..99104cf48 100644 --- a/src/server.h +++ b/src/server.h @@ -53,6 +53,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -767,38 +768,150 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o) return (char*)ptrFromObj(o); } -class expireEntry { - sds m_key; - long long m_when; +class expireEntryFat +{ +public: + struct subexpireEntry + { + long long when; + std::unique_ptr spsubkey; + + subexpireEntry(long long when, const char *subkey) + : when(when), spsubkey(subkey, sdsfree) + {} + + bool operator<(long long when) const noexcept { return this->when < when; } + bool operator<(const subexpireEntry &se) { return this->when < se.when; } + }; + +private: + sds m_keyPrimary; + std::vector m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key public: - expireEntry(sds key, long long when) + expireEntryFat(sds keyPrimary) + : m_keyPrimary(keyPrimary) + {} + long long when() const noexcept { return m_vecexpireEntries.front().when; } + const char *key() const noexcept { return m_keyPrimary; } + + bool operator<(long long when) const noexcept { return this->when() < when; } + + void expireSubKey(const char *szSubkey, long long when) { - m_key = key; - m_when = when; + auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when); + m_vecexpireEntries.emplace(itrInsert, when, sdsdup(szSubkey)); } - bool operator!=(const expireEntry &e) const noexcept - { - return m_when != e.m_when || m_key != e.m_key; - } - bool operator==(const expireEntry &e) const noexcept - { - return m_when == e.m_when && m_key == e.m_key; - } - bool operator==(const char *key) const noexcept { return m_key == key; } + bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } + const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } + void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); } - bool operator<(const expireEntry &e) const noexcept { return m_when < e.m_when; } - bool operator<(const char *key) const noexcept { return m_key < key; } - bool operator<(long long when) const noexcept { return m_when < when; } +}; - const char *key() const noexcept { return m_key; } - long long when() const noexcept { return m_when; } +class expireEntry { + union + { + sds m_key; + expireEntryFat *m_pfatentry; + } u; + long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer + +public: + expireEntry(sds key, const char *subkey, long long when) + { + if (subkey != nullptr) + { + m_when = LLONG_MIN; + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(key); + u.m_pfatentry->expireSubKey(subkey, when); + } + else + { + u.m_key = key; + m_when = when; + } + } + + expireEntry(expireEntryFat *pfatentry) + { + u.m_pfatentry = pfatentry; + m_when = LLONG_MIN; + } + + expireEntry(expireEntry &&e) + { + u.m_key = e.u.m_key; + m_when = e.m_when; + e.u.m_key = nullptr; + e.m_when = 0; + } + + ~expireEntry() + { + if (FFat()) + delete u.m_pfatentry; + } + + inline bool FFat() const noexcept { return m_when == LLONG_MIN; } + expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; } + + + bool operator==(const char *key) const noexcept + { + return this->key() == key; + } + + bool operator<(const expireEntry &e) const noexcept + { + return when() < e.when(); + } + bool operator<(long long when) const noexcept + { + return this->when() < when; + } + + const char *key() const noexcept + { + if (FFat()) + return u.m_pfatentry->key(); + return u.m_key; + } + long long when() const noexcept + { + if (FFat()) + return u.m_pfatentry->when(); + return m_when; + } + + void update(const char *subkey, long long when) + { + if (!FFat()) + { + if (subkey == nullptr) + { + m_when = when; + return; + } + else + { + // we have to upgrade to a fat entry + long long whenT = m_when; + sds keyPrimary = u.m_key; + m_when = LLONG_MIN; + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary); + u.m_pfatentry->expireSubKey(nullptr, whenT); + // at this point we're fat so fall through + } + } + u.m_pfatentry->expireSubKey(subkey, when); + } - explicit operator const char*() const noexcept { return m_key; } - explicit operator long long() const noexcept { return m_when; } + explicit operator const char*() const noexcept { return key(); } + explicit operator long long() const noexcept { return when(); } }; +typedef semiorderedset expireset; /* The a string name for an object's type as listed above * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, @@ -837,8 +950,8 @@ typedef struct clientReplyBlock { * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { dict *pdict; /* The keyspace for this DB */ - semiorderedset *setexpire; - semiorderedset::setiter expireitr; + expireset *setexpire; + expireset::setiter expireitr; dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ @@ -2225,7 +2338,7 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de); void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); long long getExpire(redisDb *db, robj_roptr key); -void setExpire(client *c, redisDb *db, robj *key, long long when); +void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when); robj_roptr lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply); @@ -2420,6 +2533,7 @@ void mgetCommand(client *c); void monitorCommand(client *c); void expireCommand(client *c); void expireatCommand(client *c); +void expireMemberCommand(client *c); void pexpireCommand(client *c); void pexpireatCommand(client *c); void getsetCommand(client *c); diff --git a/src/t_string.cpp b/src/t_string.cpp index a254f4f53..8b79097c0 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -85,7 +85,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, } setKey(c->db,key,val); g_pserver->dirty++; - if (expire) setExpire(c,c->db,key,mstime()+milliseconds); + if (expire) setExpire(c,c->db,key,nullptr,mstime()+milliseconds); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire",key,c->db->id);