Merge pull request #65 from JohnSully/subexpire

EXPIREMEMBER Command Implementation

Former-commit-id: d6b45a56e782238fc3467a7fb43ff167f0891e07
This commit is contained in:
John Sully 2019-07-23 19:06:57 -04:00 committed by GitHub
commit 8c46a1fba2
17 changed files with 536 additions and 125 deletions

View File

@ -1321,13 +1321,12 @@ int rewriteAppendOnlyFileRio(rio *aof) {
while((de = dictNext(di)) != NULL) { while((de = dictNext(di)) != NULL) {
sds keystr; sds keystr;
robj key, *o; robj key, *o;
long long expiretime;
keystr = (sds)dictGetKey(de); keystr = (sds)dictGetKey(de);
o = (robj*)dictGetVal(de); o = (robj*)dictGetVal(de);
initStaticStringObject(key,keystr); initStaticStringObject(key,keystr);
expiretime = getExpire(db,&key); expireEntry *pexpire = getExpire(db,&key);
/* Save the key and associated value */ /* Save the key and associated value */
if (o->type == OBJ_STRING) { if (o->type == OBJ_STRING) {
@ -1353,11 +1352,23 @@ int rewriteAppendOnlyFileRio(rio *aof) {
serverPanic("Unknown object type"); serverPanic("Unknown object type");
} }
/* Save the expire time */ /* Save the expire time */
if (expiretime != -1) { if (pexpire != nullptr) {
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; for (auto &subExpire : *pexpire) {
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; if (subExpire.subkey() == nullptr)
if (rioWriteBulkObject(aof,&key) == 0) goto werr; {
if (rioWriteBulkLongLong(aof,expiretime) == 0) goto werr; char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
}
else
{
char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr;
}
if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) goto werr; // common
}
} }
/* Read some diff from the parent process from time to time. */ /* Read some diff from the parent process from time to time. */
if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) { if (aof->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) {

View File

@ -85,7 +85,7 @@ struct bio_job {
void *bioProcessBackgroundJobs(void *arg); void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o); void lazyfreeFreeObjectFromBioThread(robj *o);
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset<expireEntry, const char *> *set); void lazyfreeFreeDatabaseFromBioThread(dict *ht1, expireset *set);
void lazyfreeFreeSlotsMapFromBioThread(rax *rt); void lazyfreeFreeSlotsMapFromBioThread(rax *rt);
/* Make sure we have enough stack to perform all the things we do in the /* Make sure we have enough stack to perform all the things we do in the
@ -196,7 +196,7 @@ void *bioProcessBackgroundJobs(void *arg) {
if (job->arg1) if (job->arg1)
lazyfreeFreeObjectFromBioThread((robj*)job->arg1); lazyfreeFreeObjectFromBioThread((robj*)job->arg1);
else if (job->arg2 && job->arg3) else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(semiorderedset<expireEntry, const char *>*)job->arg3); lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(expireset*)job->arg3);
else if (job->arg3) else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3); lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3);
} else { } else {

View File

@ -4949,7 +4949,7 @@ void restoreCommand(client *c) {
dbAdd(c->db,c->argv[1],obj); dbAdd(c->db,c->argv[1],obj);
if (ttl) { if (ttl) {
if (!absttl) ttl+=mstime(); if (!absttl) ttl+=mstime();
setExpire(c,c->db,c->argv[1],ttl); setExpire(c,c->db,c->argv[1],nullptr,ttl);
} }
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock); objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock);
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
@ -5194,7 +5194,10 @@ try_again:
/* Create RESTORE payload and generate the protocol to call the command. */ /* Create RESTORE payload and generate the protocol to call the command. */
for (j = 0; j < num_keys; j++) { for (j = 0; j < num_keys; j++) {
long long ttl = 0; long long ttl = 0;
long long expireat = getExpire(c->db,kv[j]); expireEntry *pexpire = getExpire(c->db,kv[j]);
long long expireat = -1;
if (pexpire != nullptr)
pexpire->FGetPrimaryExpire(&expireat);
if (expireat != -1) { if (expireat != -1) {
ttl = expireat-mstime(); ttl = expireat-mstime();

View File

@ -457,7 +457,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
} else { } else {
dictEmpty(g_pserver->db[j].pdict,callback); dictEmpty(g_pserver->db[j].pdict,callback);
delete g_pserver->db[j].setexpire; delete g_pserver->db[j].setexpire;
g_pserver->db[j].setexpire = new (MALLOC_LOCAL) semiorderedset<expireEntry, const char*>(); g_pserver->db[j].setexpire = new (MALLOC_LOCAL) expireset();
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
} }
} }
@ -976,7 +976,6 @@ void shutdownCommand(client *c) {
void renameGenericCommand(client *c, int nx) { void renameGenericCommand(client *c, int nx) {
robj *o; robj *o;
long long expire;
int samekey = 0; int samekey = 0;
/* When source and dest key is the same, no operation is performed, /* When source and dest key is the same, no operation is performed,
@ -992,7 +991,15 @@ void renameGenericCommand(client *c, int nx) {
} }
incrRefCount(o); incrRefCount(o);
expire = getExpire(c->db,c->argv[1]);
std::unique_ptr<expireEntry> spexpire;
{ // scope pexpireOld since it will be invalid soon
expireEntry *pexpireOld = getExpire(c->db,c->argv[1]);
if (pexpireOld != nullptr)
spexpire = std::make_unique<expireEntry>(std::move(*pexpireOld));
}
if (lookupKeyWrite(c->db,c->argv[2]) != NULL) { if (lookupKeyWrite(c->db,c->argv[2]) != NULL) {
if (nx) { if (nx) {
decrRefCount(o); decrRefCount(o);
@ -1005,8 +1012,8 @@ void renameGenericCommand(client *c, int nx) {
} }
dbDelete(c->db,c->argv[1]); dbDelete(c->db,c->argv[1]);
dbAdd(c->db,c->argv[2],o); dbAdd(c->db,c->argv[2],o);
if (expire != -1) if (spexpire != nullptr)
setExpire(c,c->db,c->argv[2],expire); setExpire(c,c->db,c->argv[2],std::move(*spexpire));
signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]); signalModifiedKey(c->db,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
@ -1029,7 +1036,7 @@ void moveCommand(client *c) {
robj *o; robj *o;
redisDb *src, *dst; redisDb *src, *dst;
int srcid; int srcid;
long long dbid, expire; long long dbid;
if (g_pserver->cluster_enabled) { if (g_pserver->cluster_enabled) {
addReplyError(c,"MOVE is not allowed in cluster mode"); addReplyError(c,"MOVE is not allowed in cluster mode");
@ -1063,7 +1070,13 @@ void moveCommand(client *c) {
addReply(c,shared.czero); addReply(c,shared.czero);
return; return;
} }
expire = getExpire(c->db,c->argv[1]);
std::unique_ptr<expireEntry> spexpire;
{ // scope pexpireOld
expireEntry *pexpireOld = getExpire(c->db,c->argv[1]);
if (pexpireOld != nullptr)
spexpire = std::make_unique<expireEntry>(std::move(*pexpireOld));
}
if (o->FExpires()) if (o->FExpires())
removeExpire(c->db,c->argv[1]); removeExpire(c->db,c->argv[1]);
serverAssert(!o->FExpires()); serverAssert(!o->FExpires());
@ -1077,7 +1090,7 @@ void moveCommand(client *c) {
return; return;
} }
dbAdd(dst,c->argv[1],o); dbAdd(dst,c->argv[1],o);
if (expire != -1) setExpire(c,dst,c->argv[1],expire); if (spexpire != nullptr) setExpire(c,dst,c->argv[1],std::move(*spexpire));
addReply(c,shared.cone); addReply(c,shared.cone);
} }
@ -1201,7 +1214,58 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de) {
* of an user calling a command 'c' is the client, otherwise 'c' is set * of an user calling a command 'c' is the client, otherwise 'c' is set
* to NULL. The 'when' parameter is the absolute unix time in milliseconds * to NULL. The 'when' parameter is the absolute unix time in milliseconds
* after which the key will no longer be considered valid. */ * after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) { void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) {
dictEntry *kde;
serverAssert(GlobalLocksAcquired());
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,kde != NULL);
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
{
// shared objects cannot have the expire bit set, create a real object
dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde)));
}
/* Update TTL stats (exponential moving average) */
/* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */
long long now = g_pserver->mstime;
db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed
if (db->setexpire->empty())
db->avg_ttl = 0;
else
db->avg_ttl -= db->avg_ttl / db->setexpire->size(); // slide one entry out the window
if (db->avg_ttl < 0)
db->avg_ttl = 0; // TTLs are never negative
db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry
db->last_expire_set = now;
/* Update the expire set */
const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr;
if (((robj*)dictGetVal(kde))->FExpires()) {
auto itr = db->setexpire->find((sds)dictGetKey(kde));
serverAssert(itr != db->setexpire->end());
expireEntry eNew(std::move(*itr));
eNew.update(szSubKey, when);
db->setexpire->erase(itr);
db->setexpire->insert(eNew);
}
else
{
expireEntry e((sds)dictGetKey(kde), szSubKey, when);
((robj*)dictGetVal(kde))->SetFExpires(true);
db->setexpire->insert(e);
}
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key);
}
void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
{
dictEntry *kde; dictEntry *kde;
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
@ -1217,49 +1281,36 @@ void setExpire(client *c, redisDb *db, robj *key, long long when) {
} }
if (((robj*)dictGetVal(kde))->FExpires()) if (((robj*)dictGetVal(kde))->FExpires())
removeExpire(db, key); // should we optimize for when this is called with an already set expiry? removeExpire(db, key);
expireEntry e((sds)dictGetKey(kde), when); e.setKeyUnsafe((sds)dictGetKey(kde));
db->setexpire->insert(e);
((robj*)dictGetVal(kde))->SetFExpires(true); ((robj*)dictGetVal(kde))->SetFExpires(true);
/* Update TTL stats (exponential moving average) */
/* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */
long long now = g_pserver->mstime;
db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed
if (db->setexpire->empty())
db->avg_ttl = 0;
else
db->avg_ttl -= db->avg_ttl / db->setexpire->size(); // slide one entry out the window
if (db->avg_ttl < 0)
db->avg_ttl = 0; // TTLs are never negative
db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry
db->last_expire_set = now;
db->setexpire->insert(e);
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER)) if (c && writable_slave && !(c->flags & CLIENT_MASTER))
rememberSlaveKeyWithExpire(db,key); rememberSlaveKeyWithExpire(db,key);
} }
/* Return the expire time of the specified key, or -1 if no expire /* Return the expire time of the specified key, or null if no expire
* is associated with this key (i.e. the key is non volatile) */ * is associated with this key (i.e. the key is non volatile) */
long long getExpire(redisDb *db, robj_roptr key) { expireEntry *getExpire(redisDb *db, robj_roptr key) {
dictEntry *de; dictEntry *de;
/* No expire? return ASAP */ /* No expire? return ASAP */
if (db->setexpire->size() == 0) if (db->setexpire->size() == 0)
return -1; return nullptr;
de = dictFind(db->pdict, ptrFromObj(key)); de = dictFind(db->pdict, ptrFromObj(key));
if (de == NULL) if (de == NULL)
return -1; return nullptr;
robj *obj = (robj*)dictGetVal(de); robj *obj = (robj*)dictGetVal(de);
if (!obj->FExpires()) if (!obj->FExpires())
return -1; return nullptr;
auto itr = db->setexpire->find((sds)dictGetKey(de)); auto itr = db->setexpire->find((sds)dictGetKey(de));
return itr->when(); return itr.operator->();
} }
/* Propagate expires into slaves and the AOF file. /* Propagate expires into slaves and the AOF file.
@ -1287,15 +1338,28 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
decrRefCount(argv[1]); decrRefCount(argv[1]);
} }
/* Check if the key is expired. */ /* Check if the key is expired. Note, this does not check subexpires */
int keyIsExpired(redisDb *db, robj *key) { int keyIsExpired(redisDb *db, robj *key) {
mstime_t when = getExpire(db,key); expireEntry *pexpire = getExpire(db,key);
if (when < 0) return 0; /* No expire for this key */ if (pexpire == nullptr) return 0; /* No expire for this key */
/* Don't expire anything while loading. It will be done later. */ /* Don't expire anything while loading. It will be done later. */
if (g_pserver->loading) return 0; if (g_pserver->loading) return 0;
long long when = -1;
for (auto &exp : *pexpire)
{
if (exp.subkey() == nullptr)
{
when = exp.when();
break;
}
}
if (when == -1)
return 0;
/* If we are in the context of a Lua script, we pretend that time is /* If we are in the context of a Lua script, we pretend that time is
* blocked to when the Lua script started. This way a key can expire * blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the * only the first time it is accessed and not in the middle of the

View File

@ -124,9 +124,13 @@ void mixStringObjectDigest(unsigned char *digest, robj_roptr o) {
void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj_roptr o) { void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj_roptr o) {
uint32_t aux = htonl(o->type); uint32_t aux = htonl(o->type);
mixDigest(digest,&aux,sizeof(aux)); mixDigest(digest,&aux,sizeof(aux));
long long expiretime = getExpire(db,keyobj); expireEntry *pexpire = getExpire(db,keyobj);
long long expiretime = -1;
char buf[128]; char buf[128];
if (pexpire != nullptr)
pexpire->FGetPrimaryExpire(&expiretime);
/* Save the key and associated value */ /* Save the key and associated value */
if (o->type == OBJ_STRING) { if (o->type == OBJ_STRING) {
mixStringObjectDigest(digest,o); mixStringObjectDigest(digest,o);

View File

@ -48,7 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
/* forward declarations*/ /* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref); void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
void replaceSateliteOSetKeyPtr(semiorderedset<expireEntry, const char*> &set, sds oldkey, sds newkey); void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
/* Defrag helper for generic allocations. /* Defrag helper for generic allocations.
* *
@ -407,11 +407,12 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
return NULL; return NULL;
} }
void replaceSateliteOSetKeyPtr(semiorderedset<expireEntry, const char*> &set, sds oldkey, sds newkey) { void replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey) {
auto itr = set.find(oldkey); auto itr = set.find(oldkey);
if (itr != set.end()) if (itr != set.end())
{ {
expireEntry eNew(newkey, itr->when()); expireEntry eNew(std::move(*itr));
eNew.setKeyUnsafe(newkey);
set.erase(itr); set.erase(itr);
set.insert(eNew); set.insert(eNew);
} }

View File

@ -252,7 +252,7 @@ struct visitFunctor
return count < g_pserver->maxmemory_samples; return count < g_pserver->maxmemory_samples;
} }
}; };
void evictionPoolPopulate(int dbid, dict *dbdict, semiorderedset<expireEntry,const char*> *setexpire, struct evictionPoolEntry *pool) void evictionPoolPopulate(int dbid, dict *dbdict, expireset *setexpire, struct evictionPoolEntry *pool)
{ {
if (setexpire != nullptr) if (setexpire != nullptr)
{ {

View File

@ -32,6 +32,21 @@
#include "server.h" #include "server.h"
void activeExpireCycleExpireFullKey(redisDb *db, const char *key) {
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire);
if (g_pserver->lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj);
decrRefCount(keyobj);
g_pserver->stat_expiredkeys++;
}
/*----------------------------------------------------------------------------- /*-----------------------------------------------------------------------------
* Incremental collection of expired keys. * Incremental collection of expired keys.
* *
@ -51,19 +66,102 @@
* *
* The parameter 'now' is the current time in milliseconds as is passed * The parameter 'now' is the current time in milliseconds as is passed
* to the function to avoid too many gettimeofday() syscalls. */ * to the function to avoid too many gettimeofday() syscalls. */
void activeExpireCycleExpire(redisDb *db, const char *key) { void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
robj *keyobj = createStringObject(key,sdslen(key)); if (!e.FFat())
{
activeExpireCycleExpireFullKey(db, e.key());
return;
}
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); expireEntryFat *pfat = e.pfatentry();
if (g_pserver->lazyfree_lazy_expire) dictEntry *de = dictFind(db->pdict, e.key());
dbAsyncDelete(db,keyobj); robj *val = (robj*)dictGetVal(de);
else int deleted = 0;
dbSyncDelete(db,keyobj); while (!pfat->FEmpty())
notifyKeyspaceEvent(NOTIFY_EXPIRED, {
"expired",keyobj,db->id); if (pfat->nextExpireEntry().when > now)
if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); break;
decrRefCount(keyobj);
g_pserver->stat_expiredkeys++; // Is it the full key expiration?
if (pfat->nextExpireEntry().spsubkey == nullptr)
{
activeExpireCycleExpireFullKey(db, e.key());
return;
}
switch (val->type)
{
case OBJ_SET:
if (setTypeRemove(val,pfat->nextExpireEntry().spsubkey.get())) {
deleted++;
if (setTypeSize(val) == 0) {
activeExpireCycleExpireFullKey(db, e.key());
return;
}
}
break;
case OBJ_LIST:
case OBJ_ZSET:
case OBJ_HASH:
default:
serverAssert(false);
}
pfat->popfrontExpireEntry();
}
if (deleted)
{
robj objT;
switch (val->type)
{
case OBJ_SET:
initStaticStringObject(objT, (char*)e.key());
signalModifiedKey(db,&objT);
notifyKeyspaceEvent(NOTIFY_SET,"srem",&objT,db->id);
break;
}
}
if (pfat->FEmpty())
{
robj *keyobj = createStringObject(e.key(),sdslen(e.key()));
removeExpire(db, keyobj);
decrRefCount(keyobj);
}
}
void expireMemberCommand(client *c)
{
long long when;
if (getLongLongFromObjectOrReply(c, c->argv[3], &when, NULL) != C_OK)
return;
when *= 1000;
when += mstime();
/* No key, return zero. */
dictEntry *de = dictFind(c->db->pdict, szFromObj(c->argv[1]));
if (de == NULL) {
addReply(c,shared.czero);
return;
}
robj *val = (robj*)dictGetVal(de);
switch (val->type)
{
case OBJ_SET:
// these types are safe
break;
default:
addReplyError(c, "object type is unsupported");
return;
}
setExpire(c, c->db, c->argv[1], c->argv[2], when);
addReply(c, shared.ok);
} }
/* Try to expire a few timed out keys. The algorithm used is adaptive and /* Try to expire a few timed out keys. The algorithm used is adaptive and
@ -162,10 +260,10 @@ void activeExpireCycle(int type) {
size_t expired = 0; size_t expired = 0;
size_t tried = 0; size_t tried = 0;
db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](const expireEntry &e) __attribute__((always_inline)) { db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) {
if (e.when() < now) if (e.when() < now)
{ {
activeExpireCycleExpire(db, e.key()); activeExpireCycleExpire(db, e, now);
++expired; ++expired;
} }
++tried; ++tried;
@ -270,7 +368,7 @@ void expireSlaveKeys(void) {
if (itr != db->setexpire->end()) if (itr != db->setexpire->end())
{ {
if (itr->when() < start) { if (itr->when() < start) {
activeExpireCycleExpire(g_pserver->db+dbid,itr->key()); activeExpireCycleExpire(g_pserver->db+dbid,*itr,start);
expired = 1; expired = 1;
} }
} }
@ -406,7 +504,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
addReply(c, shared.cone); addReply(c, shared.cone);
return; return;
} else { } else {
setExpire(c,c->db,key,when); setExpire(c,c->db,key,nullptr,when);
addReply(c,shared.cone); addReply(c,shared.cone);
signalModifiedKey(c->db,key); signalModifiedKey(c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id); notifyKeyspaceEvent(NOTIFY_GENERIC,"expire",key,c->db->id);
@ -437,7 +535,7 @@ void pexpireatCommand(client *c) {
/* Implements TTL and PTTL */ /* Implements TTL and PTTL */
void ttlGenericCommand(client *c, int output_ms) { void ttlGenericCommand(client *c, int output_ms) {
long long expire, ttl = -1; long long expire = -1, ttl = -1;
/* If the key does not exist at all, return -2 */ /* If the key does not exist at all, return -2 */
if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == nullptr) { if (lookupKeyReadWithFlags(c->db,c->argv[1],LOOKUP_NOTOUCH) == nullptr) {
@ -446,7 +544,10 @@ void ttlGenericCommand(client *c, int output_ms) {
} }
/* The key exists. Return -1 if it has no expire, or the actual /* The key exists. Return -1 if it has no expire, or the actual
* TTL value otherwise. */ * TTL value otherwise. */
expire = getExpire(c->db,c->argv[1]); expireEntry *pexpire = getExpire(c->db,c->argv[1]);
if (pexpire != nullptr)
pexpire->FGetPrimaryExpire(&expire);
if (expire != -1) { if (expire != -1) {
ttl = expire-mstime(); ttl = expire-mstime();
if (ttl < 0) ttl = 0; if (ttl < 0) ttl = 0;

View File

@ -343,6 +343,9 @@ struct commandHelp {
"Set the expiration for a key as a UNIX timestamp", "Set the expiration for a key as a UNIX timestamp",
0, 0,
"1.2.0" }, "1.2.0" },
{ "EXPIREMEMBER",
"key subkey seconds",
"set a subkey's time to live in seconds"},
{ "FLUSHALL", { "FLUSHALL",
"[ASYNC]", "[ASYNC]",
"Remove all keys from all databases", "Remove all keys from all databases",

View File

@ -110,7 +110,7 @@ void freeObjAsync(robj *o) {
void emptyDbAsync(redisDb *db) { void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->pdict; dict *oldht1 = db->pdict;
auto *set = db->setexpire; auto *set = db->setexpire;
db->setexpire = new (MALLOC_LOCAL) semiorderedset<expireEntry, const char*>(); db->setexpire = new (MALLOC_LOCAL) expireset();
db->expireitr = db->setexpire->end(); db->expireitr = db->setexpire->end();
db->pdict = dictCreate(&dbDictType,NULL); db->pdict = dictCreate(&dbDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1)); atomicIncr(lazyfree_objects,dictSize(oldht1));
@ -141,7 +141,7 @@ void lazyfreeFreeObjectFromBioThread(robj *o) {
* when the database was logically deleted. 'sl' is a skiplist used by * when the database was logically deleted. 'sl' is a skiplist used by
* Redis Cluster in order to take the hash slots -> keys mapping. This * Redis Cluster in order to take the hash slots -> keys mapping. This
* may be NULL if Redis Cluster is disabled. */ * may be NULL if Redis Cluster is disabled. */
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset<expireEntry, const char *> *set) { void lazyfreeFreeDatabaseFromBioThread(dict *ht1, expireset *set) {
size_t numkeys = dictSize(ht1); size_t numkeys = dictSize(ht1);
dictRelease(ht1); dictRelease(ht1);
delete set; delete set;

View File

@ -1644,7 +1644,11 @@ int RM_UnlinkKey(RedisModuleKey *key) {
* If no TTL is associated with the key or if the key is empty, * If no TTL is associated with the key or if the key is empty,
* REDISMODULE_NO_EXPIRE is returned. */ * REDISMODULE_NO_EXPIRE is returned. */
mstime_t RM_GetExpire(RedisModuleKey *key) { mstime_t RM_GetExpire(RedisModuleKey *key) {
mstime_t expire = getExpire(key->db,key->key); expireEntry *pexpire = getExpire(key->db,key->key);
mstime_t expire = -1;
if (pexpire != nullptr)
pexpire->FGetPrimaryExpire(&expire);
if (expire == -1 || key->value == NULL) return -1; if (expire == -1 || key->value == NULL) return -1;
expire -= mstime(); expire -= mstime();
return expire >= 0 ? expire : 0; return expire >= 0 ? expire : 0;
@ -1664,7 +1668,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
return REDISMODULE_ERR; return REDISMODULE_ERR;
if (expire != REDISMODULE_NO_EXPIRE) { if (expire != REDISMODULE_NO_EXPIRE) {
expire += mstime(); expire += mstime();
setExpire(key->ctx->client,key->db,key->key,expire); setExpire(key->ctx->client,key->db,key->key,nullptr,expire);
} else { } else {
removeExpire(key->db,key->key); removeExpire(key->db,key->key);
} }

View File

@ -1490,4 +1490,4 @@ void redisObject::setrefcount(unsigned ref)
{ {
serverAssert(!FExpires()); serverAssert(!FExpires());
refcount.store(ref, std::memory_order_relaxed); refcount.store(ref, std::memory_order_relaxed);
} }

View File

@ -1031,12 +1031,13 @@ size_t rdbSavedObjectLen(robj *o) {
* On error -1 is returned. * On error -1 is returned.
* On success if the key was actually saved 1 is returned, otherwise 0 * On success if the key was actually saved 1 is returned, otherwise 0
* is returned (the key was already expired). */ * is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) {
int savelru = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU; int savelru = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU; int savelfu = g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU;
/* Save the expire time */ /* Save the expire time */
if (expiretime != -1) { long long expiretime = -1;
if (pexpire != nullptr && pexpire->FGetPrimaryExpire(&expiretime)) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
} }
@ -1061,14 +1062,29 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
if (rdbWriteRaw(rdb,buf,1) == -1) return -1; if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
} }
char szMvcc[32]; char szT[32];
snprintf(szMvcc, 32, "%" PRIu64, val->mvcc_tstamp); snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp);
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szMvcc) == -1) return -1; if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1;
/* Save type, key, value */ /* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1; if (rdbSaveObject(rdb,val,key) == -1) return -1;
/* Save expire entry after as it will apply to the previously loaded key */
/* This is because we update the expire datastructure directly without buffering */
if (pexpire != nullptr)
{
for (auto itr : *pexpire)
{
if (itr.subkey() == nullptr)
continue; // already saved
snprintf(szT, 32, "%lld", itr.when());
rdbSaveAuxFieldStrStr(rdb,"keydb-subexpire-key",itr.subkey());
rdbSaveAuxFieldStrStr(rdb,"keydb-subexpire-when",szT);
}
}
return 1; return 1;
} }
@ -1099,12 +1115,11 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o) int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o)
{ {
robj key; robj key;
long long expire;
initStaticStringObject(key,(char*)keystr); initStaticStringObject(key,(char*)keystr);
expire = getExpire(db, &key); expireEntry *pexpire = getExpire(db, &key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1)
return 0; return 0;
/* When this RDB is produced as part of an AOF rewrite, move /* When this RDB is produced as part of an AOF rewrite, move
@ -1907,6 +1922,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
long long lru_clock = 0; long long lru_clock = 0;
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
robj *subexpireKey = nullptr;
robj *key = nullptr;
rdb->update_cksum = rdbLoadProgressCallback; rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
@ -1928,7 +1945,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
lru_clock = LRU_CLOCK(); lru_clock = LRU_CLOCK();
while(1) { while(1) {
robj *key, *val; robj *val;
/* Read type. */ /* Read type. */
if ((type = rdbLoadType(rdb)) == -1) goto eoferr; if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
@ -2036,6 +2053,18 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
} else if (!strcasecmp(szFromObj(auxkey),"mvcc-tstamp")) { } else if (!strcasecmp(szFromObj(auxkey),"mvcc-tstamp")) {
static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits"); static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits");
mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
} else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-key")) {
subexpireKey = auxval;
incrRefCount(subexpireKey);
} else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) {
if (key == nullptr || subexpireKey == nullptr) {
serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping.");
}
else {
setExpire(NULL, db, key, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10));
decrRefCount(subexpireKey);
subexpireKey = nullptr;
}
} else { } else {
/* We ignore fields we don't understand, as by AUX field /* We ignore fields we don't understand, as by AUX field
* contract. */ * contract. */
@ -2077,6 +2106,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
} }
/* Read key */ /* Read key */
if (key != nullptr)
decrRefCount(key);
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
/* Read value */ /* Read value */
if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr; if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr;
@ -2090,24 +2122,19 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
decrRefCount(val); decrRefCount(val);
} else { } else {
/* Add the new object in the hash table */ /* Add the new object in the hash table */
int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); // Note: dbMerge will incrRef
if (fInserted) if (fInserted)
{ {
/* Set the expire time if needed */ /* Set the expire time if needed */
if (expiretime != -1) if (expiretime != -1)
setExpire(NULL,db,key,expiretime); setExpire(NULL,db,key,nullptr,expiretime);
/* Set usage information (for eviction). */ /* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
/* Decrement the key refcount since dbMerge() will take its
* own reference. */
decrRefCount(key);
} }
else else
{ {
decrRefCount(key);
decrRefCount(val); decrRefCount(val);
} }
} }
@ -2118,6 +2145,16 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
lfu_freq = -1; lfu_freq = -1;
lru_idle = -1; lru_idle = -1;
} }
if (key != nullptr)
decrRefCount(key);
if (subexpireKey != nullptr)
{
serverLog(LL_WARNING, "Corrupt subexpire entry in RDB.");
decrRefCount(subexpireKey);
subexpireKey = nullptr;
}
/* Verify the checksum if RDB version is >= 5 */ /* Verify the checksum if RDB version is >= 5 */
if (rdbver >= 5) { if (rdbver >= 5) {

View File

@ -15,11 +15,11 @@
extern uint64_t dictGenHashFunction(const void *key, int len); extern uint64_t dictGenHashFunction(const void *key, int len);
template<typename T, typename T_KEY = T> template<typename T, typename T_KEY = T, bool MEMMOVE_SAFE = false>
class semiorderedset class semiorderedset
{ {
friend struct setiter; friend struct setiter;
std::vector<compactvector<T>> m_data; std::vector<compactvector<T, MEMMOVE_SAFE>> m_data;
size_t celem = 0; size_t celem = 0;
static const size_t bits_min = 8; static const size_t bits_min = 8;
size_t bits = bits_min; size_t bits = bits_min;
@ -109,7 +109,7 @@ public:
if (!fRehash) if (!fRehash)
++celem; ++celem;
typename compactvector<T>::iterator itrInsert; typename compactvector<T, MEMMOVE_SAFE>::iterator itrInsert;
if (!m_data[idx].empty() && !(e < m_data[idx].back())) if (!m_data[idx].empty() && !(e < m_data[idx].back()))
itrInsert = m_data[idx].end(); itrInsert = m_data[idx].end();
else else
@ -292,7 +292,7 @@ private:
int steps = 0; int steps = 0;
for (; idxRehash < (m_data.size()/2); ++idxRehash) for (; idxRehash < (m_data.size()/2); ++idxRehash)
{ {
compactvector<T> vecT; compactvector<T, MEMMOVE_SAFE> vecT;
std::swap(m_data[idxRehash], vecT); std::swap(m_data[idxRehash], vecT);
for (auto &v : vecT) for (auto &v : vecT)

View File

@ -618,6 +618,10 @@ struct redisCommand redisCommandTable[] = {
"write fast @keyspace", "write fast @keyspace",
0,NULL,1,1,1,0,0,0}, 0,NULL,1,1,1,0,0,0},
{"expiremember", expireMemberCommand, 4,
"write fast @keyspace",
0,NULL,1,1,1,0,0,0},
{"pexpire",pexpireCommand,3, {"pexpire",pexpireCommand,3,
"write fast @keyspace", "write fast @keyspace",
0,NULL,1,1,1,0,0,0}, 0,NULL,1,1,1,0,0,0},
@ -2919,7 +2923,7 @@ void initServer(void) {
/* Create the Redis databases, and initialize other internal state. */ /* Create the Redis databases, and initialize other internal state. */
for (int j = 0; j < cserver.dbnum; j++) { for (int j = 0; j < cserver.dbnum; j++) {
g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL);
g_pserver->db[j].setexpire = new(MALLOC_LOCAL) semiorderedset<expireEntry, const char*>; g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset();
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);

View File

@ -53,6 +53,7 @@
#include <atomic> #include <atomic>
#include <vector> #include <vector>
#include <algorithm> #include <algorithm>
#include <memory>
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#include <lua.h> #include <lua.h>
@ -767,39 +768,215 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o)
return (char*)ptrFromObj(o); return (char*)ptrFromObj(o);
} }
class expireEntry { class expireEntryFat
sds m_key; {
long long m_when; friend class expireEntry;
public:
struct subexpireEntry
{
long long when;
std::unique_ptr<const char, void(*)(const char*)> spsubkey;
subexpireEntry(long long when, const char *subkey)
: when(when), spsubkey(subkey, sdsfree)
{}
bool operator<(long long when) const noexcept { return this->when < when; }
bool operator<(const subexpireEntry &se) { return this->when < se.when; }
};
private:
sds m_keyPrimary;
std::vector<subexpireEntry> m_vecexpireEntries; // Note a NULL for the sds portion means the expire is for the primary key
public: public:
expireEntry(sds key, long long when) expireEntryFat(sds keyPrimary)
: m_keyPrimary(keyPrimary)
{}
long long when() const noexcept { return m_vecexpireEntries.front().when; }
const char *key() const noexcept { return m_keyPrimary; }
bool operator<(long long when) const noexcept { return this->when() < when; }
void expireSubKey(const char *szSubkey, long long when)
{ {
m_key = key; auto itrInsert = std::lower_bound(m_vecexpireEntries.begin(), m_vecexpireEntries.end(), when);
m_when = when; m_vecexpireEntries.emplace(itrInsert, when, sdsdup(szSubkey));
} }
bool operator!=(const expireEntry &e) const noexcept bool FEmpty() const noexcept { return m_vecexpireEntries.empty(); }
{ const subexpireEntry &nextExpireEntry() const noexcept { return m_vecexpireEntries.front(); }
return m_when != e.m_when || m_key != e.m_key; void popfrontExpireEntry() { m_vecexpireEntries.erase(m_vecexpireEntries.begin()); }
} const subexpireEntry &operator[](size_t idx) { return m_vecexpireEntries[idx]; }
bool operator==(const expireEntry &e) const noexcept size_t size() const noexcept { return m_vecexpireEntries.size(); }
{
return m_when == e.m_when && m_key == e.m_key;
}
bool operator==(const char *key) const noexcept { return m_key == key; }
bool operator<(const expireEntry &e) const noexcept { return m_when < e.m_when; }
bool operator<(const char *key) const noexcept { return m_key < key; }
bool operator<(long long when) const noexcept { return m_when < when; }
const char *key() const noexcept { return m_key; }
long long when() const noexcept { return m_when; }
explicit operator const char*() const noexcept { return m_key; }
explicit operator long long() const noexcept { return m_when; }
}; };
class expireEntry {
union
{
sds m_key;
expireEntryFat *m_pfatentry;
} u;
long long m_when; // LLONG_MIN means this is a fat entry and we should use the pointer
public:
class iter
{
expireEntry *m_pentry = nullptr;
size_t m_idx = 0;
public:
iter(expireEntry *pentry, size_t idx)
: m_pentry(pentry), m_idx(idx)
{}
iter &operator++() { ++m_idx; return *this; }
const char *subkey() const
{
if (m_pentry->FFat())
return (*m_pentry->pfatentry())[m_idx].spsubkey.get();
return nullptr;
}
long long when() const
{
if (m_pentry->FFat())
return (*m_pentry->pfatentry())[m_idx].when;
return m_pentry->when();
}
bool operator!=(const iter &other)
{
return m_idx != other.m_idx;
}
const iter &operator*() const { return *this; }
};
expireEntry(sds key, const char *subkey, long long when)
{
if (subkey != nullptr)
{
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(key);
u.m_pfatentry->expireSubKey(subkey, when);
}
else
{
u.m_key = key;
m_when = when;
}
}
expireEntry(expireEntryFat *pfatentry)
{
u.m_pfatentry = pfatentry;
m_when = LLONG_MIN;
}
expireEntry(expireEntry &&e)
{
u.m_key = e.u.m_key;
m_when = e.m_when;
e.u.m_key = (char*)key(); // we do this so it can still be found in the set
e.m_when = 0;
}
~expireEntry()
{
if (FFat())
delete u.m_pfatentry;
}
void setKeyUnsafe(sds key)
{
if (FFat())
u.m_pfatentry->m_keyPrimary = key;
else
u.m_key = key;
}
inline bool FFat() const noexcept { return m_when == LLONG_MIN; }
expireEntryFat *pfatentry() { assert(FFat()); return u.m_pfatentry; }
bool operator==(const char *key) const noexcept
{
return this->key() == key;
}
bool operator<(const expireEntry &e) const noexcept
{
return when() < e.when();
}
bool operator<(long long when) const noexcept
{
return this->when() < when;
}
const char *key() const noexcept
{
if (FFat())
return u.m_pfatentry->key();
return u.m_key;
}
long long when() const noexcept
{
if (FFat())
return u.m_pfatentry->when();
return m_when;
}
void update(const char *subkey, long long when)
{
if (!FFat())
{
if (subkey == nullptr)
{
m_when = when;
return;
}
else
{
// we have to upgrade to a fat entry
long long whenT = m_when;
sds keyPrimary = u.m_key;
m_when = LLONG_MIN;
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(keyPrimary);
u.m_pfatentry->expireSubKey(nullptr, whenT);
// at this point we're fat so fall through
}
}
u.m_pfatentry->expireSubKey(subkey, when);
}
iter begin() { return iter(this, 0); }
iter end()
{
if (FFat())
return iter(this, u.m_pfatentry->size());
return iter(this, 1);
}
bool FGetPrimaryExpire(long long *pwhen)
{
*pwhen = -1;
for (auto itr : *this)
{
if (itr.subkey() == nullptr)
{
*pwhen = itr.when();
return true;
}
}
return false;
}
explicit operator const char*() const noexcept { return key(); }
explicit operator long long() const noexcept { return when(); }
};
typedef semiorderedset<expireEntry, const char *, true /*expireEntry can be memmoved*/> expireset;
/* The a string name for an object's type as listed above /* The a string name for an object's type as listed above
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
* and Module types have their registered name returned. */ * and Module types have their registered name returned. */
@ -837,8 +1014,8 @@ typedef struct clientReplyBlock {
* database. The database number is the 'id' field in the structure. */ * database. The database number is the 'id' field in the structure. */
typedef struct redisDb { typedef struct redisDb {
dict *pdict; /* The keyspace for this DB */ dict *pdict; /* The keyspace for this DB */
semiorderedset<expireEntry, const char*> *setexpire; expireset *setexpire;
semiorderedset<expireEntry, const char*>::setiter expireitr; expireset::setiter expireitr;
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */ dict *ready_keys; /* Blocked keys that received a PUSH */
@ -2224,8 +2401,9 @@ int removeExpire(redisDb *db, robj *key);
int removeExpireCore(redisDb *db, robj *key, dictEntry *de); int removeExpireCore(redisDb *db, robj *key, dictEntry *de);
void propagateExpire(redisDb *db, robj *key, int lazy); void propagateExpire(redisDb *db, robj *key, int lazy);
int expireIfNeeded(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key);
long long getExpire(redisDb *db, robj_roptr key); expireEntry *getExpire(redisDb *db, robj_roptr key);
void setExpire(client *c, redisDb *db, robj *key, long long when); void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when);
void setExpire(client *c, redisDb *db, robj *key, expireEntry &&entry);
robj_roptr lookupKeyRead(redisDb *db, robj *key); robj_roptr lookupKeyRead(redisDb *db, robj *key);
robj *lookupKeyWrite(redisDb *db, robj *key); robj *lookupKeyWrite(redisDb *db, robj *key);
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply); robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply);
@ -2420,6 +2598,7 @@ void mgetCommand(client *c);
void monitorCommand(client *c); void monitorCommand(client *c);
void expireCommand(client *c); void expireCommand(client *c);
void expireatCommand(client *c); void expireatCommand(client *c);
void expireMemberCommand(client *c);
void pexpireCommand(client *c); void pexpireCommand(client *c);
void pexpireatCommand(client *c); void pexpireatCommand(client *c);
void getsetCommand(client *c); void getsetCommand(client *c);

View File

@ -85,7 +85,7 @@ void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire,
} }
setKey(c->db,key,val); setKey(c->db,key,val);
g_pserver->dirty++; g_pserver->dirty++;
if (expire) setExpire(c,c->db,key,mstime()+milliseconds); if (expire) setExpire(c,c->db,key,nullptr,mstime()+milliseconds);
notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id); notifyKeyspaceEvent(NOTIFY_STRING,"set",key,c->db->id);
if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC, if (expire) notifyKeyspaceEvent(NOTIFY_GENERIC,
"expire",key,c->db->id); "expire",key,c->db->id);