New expire datastructure and algorithm. Allows us to expire in sublinear time

Former-commit-id: 3880d2616c882e19169180dc10268564347b0279
This commit is contained in:
John Sully 2019-07-05 23:49:09 -04:00
parent f094402c33
commit a060bc7942
16 changed files with 479 additions and 325 deletions

View File

@ -51,6 +51,7 @@
"tuple": "cpp",
"type_traits": "cpp",
"typeinfo": "cpp",
"utility": "cpp"
"utility": "cpp",
"set": "cpp"
}
}

View File

@ -85,7 +85,7 @@ struct bio_job {
void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o);
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2);
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset<expireEntry, const char *> *set);
void lazyfreeFreeSlotsMapFromBioThread(rax *rt);
/* Make sure we have enough stack to perform all the things we do in the
@ -196,7 +196,7 @@ void *bioProcessBackgroundJobs(void *arg) {
if (job->arg1)
lazyfreeFreeObjectFromBioThread((robj*)job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(dict*)job->arg3);
lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(semiorderedset<expireEntry, const char *>*)job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3);
} else {

View File

@ -39,6 +39,8 @@
*----------------------------------------------------------------------------*/
int keyIsExpired(redisDb *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key, robj *o);
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
@ -49,6 +51,20 @@ void updateLFU(robj *val) {
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}
void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew)
{
serverAssert(valOld->FExpires());
serverAssert(!valNew->FExpires());
auto itr = db->setexpire->find(key);
serverAssert(itr != db->setexpire->end());
valNew->SetFExpires(true);
valOld->SetFExpires(false);
return;
}
/* Low level key lookup API, not actually called directly from commands
* implementations that should instead rely on lookupKeyRead(),
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
@ -160,8 +176,10 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) {
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWrite(redisDb *db, robj *key) {
expireIfNeeded(db,key);
return lookupKey(db,key,LOOKUP_UPDATEMVCC);
robj *o = lookupKey(db,key,LOOKUP_UPDATEMVCC);
if (expireIfNeeded(db,key))
o = NULL;
return o;
}
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
@ -177,6 +195,7 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
}
int dbAddCore(redisDb *db, robj *key, robj *val) {
serverAssert(!val->FExpires());
sds copy = sdsdup(szFromObj(key));
int retval = dictAdd(db->pdict, copy, val);
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
@ -206,10 +225,18 @@ void dbAdd(redisDb *db, robj *key, robj *val)
serverAssertWithInfo(NULL,key,retval == DICT_OK);
}
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *val, bool fUpdateMvcc)
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire)
{
dictEntry auxentry = *de;
robj *old = (robj*)dictGetVal(de);
if (old->FExpires()) {
if (fRemoveExpire)
removeExpire(db, key);
else
updateExpire(db, (sds)dictGetKey(de), old, val);
}
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
val->lru = old->lru;
}
@ -235,7 +262,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,de != NULL);
dbOverwriteCore(db, de, val, true);
dbOverwriteCore(db, de, key, val, true, false);
}
/* Insert a key, handling duplicate keys according to fReplace */
@ -250,7 +277,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
robj *old = (robj*)dictGetVal(de);
if (old->mvcc_tstamp <= val->mvcc_tstamp)
{
dbOverwriteCore(db, de, val, false);
dbOverwriteCore(db, de, key, val, false, true);
return true;
}
@ -271,13 +298,13 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
*
* All the new keys in the database should be created via this interface. */
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dictEntry *de = dictFind(db->pdict, ptrFromObj(key));
if (de == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
dbOverwriteCore(db,de,key,val,true,true);
}
incrRefCount(val);
removeExpire(db,key);
signalModifiedKey(db,key);
}
@ -292,7 +319,7 @@ int dbExists(redisDb *db, robj *key) {
robj *dbRandomKey(redisDb *db) {
dictEntry *de;
int maxtries = 100;
int allvolatile = dictSize(db->pdict) == dictSize(db->expires);
int allvolatile = dictSize(db->pdict) == db->setexpire->size();
while(1) {
sds key;
@ -303,7 +330,9 @@ robj *dbRandomKey(redisDb *db) {
key = (sds)dictGetKey(de);
keyobj = createStringObject(key,sdslen(key));
if (dictFind(db->expires,key)) {
if (((robj*)dictGetVal(de))->FExpires())
{
if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
@ -315,11 +344,16 @@ robj *dbRandomKey(redisDb *db) {
* return a key name that may be already expired. */
return keyobj;
}
}
if (((robj*)dictGetVal(de))->FExpires())
{
if (expireIfNeeded(db,keyobj)) {
decrRefCount(keyobj);
continue; /* search for another key. This expired. */
}
}
return keyobj;
}
}
@ -328,7 +362,10 @@ robj *dbRandomKey(redisDb *db) {
int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,ptrFromObj(key));
dictEntry *de = dictFind(db->pdict, szFromObj(key));
if (de != nullptr && ((robj*)dictGetVal(de))->FExpires())
removeExpireCore(db, key, de);
if (dictDelete(db->pdict,ptrFromObj(key)) == DICT_OK) {
if (g_pserver->cluster_enabled) slotToKeyDel(key);
return 1;
@ -373,7 +410,7 @@ int dbDelete(redisDb *db, robj *key) {
*/
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
serverAssert(o->type == OBJ_STRING);
if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
if (o->getrefcount(std::memory_order_relaxed) != 1 || o->encoding != OBJ_ENCODING_RAW) {
robj *decoded = getDecodedObject(o);
o = createRawStringObject(szFromObj(decoded), sdslen(szFromObj(decoded)));
decrRefCount(decoded);
@ -419,7 +456,8 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
emptyDbAsync(&g_pserver->db[j]);
} else {
dictEmpty(g_pserver->db[j].pdict,callback);
dictEmpty(g_pserver->db[j].expires,callback);
delete g_pserver->db[j].setexpire;
g_pserver->db[j].setexpire = new (MALLOC_LOCAL) semiorderedset<expireEntry, const char*>();
}
}
if (g_pserver->cluster_enabled) {
@ -964,9 +1002,10 @@ void renameGenericCommand(client *c, int nx) {
* with the same name. */
dbDelete(c->db,c->argv[2]);
}
dbAdd(c->db,c->argv[2],o);
if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
dbDelete(c->db,c->argv[1]);
dbAdd(c->db,c->argv[2],o);
if (expire != -1)
setExpire(c,c->db,c->argv[2],expire);
signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
@ -1024,6 +1063,12 @@ void moveCommand(client *c) {
return;
}
expire = getExpire(c->db,c->argv[1]);
if (o->FExpires())
removeExpire(c->db,c->argv[1]);
serverAssert(!o->FExpires());
incrRefCount(o);
dbDelete(src,c->argv[1]);
g_pserver->dirty++;
/* Return zero if the key already exists in the target DB */
if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
@ -1032,11 +1077,7 @@ void moveCommand(client *c) {
}
dbAdd(dst,c->argv[1],o);
if (expire != -1) setExpire(c,dst,c->argv[1],expire);
incrRefCount(o);
/* OK! key moved, free the entry in the source DB */
dbDelete(src,c->argv[1]);
g_pserver->dirty++;
addReply(c,shared.cone);
}
@ -1077,11 +1118,11 @@ int dbSwapDatabases(int id1, int id2) {
* ready_keys and watched_keys, since we want clients to
* remain in the same DB they were. */
db1->pdict = db2->pdict;
db1->expires = db2->expires;
db1->setexpire = db2->setexpire;
db1->avg_ttl = db2->avg_ttl;
db2->pdict = aux.pdict;
db2->expires = aux.expires;
db2->setexpire = aux.setexpire;
db2->avg_ttl = aux.avg_ttl;
/* Now we need to handle clients blocked on lists: as an effect
@ -1130,12 +1171,25 @@ void swapdbCommand(client *c) {
/*-----------------------------------------------------------------------------
* Expires API
*----------------------------------------------------------------------------*/
int removeExpire(redisDb *db, robj *key) {
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
return removeExpireCore(db, key, de);
}
int removeExpireCore(redisDb *db, robj *key, dictEntry *de) {
/* An expire may only be removed if there is a corresponding entry in the
* main dict. Otherwise, the key will never be freed. */
serverAssertWithInfo(NULL,key,dictFind(db->pdict,ptrFromObj(key)) != NULL);
return dictDelete(db->expires,ptrFromObj(key)) == DICT_OK;
serverAssertWithInfo(NULL,key,de != NULL);
robj *val = (robj*)dictGetVal(de);
if (!val->FExpires())
return 0;
auto itr = db->setexpire->find((sds)dictGetKey(de));
serverAssert(itr != db->setexpire->end());
serverAssert(itr->key() == (sds)dictGetKey(de));
db->setexpire->erase(itr);
val->SetFExpires(false);
return 1;
}
/* Set an expire to the specified key. If the expire is set in the context
@ -1143,14 +1197,27 @@ int removeExpire(redisDb *db, robj *key) {
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
* after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
dictEntry *kde;
serverAssert(GlobalLocksAcquired());
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,kde != NULL);
de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when);
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
{
// shared objects cannot have the expire bit set, create a real object
dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde)));
}
if (((robj*)dictGetVal(kde))->FExpires())
removeExpire(db, key); // should we optimize for when this is called with an already set expiry?
expireEntry e((sds)dictGetKey(kde), when);
((robj*)dictGetVal(kde))->SetFExpires(true);
db->setexpire->insert(e);
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
@ -1163,13 +1230,18 @@ long long getExpire(redisDb *db, robj_roptr key) {
dictEntry *de;
/* No expire? return ASAP */
if (dictSize(db->expires) == 0 ||
(de = dictFind(db->expires,ptrFromObj(key))) == NULL) return -1;
if (db->setexpire->size() == 0)
return -1;
/* The entry was found in the expire dict, this means it should also
* be present in the main dict (safety check). */
serverAssertWithInfo(NULL,key,dictFind(db->pdict,ptrFromObj(key)) != NULL);
return dictGetSignedIntegerVal(de);
de = dictFind(db->pdict, ptrFromObj(key));
if (de == NULL)
return -1;
robj *obj = (robj*)dictGetVal(de);
if (!obj->FExpires())
return -1;
auto itr = db->setexpire->find((sds)dictGetKey(de));
return itr->when();
}
/* Propagate expires into slaves and the AOF file.

View File

@ -436,7 +436,7 @@ NULL
"Value at:%p refcount:%d "
"encoding:%s serializedlength:%zu "
"lru:%d lru_seconds_idle:%llu%s",
(void*)val, static_cast<int>(val->refcount),
(void*)val, static_cast<int>(val->getrefcount(std::memory_order_relaxed)),
strenc, rdbSavedObjectLen(val),
val->lru, estimateObjectIdleTime(val)/1000, extra);
} else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) {
@ -639,8 +639,9 @@ NULL
stats = sdscat(stats,buf);
stats = sdscatprintf(stats,"[Expires HT]\n");
dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].expires);
stats = sdscat(stats,buf);
// TODO!
//dictGetStats(buf,sizeof(buf),server.db[dbid].expires);
//stats = sdscat(stats,buf);
addReplyBulkSds(c,stats);
} else if (!strcasecmp(szFromObj(c->argv[1]),"htstats-key") && c->argc == 3) {
@ -721,14 +722,14 @@ void _serverAssertPrintClientInfo(const client *c) {
arg = buf;
}
serverLog(LL_WARNING,"client->argv[%d] = \"%s\" (refcount: %d)",
j, arg, static_cast<int>(c->argv[j]->refcount));
j, arg, static_cast<int>(c->argv[j]->getrefcount(std::memory_order_relaxed)));
}
}
void serverLogObjectDebugInfo(robj_roptr o) {
serverLog(LL_WARNING,"Object type: %d", o->type);
serverLog(LL_WARNING,"Object encoding: %d", o->encoding);
serverLog(LL_WARNING,"Object refcount: %d", static_cast<int>(o->refcount));
serverLog(LL_WARNING,"Object refcount: %d", static_cast<int>(o->getrefcount(std::memory_order_relaxed)));
if (o->type == OBJ_STRING && sdsEncodedObject(o)) {
serverLog(LL_WARNING,"Object raw string len: %zu", sdslen(szFromObj(o)));
if (sdslen(szFromObj(o)) < 4096) {

View File

@ -48,6 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
/* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
void replaceSateliteOSetKeyPtr(semiorderedset<expireEntry, const char*> &set, sds oldkey, sds newkey);
/* Defrag helper for generic allocations.
*
@ -102,7 +103,7 @@ sds activeDefragSds(sds sdsptr) {
* and should NOT be accessed. */
robj *activeDefragStringOb(robj* ob, long *defragged) {
robj *ret = NULL;
if (ob->refcount!=1)
if (ob->getrefcount(std::memory_order_relaxed)!=1)
return NULL;
/* try to defrag robj (only if not an EMBSTR type (handled below). */
@ -406,6 +407,16 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
return NULL;
}
void replaceSateliteOSetKeyPtr(semiorderedset<expireEntry, const char*> &set, sds oldkey, sds newkey) {
auto itr = set.find(oldkey);
if (itr != set.end())
{
expireEntry eNew(newkey, itr->when());
set.erase(itr);
set.insert(eNew);
}
}
long activeDefragQuickListNodes(quicklist *ql) {
quicklistNode *node = ql->head, *newnode;
long defragged = 0;
@ -769,12 +780,8 @@ long defragKey(redisDb *db, dictEntry *de) {
newsds = activeDefragSds(keysds);
if (newsds)
defragged++, de->key = newsds;
if (dictSize(db->expires)) {
/* Dirty code:
* I can't search in db->expires for that key after i already released
* the pointer it holds it won't be able to do the string compare */
uint64_t hash = dictGetHash(db->pdict, de->key);
replaceSateliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged);
if (!db->setexpire->empty()) {
replaceSateliteOSetKeyPtr(*db->setexpire, keysds, newsds);
}
/* Try to defrag robj and / or string value. */

View File

@ -150,36 +150,9 @@ void evictionPoolAlloc(void) {
EvictionPoolLRU = ep;
}
/* This is an helper function for freeMemoryIfNeeded(), it is used in order
* to populate the evictionPool with a few entries every time we want to
* expire a key. Keys with idle time smaller than one of the current
* keys are added. Keys are always added if there are free entries.
*
* We insert keys on place in ascending order, so keys with the smaller
* idle time are on the left, and keys with the higher idle time on the
* right. */
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int j, k, count;
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
count = dictGetSomeKeys(sampledict,samples,g_pserver->maxmemory_samples);
for (j = 0; j < count; j++) {
void processEvictionCandidate(int dbid, sds key, robj *o, const expireEntry *e, struct evictionPoolEntry *pool)
{
unsigned long long idle;
sds key;
robj *o = nullptr;
dictEntry *de;
de = samples[j];
key = (sds)dictGetKey(de);
/* If the dictionary we are sampling from is not the main
* dictionary (but the expires one) we need to lookup the key
* again in the key dictionary to obtain the value object. */
if (g_pserver->maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
if (sampledict != keydict) de = dictFind(keydict, key);
o = (robj*)dictGetVal(de);
}
/* Calculate the idle time according to the policy. This is called
* idle just because the code initially handled LRU, but is in fact
@ -197,7 +170,7 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
idle = 255-LFUDecrAndReturn(o);
} else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
/* In this case the sooner the expire the better. */
idle = ULLONG_MAX - (long)dictGetVal(de);
idle = ULLONG_MAX - e->when();
} else {
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}
@ -205,14 +178,14 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
/* Insert the element inside the pool.
* First, find the first empty bucket or the first populated
* bucket that has an idle time smaller than our idle time. */
k = 0;
int k = 0;
while (k < EVPOOL_SIZE &&
pool[k].key &&
pool[k].idle < idle) k++;
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
/* Can't insert if the element is < the worst element we have
* and there are no empty buckets. */
continue;
return;
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
/* Inserting into empty position. No setup needed before insert. */
} else {
@ -253,6 +226,47 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
}
pool[k].idle = idle;
pool[k].dbid = dbid;
}
/* This is an helper function for freeMemoryIfNeeded(), it is used in order
* to populate the evictionPool with a few entries every time we want to
* expire a key. Keys with idle time smaller than one of the current
* keys are added. Keys are always added if there are free entries.
*
* We insert keys on place in ascending order, so keys with the smaller
* idle time are on the left, and keys with the higher idle time on the
* right. */
struct visitFunctor
{
int dbid;
dict *dbdict;
struct evictionPoolEntry *pool;
int count;
bool operator()(const expireEntry &e)
{
dictEntry *de = dictFind(dbdict, e.key());
processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool);
++count;
return count < g_pserver->maxmemory_samples;
}
};
void evictionPoolPopulate(int dbid, dict *dbdict, semiorderedset<expireEntry,const char*> *setexpire, struct evictionPoolEntry *pool)
{
if (setexpire != nullptr)
{
visitFunctor visitor { dbid, dbdict, pool, 0 };
setexpire->random_visit(visitor);
}
else
{
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
int count = dictGetSomeKeys(dbdict,samples,g_pserver->maxmemory_samples);
for (int j = 0; j < count; j++) {
robj *o = (robj*)dictGetVal(samples[j]);
processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool);
}
}
}
@ -474,8 +488,6 @@ int freeMemoryIfNeeded(void) {
sds bestkey = NULL;
int bestdbid;
redisDb *db;
dict *dict;
dictEntry *de;
if (g_pserver->maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
@ -490,10 +502,18 @@ int freeMemoryIfNeeded(void) {
* every DB. */
for (i = 0; i < cserver.dbnum; i++) {
db = g_pserver->db+i;
dict = (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->pdict : db->expires;
if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, dict, db->pdict, pool);
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)
{
if ((keys = dictSize(db->pdict)) != 0) {
evictionPoolPopulate(i, db->pdict, nullptr, pool);
total_keys += keys;
}
}
else
{
keys = db->setexpire->size();
if (keys != 0)
evictionPoolPopulate(i, db->pdict, db->setexpire, pool);
total_keys += keys;
}
}
@ -503,14 +523,11 @@ int freeMemoryIfNeeded(void) {
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
bestdbid = pool[k].dbid;
sds key = nullptr;
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(g_pserver->db[pool[k].dbid].pdict,
pool[k].key);
} else {
de = dictFind(g_pserver->db[pool[k].dbid].expires,
pool[k].key);
}
dictEntry *de = dictFind(g_pserver->db[pool[k].dbid].pdict,pool[k].key);
if (de != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || ((robj*)dictGetVal(de))->FExpires()))
key = (sds)dictGetKey(de);
/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
@ -520,8 +537,8 @@ int freeMemoryIfNeeded(void) {
/* If the key exists, is our pick. Otherwise it is
* a ghost and we need to try the next element. */
if (de) {
bestkey = (sds)dictGetKey(de);
if (key) {
bestkey = key;
break;
} else {
/* Ghost... Iterate again. */
@ -540,15 +557,25 @@ int freeMemoryIfNeeded(void) {
for (i = 0; i < cserver.dbnum; i++) {
j = (++next_db) % cserver.dbnum;
db = g_pserver->db+j;
dict = (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->pdict : db->expires;
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
{
if (dictSize(db->pdict) != 0) {
dictEntry *de = dictGetRandomKey(db->pdict);
bestkey = (sds)dictGetKey(de);
bestdbid = j;
break;
}
}
else
{
if (!db->setexpire->empty())
{
bestkey = (sds)db->setexpire->random_value().key();
bestdbid = j;
break;
}
}
}
}
/* Finally remove the selected key. */

View File

@ -51,10 +51,7 @@
*
* The parameter 'now' is the current time in milliseconds as is passed
* to the function to avoid too many gettimeofday() syscalls. */
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
long long t = dictGetSignedIntegerVal(de);
if (now > t) {
sds key = (sds)dictGetKey(de);
void activeExpireCycleExpire(redisDb *db, const char *key) {
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire);
@ -67,10 +64,6 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj);
decrRefCount(keyobj);
g_pserver->stat_expiredkeys++;
return 1;
} else {
return 0;
}
}
/* Try to expire a few timed out keys. The algorithm used is adaptive and
@ -148,7 +141,6 @@ void activeExpireCycle(int type) {
long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
int expired;
redisDb *db = g_pserver->db+(current_db % cserver.dbnum);
/* Increment the DB now so we are sure if we run out of time
@ -156,78 +148,44 @@ void activeExpireCycle(int type) {
* distribute the time evenly across DBs. */
current_db++;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
long long now;
iteration++;
/* If there is nothing to expire try next DB ASAP. */
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
slots = dictSlots(db->expires);
now = mstime();
/* When there are less than 1% filled slots getting random
* keys is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
ttl_sum = 0;
ttl_samples = 0;
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
while (num--) {
dictEntry *de;
long long ttl;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedIntegerVal(de)-now;
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl > 0) {
/* We want the average TTL of keys yet not expired. */
ttl_sum += ttl;
ttl_samples++;
}
total_sampled++;
}
total_expired += expired;
/* Update the average TTL stats for this database. */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
/* If there is nothing to expire try next DB ASAP. */
if (db->setexpire->empty())
{
// TODO: Compute db->avg_ttl somewhere... but probably not here
db->avg_ttl = 0;
continue;
}
size_t expired = 0;
size_t tried = 0;
db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](const expireEntry &e) __attribute__((always_inline)) {
if (e.when() < now)
{
activeExpireCycleExpire(db, e.key());
++expired;
}
++tried;
if ((tried % ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) == 0)
{
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
g_pserver->stat_expired_time_cap_reached_count++;
break;
return false;
}
}
/* We don't repeat the cycle if there are less than 25% of keys
* found expired in the current DB. */
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
return true;
});
total_expired += expired;
}
elapsed = ustime()-start;
@ -301,20 +259,27 @@ void expireSlaveKeys(void) {
while(dbids && dbid < cserver.dbnum) {
if ((dbids & 1) != 0) {
redisDb *db = g_pserver->db+dbid;
dictEntry *expire = dictFind(db->expires,keyname);
// the expire is hashed based on the key pointer, so we need the point in the main db
dictEntry *deMain = dictFind(db->pdict, keyname);
auto itr = db->setexpire->end();
if (deMain != nullptr)
itr = db->setexpire->find((sds)dictGetKey(deMain));
int expired = 0;
if (expire &&
activeExpireCycleTryExpire(g_pserver->db+dbid,expire,start))
if (itr != db->setexpire->end())
{
if (itr->when() < start) {
activeExpireCycleExpire(g_pserver->db+dbid,itr->key());
expired = 1;
}
}
/* If the key was not expired in this DB, we need to set the
* corresponding bit in the new bitmap we set as value.
* At the end of the loop if the bitmap is zero, it means we
* no longer need to keep track of this key. */
if (expire && !expired) {
if (itr != db->setexpire->end() && !expired) {
noexpire++;
new_dbids |= (uint64_t)1 << dbid;
}

View File

@ -52,16 +52,19 @@ size_t lazyfreeGetFreeEffort(robj *obj) {
* will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,ptrFromObj(key));
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictUnlink(db->pdict,ptrFromObj(key));
if (de) {
robj *val = (robj*)dictGetVal(de);
if (val->FExpires())
{
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
removeExpireCore(db,key,de);
}
size_t free_effort = lazyfreeGetFreeEffort(val);
/* If releasing the object is too much work, do it in the background
@ -72,7 +75,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
* objects, and then call dbDelete(). In this case we'll fall
* through and reach the dictFreeUnlinkedEntry() call, that will be
* equivalent to just calling decrRefCount(). */
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
if (free_effort > LAZYFREE_THRESHOLD && val->getrefcount(std::memory_order_relaxed) == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->pdict,de,NULL);
@ -93,7 +96,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
/* Free an object, if the object is huge enough, free it in async way. */
void freeObjAsync(robj *o) {
size_t free_effort = lazyfreeGetFreeEffort(o);
if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) {
if (free_effort > LAZYFREE_THRESHOLD && o->getrefcount(std::memory_order_relaxed) == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL);
} else {
@ -105,11 +108,13 @@ void freeObjAsync(robj *o) {
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->pdict, *oldht2 = db->expires;
dict *oldht1 = db->pdict;
auto *set = db->setexpire;
db->setexpire = new (MALLOC_LOCAL) semiorderedset<expireEntry, const char*>();
db->expireitr = db->setexpire->end();
db->pdict = dictCreate(&dbDictType,NULL);
db->expires = dictCreate(&keyptrDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1));
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,set);
}
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
@ -136,10 +141,10 @@ void lazyfreeFreeObjectFromBioThread(robj *o) {
* when the database was logically deleted. 'sl' is a skiplist used by
* Redis Cluster in order to take the hash slots -> keys mapping. This
* may be NULL if Redis Cluster is disabled. */
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset<expireEntry, const char *> *set) {
size_t numkeys = dictSize(ht1);
dictRelease(ht1);
dictRelease(ht2);
delete set;
atomicDecr(lazyfree_objects,numkeys);
}

View File

@ -566,7 +566,7 @@ void RedisModuleCommandDispatcher(client *c) {
for (int i = 0; i < c->argc; i++) {
/* Only do the work if the module took ownership of the object:
* in that case the refcount is no longer 1. */
if (c->argv[i]->refcount > 1)
if (c->argv[i]->getrefcount(std::memory_order_relaxed) > 1)
trimStringObjectIfNeeded(c->argv[i]);
}
}
@ -1037,7 +1037,7 @@ int RM_StringCompare(RedisModuleString *a, RedisModuleString *b) {
/* Return the (possibly modified in encoding) input 'str' object if
* the string is unshared, otherwise NULL is returned. */
RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) {
if (str->refcount != 1) {
if (str->getrefcount(std::memory_order_relaxed) != 1) {
serverLog(LL_WARNING,
"Module attempted to use an in-place string modify operation "
"with a string referenced multiple times. Please check the code "

View File

@ -39,11 +39,11 @@
/* ===================== Creation and parsing of objects ==================== */
robj *createObject(int type, void *ptr) {
robj *o = (robj*)zmalloc(sizeof(*o), MALLOC_SHARED);
robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED);
o->type = type;
o->encoding = OBJ_ENCODING_RAW;
o->m_ptr = ptr;
o->refcount.store(1, std::memory_order_relaxed);
o->setrefcount(1);
o->mvcc_tstamp = OBJ_MVCC_INVALID;
/* Set the LRU to the current lruclock (minutes resolution), or
@ -68,8 +68,9 @@ robj *createObject(int type, void *ptr) {
*
*/
robj *makeObjectShared(robj *o) {
serverAssert(o->refcount == 1);
o->refcount.store(OBJ_SHARED_REFCOUNT, std::memory_order_relaxed);
serverAssert(o->getrefcount(std::memory_order_relaxed) == 1);
serverAssert(!o->FExpires());
o->setrefcount(OBJ_SHARED_REFCOUNT);
return o;
}
@ -86,12 +87,12 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
size_t allocsize = sizeof(struct sdshdr8)+len+1;
if (allocsize < sizeof(void*))
allocsize = sizeof(void*);
robj *o = (robj*)zmalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED);
robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED);
struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr);
o->type = OBJ_STRING;
o->encoding = OBJ_ENCODING_EMBSTR;
o->refcount.store(1, std::memory_order_relaxed);
o->setrefcount(1);
o->mvcc_tstamp = OBJ_MVCC_INVALID;
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
@ -352,11 +353,14 @@ void freeStreamObject(robj_roptr o) {
}
void incrRefCount(robj_roptr o) {
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount.fetch_add(1, std::memory_order_acquire);
if (o->getrefcount(std::memory_order_relaxed) != OBJ_SHARED_REFCOUNT) o->addref();
}
void decrRefCount(robj_roptr o) {
if (o->refcount.load(std::memory_order_acquire) == 1) {
if (o->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
return;
unsigned prev = o->release();
if (prev == 1) {
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
@ -369,8 +373,7 @@ void decrRefCount(robj_roptr o) {
}
zfree(o.unsafe_robjcast());
} else {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount.fetch_sub(1, std::memory_order_acquire);
if (prev <= 0) serverPanic("decrRefCount against refcount <= 0");
}
}
@ -394,7 +397,7 @@ void decrRefCountVoid(const void *o) {
* decrRefCount(obj);
*/
robj *resetRefCount(robj *obj) {
obj->refcount = 0;
obj->setrefcount(0);
return obj;
}
@ -452,7 +455,7 @@ robj *tryObjectEncoding(robj *o) {
/* It's not safe to encode shared objects: shared objects can be shared
* everywhere in the "object space" of Redis and may end in places where
* they are not handled. We handle them only as values in the keyspace. */
if (o->refcount > 1) return o;
if (o->getrefcount(std::memory_order_relaxed) > 1) return o;
/* Check if we can represent this string as a long integer.
* Note that we are sure that a string larger than 20 chars is not
@ -1064,8 +1067,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mh->db[mh->num_dbs].overhead_ht_main = mem;
mem_total+=mem;
mem = dictSize(db->expires) * sizeof(dictEntry) +
dictSlots(db->expires) * sizeof(dictEntry*);
mem = db->setexpire->bytes_used();
mh->db[mh->num_dbs].overhead_ht_expires = mem;
mem_total+=mem;
@ -1275,7 +1277,7 @@ NULL
} else if (!strcasecmp(szFromObj(c->argv[1]),"refcount") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
addReplyLongLong(c,o->refcount);
addReplyLongLong(c,o->getrefcount(std::memory_order_relaxed));
} else if (!strcasecmp(szFromObj(c->argv[1]),"encoding") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
@ -1474,3 +1476,18 @@ NULL
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)ptrFromObj(c->argv[1]));
}
}
void redisObject::SetFExpires(bool fExpire)
{
serverAssert(this->refcount != OBJ_SHARED_REFCOUNT);
if (fExpire)
this->refcount.fetch_or(1U << 31, std::memory_order_relaxed);
else
this->refcount.fetch_and(~(1U << 31), std::memory_order_relaxed);
}
void redisObject::setrefcount(unsigned ref)
{
serverAssert(!FExpires());
refcount.store(ref, std::memory_order_relaxed);
}

View File

@ -1096,6 +1096,29 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
return 1;
}
int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o)
{
robj key;
long long expire;
initStaticStringObject(key,(char*)keystr);
expire = getExpire(db, &key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1)
return 0;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > *processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
*processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
return 1;
}
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
* is returned and part of the output, or all the output, can be
@ -1134,31 +1157,24 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
* these sizes are just hints to resize the hash tables. */
uint64_t db_size, expires_size;
db_size = dictSize(db->pdict);
expires_size = dictSize(db->expires);
expires_size = db->setexpire->size();
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
/* Iterate this DB writing every entry */
size_t ckeysExpired = 0;
while((de = dictNext(di)) != NULL) {
sds keystr = (sds)dictGetKey(de);
robj key, *o = (robj*)dictGetVal(de);
long long expire;
robj *o = (robj*)dictGetVal(de);
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
if (o->FExpires())
++ckeysExpired;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
if (!saveKey(rdb, db, flags, &processed, keystr, o))
goto werr;
}
serverAssert(ckeysExpired == db->setexpire->size());
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
@ -1822,6 +1838,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
}
o->mvcc_tstamp = mvcc_tstamp;
serverAssert(!o->FExpires());
return o;
}
@ -1965,7 +1982,6 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
dictExpand(db->pdict,db_size);
dictExpand(db->expires,expires_size);
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB
@ -2079,7 +2095,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
if (fInserted)
{
/* Set the expire time if needed */
if (expiretime != -1) setExpire(NULL,db,key,expiretime);
if (expiretime != -1)
setExpire(NULL,db,key,expiretime);
/* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
@ -2101,6 +2118,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
lfu_freq = -1;
lru_idle = -1;
}
/* Verify the checksum if RDB version is >= 5 */
if (rdbver >= 5) {
uint64_t cksum, expected = rdb->cksum;

View File

@ -665,7 +665,7 @@ cleanup:
* The object must be small, SDS-encoded, and with refcount = 1
* (we must be the only owner) for us to cache it. */
if (j < LUA_CMD_OBJCACHE_SIZE &&
o->refcount == 1 &&
o->getrefcount(std::memory_order_relaxed) == 1 &&
(o->encoding == OBJ_ENCODING_RAW ||
o->encoding == OBJ_ENCODING_EMBSTR) &&
sdslen((sds)ptrFromObj(o)) <= LUA_CMD_OBJCACHE_MAX_LEN)

View File

@ -1428,8 +1428,6 @@ int htNeedsResize(dict *dict) {
void tryResizeHashTables(int dbid) {
if (htNeedsResize(g_pserver->db[dbid].pdict))
dictResize(g_pserver->db[dbid].pdict);
if (htNeedsResize(g_pserver->db[dbid].expires))
dictResize(g_pserver->db[dbid].expires);
}
/* Our hash table implementation performs rehashing incrementally while
@ -1445,11 +1443,6 @@ int incrementallyRehash(int dbid) {
dictRehashMilliseconds(g_pserver->db[dbid].pdict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(g_pserver->db[dbid].expires)) {
dictRehashMilliseconds(g_pserver->db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
@ -1889,7 +1882,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
size = dictSlots(g_pserver->db[j].pdict);
used = dictSize(g_pserver->db[j].pdict);
vkeys = dictSize(g_pserver->db[j].expires);
vkeys = g_pserver->db[j].setexpire->size();
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(g_pserver->dict); */
@ -2926,7 +2919,8 @@ void initServer(void) {
/* Create the Redis databases, and initialize other internal state. */
for (int j = 0; j < cserver.dbnum; j++) {
g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL);
g_pserver->db[j].expires = dictCreate(&keyptrDictType,NULL);
g_pserver->db[j].setexpire = new(MALLOC_LOCAL) semiorderedset<expireEntry, const char*>;
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL);
@ -4571,7 +4565,7 @@ sds genRedisInfoString(const char *section) {
long long keys, vkeys;
keys = dictSize(g_pserver->db[j].pdict);
vkeys = dictSize(g_pserver->db[j].expires);
vkeys = g_pserver->db[j].setexpire->size();
if (keys || vkeys) {
info = sdscatprintf(info,
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",

View File

@ -81,6 +81,7 @@ typedef long long mstime_t; /* millisecond time type. */
N-elements flat arrays */
#include "rax.h" /* Radix tree */
#include "uuid.h"
#include "semiorderedset.h"
/* Following includes allow test functions to be called from Redis main() */
#include "zipmap.h"
@ -243,7 +244,7 @@ public:
#define CONFIG_DEFAULT_ACTIVE_REPLICA 0
#define CONFIG_DEFAULT_ENABLE_MULTIMASTER 0
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 64 /* Loopkups per loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
@ -717,7 +718,7 @@ typedef struct RedisModuleDigest {
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
#define OBJ_SHARED_REFCOUNT INT_MAX
#define OBJ_SHARED_REFCOUNT (0x7FFFFFFF)
#define OBJ_MVCC_INVALID (0xFFFFFFFFFFFFFFFFULL)
typedef struct redisObject {
@ -726,10 +727,21 @@ typedef struct redisObject {
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
mutable std::atomic<int> refcount;
private:
mutable std::atomic<unsigned> refcount;
public:
uint64_t mvcc_tstamp;
void *m_ptr;
inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; }
void SetFExpires(bool fExpires);
void setrefcount(unsigned ref);
unsigned getrefcount(std::memory_order order) const { return (refcount.load(order) & ~(1U << 31)); }
void addref() const { refcount.fetch_add(1, std::memory_order_acq_rel); }
unsigned release() const { return refcount.fetch_sub(1, std::memory_order_acq_rel) & ~(1U << 31); }
} robj;
static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase");
__attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o)
{
@ -755,6 +767,38 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o)
return (char*)ptrFromObj(o);
}
class expireEntry {
sds m_key;
long long m_when;
public:
expireEntry(sds key, long long when)
{
m_key = key;
m_when = when;
}
bool operator!=(const expireEntry &e) const noexcept
{
return m_when != e.m_when || m_key != e.m_key;
}
bool operator==(const expireEntry &e) const noexcept
{
return m_when == e.m_when && m_key == e.m_key;
}
bool operator==(const char *key) const noexcept { return m_key == key; }
bool operator<(const expireEntry &e) const noexcept { return m_when < e.m_when; }
bool operator<(const char *key) const noexcept { return m_key < key; }
bool operator<(long long when) const noexcept { return m_when < when; }
const char *key() const noexcept { return m_key; }
long long when() const noexcept { return m_when; }
explicit operator const char*() const noexcept { return m_key; }
explicit operator long long() const noexcept { return m_when; }
};
/* The a string name for an object's type as listed above
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
@ -766,7 +810,7 @@ const char *getObjectTypeName(robj_roptr o);
* we'll update it when the structure is changed, to avoid bugs like
* bug #85 introduced exactly in this way. */
#define initStaticStringObject(_var,_ptr) do { \
_var.refcount = 1; \
_var.setrefcount(1); \
_var.type = OBJ_STRING; \
_var.encoding = OBJ_ENCODING_RAW; \
_var.m_ptr = _ptr; \
@ -793,7 +837,9 @@ typedef struct clientReplyBlock {
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *pdict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
semiorderedset<expireEntry, const char*> *setexpire;
semiorderedset<expireEntry, const char*>::setiter expireitr;
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
@ -2174,6 +2220,7 @@ int rewriteConfig(char *path);
/* db.c -- Keyspace access API */
int removeExpire(redisDb *db, robj *key);
int removeExpireCore(redisDb *db, robj *key, dictEntry *de);
void propagateExpire(redisDb *db, robj *key, int lazy);
int expireIfNeeded(redisDb *db, robj *key);
long long getExpire(redisDb *db, robj_roptr key);

View File

@ -72,7 +72,7 @@ slowlogEntry *slowlogCreateEntry(client *c, robj **argv, int argc, long long dur
(unsigned long)
sdslen(szFromObj(argv[j])) - SLOWLOG_ENTRY_MAX_STRING);
se->argv[j] = createObject(OBJ_STRING,s);
} else if (argv[j]->refcount == OBJ_SHARED_REFCOUNT) {
} else if (argv[j]->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) {
se->argv[j] = argv[j];
} else {
/* Here we need to dupliacate the string objects composing the

View File

@ -353,7 +353,7 @@ void incrDecrCommand(client *c, long long incr) {
}
value += incr;
if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
if (o && o->getrefcount(std::memory_order_relaxed) == 1 && o->encoding == OBJ_ENCODING_INT &&
(value < 0 || value >= OBJ_SHARED_INTEGERS) &&
value >= LONG_MIN && value <= LONG_MAX)
{