From 82a3e942bf974d35a696b13e44d6d5bb736b581c Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 5 Jul 2019 23:49:09 -0400 Subject: [PATCH 1/7] New expire datastructure and algorithm. Allows us to expire in sublinear time Former-commit-id: ea3bd614b8b88b8de0b114f917fbd0de93557c72 --- src/compactvector.h | 20 ++++++++++++++++++++ src/object.cpp | 2 +- 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/src/compactvector.h b/src/compactvector.h index 65a40f114..8f9e8e74e 100644 --- a/src/compactvector.h +++ b/src/compactvector.h @@ -12,10 +12,17 @@ * *************************************************/ +<<<<<<< HEAD template class compactvector { static_assert(MEMMOVE_SAFE || std::is_trivially_copyable::value, "compactvector requires trivially copyable types"); +======= +template +class compactvector +{ + static_assert(std::is_trivially_copyable::value, "compactvector requires trivially copyable types"); +>>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time T *m_data = nullptr; unsigned m_celem = 0; unsigned m_max = 0; @@ -26,7 +33,10 @@ public: compactvector() noexcept = default; ~compactvector() noexcept { +<<<<<<< HEAD clear(); // call dtors +======= +>>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time zfree(m_data); } @@ -78,7 +88,11 @@ public: assert(idx < m_max); where = m_data + idx; memmove(m_data + idx + 1, m_data + idx, (m_celem - idx)*sizeof(T)); +<<<<<<< HEAD new(m_data + idx) T(std::move(val)); +======= + m_data[idx] = val; +>>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time ++m_celem; return where; } @@ -102,7 +116,10 @@ public: assert(where >= m_data); size_t idx = where - m_data; assert(idx < m_celem); +<<<<<<< HEAD where->~T(); +======= +>>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time memmove(where, where+1, ((m_celem - idx - 1)*sizeof(T))); --m_celem; @@ -129,8 +146,11 @@ public: void clear() { +<<<<<<< HEAD for (size_t idx = 0; idx < m_celem; ++idx) m_data[idx].~T(); +======= +>>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time zfree(m_data); m_data = nullptr; m_celem = 0; diff --git a/src/object.cpp b/src/object.cpp index 900a9058c..ce6265ad1 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1490,4 +1490,4 @@ void redisObject::setrefcount(unsigned ref) { serverAssert(!FExpires()); refcount.store(ref, std::memory_order_relaxed); -} \ No newline at end of file +} From 94645b33ddd6bf8273632a952a716a8c80efd3c3 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 13 Jul 2019 20:11:49 -0400 Subject: [PATCH 2/7] Initial prototype of EXPIREMEMBER command Former-commit-id: 0b3d74ea67d616a6869cbd66198c8dd7ffa72eb7 --- src/bio.cpp | 4 +- src/cluster.cpp | 2 +- src/compactvector.h | 21 ++++-- src/db.cpp | 28 +++++--- src/defrag.cpp | 7 +- src/evict.cpp | 2 +- src/expire.cpp | 127 ++++++++++++++++++++++++++++----- src/help.h | 3 + src/lazyfree.cpp | 4 +- src/module.cpp | 2 +- src/rdb.cpp | 2 +- src/semiorderedset.h | 8 +-- src/server.cpp | 6 +- src/server.h | 164 ++++++++++++++++++++++++++++++++++++------- src/t_string.cpp | 2 +- 15 files changed, 307 insertions(+), 75 deletions(-) diff --git a/src/bio.cpp b/src/bio.cpp index 844464e77..97fa7cf18 100644 --- a/src/bio.cpp +++ b/src/bio.cpp @@ -85,7 +85,7 @@ struct bio_job { void *bioProcessBackgroundJobs(void *arg); void lazyfreeFreeObjectFromBioThread(robj *o); -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset *set); +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, expireset *set); void lazyfreeFreeSlotsMapFromBioThread(rax *rt); /* Make sure we have enough stack to perform all the things we do in the @@ -196,7 +196,7 @@ void *bioProcessBackgroundJobs(void *arg) { if (job->arg1) lazyfreeFreeObjectFromBioThread((robj*)job->arg1); else if (job->arg2 && job->arg3) - lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(semiorderedset*)job->arg3); + lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(expireset*)job->arg3); else if (job->arg3) lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3); } else { diff --git a/src/cluster.cpp b/src/cluster.cpp index 79cb0972d..8978f184b 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -4949,7 +4949,7 @@ void restoreCommand(client *c) { dbAdd(c->db,c->argv[1],obj); if (ttl) { if (!absttl) ttl+=mstime(); - setExpire(c,c->db,c->argv[1],ttl); + setExpire(c,c->db,c->argv[1],nullptr,ttl); } objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); signalModifiedKey(c->db,c->argv[1]); diff --git a/src/compactvector.h b/src/compactvector.h index 8f9e8e74e..ee10a135b 100644 --- a/src/compactvector.h +++ b/src/compactvector.h @@ -12,6 +12,7 @@ * *************************************************/ +<<<<<<< HEAD <<<<<<< HEAD template class compactvector @@ -23,6 +24,12 @@ class compactvector { static_assert(std::is_trivially_copyable::value, "compactvector requires trivially copyable types"); >>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time +======= +template +class compactvector +{ + static_assert(MEMMOVE_SAFE || std::is_trivially_copyable::value, "compactvector requires trivially copyable types"); +>>>>>>> Initial prototype of EXPIREMEMBER command T *m_data = nullptr; unsigned m_celem = 0; unsigned m_max = 0; @@ -33,10 +40,14 @@ public: compactvector() noexcept = default; ~compactvector() noexcept { +<<<<<<< HEAD <<<<<<< HEAD clear(); // call dtors ======= >>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time +======= + clear(); // call dtors +>>>>>>> Initial prototype of EXPIREMEMBER command zfree(m_data); } @@ -88,11 +99,15 @@ public: assert(idx < m_max); where = m_data + idx; memmove(m_data + idx + 1, m_data + idx, (m_celem - idx)*sizeof(T)); +<<<<<<< HEAD <<<<<<< HEAD new(m_data + idx) T(std::move(val)); ======= m_data[idx] = val; >>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time +======= + new(m_data + idx) T(std::move(val)); +>>>>>>> Initial prototype of EXPIREMEMBER command ++m_celem; return where; } @@ -116,10 +131,7 @@ public: assert(where >= m_data); size_t idx = where - m_data; assert(idx < m_celem); -<<<<<<< HEAD where->~T(); -======= ->>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time memmove(where, where+1, ((m_celem - idx - 1)*sizeof(T))); --m_celem; @@ -146,11 +158,8 @@ public: void clear() { -<<<<<<< HEAD for (size_t idx = 0; idx < m_celem; ++idx) m_data[idx].~T(); -======= ->>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time zfree(m_data); m_data = nullptr; m_celem = 0; diff --git a/src/db.cpp b/src/db.cpp index dd75c28e9..9a47b415d 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -457,7 +457,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { } else { dictEmpty(g_pserver->db[j].pdict,callback); delete g_pserver->db[j].setexpire; - g_pserver->db[j].setexpire = new (MALLOC_LOCAL) semiorderedset(); + g_pserver->db[j].setexpire = new (MALLOC_LOCAL) expireset(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); } } @@ -1006,7 +1006,7 @@ void renameGenericCommand(client *c, int nx) { dbDelete(c->db,c->argv[1]); dbAdd(c->db,c->argv[2],o); if (expire != -1) - setExpire(c,c->db,c->argv[2],expire); + setExpire(c,c->db,c->argv[2],nullptr,expire); signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", @@ -1077,7 +1077,7 @@ void moveCommand(client *c) { return; } dbAdd(dst,c->argv[1],o); - if (expire != -1) setExpire(c,dst,c->argv[1],expire); + if (expire != -1) setExpire(c,dst,c->argv[1],nullptr,expire); addReply(c,shared.cone); } @@ -1201,7 +1201,7 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de) { * of an user calling a command 'c' is the client, otherwise 'c' is set * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ -void setExpire(client *c, redisDb *db, robj *key, long long when) { +void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) { dictEntry *kde; serverAssert(GlobalLocksAcquired()); @@ -1216,12 +1216,6 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) { dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); } - if (((robj*)dictGetVal(kde))->FExpires()) - removeExpire(db, key); // should we optimize for when this is called with an already set expiry? - - expireEntry e((sds)dictGetKey(kde), when); - ((robj*)dictGetVal(kde))->SetFExpires(true); - /* Update TTL stats (exponential moving average) */ /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ long long now = g_pserver->mstime; @@ -1235,7 +1229,19 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) { db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry db->last_expire_set = now; - db->setexpire->insert(e); + /* Update the expire set */ + const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr; + if (((robj*)dictGetVal(kde))->FExpires()) { + auto itr = db->setexpire->find((sds)dictGetKey(kde)); + serverAssert(itr != db->setexpire->end()); + itr->update(szSubKey, when); + } + else + { + expireEntry e((sds)dictGetKey(kde), szSubKey, when); + ((robj*)dictGetVal(kde))->SetFExpires(true); + db->setexpire->insert(e); + } int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) diff --git a/src/defrag.cpp b/src/defrag.cpp index b11564c4b..a6acb8e72 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -48,7 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); -void replaceSateliteOSetKeyPtr(semiorderedset &set, sds oldkey, sds newkey); +void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); /* Defrag helper for generic allocations. * @@ -407,11 +407,12 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } -void replaceSateliteOSetKeyPtr(semiorderedset &set, sds oldkey, sds newkey) { +void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { auto itr = set.find(oldkey); + serverAssert(false); if (itr != set.end()) { - expireEntry eNew(newkey, itr->when()); + expireEntry eNew(newkey, nullptr, itr->when()); set.erase(itr); set.insert(eNew); } diff --git a/src/evict.cpp b/src/evict.cpp index 4acdb5ad0..8cf24dd5e 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -252,7 +252,7 @@ struct visitFunctor return count < g_pserver->maxmemory_samples; } }; -void evictionPoolPopulate(int dbid, dict *dbdict, semiorderedset *setexpire, struct evictionPoolEntry *pool) +void evictionPoolPopulate(int dbid, dict *dbdict, expireset *setexpire, struct evictionPoolEntry *pool) { if (setexpire != nullptr) { diff --git a/src/expire.cpp b/src/expire.cpp index 38a65cf44..b6833db04 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -32,6 +32,21 @@ #include "server.h" +void activeExpireCycleExpireFullKey(redisDb *db, const char *key) { + robj *keyobj = createStringObject(key,sdslen(key)); + + propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); + if (g_pserver->lazyfree_lazy_expire) + dbAsyncDelete(db,keyobj); + else + dbSyncDelete(db,keyobj); + notifyKeyspaceEvent(NOTIFY_EXPIRED, + "expired",keyobj,db->id); + if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); + decrRefCount(keyobj); + g_pserver->stat_expiredkeys++; +} + /*----------------------------------------------------------------------------- * Incremental collection of expired keys. * @@ -51,19 +66,99 @@ * * The parameter 'now' is the current time in milliseconds as is passed * to the function to avoid too many gettimeofday() syscalls. */ -void activeExpireCycleExpire(redisDb *db, const char *key) { - robj *keyobj = createStringObject(key,sdslen(key)); +void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { + if (!e.FFat()) + activeExpireCycleExpireFullKey(db, e.key()); - propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); - if (g_pserver->lazyfree_lazy_expire) - dbAsyncDelete(db,keyobj); - else - dbSyncDelete(db,keyobj); - notifyKeyspaceEvent(NOTIFY_EXPIRED, - "expired",keyobj,db->id); - if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); - decrRefCount(keyobj); - g_pserver->stat_expiredkeys++; + expireEntryFat *pfat = e.pfatentry(); + dictEntry *de = dictFind(db->pdict, e.key()); + robj *val = (robj*)dictGetVal(de); + int deleted = 0; + while (!pfat->FEmpty()) + { + if (pfat->nextExpireEntry().when > now) + break; + + // Is it the full key expiration? + if (pfat->nextExpireEntry().spsubkey == nullptr) + { + activeExpireCycleExpireFullKey(db, e.key()); + return; + } + + switch (val->type) + { + case OBJ_SET: + if (setTypeRemove(val,pfat->nextExpireEntry().spsubkey.get())) { + deleted++; + if (setTypeSize(val) == 0) { + activeExpireCycleExpireFullKey(db, e.key()); + return; + } + } + break; + case OBJ_LIST: + case OBJ_ZSET: + case OBJ_HASH: + default: + serverAssert(false); + } + pfat->popfrontExpireEntry(); + } + + if (deleted) + { + robj objT; + switch (val->type) + { + case OBJ_SET: + initStaticStringObject(objT, (char*)e.key()); + signalModifiedKey(db,&objT); + notifyKeyspaceEvent(NOTIFY_SET,"srem",&objT,db->id); + break; + } + } + + if (pfat->FEmpty()) + { + robj *keyobj = createStringObject(e.key(),sdslen(e.key())); + removeExpire(db, keyobj); + decrRefCount(keyobj); + } +} + +void expireMemberCommand(client *c) +{ + long long when; + if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK) + return; + + when *= 1000; + when += mstime(); + + /* No key, return zero. */ + dictEntry *de = dictFind(c->db->pdict, szFromObj(c->argv[1])); + if (de == NULL) { + addReply(c,shared.czero); + return; + } + + robj *val = (robj*)dictGetVal(de); + + switch (val->type) + { + case OBJ_SET: + // these types are safe + break; + + default: + addReplyError(c, "object type is unsupported"); + return; + } + + setExpire(c, c->db, c->argv[1], c->argv[2], when); + + addReply(c, shared.ok); } /* Try to expire a few timed out keys. The algorithm used is adaptive and @@ -162,10 +257,10 @@ void activeExpireCycle(int type) { size_t expired = 0; size_t tried = 0; - db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](const expireEntry &e) __attribute__((always_inline)) { + db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) { if (e.when() < now) { - activeExpireCycleExpire(db, e.key()); + activeExpireCycleExpire(db, e, now); ++expired; } ++tried; @@ -270,7 +365,7 @@ void expireSlaveKeys(void) { if (itr != db->setexpire->end()) { if (itr->when() < start) { - activeExpireCycleExpire(g_pserver->db+dbid,itr->key()); + activeExpireCycleExpire(g_pserver->db+dbid,*itr,start); expired = 1; } } @@ -406,7 +501,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { addReply(c, shared.cone); return; } else { - setExpire(c,c->db,key,when); + setExpire(c,c->db,key,nullptr,when); addReply(c,shared.cone); signalModifiedKey(c->db,key); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); diff --git a/src/help.h b/src/help.h index 184d76724..01b856b9d 100644 --- a/src/help.h +++ b/src/help.h @@ -343,6 +343,9 @@ struct commandHelp { "Set the expiration for a key as a UNIX timestamp", 0, "1.2.0" }, + { "EXPIREMEMBER", + "key subkey seconds", + "set a subkey's time to live in seconds"}, { "FLUSHALL", "[ASYNC]", "Remove all keys from all databases", diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 0dbfd57d1..91577cb85 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -110,7 +110,7 @@ void freeObjAsync(robj *o) { void emptyDbAsync(redisDb *db) { dict *oldht1 = db->pdict; auto *set = db->setexpire; - db->setexpire = new (MALLOC_LOCAL) semiorderedset(); + db->setexpire = new (MALLOC_LOCAL) expireset(); db->expireitr = db->setexpire->end(); db->pdict = dictCreate(&dbDictType,NULL); atomicIncr(lazyfree_objects,dictSize(oldht1)); @@ -141,7 +141,7 @@ void lazyfreeFreeObjectFromBioThread(robj *o) { * when the database was logically deleted. 'sl' is a skiplist used by * Redis Cluster in order to take the hash slots -> keys mapping. This * may be NULL if Redis Cluster is disabled. */ -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset *set) { +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, expireset *set) { size_t numkeys = dictSize(ht1); dictRelease(ht1); delete set; diff --git a/src/module.cpp b/src/module.cpp index 7863ca4cf..3ef824a7f 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -1664,7 +1664,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) { return REDISMODULE_ERR; if (expire != REDISMODULE_NO_EXPIRE) { expire += mstime(); - setExpire(key->ctx->client,key->db,key->key,expire); + setExpire(key->ctx->client,key->db,key->key,nullptr,expire); } else { removeExpire(key->db,key->key); } diff --git a/src/rdb.cpp b/src/rdb.cpp index 5443ca064..c1b15e2ca 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2096,7 +2096,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { { /* Set the expire time if needed */ if (expiretime != -1) - setExpire(NULL,db,key,expiretime); + setExpire(NULL,db,key,nullptr,expiretime); /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); diff --git a/src/semiorderedset.h b/src/semiorderedset.h index 7713d5533..450910c49 100644 --- a/src/semiorderedset.h +++ b/src/semiorderedset.h @@ -15,11 +15,11 @@ extern uint64_t dictGenHashFunction(const void *key, int len); -template +template class semiorderedset { friend struct setiter; - std::vector> m_data; + std::vector> m_data; size_t celem = 0; static const size_t bits_min = 8; size_t bits = bits_min; @@ -109,7 +109,7 @@ public: if (!fRehash) ++celem; - typename compactvector::iterator itrInsert; + typename compactvector::iterator itrInsert; if (!m_data[idx].empty() && !(e < m_data[idx].back())) itrInsert = m_data[idx].end(); else @@ -292,7 +292,7 @@ private: int steps = 0; for (; idxRehash < (m_data.size()/2); ++idxRehash) { - compactvector vecT; + compactvector vecT; std::swap(m_data[idxRehash], vecT); for (auto &v : vecT) diff --git a/src/server.cpp b/src/server.cpp index e6e86f6ea..2d9627c0f 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -618,6 +618,10 @@ struct redisCommand redisCommandTable[] = { "write fast @keyspace", 0,NULL,1,1,1,0,0,0}, + {"expiremember", expireMemberCommand, 4, + "write fast @keyspace", + 0,NULL,1,1,1,0,0,0}, + {"pexpire",pexpireCommand,3, "write fast @keyspace", 0,NULL,1,1,1,0,0,0}, @@ -2919,7 +2923,7 @@ void initServer(void) { /* Create the Redis databases, and initialize other internal state. */ for (int j = 0; j < cserver.dbnum; j++) { g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); - g_pserver->db[j].setexpire = new(MALLOC_LOCAL) semiorderedset; + g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); diff --git a/src/server.h b/src/server.h index 8bc30a0f5..99104cf48 100644 --- a/src/server.h +++ b/src/server.h @@ -53,6 +53,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -767,38 +768,150 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o) return (char*)ptrFromObj(o); } -class expireEntry { - sds m_key; - long long m_when; +class expireEntryFat +{ +public: + struct subexpireEntry + { + long long when; + std::unique_ptr spsubkey; + + subexpireEntry(long long when, const char *subkey) + : when(when), spsubkey(subkey, sdsfree) + {} + + bool operator<(long long when) const noexcept { return this->when < when; } + bool operator<(const subexpireEntry &se) { return this->when < se.when; } + }; + +private: + sds m_keyPrimary; + std::vector m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key public: - expireEntry(sds key, long long when) + expireEntryFat(sds keyPrimary) + : m_keyPrimary(keyPrimary) + {} + long long when() const noexcept { return m_vecexpireEntries.front().when; } + const char *key() const noexcept { return m_keyPrimary; } + + bool operator<(long long when) const noexcept { return this->when() < when; } + + void expireSubKey(const char *szSubkey, long long when) { - m_key = key; - m_when = when; + auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when); + m_vecexpireEntries.emplace(itrInsert, when, sdsdup(szSubkey)); } - bool operator!=(const expireEntry &e) const noexcept - { - return m_when != e.m_when || m_key != e.m_key; - } - bool operator==(const expireEntry &e) const noexcept - { - return m_when == e.m_when && m_key == e.m_key; - } - bool operator==(const char *key) const noexcept { return m_key == key; } + bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } + const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } + void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); } - bool operator<(const expireEntry &e) const noexcept { return m_when < e.m_when; } - bool operator<(const char *key) const noexcept { return m_key < key; } - bool operator<(long long when) const noexcept { return m_when < when; } +}; - const char *key() const noexcept { return m_key; } - long long when() const noexcept { return m_when; } +class expireEntry { + union + { + sds m_key; + expireEntryFat *m_pfatentry; + } u; + long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer + +public: + expireEntry(sds key, const char *subkey, long long when) + { + if (subkey != nullptr) + { + m_when = LLONG_MIN; + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(key); + u.m_pfatentry->expireSubKey(subkey, when); + } + else + { + u.m_key = key; + m_when = when; + } + } + + expireEntry(expireEntryFat *pfatentry) + { + u.m_pfatentry = pfatentry; + m_when = LLONG_MIN; + } + + expireEntry(expireEntry &&e) + { + u.m_key = e.u.m_key; + m_when = e.m_when; + e.u.m_key = nullptr; + e.m_when = 0; + } + + ~expireEntry() + { + if (FFat()) + delete u.m_pfatentry; + } + + inline bool FFat() const noexcept { return m_when == LLONG_MIN; } + expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; } + + + bool operator==(const char *key) const noexcept + { + return this->key() == key; + } + + bool operator<(const expireEntry &e) const noexcept + { + return when() < e.when(); + } + bool operator<(long long when) const noexcept + { + return this->when() < when; + } + + const char *key() const noexcept + { + if (FFat()) + return u.m_pfatentry->key(); + return u.m_key; + } + long long when() const noexcept + { + if (FFat()) + return u.m_pfatentry->when(); + return m_when; + } + + void update(const char *subkey, long long when) + { + if (!FFat()) + { + if (subkey == nullptr) + { + m_when = when; + return; + } + else + { + // we have to upgrade to a fat entry + long long whenT = m_when; + sds keyPrimary = u.m_key; + m_when = LLONG_MIN; + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary); + u.m_pfatentry->expireSubKey(nullptr, whenT); + // at this point we're fat so fall through + } + } + u.m_pfatentry->expireSubKey(subkey, when); + } - explicit operator const char*() const noexcept { return m_key; } - explicit operator long long() const noexcept { return m_when; } + explicit operator const char*() const noexcept { return key(); } + explicit operator long long() const noexcept { return when(); } }; +typedef semiorderedset expireset; /* The a string name for an object's type as listed above * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, @@ -837,8 +950,8 @@ typedef struct clientReplyBlock { * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { dict *pdict; /* The keyspace for this DB */ - semiorderedset *setexpire; - semiorderedset::setiter expireitr; + expireset *setexpire; + expireset::setiter expireitr; dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ @@ -2225,7 +2338,7 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de); void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); long long getExpire(redisDb *db, robj_roptr key); -void setExpire(client *c, redisDb *db, robj *key, long long when); +void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when); robj_roptr lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply); @@ -2420,6 +2533,7 @@ void mgetCommand(client *c); void monitorCommand(client *c); void expireCommand(client *c); void expireatCommand(client *c); +void expireMemberCommand(client *c); void pexpireCommand(client *c); void pexpireatCommand(client *c); void getsetCommand(client *c); diff --git a/src/t_string.cpp b/src/t_string.cpp index a254f4f53..8b79097c0 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -85,7 +85,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, } setKey(c->db,key,val); g_pserver->dirty++; - if (expire) setExpire(c,c->db,key,mstime()+milliseconds); + if (expire) setExpire(c,c->db,key,nullptr,mstime()+milliseconds); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, "expire",key,c->db->id); From 95371d60fe6ecc471a12caba5aadef0f77292378 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 13 Jul 2019 20:35:44 -0400 Subject: [PATCH 3/7] Fix crash with traditional expiration Former-commit-id: 0ba5b2c3d66d3a1a520f223ad2c288c22601bd5a --- src/db.cpp | 5 ++++- src/expire.cpp | 3 +++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/db.cpp b/src/db.cpp index 9a47b415d..40e44a7c1 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1234,7 +1234,10 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) if (((robj*)dictGetVal(kde))->FExpires()) { auto itr = db->setexpire->find((sds)dictGetKey(kde)); serverAssert(itr != db->setexpire->end()); - itr->update(szSubKey, when); + expireEntry eNew(std::move(*itr)); + eNew.update(szSubKey, when); + db->setexpire->erase(itr); + db->setexpire->insert(eNew); } else { diff --git a/src/expire.cpp b/src/expire.cpp index b6833db04..c10047d2c 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -68,7 +68,10 @@ void activeExpireCycleExpireFullKey(redisDb *db, const char *key) { * to the function to avoid too many gettimeofday() syscalls. */ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { if (!e.FFat()) + { activeExpireCycleExpireFullKey(db, e.key()); + return; + } expireEntryFat *pfat = e.pfatentry(); dictEntry *de = dictFind(db->pdict, e.key()); From e06c38f1d304273ba3a09ce1dc54e79f1394ba59 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 14 Jul 2019 00:23:31 -0400 Subject: [PATCH 4/7] Plumb support for sub expires to all expire related code Former-commit-id: 184abac6942a9a6aa8783741b50b23210afddcc5 --- src/aof.cpp | 25 ++++++++++---- src/cluster.cpp | 5 ++- src/db.cpp | 87 ++++++++++++++++++++++++++++++++++++++++--------- src/debug.cpp | 6 +++- src/defrag.cpp | 4 +-- src/expire.cpp | 7 ++-- src/module.cpp | 6 +++- src/rdb.cpp | 28 +++++++++++----- src/server.h | 71 ++++++++++++++++++++++++++++++++++++++-- 9 files changed, 198 insertions(+), 41 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index c7160489b..5c6385c84 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1321,13 +1321,12 @@ int rewriteAppendOnlyFileRio(rio *aof) { while((de = dictNext(di)) != NULL) { sds keystr; robj key, *o; - long long expiretime; keystr = (sds)dictGetKey(de); o = (robj*)dictGetVal(de); initStaticStringObject(key,keystr); - expiretime = getExpire(db,&key); + expireEntry *pexpire = getExpire(db,&key); /* Save the key and associated value */ if (o->type == OBJ_STRING) { @@ -1353,11 +1352,23 @@ int rewriteAppendOnlyFileRio(rio *aof) { serverPanic("Unknown object type"); } /* Save the expire time */ - if (expiretime != -1) { - char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; + if (pexpire != nullptr) { + for (auto &subExpire : *pexpire) { + if (subExpire.subkey() == nullptr) + { + char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) goto werr; + } + else + { + char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n"; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr; + } + if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) goto werr; // common + } } /* Read some diff from the parent process from time to time. */ if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { diff --git a/src/cluster.cpp b/src/cluster.cpp index 8978f184b..619ce3b3a 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -5194,7 +5194,10 @@ try_again: /* Create RESTORE payload and generate the protocol to call the command. */ for (j = 0; j < num_keys; j++) { long long ttl = 0; - long long expireat = getExpire(c->db,kv[j]); + expireEntry *pexpire = getExpire(c->db,kv[j]); + long long expireat = -1; + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expireat); if (expireat != -1) { ttl = expireat-mstime(); diff --git a/src/db.cpp b/src/db.cpp index 40e44a7c1..a3dec518a 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -976,7 +976,6 @@ void shutdownCommand(client *c) { void renameGenericCommand(client *c, int nx) { robj *o; - long long expire; int samekey = 0; /* When source and dest key is the same, no operation is performed, @@ -992,7 +991,15 @@ void renameGenericCommand(client *c, int nx) { } incrRefCount(o); - expire = getExpire(c->db,c->argv[1]); + + std::unique_ptr spexpire; + + { // scope pexpireOld since it will be invalid soon + expireEntry *pexpireOld = getExpire(c->db,c->argv[1]); + if (pexpireOld != nullptr) + spexpire = std::make_unique(std::move(*pexpireOld)); + } + if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { if (nx) { decrRefCount(o); @@ -1005,8 +1012,8 @@ void renameGenericCommand(client *c, int nx) { } dbDelete(c->db,c->argv[1]); dbAdd(c->db,c->argv[2],o); - if (expire != -1) - setExpire(c,c->db,c->argv[2],nullptr,expire); + if (spexpire != nullptr) + setExpire(c,c->db,c->argv[2],std::move(*spexpire)); signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", @@ -1029,7 +1036,7 @@ void moveCommand(client *c) { robj *o; redisDb *src, *dst; int srcid; - long long dbid, expire; + long long dbid; if (g_pserver->cluster_enabled) { addReplyError(c,"MOVE is not allowed in cluster mode"); @@ -1063,7 +1070,13 @@ void moveCommand(client *c) { addReply(c,shared.czero); return; } - expire = getExpire(c->db,c->argv[1]); + + std::unique_ptr spexpire; + { // scope pexpireOld + expireEntry *pexpireOld = getExpire(c->db,c->argv[1]); + if (pexpireOld != nullptr) + spexpire = std::make_unique(std::move(*pexpireOld)); + } if (o->FExpires()) removeExpire(c->db,c->argv[1]); serverAssert(!o->FExpires()); @@ -1077,7 +1090,7 @@ void moveCommand(client *c) { return; } dbAdd(dst,c->argv[1],o); - if (expire != -1) setExpire(c,dst,c->argv[1],nullptr,expire); + if (spexpire != nullptr) setExpire(c,dst,c->argv[1],std::move(*spexpire)); addReply(c,shared.cone); } @@ -1251,24 +1264,53 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) rememberSlaveKeyWithExpire(db,key); } -/* Return the expire time of the specified key, or -1 if no expire +void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) +{ + dictEntry *kde; + + serverAssert(GlobalLocksAcquired()); + + /* Reuse the sds from the main dict in the expire dict */ + kde = dictFind(db->pdict,ptrFromObj(key)); + serverAssertWithInfo(NULL,key,kde != NULL); + + if (((robj*)dictGetVal(kde))->refcount == OBJ_SHARED_REFCOUNT) + { + // shared objects cannot have the expire bit set, create a real object + dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); + } + + if (((robj*)dictGetVal(kde))->FExpires()) + removeExpire(db, key); + + e.setKeyUnsafe((sds)dictGetKey(kde)); + db->setexpire->insert(e); + ((robj*)dictGetVal(kde))->SetFExpires(true); + + + int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; + if (c && writable_slave && !(c->flags & CLIENT_MASTER)) + rememberSlaveKeyWithExpire(db,key); +} + +/* Return the expire time of the specified key, or null if no expire * is associated with this key (i.e. the key is non volatile) */ -long long getExpire(redisDb *db, robj_roptr key) { +expireEntry *getExpire(redisDb *db, robj_roptr key) { dictEntry *de; /* No expire? return ASAP */ if (db->setexpire->size() == 0) - return -1; + return nullptr; de = dictFind(db->pdict, ptrFromObj(key)); if (de == NULL) - return -1; + return nullptr; robj *obj = (robj*)dictGetVal(de); if (!obj->FExpires()) - return -1; + return nullptr; auto itr = db->setexpire->find((sds)dictGetKey(de)); - return itr->when(); + return itr.operator->(); } /* Propagate expires into slaves and the AOF file. @@ -1296,15 +1338,28 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { decrRefCount(argv[1]); } -/* Check if the key is expired. */ +/* Check if the key is expired. Note, this does not check subexpires */ int keyIsExpired(redisDb *db, robj *key) { - mstime_t when = getExpire(db,key); + expireEntry *pexpire = getExpire(db,key); - if (when < 0) return 0; /* No expire for this key */ + if (pexpire == nullptr) return 0; /* No expire for this key */ /* Don't expire anything while loading. It will be done later. */ if (g_pserver->loading) return 0; + long long when = -1; + for (auto &exp : *pexpire) + { + if (exp.subkey() == nullptr) + { + when = exp.when(); + break; + } + } + + if (when == -1) + return 0; + /* If we are in the context of a Lua script, we pretend that time is * blocked to when the Lua script started. This way a key can expire * only the first time it is accessed and not in the middle of the diff --git a/src/debug.cpp b/src/debug.cpp index 4d2f4bbca..41c73b55c 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -124,9 +124,13 @@ void mixStringObjectDigest(unsigned char *digest, robj_roptr o) { void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj_roptr o) { uint32_t aux = htonl(o->type); mixDigest(digest,&aux,sizeof(aux)); - long long expiretime = getExpire(db,keyobj); + expireEntry *pexpire = getExpire(db,keyobj); + long long expiretime = -1; char buf[128]; + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expiretime); + /* Save the key and associated value */ if (o->type == OBJ_STRING) { mixStringObjectDigest(digest,o); diff --git a/src/defrag.cpp b/src/defrag.cpp index a6acb8e72..c49cd2665 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -409,10 +409,10 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) { auto itr = set.find(oldkey); - serverAssert(false); if (itr != set.end()) { - expireEntry eNew(newkey, nullptr, itr->when()); + expireEntry eNew(std::move(*itr)); + eNew.setKeyUnsafe(newkey); set.erase(itr); set.insert(eNew); } diff --git a/src/expire.cpp b/src/expire.cpp index c10047d2c..5d257428d 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -535,7 +535,7 @@ void pexpireatCommand(client *c) { /* Implements TTL and PTTL */ void ttlGenericCommand(client *c, int output_ms) { - long long expire, ttl = -1; + long long expire = -1, ttl = -1; /* If the key does not exist at all, return -2 */ if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == nullptr) { @@ -544,7 +544,10 @@ void ttlGenericCommand(client *c, int output_ms) { } /* The key exists. Return -1 if it has no expire, or the actual * TTL value otherwise. */ - expire = getExpire(c->db,c->argv[1]); + expireEntry *pexpire = getExpire(c->db,c->argv[1]); + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expire); + if (expire != -1) { ttl = expire-mstime(); if (ttl < 0) ttl = 0; diff --git a/src/module.cpp b/src/module.cpp index 3ef824a7f..052c8744a 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -1644,7 +1644,11 @@ int RM_UnlinkKey(RedisModuleKey *key) { * If no TTL is associated with the key or if the key is empty, * REDISMODULE_NO_EXPIRE is returned. */ mstime_t RM_GetExpire(RedisModuleKey *key) { - mstime_t expire = getExpire(key->db,key->key); + expireEntry *pexpire = getExpire(key->db,key->key); + mstime_t expire = -1; + if (pexpire != nullptr) + pexpire->FGetPrimaryExpire(&expire); + if (expire == -1 || key->value == NULL) return -1; expire -= mstime(); return expire >= 0 ? expire : 0; diff --git a/src/rdb.cpp b/src/rdb.cpp index c1b15e2ca..bed797305 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1031,12 +1031,13 @@ size_t rdbSavedObjectLen(robj *o) { * On error -1 is returned. * On success if the key was actually saved 1 is returned, otherwise 0 * is returned (the key was already expired). */ -int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { +int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) { int savelru = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU; int savelfu = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU; /* Save the expire time */ - if (expiretime != -1) { + long long expiretime = -1; + if (pexpire != nullptr && pexpire->FGetPrimaryExpire(&expiretime)) { if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; } @@ -1061,9 +1062,21 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { if (rdbWriteRaw(rdb,buf,1) == -1) return -1; } - char szMvcc[32]; - snprintf(szMvcc, 32, "%" PRIu64, val->mvcc_tstamp); - if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szMvcc) == -1) return -1; + char szT[32]; + snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); + if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; + + if (pexpire != nullptr) + { + for (auto itr : *pexpire) + { + if (itr.subkey() == nullptr) + continue; // already saved + snprintf(szT, 32, "%lld", itr.when()); + rdbSaveAuxFieldStrStr(rdb,"keydb-subexpire-key",itr.subkey()); + rdbSaveAuxFieldStrStr(rdb,"keydb-subexpire-when",szT); + } + } /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; @@ -1099,12 +1112,11 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o) { robj key; - long long expire; initStaticStringObject(key,(char*)keystr); - expire = getExpire(db, &key); + expireEntry *pexpire = getExpire(db, &key); - if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) + if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1) return 0; /* When this RDB is produced as part of an AOF rewrite, move diff --git a/src/server.h b/src/server.h index 99104cf48..f350410d0 100644 --- a/src/server.h +++ b/src/server.h @@ -770,6 +770,7 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o) class expireEntryFat { + friend class expireEntry; public: struct subexpireEntry { @@ -806,7 +807,8 @@ public: bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); } const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); } void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); } - + const subexpireEntry &operator[](size_t idx) { return m_vecexpireEntries[idx]; } + size_t size() const noexcept { return m_vecexpireEntries.size(); } }; class expireEntry { @@ -818,6 +820,39 @@ class expireEntry { long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer public: + class iter + { + expireEntry *m_pentry = nullptr; + size_t m_idx = 0; + + public: + iter(expireEntry *pentry, size_t idx) + : m_pentry(pentry), m_idx(idx) + {} + + iter &operator++() { ++m_idx; return *this; } + + const char *subkey() const + { + if (m_pentry->FFat()) + return (*m_pentry->pfatentry())[m_idx].spsubkey.get(); + return nullptr; + } + long long when() const + { + if (m_pentry->FFat()) + return (*m_pentry->pfatentry())[m_idx].when; + return m_pentry->when(); + } + + bool operator!=(const iter &other) + { + return m_idx != other.m_idx; + } + + const iter &operator*() const { return *this; } + }; + expireEntry(sds key, const char *subkey, long long when) { if (subkey != nullptr) @@ -843,7 +878,7 @@ public: { u.m_key = e.u.m_key; m_when = e.m_when; - e.u.m_key = nullptr; + e.u.m_key = (char*)key(); // we do this so it can still be found in the set e.m_when = 0; } @@ -853,6 +888,14 @@ public: delete u.m_pfatentry; } + void setKeyUnsafe(sds key) + { + if (FFat()) + u.m_pfatentry->m_keyPrimary = key; + else + u.m_key = key; + } + inline bool FFat() const noexcept { return m_when == LLONG_MIN; } expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; } @@ -907,6 +950,27 @@ public: u.m_pfatentry->expireSubKey(subkey, when); } + iter begin() { return iter(this, 0); } + iter end() + { + if (FFat()) + return iter(this, u.m_pfatentry->size()); + return iter(this, 1); + } + + bool FGetPrimaryExpire(long long *pwhen) + { + *pwhen = -1; + for (auto itr : *this) + { + if (itr.subkey() == nullptr) + { + *pwhen = itr.when(); + return true; + } + } + return false; + } explicit operator const char*() const noexcept { return key(); } explicit operator long long() const noexcept { return when(); } @@ -2337,8 +2401,9 @@ int removeExpire(redisDb *db, robj *key); int removeExpireCore(redisDb *db, robj *key, dictEntry *de); void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); -long long getExpire(redisDb *db, robj_roptr key); +expireEntry *getExpire(redisDb *db, robj_roptr key); void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when); +void setExpire(client *c, redisDb *db, robj *key, expireEntry &&entry); robj_roptr lookupKeyRead(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key); robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply); From 4ec97fdb460e3825e33b000b533a5cdf23351ac2 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 18 Jul 2019 21:52:19 -0400 Subject: [PATCH 5/7] Fix merge conflict Former-commit-id: 0b43b51a2e3a6af11532146fbb7929f3ecf3b036 --- src/db.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.cpp b/src/db.cpp index a3dec518a..f6607b40e 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1274,7 +1274,7 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) kde = dictFind(db->pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,kde != NULL); - if (((robj*)dictGetVal(kde))->refcount == OBJ_SHARED_REFCOUNT) + if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) { // shared objects cannot have the expire bit set, create a real object dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); From 9f42bb5d910c97ff6862b3a14dc55be46e076095 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 18 Jul 2019 21:57:17 -0400 Subject: [PATCH 6/7] Fix buggy rebase Former-commit-id: 6037d1f326116e5aae56be9a73a8f9ca68a45bbe --- src/compactvector.h | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/src/compactvector.h b/src/compactvector.h index ee10a135b..65a40f114 100644 --- a/src/compactvector.h +++ b/src/compactvector.h @@ -12,24 +12,10 @@ * *************************************************/ -<<<<<<< HEAD -<<<<<<< HEAD template class compactvector { static_assert(MEMMOVE_SAFE || std::is_trivially_copyable::value, "compactvector requires trivially copyable types"); -======= -template -class compactvector -{ - static_assert(std::is_trivially_copyable::value, "compactvector requires trivially copyable types"); ->>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time -======= -template -class compactvector -{ - static_assert(MEMMOVE_SAFE || std::is_trivially_copyable::value, "compactvector requires trivially copyable types"); ->>>>>>> Initial prototype of EXPIREMEMBER command T *m_data = nullptr; unsigned m_celem = 0; unsigned m_max = 0; @@ -40,14 +26,7 @@ public: compactvector() noexcept = default; ~compactvector() noexcept { -<<<<<<< HEAD -<<<<<<< HEAD clear(); // call dtors -======= ->>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time -======= - clear(); // call dtors ->>>>>>> Initial prototype of EXPIREMEMBER command zfree(m_data); } @@ -99,15 +78,7 @@ public: assert(idx < m_max); where = m_data + idx; memmove(m_data + idx + 1, m_data + idx, (m_celem - idx)*sizeof(T)); -<<<<<<< HEAD -<<<<<<< HEAD new(m_data + idx) T(std::move(val)); -======= - m_data[idx] = val; ->>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time -======= - new(m_data + idx) T(std::move(val)); ->>>>>>> Initial prototype of EXPIREMEMBER command ++m_celem; return where; } From 9ba5270bdac9ec87ca42b46d12c0e18c9f98909b Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 18 Jul 2019 22:31:20 -0400 Subject: [PATCH 7/7] Subexpire entries should load/save Former-commit-id: a55d98043655473ecdd53db2927381635eefc0b8 --- src/rdb.cpp | 47 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index bed797305..97ade6d1f 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1066,6 +1066,13 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) { snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; + /* Save type, key, value */ + if (rdbSaveObjectType(rdb,val) == -1) return -1; + if (rdbSaveStringObject(rdb,key) == -1) return -1; + if (rdbSaveObject(rdb,val,key) == -1) return -1; + + /* Save expire entry after as it will apply to the previously loaded key */ + /* This is because we update the expire datastructure directly without buffering */ if (pexpire != nullptr) { for (auto itr : *pexpire) @@ -1078,10 +1085,6 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) { } } - /* Save type, key, value */ - if (rdbSaveObjectType(rdb,val) == -1) return -1; - if (rdbSaveStringObject(rdb,key) == -1) return -1; - if (rdbSaveObject(rdb,val,key) == -1) return -1; return 1; } @@ -1919,6 +1922,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_clock = 0; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; + robj *subexpireKey = nullptr; + robj *key = nullptr; rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; @@ -1940,7 +1945,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { lru_clock = LRU_CLOCK(); while(1) { - robj *key, *val; + robj *val; /* Read type. */ if ((type = rdbLoadType(rdb)) == -1) goto eoferr; @@ -2048,6 +2053,18 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } else if (!strcasecmp(szFromObj(auxkey),"mvcc-tstamp")) { static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits"); mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); + } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-key")) { + subexpireKey = auxval; + incrRefCount(subexpireKey); + } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) { + if (key == nullptr || subexpireKey == nullptr) { + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping."); + } + else { + setExpire(NULL, db, key, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } } else { /* We ignore fields we don't understand, as by AUX field * contract. */ @@ -2089,6 +2106,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } /* Read key */ + if (key != nullptr) + decrRefCount(key); + if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; /* Read value */ if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr; @@ -2102,7 +2122,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(val); } else { /* Add the new object in the hash table */ - int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); + int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); // Note: dbMerge will incrRef if (fInserted) { @@ -2112,14 +2132,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); - - /* Decrement the key refcount since dbMerge() will take its - * own reference. */ - decrRefCount(key); } else { - decrRefCount(key); decrRefCount(val); } } @@ -2130,6 +2145,16 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { lfu_freq = -1; lru_idle = -1; } + + if (key != nullptr) + decrRefCount(key); + + if (subexpireKey != nullptr) + { + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB."); + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } /* Verify the checksum if RDB version is >= 5 */ if (rdbver >= 5) {