Initial prototype of EXPIREMEMBER command

Former-commit-id: 0b3d74ea67d616a6869cbd66198c8dd7ffa72eb7
This commit is contained in:
John Sully 2019-07-13 20:11:49 -04:00
parent 4568ecf9cd
commit 2a2225d150
15 changed files with 307 additions and 75 deletions

View File

@ -85,7 +85,7 @@ struct bio_job {
void *bioProcessBackgroundJobs(void *arg); void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o); void lazyfreeFreeObjectFromBioThread(robj *o);
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset<expireEntry, const char *> *set); void lazyfreeFreeDatabaseFromBioThread(dict *ht1, expireset *set);
void lazyfreeFreeSlotsMapFromBioThread(rax *rt); void lazyfreeFreeSlotsMapFromBioThread(rax *rt);
/* Make sure we have enough stack to perform all the things we do in the /* Make sure we have enough stack to perform all the things we do in the
@ -196,7 +196,7 @@ void *bioProcessBackgroundJobs(void *arg) {
if (job->arg1) if (job->arg1)
lazyfreeFreeObjectFromBioThread((robj*)job->arg1); lazyfreeFreeObjectFromBioThread((robj*)job->arg1);
else if (job->arg2 && job->arg3) else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(semiorderedset<expireEntry, const char *>*)job->arg3); lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(expireset*)job->arg3);
else if (job->arg3) else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3); lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3);
} else { } else {

View File

@ -4949,7 +4949,7 @@ void restoreCommand(client *c) {
dbAdd(c->db,c->argv[1],obj); dbAdd(c->db,c->argv[1],obj);
if (ttl) { if (ttl) {
if (!absttl) ttl+=mstime(); if (!absttl) ttl+=mstime();
setExpire(c,c->db,c->argv[1],ttl); setExpire(c,c->db,c->argv[1],nullptr,ttl);
} }
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);

View File

@ -12,6 +12,7 @@
* *
*************************************************/ *************************************************/
<<<<<<< HEAD
<<<<<<< HEAD <<<<<<< HEAD
template<typename T, bool MEMMOVE_SAFE=false> template<typename T, bool MEMMOVE_SAFE=false>
class compactvector class compactvector
@ -23,6 +24,12 @@ class compactvector
{ {
static_assert(std::is_trivially_copyable<T>::value, "compactvector requires trivially copyable types"); static_assert(std::is_trivially_copyable<T>::value, "compactvector requires trivially copyable types");
>>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time >>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time
=======
template<typename T, bool MEMMOVE_SAFE=false>
class compactvector
{
static_assert(MEMMOVE_SAFE || std::is_trivially_copyable<T>::value, "compactvector requires trivially copyable types");
>>>>>>> Initial prototype of EXPIREMEMBER command
T *m_data = nullptr; T *m_data = nullptr;
unsigned m_celem = 0; unsigned m_celem = 0;
unsigned m_max = 0; unsigned m_max = 0;
@ -33,10 +40,14 @@ public:
compactvector() noexcept = default; compactvector() noexcept = default;
~compactvector() noexcept ~compactvector() noexcept
{ {
<<<<<<< HEAD
<<<<<<< HEAD <<<<<<< HEAD
clear(); // call dtors clear(); // call dtors
======= =======
>>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time >>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time
=======
clear(); // call dtors
>>>>>>> Initial prototype of EXPIREMEMBER command
zfree(m_data); zfree(m_data);
} }
@ -88,11 +99,15 @@ public:
assert(idx < m_max); assert(idx < m_max);
where = m_data + idx; where = m_data + idx;
memmove(m_data + idx + 1, m_data + idx, (m_celem - idx)*sizeof(T)); memmove(m_data + idx + 1, m_data + idx, (m_celem - idx)*sizeof(T));
<<<<<<< HEAD
<<<<<<< HEAD <<<<<<< HEAD
new(m_data + idx) T(std::move(val)); new(m_data + idx) T(std::move(val));
======= =======
m_data[idx] = val; m_data[idx] = val;
>>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time >>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time
=======
new(m_data + idx) T(std::move(val));
>>>>>>> Initial prototype of EXPIREMEMBER command
++m_celem; ++m_celem;
return where; return where;
} }
@ -116,10 +131,7 @@ public:
assert(where >= m_data); assert(where >= m_data);
size_t idx = where - m_data; size_t idx = where - m_data;
assert(idx < m_celem); assert(idx < m_celem);
<<<<<<< HEAD
where->~T(); where->~T();
=======
>>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time
memmove(where, where+1, ((m_celem - idx - 1)*sizeof(T))); memmove(where, where+1, ((m_celem - idx - 1)*sizeof(T)));
--m_celem; --m_celem;
@ -146,11 +158,8 @@ public:
void clear() void clear()
{ {
<<<<<<< HEAD
for (size_t idx = 0; idx < m_celem; ++idx) for (size_t idx = 0; idx < m_celem; ++idx)
m_data[idx].~T(); m_data[idx].~T();
=======
>>>>>>> New expire datastructure and algorithm. Allows us to expire in sublinear time
zfree(m_data); zfree(m_data);
m_data = nullptr; m_data = nullptr;
m_celem = 0; m_celem = 0;

View File

@ -457,7 +457,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
} else { } else {
dictEmpty(g_pserver->db[j].pdict,callback); dictEmpty(g_pserver->db[j].pdict,callback);
delete g_pserver->db[j].setexpire; delete g_pserver->db[j].setexpire;
g_pserver->db[j].setexpire = new (MALLOC_LOCAL) semiorderedset<expireEntry, const char*>(); g_pserver->db[j].setexpire = new (MALLOC_LOCAL) expireset();
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
} }
} }
@ -1006,7 +1006,7 @@ void renameGenericCommand(client *c, int nx) {
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
dbAdd(c->db,c->argv[2],o); dbAdd(c->db,c->argv[2],o);
if (expire != -1) if (expire != -1)
setExpire(c,c->db,c->argv[2],expire); setExpire(c,c->db,c->argv[2],nullptr,expire);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]); signalModifiedKey(c->db,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
@ -1077,7 +1077,7 @@ void moveCommand(client *c) {
return; return;
} }
dbAdd(dst,c->argv[1],o); dbAdd(dst,c->argv[1],o);
if (expire != -1) setExpire(c,dst,c->argv[1],expire); if (expire != -1) setExpire(c,dst,c->argv[1],nullptr,expire);
addReply(c,shared.cone); addReply(c,shared.cone);
} }
@ -1201,7 +1201,7 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de) {
* of an user calling a command 'c' is the client, otherwise 'c' is set * of an user calling a command 'c' is the client, otherwise 'c' is set
* to NULL. The 'when' parameter is the absolute unix time in milliseconds * to NULL. The 'when' parameter is the absolute unix time in milliseconds
* after which the key will no longer be considered valid. */ * after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) { void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) {
dictEntry *kde; dictEntry *kde;
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
@ -1216,12 +1216,6 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde)));
} }
if (((robj*)dictGetVal(kde))->FExpires())
removeExpire(db, key); // should we optimize for when this is called with an already set expiry?
expireEntry e((sds)dictGetKey(kde), when);
((robj*)dictGetVal(kde))->SetFExpires(true);
/* Update TTL stats (exponential moving average) */ /* Update TTL stats (exponential moving average) */
/* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */
long long now = g_pserver->mstime; long long now = g_pserver->mstime;
@ -1235,7 +1229,19 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) {
db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry
db->last_expire_set = now; db->last_expire_set = now;
/* Update the expire set */
const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr;
if (((robj*)dictGetVal(kde))->FExpires()) {
auto itr = db->setexpire->find((sds)dictGetKey(kde));
serverAssert(itr != db->setexpire->end());
itr->update(szSubKey, when);
}
else
{
expireEntry e((sds)dictGetKey(kde), szSubKey, when);
((robj*)dictGetVal(kde))->SetFExpires(true);
db->setexpire->insert(e); db->setexpire->insert(e);
}
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER)) if (c && writable_slave && !(c->flags & CLIENT_MASTER))

View File

@ -48,7 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
/* forward declarations*/ /* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref); void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
void replaceSateliteOSetKeyPtr(semiorderedset<expireEntry, const char*> &set, sds oldkey, sds newkey); void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
/* Defrag helper for generic allocations. /* Defrag helper for generic allocations.
* *
@ -407,11 +407,12 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
return NULL; return NULL;
} }
void replaceSateliteOSetKeyPtr(semiorderedset<expireEntry, const char*> &set, sds oldkey, sds newkey) { void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) {
auto itr = set.find(oldkey); auto itr = set.find(oldkey);
serverAssert(false);
if (itr != set.end()) if (itr != set.end())
{ {
expireEntry eNew(newkey, itr->when()); expireEntry eNew(newkey, nullptr, itr->when());
set.erase(itr); set.erase(itr);
set.insert(eNew); set.insert(eNew);
} }

View File

@ -252,7 +252,7 @@ struct visitFunctor
return count < g_pserver->maxmemory_samples; return count < g_pserver->maxmemory_samples;
} }
}; };
void evictionPoolPopulate(int dbid, dict *dbdict, semiorderedset<expireEntry,const char*> *setexpire, struct evictionPoolEntry *pool) void evictionPoolPopulate(int dbid, dict *dbdict, expireset *setexpire, struct evictionPoolEntry *pool)
{ {
if (setexpire != nullptr) if (setexpire != nullptr)
{ {

View File

@ -32,6 +32,21 @@
#include "server.h" #include "server.h"
void activeExpireCycleExpireFullKey(redisDb *db, const char *key) {
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire);
if (g_pserver->lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj);
decrRefCount(keyobj);
g_pserver->stat_expiredkeys++;
}
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* Incremental collection of expired keys. * Incremental collection of expired keys.
* *
@ -51,19 +66,99 @@
* *
* The parameter 'now' is the current time in milliseconds as is passed * The parameter 'now' is the current time in milliseconds as is passed
* to the function to avoid too many gettimeofday() syscalls. */ * to the function to avoid too many gettimeofday() syscalls. */
void activeExpireCycleExpire(redisDb *db, const char *key) { void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
robj *keyobj = createStringObject(key,sdslen(key)); if (!e.FFat())
activeExpireCycleExpireFullKey(db, e.key());
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); expireEntryFat *pfat = e.pfatentry();
if (g_pserver->lazyfree_lazy_expire) dictEntry *de = dictFind(db->pdict, e.key());
dbAsyncDelete(db,keyobj); robj *val = (robj*)dictGetVal(de);
else int deleted = 0;
dbSyncDelete(db,keyobj); while (!pfat->FEmpty())
notifyKeyspaceEvent(NOTIFY_EXPIRED, {
"expired",keyobj,db->id); if (pfat->nextExpireEntry().when > now)
if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); break;
// Is it the full key expiration?
if (pfat->nextExpireEntry().spsubkey == nullptr)
{
activeExpireCycleExpireFullKey(db, e.key());
return;
}
switch (val->type)
{
case OBJ_SET:
if (setTypeRemove(val,pfat->nextExpireEntry().spsubkey.get())) {
deleted++;
if (setTypeSize(val) == 0) {
activeExpireCycleExpireFullKey(db, e.key());
return;
}
}
break;
case OBJ_LIST:
case OBJ_ZSET:
case OBJ_HASH:
default:
serverAssert(false);
}
pfat->popfrontExpireEntry();
}
if (deleted)
{
robj objT;
switch (val->type)
{
case OBJ_SET:
initStaticStringObject(objT, (char*)e.key());
signalModifiedKey(db,&objT);
notifyKeyspaceEvent(NOTIFY_SET,"srem",&objT,db->id);
break;
}
}
if (pfat->FEmpty())
{
robj *keyobj = createStringObject(e.key(),sdslen(e.key()));
removeExpire(db, keyobj);
decrRefCount(keyobj); decrRefCount(keyobj);
g_pserver->stat_expiredkeys++; }
}
void expireMemberCommand(client *c)
{
long long when;
if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK)
return;
when *= 1000;
when += mstime();
/* No key, return zero. */
dictEntry *de = dictFind(c->db->pdict, szFromObj(c->argv[1]));
if (de == NULL) {
addReply(c,shared.czero);
return;
}
robj *val = (robj*)dictGetVal(de);
switch (val->type)
{
case OBJ_SET:
// these types are safe
break;
default:
addReplyError(c, "object type is unsupported");
return;
}
setExpire(c, c->db, c->argv[1], c->argv[2], when);
addReply(c, shared.ok);
} }
/* Try to expire a few timed out keys. The algorithm used is adaptive and /* Try to expire a few timed out keys. The algorithm used is adaptive and
@ -162,10 +257,10 @@ void activeExpireCycle(int type) {
size_t expired = 0; size_t expired = 0;
size_t tried = 0; size_t tried = 0;
db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](const expireEntry &e) __attribute__((always_inline)) { db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) {
if (e.when() < now) if (e.when() < now)
{ {
activeExpireCycleExpire(db, e.key()); activeExpireCycleExpire(db, e, now);
++expired; ++expired;
} }
++tried; ++tried;
@ -270,7 +365,7 @@ void expireSlaveKeys(void) {
if (itr != db->setexpire->end()) if (itr != db->setexpire->end())
{ {
if (itr->when() < start) { if (itr->when() < start) {
activeExpireCycleExpire(g_pserver->db+dbid,itr->key()); activeExpireCycleExpire(g_pserver->db+dbid,*itr,start);
expired = 1; expired = 1;
} }
} }
@ -406,7 +501,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
addReply(c, shared.cone); addReply(c, shared.cone);
return; return;
} else { } else {
setExpire(c,c->db,key,when); setExpire(c,c->db,key,nullptr,when);
addReply(c,shared.cone); addReply(c,shared.cone);
signalModifiedKey(c->db,key); signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);

View File

@ -343,6 +343,9 @@ struct commandHelp {
"Set the expiration for a key as a UNIX timestamp", "Set the expiration for a key as a UNIX timestamp",
0, 0,
"1.2.0" }, "1.2.0" },
{ "EXPIREMEMBER",
"key subkey seconds",
"set a subkey's time to live in seconds"},
{ "FLUSHALL", { "FLUSHALL",
"[ASYNC]", "[ASYNC]",
"Remove all keys from all databases", "Remove all keys from all databases",

View File

@ -110,7 +110,7 @@ void freeObjAsync(robj *o) {
void emptyDbAsync(redisDb *db) { void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->pdict; dict *oldht1 = db->pdict;
auto *set = db->setexpire; auto *set = db->setexpire;
db->setexpire = new (MALLOC_LOCAL) semiorderedset<expireEntry, const char*>(); db->setexpire = new (MALLOC_LOCAL) expireset();
db->expireitr = db->setexpire->end(); db->expireitr = db->setexpire->end();
db->pdict = dictCreate(&dbDictType,NULL); db->pdict = dictCreate(&dbDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1)); atomicIncr(lazyfree_objects,dictSize(oldht1));
@ -141,7 +141,7 @@ void lazyfreeFreeObjectFromBioThread(robj *o) {
* when the database was logically deleted. 'sl' is a skiplist used by * when the database was logically deleted. 'sl' is a skiplist used by
* Redis Cluster in order to take the hash slots -> keys mapping. This * Redis Cluster in order to take the hash slots -> keys mapping. This
* may be NULL if Redis Cluster is disabled. */ * may be NULL if Redis Cluster is disabled. */
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset<expireEntry, const char *> *set) { void lazyfreeFreeDatabaseFromBioThread(dict *ht1, expireset *set) {
size_t numkeys = dictSize(ht1); size_t numkeys = dictSize(ht1);
dictRelease(ht1); dictRelease(ht1);
delete set; delete set;

View File

@ -1664,7 +1664,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_ERR; return REDISMODULE_ERR;
if (expire != REDISMODULE_NO_EXPIRE) { if (expire != REDISMODULE_NO_EXPIRE) {
expire += mstime(); expire += mstime();
setExpire(key->ctx->client,key->db,key->key,expire); setExpire(key->ctx->client,key->db,key->key,nullptr,expire);
} else { } else {
removeExpire(key->db,key->key); removeExpire(key->db,key->key);
} }

View File

@ -2096,7 +2096,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
{ {
/* Set the expire time if needed */ /* Set the expire time if needed */
if (expiretime != -1) if (expiretime != -1)
setExpire(NULL,db,key,expiretime); setExpire(NULL,db,key,nullptr,expiretime);
/* Set usage information (for eviction). */ /* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);

View File

@ -15,11 +15,11 @@
extern uint64_t dictGenHashFunction(const void *key, int len); extern uint64_t dictGenHashFunction(const void *key, int len);
template<typename T, typename T_KEY = T> template<typename T, typename T_KEY = T, bool MEMMOVE_SAFE = false>
class semiorderedset class semiorderedset
{ {
friend struct setiter; friend struct setiter;
std::vector<compactvector<T>> m_data; std::vector<compactvector<T, MEMMOVE_SAFE>> m_data;
size_t celem = 0; size_t celem = 0;
static const size_t bits_min = 8; static const size_t bits_min = 8;
size_t bits = bits_min; size_t bits = bits_min;
@ -109,7 +109,7 @@ public:
if (!fRehash) if (!fRehash)
++celem; ++celem;
typename compactvector<T>::iterator itrInsert; typename compactvector<T, MEMMOVE_SAFE>::iterator itrInsert;
if (!m_data[idx].empty() && !(e < m_data[idx].back())) if (!m_data[idx].empty() && !(e < m_data[idx].back()))
itrInsert = m_data[idx].end(); itrInsert = m_data[idx].end();
else else
@ -292,7 +292,7 @@ private:
int steps = 0; int steps = 0;
for (; idxRehash < (m_data.size()/2); ++idxRehash) for (; idxRehash < (m_data.size()/2); ++idxRehash)
{ {
compactvector<T> vecT; compactvector<T, MEMMOVE_SAFE> vecT;
std::swap(m_data[idxRehash], vecT); std::swap(m_data[idxRehash], vecT);
for (auto &v : vecT) for (auto &v : vecT)

View File

@ -618,6 +618,10 @@ struct redisCommand redisCommandTable[] = {
"write fast @keyspace", "write fast @keyspace",
0,NULL,1,1,1,0,0,0}, 0,NULL,1,1,1,0,0,0},
{"expiremember", expireMemberCommand, 4,
"write fast @keyspace",
0,NULL,1,1,1,0,0,0},
{"pexpire",pexpireCommand,3, {"pexpire",pexpireCommand,3,
"write fast @keyspace", "write fast @keyspace",
0,NULL,1,1,1,0,0,0}, 0,NULL,1,1,1,0,0,0},
@ -2919,7 +2923,7 @@ void initServer(void) {
/* Create the Redis databases, and initialize other internal state. */ /* Create the Redis databases, and initialize other internal state. */
for (int j = 0; j < cserver.dbnum; j++) { for (int j = 0; j < cserver.dbnum; j++) {
g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL);
g_pserver->db[j].setexpire = new(MALLOC_LOCAL) semiorderedset<expireEntry, const char*>; g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset();
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);

View File

@ -53,6 +53,7 @@
#include <atomic> #include <atomic>
#include <vector> #include <vector>
#include <algorithm> #include <algorithm>
#include <memory>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#include <lua.h> #include <lua.h>
@ -767,38 +768,150 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o)
return (char*)ptrFromObj(o); return (char*)ptrFromObj(o);
} }
class expireEntry { class expireEntryFat
sds m_key; {
long long m_when; public:
struct subexpireEntry
{
long long when;
std::unique_ptr<const char, void(*)(const char*)> spsubkey;
subexpireEntry(long long when, const char *subkey)
: when(when), spsubkey(subkey, sdsfree)
{}
bool operator<(long long when) const noexcept { return this->when < when; }
bool operator<(const subexpireEntry &se) { return this->when < se.when; }
};
private:
sds m_keyPrimary;
std::vector<subexpireEntry> m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key
public: public:
expireEntry(sds key, long long when) expireEntryFat(sds keyPrimary)
: m_keyPrimary(keyPrimary)
{}
long long when() const noexcept { return m_vecexpireEntries.front().when; }
const char *key() const noexcept { return m_keyPrimary; }
bool operator<(long long when) const noexcept { return this->when() < when; }
void expireSubKey(const char *szSubkey, long long when)
{ {
m_key = key; auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when);
m_vecexpireEntries.emplace(itrInsert, when, sdsdup(szSubkey));
}
bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); }
const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); }
void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); }
};
class expireEntry {
union
{
sds m_key;
expireEntryFat *m_pfatentry;
} u;
long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer
public:
expireEntry(sds key, const char *subkey, long long when)
{
if (subkey != nullptr)
{
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(key);
u.m_pfatentry->expireSubKey(subkey, when);
}
else
{
u.m_key = key;
m_when = when; m_when = when;
} }
bool operator!=(const expireEntry &e) const noexcept
{
return m_when != e.m_when || m_key != e.m_key;
} }
bool operator==(const expireEntry &e) const noexcept
expireEntry(expireEntryFat *pfatentry)
{ {
return m_when == e.m_when && m_key == e.m_key; u.m_pfatentry = pfatentry;
m_when = LLONG_MIN;
} }
bool operator==(const char *key) const noexcept { return m_key == key; }
bool operator<(const expireEntry &e) const noexcept { return m_when < e.m_when; } expireEntry(expireEntry &&e)
bool operator<(const char *key) const noexcept { return m_key < key; } {
bool operator<(long long when) const noexcept { return m_when < when; } u.m_key = e.u.m_key;
m_when = e.m_when;
e.u.m_key = nullptr;
e.m_when = 0;
}
const char *key() const noexcept { return m_key; } ~expireEntry()
long long when() const noexcept { return m_when; } {
if (FFat())
delete u.m_pfatentry;
}
inline bool FFat() const noexcept { return m_when == LLONG_MIN; }
expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; }
explicit operator const char*() const noexcept { return m_key; } bool operator==(const char *key) const noexcept
explicit operator long long() const noexcept { return m_when; } {
return this->key() == key;
}
bool operator<(const expireEntry &e) const noexcept
{
return when() < e.when();
}
bool operator<(long long when) const noexcept
{
return this->when() < when;
}
const char *key() const noexcept
{
if (FFat())
return u.m_pfatentry->key();
return u.m_key;
}
long long when() const noexcept
{
if (FFat())
return u.m_pfatentry->when();
return m_when;
}
void update(const char *subkey, long long when)
{
if (!FFat())
{
if (subkey == nullptr)
{
m_when = when;
return;
}
else
{
// we have to upgrade to a fat entry
long long whenT = m_when;
sds keyPrimary = u.m_key;
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary);
u.m_pfatentry->expireSubKey(nullptr, whenT);
// at this point we're fat so fall through
}
}
u.m_pfatentry->expireSubKey(subkey, when);
}
explicit operator const char*() const noexcept { return key(); }
explicit operator long long() const noexcept { return when(); }
}; };
typedef semiorderedset<expireEntry, const char *, true /*expireEntry can be memmoved*/> expireset;
/* The a string name for an object's type as listed above /* The a string name for an object's type as listed above
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
@ -837,8 +950,8 @@ typedef struct clientReplyBlock {
* database. The database number is the 'id' field in the structure. */ * database. The database number is the 'id' field in the structure. */
typedef struct redisDb { typedef struct redisDb {
dict *pdict; /* The keyspace for this DB */ dict *pdict; /* The keyspace for this DB */
semiorderedset<expireEntry, const char*> *setexpire; expireset *setexpire;
semiorderedset<expireEntry, const char*>::setiter expireitr; expireset::setiter expireitr;
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */ dict *ready_keys; /* Blocked keys that received a PUSH */
@ -2225,7 +2338,7 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de);
void propagateExpire(redisDb *db, robj *key, int lazy); void propagateExpire(redisDb *db, robj *key, int lazy);
int expireIfNeeded(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key);
long long getExpire(redisDb *db, robj_roptr key); long long getExpire(redisDb *db, robj_roptr key);
void setExpire(client *c, redisDb *db, robj *key, long long when); void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when);
robj_roptr lookupKeyRead(redisDb *db, robj *key); robj_roptr lookupKeyRead(redisDb *db, robj *key);
robj *lookupKeyWrite(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key);
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply); robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply);
@ -2420,6 +2533,7 @@ void mgetCommand(client *c);
void monitorCommand(client *c); void monitorCommand(client *c);
void expireCommand(client *c); void expireCommand(client *c);
void expireatCommand(client *c); void expireatCommand(client *c);
void expireMemberCommand(client *c);
void pexpireCommand(client *c); void pexpireCommand(client *c);
void pexpireatCommand(client *c); void pexpireatCommand(client *c);
void getsetCommand(client *c); void getsetCommand(client *c);

View File

@ -85,7 +85,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
} }
setKey(c->db,key,val); setKey(c->db,key,val);
g_pserver->dirty++; g_pserver->dirty++;
if (expire) setExpire(c,c->db,key,mstime()+milliseconds); if (expire) setExpire(c,c->db,key,nullptr,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id); "expire",key,c->db->id);