From a060bc79427f544d4748222c45beece1c0370f39 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 5 Jul 2019 23:49:09 -0400 Subject: [PATCH] New expire datastructure and algorithm. Allows us to expire in sublinear time Former-commit-id: 3880d2616c882e19169180dc10268564347b0279 --- .vscode/settings.json | 3 +- src/bio.cpp | 4 +- src/db.cpp | 156 +++++++++++++++++++------- src/debug.cpp | 11 +- src/defrag.cpp | 21 ++-- src/evict.cpp | 255 +++++++++++++++++++++++------------------- src/expire.cpp | 141 +++++++++-------------- src/lazyfree.cpp | 27 +++-- src/module.cpp | 4 +- src/object.cpp | 47 +++++--- src/rdb.cpp | 58 ++++++---- src/scripting.cpp | 2 +- src/server.cpp | 14 +-- src/server.h | 57 +++++++++- src/slowlog.cpp | 2 +- src/t_string.cpp | 2 +- 16 files changed, 479 insertions(+), 325 deletions(-) diff --git a/.vscode/settings.json b/.vscode/settings.json index 56bf76d11..e4d7c4c9a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -51,6 +51,7 @@ "tuple": "cpp", "type_traits": "cpp", "typeinfo": "cpp", - "utility": "cpp" + "utility": "cpp", + "set": "cpp" } } diff --git a/src/bio.cpp b/src/bio.cpp index 62f6615a6..844464e77 100644 --- a/src/bio.cpp +++ b/src/bio.cpp @@ -85,7 +85,7 @@ struct bio_job { void *bioProcessBackgroundJobs(void *arg); void lazyfreeFreeObjectFromBioThread(robj *o); -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2); +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset *set); void lazyfreeFreeSlotsMapFromBioThread(rax *rt); /* Make sure we have enough stack to perform all the things we do in the @@ -196,7 +196,7 @@ void *bioProcessBackgroundJobs(void *arg) { if (job->arg1) lazyfreeFreeObjectFromBioThread((robj*)job->arg1); else if (job->arg2 && job->arg3) - lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(dict*)job->arg3); + lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(semiorderedset*)job->arg3); else if (job->arg3) lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3); } else { diff --git a/src/db.cpp b/src/db.cpp index 114b84297..d1a687712 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -39,6 +39,8 @@ *----------------------------------------------------------------------------*/ int keyIsExpired(redisDb *db, robj *key); +int expireIfNeeded(redisDb *db, robj *key, robj *o); +void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. @@ -49,6 +51,20 @@ void updateLFU(robj *val) { val->lru = (LFUGetTimeInMinutes()<<8) | counter; } +void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew) +{ + serverAssert(valOld->FExpires()); + serverAssert(!valNew->FExpires()); + + auto itr = db->setexpire->find(key); + serverAssert(itr != db->setexpire->end()); + + valNew->SetFExpires(true); + valOld->SetFExpires(false); + return; +} + + /* Low level key lookup API, not actually called directly from commands * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ @@ -160,8 +176,10 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) { * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ robj *lookupKeyWrite(redisDb *db, robj *key) { - expireIfNeeded(db,key); - return lookupKey(db,key,LOOKUP_UPDATEMVCC); + robj *o = lookupKey(db,key,LOOKUP_UPDATEMVCC); + if (expireIfNeeded(db,key)) + o = NULL; + return o; } robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) { @@ -177,6 +195,7 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { } int dbAddCore(redisDb *db, robj *key, robj *val) { + serverAssert(!val->FExpires()); sds copy = sdsdup(szFromObj(key)); int retval = dictAdd(db->pdict, copy, val); val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); @@ -206,10 +225,18 @@ void dbAdd(redisDb *db, robj *key, robj *val) serverAssertWithInfo(NULL,key,retval == DICT_OK); } -void dbOverwriteCore(redisDb *db, dictEntry *de, robj *val, bool fUpdateMvcc) +void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire) { dictEntry auxentry = *de; robj *old = (robj*)dictGetVal(de); + + if (old->FExpires()) { + if (fRemoveExpire) + removeExpire(db, key); + else + updateExpire(db, (sds)dictGetKey(de), old, val); + } + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { val->lru = old->lru; } @@ -235,7 +262,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,de != NULL); - dbOverwriteCore(db, de, val, true); + dbOverwriteCore(db, de, key, val, true, false); } /* Insert a key, handling duplicate keys according to fReplace */ @@ -250,7 +277,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) robj *old = (robj*)dictGetVal(de); if (old->mvcc_tstamp <= val->mvcc_tstamp) { - dbOverwriteCore(db, de, val, false); + dbOverwriteCore(db, de, key, val, false, true); return true; } @@ -271,13 +298,13 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) * * All the new keys in the database should be created via this interface. */ void setKey(redisDb *db, robj *key, robj *val) { - if (lookupKeyWrite(db,key) == NULL) { + dictEntry *de = dictFind(db->pdict, ptrFromObj(key)); + if (de == NULL) { dbAdd(db,key,val); } else { - dbOverwrite(db,key,val); + dbOverwriteCore(db,de,key,val,true,true); } incrRefCount(val); - removeExpire(db,key); signalModifiedKey(db,key); } @@ -292,7 +319,7 @@ int dbExists(redisDb *db, robj *key) { robj *dbRandomKey(redisDb *db) { dictEntry *de; int maxtries = 100; - int allvolatile = dictSize(db->pdict) == dictSize(db->expires); + int allvolatile = dictSize(db->pdict) == db->setexpire->size(); while(1) { sds key; @@ -303,23 +330,30 @@ robj *dbRandomKey(redisDb *db) { key = (sds)dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); - if (dictFind(db->expires,key)) { + + if (((robj*)dictGetVal(de))->FExpires()) + { if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, - * it could happen that all the keys are already logically - * expired in the slave, so the function cannot stop because - * expireIfNeeded() is false, nor it can stop because - * dictGetRandomKey() returns NULL (there are keys to return). - * To prevent the infinite loop we do some tries, but if there - * are the conditions for an infinite loop, eventually we - * return a key name that may be already expired. */ + * it could happen that all the keys are already logically + * expired in the slave, so the function cannot stop because + * expireIfNeeded() is false, nor it can stop because + * dictGetRandomKey() returns NULL (there are keys to return). + * To prevent the infinite loop we do some tries, but if there + * are the conditions for an infinite loop, eventually we + * return a key name that may be already expired. */ return keyobj; } - if (expireIfNeeded(db,keyobj)) { + } + + if (((robj*)dictGetVal(de))->FExpires()) + { + if (expireIfNeeded(db,keyobj)) { decrRefCount(keyobj); continue; /* search for another key. This expired. */ - } + } } + return keyobj; } } @@ -328,7 +362,10 @@ robj *dbRandomKey(redisDb *db) { int dbSyncDelete(redisDb *db, robj *key) { /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires) > 0) dictDelete(db->expires,ptrFromObj(key)); + + dictEntry *de = dictFind(db->pdict, szFromObj(key)); + if (de != nullptr && ((robj*)dictGetVal(de))->FExpires()) + removeExpireCore(db, key, de); if (dictDelete(db->pdict,ptrFromObj(key)) == DICT_OK) { if (g_pserver->cluster_enabled) slotToKeyDel(key); return 1; @@ -373,7 +410,7 @@ int dbDelete(redisDb *db, robj *key) { */ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { serverAssert(o->type == OBJ_STRING); - if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { + if (o->getrefcount(std::memory_order_relaxed) != 1 || o->encoding != OBJ_ENCODING_RAW) { robj *decoded = getDecodedObject(o); o = createRawStringObject(szFromObj(decoded), sdslen(szFromObj(decoded))); decrRefCount(decoded); @@ -419,7 +456,8 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { emptyDbAsync(&g_pserver->db[j]); } else { dictEmpty(g_pserver->db[j].pdict,callback); - dictEmpty(g_pserver->db[j].expires,callback); + delete g_pserver->db[j].setexpire; + g_pserver->db[j].setexpire = new (MALLOC_LOCAL) semiorderedset(); } } if (g_pserver->cluster_enabled) { @@ -964,9 +1002,10 @@ void renameGenericCommand(client *c, int nx) { * with the same name. */ dbDelete(c->db,c->argv[2]); } - dbAdd(c->db,c->argv[2],o); - if (expire != -1) setExpire(c,c->db,c->argv[2],expire); dbDelete(c->db,c->argv[1]); + dbAdd(c->db,c->argv[2],o); + if (expire != -1) + setExpire(c,c->db,c->argv[2],expire); signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", @@ -1024,6 +1063,12 @@ void moveCommand(client *c) { return; } expire = getExpire(c->db,c->argv[1]); + if (o->FExpires()) + removeExpire(c->db,c->argv[1]); + serverAssert(!o->FExpires()); + incrRefCount(o); + dbDelete(src,c->argv[1]); + g_pserver->dirty++; /* Return zero if the key already exists in the target DB */ if (lookupKeyWrite(dst,c->argv[1]) != NULL) { @@ -1032,11 +1077,7 @@ void moveCommand(client *c) { } dbAdd(dst,c->argv[1],o); if (expire != -1) setExpire(c,dst,c->argv[1],expire); - incrRefCount(o); - /* OK! key moved, free the entry in the source DB */ - dbDelete(src,c->argv[1]); - g_pserver->dirty++; addReply(c,shared.cone); } @@ -1077,11 +1118,11 @@ int dbSwapDatabases(int id1, int id2) { * ready_keys and watched_keys, since we want clients to * remain in the same DB they were. */ db1->pdict = db2->pdict; - db1->expires = db2->expires; + db1->setexpire = db2->setexpire; db1->avg_ttl = db2->avg_ttl; db2->pdict = aux.pdict; - db2->expires = aux.expires; + db2->setexpire = aux.setexpire; db2->avg_ttl = aux.avg_ttl; /* Now we need to handle clients blocked on lists: as an effect @@ -1130,12 +1171,25 @@ void swapdbCommand(client *c) { /*----------------------------------------------------------------------------- * Expires API *----------------------------------------------------------------------------*/ - int removeExpire(redisDb *db, robj *key) { + dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); + return removeExpireCore(db, key, de); +} +int removeExpireCore(redisDb *db, robj *key, dictEntry *de) { /* An expire may only be removed if there is a corresponding entry in the * main dict. Otherwise, the key will never be freed. */ - serverAssertWithInfo(NULL,key,dictFind(db->pdict,ptrFromObj(key)) != NULL); - return dictDelete(db->expires,ptrFromObj(key)) == DICT_OK; + serverAssertWithInfo(NULL,key,de != NULL); + + robj *val = (robj*)dictGetVal(de); + if (!val->FExpires()) + return 0; + + auto itr = db->setexpire->find((sds)dictGetKey(de)); + serverAssert(itr != db->setexpire->end()); + serverAssert(itr->key() == (sds)dictGetKey(de)); + db->setexpire->erase(itr); + val->SetFExpires(false); + return 1; } /* Set an expire to the specified key. If the expire is set in the context @@ -1143,14 +1197,27 @@ int removeExpire(redisDb *db, robj *key) { * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ void setExpire(client *c, redisDb *db, robj *key, long long when) { - dictEntry *kde, *de; + dictEntry *kde; + serverAssert(GlobalLocksAcquired()); /* Reuse the sds from the main dict in the expire dict */ kde = dictFind(db->pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,kde != NULL); - de = dictAddOrFind(db->expires,dictGetKey(kde)); - dictSetSignedIntegerVal(de,when); + + if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + { + // shared objects cannot have the expire bit set, create a real object + dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); + } + + if (((robj*)dictGetVal(kde))->FExpires()) + removeExpire(db, key); // should we optimize for when this is called with an already set expiry? + + expireEntry e((sds)dictGetKey(kde), when); + ((robj*)dictGetVal(kde))->SetFExpires(true); + + db->setexpire->insert(e); int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) @@ -1163,13 +1230,18 @@ long long getExpire(redisDb *db, robj_roptr key) { dictEntry *de; /* No expire? return ASAP */ - if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,ptrFromObj(key))) == NULL) return -1; + if (db->setexpire->size() == 0) + return -1; - /* The entry was found in the expire dict, this means it should also - * be present in the main dict (safety check). */ - serverAssertWithInfo(NULL,key,dictFind(db->pdict,ptrFromObj(key)) != NULL); - return dictGetSignedIntegerVal(de); + de = dictFind(db->pdict, ptrFromObj(key)); + if (de == NULL) + return -1; + robj *obj = (robj*)dictGetVal(de); + if (!obj->FExpires()) + return -1; + + auto itr = db->setexpire->find((sds)dictGetKey(de)); + return itr->when(); } /* Propagate expires into slaves and the AOF file. diff --git a/src/debug.cpp b/src/debug.cpp index 3485df967..c02eba225 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -436,7 +436,7 @@ NULL "Value at:%p refcount:%d " "encoding:%s serializedlength:%zu " "lru:%d lru_seconds_idle:%llu%s", - (void*)val, static_cast(val->refcount), + (void*)val, static_cast(val->getrefcount(std::memory_order_relaxed)), strenc, rdbSavedObjectLen(val), val->lru, estimateObjectIdleTime(val)/1000, extra); } else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) { @@ -639,8 +639,9 @@ NULL stats = sdscat(stats,buf); stats = sdscatprintf(stats,"[Expires HT]\n"); - dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].expires); - stats = sdscat(stats,buf); + // TODO! + //dictGetStats(buf,sizeof(buf),server.db[dbid].expires); + //stats = sdscat(stats,buf); addReplyBulkSds(c,stats); } else if (!strcasecmp(szFromObj(c->argv[1]),"htstats-key") && c->argc == 3) { @@ -721,14 +722,14 @@ void _serverAssertPrintClientInfo(const client *c) { arg = buf; } serverLog(LL_WARNING,"client->argv[%d] = \"%s\" (refcount: %d)", - j, arg, static_cast(c->argv[j]->refcount)); + j, arg, static_cast(c->argv[j]->getrefcount(std::memory_order_relaxed))); } } void serverLogObjectDebugInfo(robj_roptr o) { serverLog(LL_WARNING,"Object type: %d", o->type); serverLog(LL_WARNING,"Object encoding: %d", o->encoding); - serverLog(LL_WARNING,"Object refcount: %d", static_cast(o->refcount)); + serverLog(LL_WARNING,"Object refcount: %d", static_cast(o->getrefcount(std::memory_order_relaxed))); if (o->type == OBJ_STRING && sdsEncodedObject(o)) { serverLog(LL_WARNING,"Object raw string len: %zu", sdslen(szFromObj(o))); if (sdslen(szFromObj(o)) < 4096) { diff --git a/src/defrag.cpp b/src/defrag.cpp index 2e9abd290..b11564c4b 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -48,6 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); +void replaceSateliteOSetKeyPtr(semiorderedset &set, sds oldkey, sds newkey); /* Defrag helper for generic allocations. * @@ -102,7 +103,7 @@ sds activeDefragSds(sds sdsptr) { * and should NOT be accessed. */ robj *activeDefragStringOb(robj* ob, long *defragged) { robj *ret = NULL; - if (ob->refcount!=1) + if (ob->getrefcount(std::memory_order_relaxed)!=1) return NULL; /* try to defrag robj (only if not an EMBSTR type (handled below). */ @@ -406,6 +407,16 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } +void replaceSateliteOSetKeyPtr(semiorderedset &set, sds oldkey, sds newkey) { + auto itr = set.find(oldkey); + if (itr != set.end()) + { + expireEntry eNew(newkey, itr->when()); + set.erase(itr); + set.insert(eNew); + } +} + long activeDefragQuickListNodes(quicklist *ql) { quicklistNode *node = ql->head, *newnode; long defragged = 0; @@ -769,12 +780,8 @@ long defragKey(redisDb *db, dictEntry *de) { newsds = activeDefragSds(keysds); if (newsds) defragged++, de->key = newsds; - if (dictSize(db->expires)) { - /* Dirty code: - * I can't search in db->expires for that key after i already released - * the pointer it holds it won't be able to do the string compare */ - uint64_t hash = dictGetHash(db->pdict, de->key); - replaceSateliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged); + if (!db->setexpire->empty()) { + replaceSateliteOSetKeyPtr(*db->setexpire, keysds, newsds); } /* Try to defrag robj and / or string value. */ diff --git a/src/evict.cpp b/src/evict.cpp index 4be6bf761..4acdb5ad0 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -150,6 +150,84 @@ void evictionPoolAlloc(void) { EvictionPoolLRU = ep; } +void processEvictionCandidate(int dbid, sds key, robj *o, const expireEntry *e, struct evictionPoolEntry *pool) +{ + unsigned long long idle; + + /* Calculate the idle time according to the policy. This is called + * idle just because the code initially handled LRU, but is in fact + * just a score where an higher score means better candidate. */ + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU) { + idle = (o != nullptr) ? estimateObjectIdleTime(o) : 0; + } else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { + /* When we use an LRU policy, we sort the keys by idle time + * so that we expire keys starting from greater idle time. + * However when the policy is an LFU one, we have a frequency + * estimation, and we want to evict keys with lower frequency + * first. So inside the pool we put objects using the inverted + * frequency subtracting the actual frequency to the maximum + * frequency of 255. */ + idle = 255-LFUDecrAndReturn(o); + } else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { + /* In this case the sooner the expire the better. */ + idle = ULLONG_MAX - e->when(); + } else { + serverPanic("Unknown eviction policy in evictionPoolPopulate()"); + } + + /* Insert the element inside the pool. + * First, find the first empty bucket or the first populated + * bucket that has an idle time smaller than our idle time. */ + int k = 0; + while (k < EVPOOL_SIZE && + pool[k].key && + pool[k].idle < idle) k++; + if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) { + /* Can't insert if the element is < the worst element we have + * and there are no empty buckets. */ + return; + } else if (k < EVPOOL_SIZE && pool[k].key == NULL) { + /* Inserting into empty position. No setup needed before insert. */ + } else { + /* Inserting in the middle. Now k points to the first element + * greater than the element to insert. */ + if (pool[EVPOOL_SIZE-1].key == NULL) { + /* Free space on the right? Insert at k shifting + * all the elements from k to end to the right. */ + + /* Save SDS before overwriting. */ + sds cached = pool[EVPOOL_SIZE-1].cached; + memmove(pool+k+1,pool+k, + sizeof(pool[0])*(EVPOOL_SIZE-k-1)); + pool[k].cached = cached; + } else { + /* No free space on right? Insert at k-1 */ + k--; + /* Shift all elements on the left of k (included) to the + * left, so we discard the element with smaller idle time. */ + sds cached = pool[0].cached; /* Save SDS before overwriting. */ + if (pool[0].key != pool[0].cached) sdsfree(pool[0].key); + memmove(pool,pool+1,sizeof(pool[0])*k); + pool[k].cached = cached; + } + } + + /* Try to reuse the cached SDS string allocated in the pool entry, + * because allocating and deallocating this object is costly + * (according to the profiler, not my fantasy. Remember: + * premature optimizbla bla bla bla. */ + int klen = sdslen(key); + if (klen > EVPOOL_CACHED_SDS_SIZE) { + pool[k].key = sdsdup(key); + } else { + memcpy(pool[k].cached,key,klen+1); + sdssetlen(pool[k].cached,klen); + pool[k].key = pool[k].cached; + } + pool[k].idle = idle; + pool[k].dbid = dbid; +} + /* This is an helper function for freeMemoryIfNeeded(), it is used in order * to populate the evictionPool with a few entries every time we want to * expire a key. Keys with idle time smaller than one of the current @@ -159,100 +237,36 @@ void evictionPoolAlloc(void) { * idle time are on the left, and keys with the higher idle time on the * right. */ -void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { - int j, k, count; - dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); +struct visitFunctor +{ + int dbid; + dict *dbdict; + struct evictionPoolEntry *pool; + int count; - count = dictGetSomeKeys(sampledict,samples,g_pserver->maxmemory_samples); - for (j = 0; j < count; j++) { - unsigned long long idle; - sds key; - robj *o = nullptr; - dictEntry *de; - - de = samples[j]; - key = (sds)dictGetKey(de); - - /* If the dictionary we are sampling from is not the main - * dictionary (but the expires one) we need to lookup the key - * again in the key dictionary to obtain the value object. */ - if (g_pserver->maxmemory_policy != MAXMEMORY_VOLATILE_TTL) { - if (sampledict != keydict) de = dictFind(keydict, key); - o = (robj*)dictGetVal(de); + bool operator()(const expireEntry &e) + { + dictEntry *de = dictFind(dbdict, e.key()); + processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool); + ++count; + return count < g_pserver->maxmemory_samples; + } +}; +void evictionPoolPopulate(int dbid, dict *dbdict, semiorderedset *setexpire, struct evictionPoolEntry *pool) +{ + if (setexpire != nullptr) + { + visitFunctor visitor { dbid, dbdict, pool, 0 }; + setexpire->random_visit(visitor); + } + else + { + dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); + int count = dictGetSomeKeys(dbdict,samples,g_pserver->maxmemory_samples); + for (int j = 0; j < count; j++) { + robj *o = (robj*)dictGetVal(samples[j]); + processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); } - - /* Calculate the idle time according to the policy. This is called - * idle just because the code initially handled LRU, but is in fact - * just a score where an higher score means better candidate. */ - if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU) { - idle = (o != nullptr) ? estimateObjectIdleTime(o) : 0; - } else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { - /* When we use an LRU policy, we sort the keys by idle time - * so that we expire keys starting from greater idle time. - * However when the policy is an LFU one, we have a frequency - * estimation, and we want to evict keys with lower frequency - * first. So inside the pool we put objects using the inverted - * frequency subtracting the actual frequency to the maximum - * frequency of 255. */ - idle = 255-LFUDecrAndReturn(o); - } else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { - /* In this case the sooner the expire the better. */ - idle = ULLONG_MAX - (long)dictGetVal(de); - } else { - serverPanic("Unknown eviction policy in evictionPoolPopulate()"); - } - - /* Insert the element inside the pool. - * First, find the first empty bucket or the first populated - * bucket that has an idle time smaller than our idle time. */ - k = 0; - while (k < EVPOOL_SIZE && - pool[k].key && - pool[k].idle < idle) k++; - if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) { - /* Can't insert if the element is < the worst element we have - * and there are no empty buckets. */ - continue; - } else if (k < EVPOOL_SIZE && pool[k].key == NULL) { - /* Inserting into empty position. No setup needed before insert. */ - } else { - /* Inserting in the middle. Now k points to the first element - * greater than the element to insert. */ - if (pool[EVPOOL_SIZE-1].key == NULL) { - /* Free space on the right? Insert at k shifting - * all the elements from k to end to the right. */ - - /* Save SDS before overwriting. */ - sds cached = pool[EVPOOL_SIZE-1].cached; - memmove(pool+k+1,pool+k, - sizeof(pool[0])*(EVPOOL_SIZE-k-1)); - pool[k].cached = cached; - } else { - /* No free space on right? Insert at k-1 */ - k--; - /* Shift all elements on the left of k (included) to the - * left, so we discard the element with smaller idle time. */ - sds cached = pool[0].cached; /* Save SDS before overwriting. */ - if (pool[0].key != pool[0].cached) sdsfree(pool[0].key); - memmove(pool,pool+1,sizeof(pool[0])*k); - pool[k].cached = cached; - } - } - - /* Try to reuse the cached SDS string allocated in the pool entry, - * because allocating and deallocating this object is costly - * (according to the profiler, not my fantasy. Remember: - * premature optimizbla bla bla bla. */ - int klen = sdslen(key); - if (klen > EVPOOL_CACHED_SDS_SIZE) { - pool[k].key = sdsdup(key); - } else { - memcpy(pool[k].cached,key,klen+1); - sdssetlen(pool[k].cached,klen); - pool[k].key = pool[k].cached; - } - pool[k].idle = idle; - pool[k].dbid = dbid; } } @@ -474,8 +488,6 @@ int freeMemoryIfNeeded(void) { sds bestkey = NULL; int bestdbid; redisDb *db; - dict *dict; - dictEntry *de; if (g_pserver->maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) @@ -490,10 +502,18 @@ int freeMemoryIfNeeded(void) { * every DB. */ for (i = 0; i < cserver.dbnum; i++) { db = g_pserver->db+i; - dict = (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ? - db->pdict : db->expires; - if ((keys = dictSize(dict)) != 0) { - evictionPoolPopulate(i, dict, db->pdict, pool); + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) + { + if ((keys = dictSize(db->pdict)) != 0) { + evictionPoolPopulate(i, db->pdict, nullptr, pool); + total_keys += keys; + } + } + else + { + keys = db->setexpire->size(); + if (keys != 0) + evictionPoolPopulate(i, db->pdict, db->setexpire, pool); total_keys += keys; } } @@ -503,14 +523,11 @@ int freeMemoryIfNeeded(void) { for (k = EVPOOL_SIZE-1; k >= 0; k--) { if (pool[k].key == NULL) continue; bestdbid = pool[k].dbid; + sds key = nullptr; - if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { - de = dictFind(g_pserver->db[pool[k].dbid].pdict, - pool[k].key); - } else { - de = dictFind(g_pserver->db[pool[k].dbid].expires, - pool[k].key); - } + dictEntry *de = dictFind(g_pserver->db[pool[k].dbid].pdict,pool[k].key); + if (de != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || ((robj*)dictGetVal(de))->FExpires())) + key = (sds)dictGetKey(de); /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) @@ -520,8 +537,8 @@ int freeMemoryIfNeeded(void) { /* If the key exists, is our pick. Otherwise it is * a ghost and we need to try the next element. */ - if (de) { - bestkey = (sds)dictGetKey(de); + if (key) { + bestkey = key; break; } else { /* Ghost... Iterate again. */ @@ -540,13 +557,23 @@ int freeMemoryIfNeeded(void) { for (i = 0; i < cserver.dbnum; i++) { j = (++next_db) % cserver.dbnum; db = g_pserver->db+j; - dict = (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? - db->pdict : db->expires; - if (dictSize(dict) != 0) { - de = dictGetRandomKey(dict); - bestkey = (sds)dictGetKey(de); - bestdbid = j; - break; + if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) + { + if (dictSize(db->pdict) != 0) { + dictEntry *de = dictGetRandomKey(db->pdict); + bestkey = (sds)dictGetKey(de); + bestdbid = j; + break; + } + } + else + { + if (!db->setexpire->empty()) + { + bestkey = (sds)db->setexpire->random_value().key(); + bestdbid = j; + break; + } } } } diff --git a/src/expire.cpp b/src/expire.cpp index 5a0abbb06..55ea83411 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -51,26 +51,19 @@ * * The parameter 'now' is the current time in milliseconds as is passed * to the function to avoid too many gettimeofday() syscalls. */ -int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { - long long t = dictGetSignedIntegerVal(de); - if (now > t) { - sds key = (sds)dictGetKey(de); - robj *keyobj = createStringObject(key,sdslen(key)); +void activeExpireCycleExpire(redisDb *db, const char *key) { + robj *keyobj = createStringObject(key,sdslen(key)); - propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); - if (g_pserver->lazyfree_lazy_expire) - dbAsyncDelete(db,keyobj); - else - dbSyncDelete(db,keyobj); - notifyKeyspaceEvent(NOTIFY_EXPIRED, - "expired",keyobj,db->id); - if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); - decrRefCount(keyobj); - g_pserver->stat_expiredkeys++; - return 1; - } else { - return 0; - } + propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); + if (g_pserver->lazyfree_lazy_expire) + dbAsyncDelete(db,keyobj); + else + dbSyncDelete(db,keyobj); + notifyKeyspaceEvent(NOTIFY_EXPIRED, + "expired",keyobj,db->id); + if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); + decrRefCount(keyobj); + g_pserver->stat_expiredkeys++; } /* Try to expire a few timed out keys. The algorithm used is adaptive and @@ -148,7 +141,6 @@ void activeExpireCycle(int type) { long total_expired = 0; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { - int expired; redisDb *db = g_pserver->db+(current_db % cserver.dbnum); /* Increment the DB now so we are sure if we run out of time @@ -156,78 +148,44 @@ void activeExpireCycle(int type) { * distribute the time evenly across DBs. */ current_db++; - /* Continue to expire if at the end of the cycle more than 25% - * of the keys were expired. */ - do { - unsigned long num, slots; - long long now, ttl_sum; - int ttl_samples; - iteration++; + long long now; + iteration++; + now = mstime(); - /* If there is nothing to expire try next DB ASAP. */ - if ((num = dictSize(db->expires)) == 0) { - db->avg_ttl = 0; - break; + /* If there is nothing to expire try next DB ASAP. */ + if (db->setexpire->empty()) + { + // TODO: Compute db->avg_ttl somewhere... but probably not here + db->avg_ttl = 0; + continue; + } + + size_t expired = 0; + size_t tried = 0; + db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](const expireEntry &e) __attribute__((always_inline)) { + if (e.when() < now) + { + activeExpireCycleExpire(db, e.key()); + ++expired; } - slots = dictSlots(db->expires); - now = mstime(); + ++tried; - /* When there are less than 1% filled slots getting random - * keys is expensive, so stop here waiting for better times... - * The dictionary will be resized asap. */ - if (num && slots > DICT_HT_INITIAL_SIZE && - (num*100/slots < 1)) break; - - /* The main collection cycle. Sample random keys among keys - * with an expire set, checking for expired ones. */ - expired = 0; - ttl_sum = 0; - ttl_samples = 0; - - if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) - num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; - - while (num--) { - dictEntry *de; - long long ttl; - - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - ttl = dictGetSignedIntegerVal(de)-now; - if (activeExpireCycleTryExpire(db,de,now)) expired++; - if (ttl > 0) { - /* We want the average TTL of keys yet not expired. */ - ttl_sum += ttl; - ttl_samples++; - } - total_sampled++; - } - total_expired += expired; - - /* Update the average TTL stats for this database. */ - if (ttl_samples) { - long long avg_ttl = ttl_sum/ttl_samples; - - /* Do a simple running average with a few samples. - * We just use the current estimate with a weight of 2% - * and the previous estimate with a weight of 98%. */ - if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; - db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); - } - - /* We can't block forever here even if there are many keys to - * expire. So after a given amount of milliseconds return to the - * caller waiting for the other active expire cycle. */ - if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ + if ((tried % ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) == 0) + { + /* We can't block forever here even if there are many keys to + * expire. So after a given amount of milliseconds return to the + * caller waiting for the other active expire cycle. */ elapsed = ustime()-start; if (elapsed > timelimit) { timelimit_exit = 1; g_pserver->stat_expired_time_cap_reached_count++; - break; + return false; } } - /* We don't repeat the cycle if there are less than 25% of keys - * found expired in the current DB. */ - } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); + return true; + }); + + total_expired += expired; } elapsed = ustime()-start; @@ -301,20 +259,27 @@ void expireSlaveKeys(void) { while(dbids && dbid < cserver.dbnum) { if ((dbids & 1) != 0) { redisDb *db = g_pserver->db+dbid; - dictEntry *expire = dictFind(db->expires,keyname); + + // the expire is hashed based on the key pointer, so we need the point in the main db + dictEntry *deMain = dictFind(db->pdict, keyname); + auto itr = db->setexpire->end(); + if (deMain != nullptr) + itr = db->setexpire->find((sds)dictGetKey(deMain)); int expired = 0; - if (expire && - activeExpireCycleTryExpire(g_pserver->db+dbid,expire,start)) + if (itr != db->setexpire->end()) { - expired = 1; + if (itr->when() < start) { + activeExpireCycleExpire(g_pserver->db+dbid,itr->key()); + expired = 1; + } } /* If the key was not expired in this DB, we need to set the * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ - if (expire && !expired) { + if (itr != db->setexpire->end() && !expired) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 6d56ec86d..0dbfd57d1 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -52,16 +52,19 @@ size_t lazyfreeGetFreeEffort(robj *obj) { * will be reclaimed in a different bio.c thread. */ #define LAZYFREE_THRESHOLD 64 int dbAsyncDelete(redisDb *db, robj *key) { - /* Deleting an entry from the expires dict will not free the sds of - * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires) > 0) dictDelete(db->expires,ptrFromObj(key)); - /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ dictEntry *de = dictUnlink(db->pdict,ptrFromObj(key)); if (de) { robj *val = (robj*)dictGetVal(de); + if (val->FExpires()) + { + /* Deleting an entry from the expires dict will not free the sds of + * the key, because it is shared with the main dictionary. */ + removeExpireCore(db,key,de); + } + size_t free_effort = lazyfreeGetFreeEffort(val); /* If releasing the object is too much work, do it in the background @@ -72,7 +75,7 @@ int dbAsyncDelete(redisDb *db, robj *key) { * objects, and then call dbDelete(). In this case we'll fall * through and reach the dictFreeUnlinkedEntry() call, that will be * equivalent to just calling decrRefCount(). */ - if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { + if (free_effort > LAZYFREE_THRESHOLD && val->getrefcount(std::memory_order_relaxed) == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); dictSetVal(db->pdict,de,NULL); @@ -93,7 +96,7 @@ int dbAsyncDelete(redisDb *db, robj *key) { /* Free an object, if the object is huge enough, free it in async way. */ void freeObjAsync(robj *o) { size_t free_effort = lazyfreeGetFreeEffort(o); - if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) { + if (free_effort > LAZYFREE_THRESHOLD && o->getrefcount(std::memory_order_relaxed) == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL); } else { @@ -105,11 +108,13 @@ void freeObjAsync(robj *o) { * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ void emptyDbAsync(redisDb *db) { - dict *oldht1 = db->pdict, *oldht2 = db->expires; + dict *oldht1 = db->pdict; + auto *set = db->setexpire; + db->setexpire = new (MALLOC_LOCAL) semiorderedset(); + db->expireitr = db->setexpire->end(); db->pdict = dictCreate(&dbDictType,NULL); - db->expires = dictCreate(&keyptrDictType,NULL); atomicIncr(lazyfree_objects,dictSize(oldht1)); - bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2); + bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,set); } /* Empty the slots-keys map of Redis CLuster by creating a new empty one @@ -136,10 +141,10 @@ void lazyfreeFreeObjectFromBioThread(robj *o) { * when the database was logically deleted. 'sl' is a skiplist used by * Redis Cluster in order to take the hash slots -> keys mapping. This * may be NULL if Redis Cluster is disabled. */ -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset *set) { size_t numkeys = dictSize(ht1); dictRelease(ht1); - dictRelease(ht2); + delete set; atomicDecr(lazyfree_objects,numkeys); } diff --git a/src/module.cpp b/src/module.cpp index ee31cf7a5..7863ca4cf 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -566,7 +566,7 @@ void RedisModuleCommandDispatcher(client *c) { for (int i = 0; i < c->argc; i++) { /* Only do the work if the module took ownership of the object: * in that case the refcount is no longer 1. */ - if (c->argv[i]->refcount > 1) + if (c->argv[i]->getrefcount(std::memory_order_relaxed) > 1) trimStringObjectIfNeeded(c->argv[i]); } } @@ -1037,7 +1037,7 @@ int RM_StringCompare(RedisModuleString *a, RedisModuleString *b) { /* Return the (possibly modified in encoding) input 'str' object if * the string is unshared, otherwise NULL is returned. */ RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) { - if (str->refcount != 1) { + if (str->getrefcount(std::memory_order_relaxed) != 1) { serverLog(LL_WARNING, "Module attempted to use an in-place string modify operation " "with a string referenced multiple times. Please check the code " diff --git a/src/object.cpp b/src/object.cpp index 6e65ec52b..900a9058c 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -39,11 +39,11 @@ /* ===================== Creation and parsing of objects ==================== */ robj *createObject(int type, void *ptr) { - robj *o = (robj*)zmalloc(sizeof(*o), MALLOC_SHARED); + robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED); o->type = type; o->encoding = OBJ_ENCODING_RAW; o->m_ptr = ptr; - o->refcount.store(1, std::memory_order_relaxed); + o->setrefcount(1); o->mvcc_tstamp = OBJ_MVCC_INVALID; /* Set the LRU to the current lruclock (minutes resolution), or @@ -68,8 +68,9 @@ robj *createObject(int type, void *ptr) { * */ robj *makeObjectShared(robj *o) { - serverAssert(o->refcount == 1); - o->refcount.store(OBJ_SHARED_REFCOUNT, std::memory_order_relaxed); + serverAssert(o->getrefcount(std::memory_order_relaxed) == 1); + serverAssert(!o->FExpires()); + o->setrefcount(OBJ_SHARED_REFCOUNT); return o; } @@ -86,12 +87,12 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { size_t allocsize = sizeof(struct sdshdr8)+len+1; if (allocsize < sizeof(void*)) allocsize = sizeof(void*); - robj *o = (robj*)zmalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED); + robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED); struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr); o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; - o->refcount.store(1, std::memory_order_relaxed); + o->setrefcount(1); o->mvcc_tstamp = OBJ_MVCC_INVALID; if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { @@ -352,11 +353,14 @@ void freeStreamObject(robj_roptr o) { } void incrRefCount(robj_roptr o) { - if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount.fetch_add(1, std::memory_order_acquire); + if (o->getrefcount(std::memory_order_relaxed) != OBJ_SHARED_REFCOUNT) o->addref(); } void decrRefCount(robj_roptr o) { - if (o->refcount.load(std::memory_order_acquire) == 1) { + if (o->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + return; + unsigned prev = o->release(); + if (prev == 1) { switch(o->type) { case OBJ_STRING: freeStringObject(o); break; case OBJ_LIST: freeListObject(o); break; @@ -369,8 +373,7 @@ void decrRefCount(robj_roptr o) { } zfree(o.unsafe_robjcast()); } else { - if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0"); - if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount.fetch_sub(1, std::memory_order_acquire); + if (prev <= 0) serverPanic("decrRefCount against refcount <= 0"); } } @@ -394,7 +397,7 @@ void decrRefCountVoid(const void *o) { * decrRefCount(obj); */ robj *resetRefCount(robj *obj) { - obj->refcount = 0; + obj->setrefcount(0); return obj; } @@ -452,7 +455,7 @@ robj *tryObjectEncoding(robj *o) { /* It's not safe to encode shared objects: shared objects can be shared * everywhere in the "object space" of Redis and may end in places where * they are not handled. We handle them only as values in the keyspace. */ - if (o->refcount > 1) return o; + if (o->getrefcount(std::memory_order_relaxed) > 1) return o; /* Check if we can represent this string as a long integer. * Note that we are sure that a string larger than 20 chars is not @@ -1064,8 +1067,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; - mem = dictSize(db->expires) * sizeof(dictEntry) + - dictSlots(db->expires) * sizeof(dictEntry*); + mem = db->setexpire->bytes_used(); mh->db[mh->num_dbs].overhead_ht_expires = mem; mem_total+=mem; @@ -1275,7 +1277,7 @@ NULL } else if (!strcasecmp(szFromObj(c->argv[1]),"refcount") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) == NULL) return; - addReplyLongLong(c,o->refcount); + addReplyLongLong(c,o->getrefcount(std::memory_order_relaxed)); } else if (!strcasecmp(szFromObj(c->argv[1]),"encoding") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) == NULL) return; @@ -1474,3 +1476,18 @@ NULL addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)ptrFromObj(c->argv[1])); } } + +void redisObject::SetFExpires(bool fExpire) +{ + serverAssert(this->refcount != OBJ_SHARED_REFCOUNT); + if (fExpire) + this->refcount.fetch_or(1U << 31, std::memory_order_relaxed); + else + this->refcount.fetch_and(~(1U << 31), std::memory_order_relaxed); +} + +void redisObject::setrefcount(unsigned ref) +{ + serverAssert(!FExpires()); + refcount.store(ref, std::memory_order_relaxed); +} \ No newline at end of file diff --git a/src/rdb.cpp b/src/rdb.cpp index d4d91ff1f..5443ca064 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1096,6 +1096,29 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { return 1; } +int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o) +{ + robj key; + long long expire; + + initStaticStringObject(key,(char*)keystr); + expire = getExpire(db, &key); + + if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) + return 0; + + /* When this RDB is produced as part of an AOF rewrite, move + * accumulated diff from parent to child while rewriting in + * order to have a smaller final write. */ + if (flags & RDB_SAVE_AOF_PREAMBLE && + rdb->processed_bytes > *processed+AOF_READ_DIFF_INTERVAL_BYTES) + { + *processed = rdb->processed_bytes; + aofReadDiffFromParent(); + } + return 1; +} + /* Produces a dump of the database in RDB format sending it to the specified * Redis I/O channel. On success C_OK is returned, otherwise C_ERR * is returned and part of the output, or all the output, can be @@ -1134,31 +1157,24 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { * these sizes are just hints to resize the hash tables. */ uint64_t db_size, expires_size; db_size = dictSize(db->pdict); - expires_size = dictSize(db->expires); + expires_size = db->setexpire->size(); if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; if (rdbSaveLen(rdb,db_size) == -1) goto werr; if (rdbSaveLen(rdb,expires_size) == -1) goto werr; - + /* Iterate this DB writing every entry */ + size_t ckeysExpired = 0; while((de = dictNext(di)) != NULL) { sds keystr = (sds)dictGetKey(de); - robj key, *o = (robj*)dictGetVal(de); - long long expire; + robj *o = (robj*)dictGetVal(de); - initStaticStringObject(key,keystr); - expire = getExpire(db,&key); - if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr; - - /* When this RDB is produced as part of an AOF rewrite, move - * accumulated diff from parent to child while rewriting in - * order to have a smaller final write. */ - if (flags & RDB_SAVE_AOF_PREAMBLE && - rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) - { - processed = rdb->processed_bytes; - aofReadDiffFromParent(); - } + if (o->FExpires()) + ++ckeysExpired; + + if (!saveKey(rdb, db, flags, &processed, keystr, o)) + goto werr; } + serverAssert(ckeysExpired == db->setexpire->size()); dictReleaseIterator(di); di = NULL; /* So that we don't release it again on error. */ } @@ -1822,6 +1838,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { } o->mvcc_tstamp = mvcc_tstamp; + serverAssert(!o->FExpires()); return o; } @@ -1909,7 +1926,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { now = mstime(); lru_clock = LRU_CLOCK(); - + while(1) { robj *key, *val; @@ -1965,7 +1982,6 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; dictExpand(db->pdict,db_size); - dictExpand(db->expires,expires_size); continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_AUX) { /* AUX: generic string-string fields. Use to add state to RDB @@ -2079,7 +2095,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { if (fInserted) { /* Set the expire time if needed */ - if (expiretime != -1) setExpire(NULL,db,key,expiretime); + if (expiretime != -1) + setExpire(NULL,db,key,expiretime); /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); @@ -2101,6 +2118,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { lfu_freq = -1; lru_idle = -1; } + /* Verify the checksum if RDB version is >= 5 */ if (rdbver >= 5) { uint64_t cksum, expected = rdb->cksum; diff --git a/src/scripting.cpp b/src/scripting.cpp index 1548044e2..5ba336374 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -665,7 +665,7 @@ cleanup: * The object must be small, SDS-encoded, and with refcount = 1 * (we must be the only owner) for us to cache it. */ if (j < LUA_CMD_OBJCACHE_SIZE && - o->refcount == 1 && + o->getrefcount(std::memory_order_relaxed) == 1 && (o->encoding == OBJ_ENCODING_RAW || o->encoding == OBJ_ENCODING_EMBSTR) && sdslen((sds)ptrFromObj(o)) <= LUA_CMD_OBJCACHE_MAX_LEN) diff --git a/src/server.cpp b/src/server.cpp index ebdca3234..008459034 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1428,8 +1428,6 @@ int htNeedsResize(dict *dict) { void tryResizeHashTables(int dbid) { if (htNeedsResize(g_pserver->db[dbid].pdict)) dictResize(g_pserver->db[dbid].pdict); - if (htNeedsResize(g_pserver->db[dbid].expires)) - dictResize(g_pserver->db[dbid].expires); } /* Our hash table implementation performs rehashing incrementally while @@ -1445,11 +1443,6 @@ int incrementallyRehash(int dbid) { dictRehashMilliseconds(g_pserver->db[dbid].pdict,1); return 1; /* already used our millisecond for this loop... */ } - /* Expires */ - if (dictIsRehashing(g_pserver->db[dbid].expires)) { - dictRehashMilliseconds(g_pserver->db[dbid].expires,1); - return 1; /* already used our millisecond for this loop... */ - } return 0; } @@ -1889,7 +1882,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { size = dictSlots(g_pserver->db[j].pdict); used = dictSize(g_pserver->db[j].pdict); - vkeys = dictSize(g_pserver->db[j].expires); + vkeys = g_pserver->db[j].setexpire->size(); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(g_pserver->dict); */ @@ -2926,7 +2919,8 @@ void initServer(void) { /* Create the Redis databases, and initialize other internal state. */ for (int j = 0; j < cserver.dbnum; j++) { g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); - g_pserver->db[j].expires = dictCreate(&keyptrDictType,NULL); + g_pserver->db[j].setexpire = new(MALLOC_LOCAL) semiorderedset; + g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL); @@ -4571,7 +4565,7 @@ sds genRedisInfoString(const char *section) { long long keys, vkeys; keys = dictSize(g_pserver->db[j].pdict); - vkeys = dictSize(g_pserver->db[j].expires); + vkeys = g_pserver->db[j].setexpire->size(); if (keys || vkeys) { info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", diff --git a/src/server.h b/src/server.h index 5c66aaba6..74960bbab 100644 --- a/src/server.h +++ b/src/server.h @@ -81,6 +81,7 @@ typedef long long mstime_t; /* millisecond time type. */ N-elements flat arrays */ #include "rax.h" /* Radix tree */ #include "uuid.h" +#include "semiorderedset.h" /* Following includes allow test functions to be called from Redis main() */ #include "zipmap.h" @@ -243,7 +244,7 @@ public: #define CONFIG_DEFAULT_ACTIVE_REPLICA 0 #define CONFIG_DEFAULT_ENABLE_MULTIMASTER 0 -#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ +#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 64 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ #define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */ #define ACTIVE_EXPIRE_CYCLE_SLOW 0 @@ -717,7 +718,7 @@ typedef struct RedisModuleDigest { #define LRU_CLOCK_MAX ((1<lru */ #define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ -#define OBJ_SHARED_REFCOUNT INT_MAX +#define OBJ_SHARED_REFCOUNT (0x7FFFFFFF) #define OBJ_MVCC_INVALID (0xFFFFFFFFFFFFFFFFULL) typedef struct redisObject { @@ -726,10 +727,21 @@ typedef struct redisObject { unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ - mutable std::atomic refcount; +private: + mutable std::atomic refcount; +public: uint64_t mvcc_tstamp; void *m_ptr; + + inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } + void SetFExpires(bool fExpires); + + void setrefcount(unsigned ref); + unsigned getrefcount(std::memory_order order) const { return (refcount.load(order) & ~(1U << 31)); } + void addref() const { refcount.fetch_add(1, std::memory_order_acq_rel); } + unsigned release() const { return refcount.fetch_sub(1, std::memory_order_acq_rel) & ~(1U << 31); } } robj; +static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase"); __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) { @@ -755,6 +767,38 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o) return (char*)ptrFromObj(o); } +class expireEntry { + sds m_key; + long long m_when; + +public: + expireEntry(sds key, long long when) + { + m_key = key; + m_when = when; + } + + bool operator!=(const expireEntry &e) const noexcept + { + return m_when != e.m_when || m_key != e.m_key; + } + bool operator==(const expireEntry &e) const noexcept + { + return m_when == e.m_when && m_key == e.m_key; + } + bool operator==(const char *key) const noexcept { return m_key == key; } + + bool operator<(const expireEntry &e) const noexcept { return m_when < e.m_when; } + bool operator<(const char *key) const noexcept { return m_key < key; } + bool operator<(long long when) const noexcept { return m_when < when; } + + const char *key() const noexcept { return m_key; } + long long when() const noexcept { return m_when; } + + + explicit operator const char*() const noexcept { return m_key; } + explicit operator long long() const noexcept { return m_when; } +}; /* The a string name for an object's type as listed above * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, @@ -766,7 +810,7 @@ const char *getObjectTypeName(robj_roptr o); * we'll update it when the structure is changed, to avoid bugs like * bug #85 introduced exactly in this way. */ #define initStaticStringObject(_var,_ptr) do { \ - _var.refcount = 1; \ + _var.setrefcount(1); \ _var.type = OBJ_STRING; \ _var.encoding = OBJ_ENCODING_RAW; \ _var.m_ptr = _ptr; \ @@ -793,7 +837,9 @@ typedef struct clientReplyBlock { * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { dict *pdict; /* The keyspace for this DB */ - dict *expires; /* Timeout of keys with a timeout set */ + semiorderedset *setexpire; + semiorderedset::setiter expireitr; + dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ @@ -2174,6 +2220,7 @@ int rewriteConfig(char *path); /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); +int removeExpireCore(redisDb *db, robj *key, dictEntry *de); void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); long long getExpire(redisDb *db, robj_roptr key); diff --git a/src/slowlog.cpp b/src/slowlog.cpp index 4f338b341..08a2e62e9 100644 --- a/src/slowlog.cpp +++ b/src/slowlog.cpp @@ -72,7 +72,7 @@ slowlogEntry *slowlogCreateEntry(client *c, robj **argv, int argc, long long dur (unsigned long) sdslen(szFromObj(argv[j])) - SLOWLOG_ENTRY_MAX_STRING); se->argv[j] = createObject(OBJ_STRING,s); - } else if (argv[j]->refcount == OBJ_SHARED_REFCOUNT) { + } else if (argv[j]->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) { se->argv[j] = argv[j]; } else { /* Here we need to dupliacate the string objects composing the diff --git a/src/t_string.cpp b/src/t_string.cpp index 4cb30eac6..a254f4f53 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -353,7 +353,7 @@ void incrDecrCommand(client *c, long long incr) { } value += incr; - if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT && + if (o && o->getrefcount(std::memory_order_relaxed) == 1 && o->encoding == OBJ_ENCODING_INT && (value < 0 || value >= OBJ_SHARED_INTEGERS) && value >= LONG_MIN && value <= LONG_MAX) {