diff --git a/.vscode/settings.json b/.vscode/settings.json index 56bf76d11..e4d7c4c9a 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -51,6 +51,7 @@ "tuple": "cpp", "type_traits": "cpp", "typeinfo": "cpp", - "utility": "cpp" + "utility": "cpp", + "set": "cpp" } } diff --git a/src/bio.cpp b/src/bio.cpp index 62f6615a6..844464e77 100644 --- a/src/bio.cpp +++ b/src/bio.cpp @@ -85,7 +85,7 @@ struct bio_job { void *bioProcessBackgroundJobs(void *arg); void lazyfreeFreeObjectFromBioThread(robj *o); -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2); +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset *set); void lazyfreeFreeSlotsMapFromBioThread(rax *rt); /* Make sure we have enough stack to perform all the things we do in the @@ -196,7 +196,7 @@ void *bioProcessBackgroundJobs(void *arg) { if (job->arg1) lazyfreeFreeObjectFromBioThread((robj*)job->arg1); else if (job->arg2 && job->arg3) - lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(dict*)job->arg3); + lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(semiorderedset*)job->arg3); else if (job->arg3) lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3); } else { diff --git a/src/compactvector.h b/src/compactvector.h new file mode 100644 index 000000000..65a40f114 --- /dev/null +++ b/src/compactvector.h @@ -0,0 +1,153 @@ +#pragma once + +#include +#include + +/************************************************* + * compactvector - similar to std::vector but optimized for minimal memory + * + * Notable differences: + * - Limited to 2^32 elements + * - Grows linearly not exponentially + * + *************************************************/ + +template +class compactvector +{ + static_assert(MEMMOVE_SAFE || std::is_trivially_copyable::value, "compactvector requires trivially copyable types"); + T *m_data = nullptr; + unsigned m_celem = 0; + unsigned m_max = 0; + +public: + typedef T* iterator; + + compactvector() noexcept = default; + ~compactvector() noexcept + { + clear(); // call dtors + zfree(m_data); + } + + compactvector(compactvector &) noexcept = delete; + + compactvector(compactvector &&src) noexcept + { + m_data = src.m_data; + m_celem = src.m_celem; + m_max = src.m_max; + src.m_data = nullptr; + src.m_celem = 0; + src.m_max = 0; + } + + compactvector &operator=(const compactvector&) noexcept = delete; + compactvector &operator=(compactvector &&src) noexcept + { + zfree(m_data); + m_data = src.m_data; + m_celem = src.m_celem; + m_max = src.m_max; + src.m_data = nullptr; + src.m_celem = 0; + src.m_max = 0; + return *this; + } + + inline T* begin() { return m_data; } + inline const T* begin() const { return m_data; } + + inline T* end() { return m_data + m_celem; } + inline const T* end() const { return m_data + m_celem; } + + T* insert(T* where, T &val) + { + assert(where >= m_data); + size_t idx = where - m_data; + if (m_celem >= m_max) + { + if (m_max < 2) + m_max = 2; + else + m_max = m_max + 4; + + m_data = (T*)zrealloc(m_data, sizeof(T) * m_max, MALLOC_LOCAL); + m_max = zmalloc_usable(m_data) / sizeof(T); + } + assert(idx < m_max); + where = m_data + idx; + memmove(m_data + idx + 1, m_data + idx, (m_celem - idx)*sizeof(T)); + new(m_data + idx) T(std::move(val)); + ++m_celem; + return where; + } + + T &operator[](size_t idx) + { + assert(idx < m_celem); + return m_data[idx]; + } + const T &operator[](size_t idx) const + { + assert(idx < m_celem); + return m_data[idx]; + } + + T& back() { assert(m_celem > 0); return m_data[m_celem-1]; } + const T& back() const { assert(m_celem > 0); return m_data[m_celem-1]; } + + void erase(T* where) + { + assert(where >= m_data); + size_t idx = where - m_data; + assert(idx < m_celem); + where->~T(); + memmove(where, where+1, ((m_celem - idx - 1)*sizeof(T))); + --m_celem; + + if (m_celem == 0) + { + zfree(m_data); + m_data = nullptr; + m_max = 0; + } + } + + void shrink_to_fit() + { + if (m_max == m_celem) + return; + m_data = (T*)zrealloc(m_data, sizeof(T) * m_celem, MALLOC_LOCAL); + m_max = m_celem; // NOTE: We do not get the usable size here, because this could cause us to continually realloc + } + + size_t bytes_used() const + { + return sizeof(this) + (m_max * sizeof(T)); + } + + void clear() + { + for (size_t idx = 0; idx < m_celem; ++idx) + m_data[idx].~T(); + zfree(m_data); + m_data = nullptr; + m_celem = 0; + m_max = 0; + } + + bool empty() const noexcept + { + return m_celem == 0; + } + + size_t size() const noexcept + { + return m_celem; + } + + T* data() noexcept { return m_data; } + const T* data() const noexcept { return m_data; } +}; +static_assert(sizeof(compactvector) <= 16, "not compact"); diff --git a/src/db.cpp b/src/db.cpp index 114b84297..dd75c28e9 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -39,6 +39,8 @@ *----------------------------------------------------------------------------*/ int keyIsExpired(redisDb *db, robj *key); +int expireIfNeeded(redisDb *db, robj *key, robj *o); +void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. @@ -49,6 +51,20 @@ void updateLFU(robj *val) { val->lru = (LFUGetTimeInMinutes()<<8) | counter; } +void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew) +{ + serverAssert(valOld->FExpires()); + serverAssert(!valNew->FExpires()); + + auto itr = db->setexpire->find(key); + serverAssert(itr != db->setexpire->end()); + + valNew->SetFExpires(true); + valOld->SetFExpires(false); + return; +} + + /* Low level key lookup API, not actually called directly from commands * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ @@ -160,8 +176,10 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) { * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ robj *lookupKeyWrite(redisDb *db, robj *key) { - expireIfNeeded(db,key); - return lookupKey(db,key,LOOKUP_UPDATEMVCC); + robj *o = lookupKey(db,key,LOOKUP_UPDATEMVCC); + if (expireIfNeeded(db,key)) + o = NULL; + return o; } robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) { @@ -177,6 +195,7 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { } int dbAddCore(redisDb *db, robj *key, robj *val) { + serverAssert(!val->FExpires()); sds copy = sdsdup(szFromObj(key)); int retval = dictAdd(db->pdict, copy, val); val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); @@ -206,10 +225,18 @@ void dbAdd(redisDb *db, robj *key, robj *val) serverAssertWithInfo(NULL,key,retval == DICT_OK); } -void dbOverwriteCore(redisDb *db, dictEntry *de, robj *val, bool fUpdateMvcc) +void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire) { dictEntry auxentry = *de; robj *old = (robj*)dictGetVal(de); + + if (old->FExpires()) { + if (fRemoveExpire) + removeExpire(db, key); + else + updateExpire(db, (sds)dictGetKey(de), old, val); + } + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { val->lru = old->lru; } @@ -235,7 +262,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,de != NULL); - dbOverwriteCore(db, de, val, true); + dbOverwriteCore(db, de, key, val, true, false); } /* Insert a key, handling duplicate keys according to fReplace */ @@ -250,7 +277,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) robj *old = (robj*)dictGetVal(de); if (old->mvcc_tstamp <= val->mvcc_tstamp) { - dbOverwriteCore(db, de, val, false); + dbOverwriteCore(db, de, key, val, false, true); return true; } @@ -271,13 +298,13 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) * * All the new keys in the database should be created via this interface. */ void setKey(redisDb *db, robj *key, robj *val) { - if (lookupKeyWrite(db,key) == NULL) { + dictEntry *de = dictFind(db->pdict, ptrFromObj(key)); + if (de == NULL) { dbAdd(db,key,val); } else { - dbOverwrite(db,key,val); + dbOverwriteCore(db,de,key,val,true,true); } incrRefCount(val); - removeExpire(db,key); signalModifiedKey(db,key); } @@ -292,7 +319,7 @@ int dbExists(redisDb *db, robj *key) { robj *dbRandomKey(redisDb *db) { dictEntry *de; int maxtries = 100; - int allvolatile = dictSize(db->pdict) == dictSize(db->expires); + int allvolatile = dictSize(db->pdict) == db->setexpire->size(); while(1) { sds key; @@ -303,23 +330,30 @@ robj *dbRandomKey(redisDb *db) { key = (sds)dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); - if (dictFind(db->expires,key)) { + + if (((robj*)dictGetVal(de))->FExpires()) + { if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, - * it could happen that all the keys are already logically - * expired in the slave, so the function cannot stop because - * expireIfNeeded() is false, nor it can stop because - * dictGetRandomKey() returns NULL (there are keys to return). - * To prevent the infinite loop we do some tries, but if there - * are the conditions for an infinite loop, eventually we - * return a key name that may be already expired. */ + * it could happen that all the keys are already logically + * expired in the slave, so the function cannot stop because + * expireIfNeeded() is false, nor it can stop because + * dictGetRandomKey() returns NULL (there are keys to return). + * To prevent the infinite loop we do some tries, but if there + * are the conditions for an infinite loop, eventually we + * return a key name that may be already expired. */ return keyobj; } - if (expireIfNeeded(db,keyobj)) { + } + + if (((robj*)dictGetVal(de))->FExpires()) + { + if (expireIfNeeded(db,keyobj)) { decrRefCount(keyobj); continue; /* search for another key. This expired. */ - } + } } + return keyobj; } } @@ -328,7 +362,10 @@ robj *dbRandomKey(redisDb *db) { int dbSyncDelete(redisDb *db, robj *key) { /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires) > 0) dictDelete(db->expires,ptrFromObj(key)); + + dictEntry *de = dictFind(db->pdict, szFromObj(key)); + if (de != nullptr && ((robj*)dictGetVal(de))->FExpires()) + removeExpireCore(db, key, de); if (dictDelete(db->pdict,ptrFromObj(key)) == DICT_OK) { if (g_pserver->cluster_enabled) slotToKeyDel(key); return 1; @@ -373,7 +410,7 @@ int dbDelete(redisDb *db, robj *key) { */ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { serverAssert(o->type == OBJ_STRING); - if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) { + if (o->getrefcount(std::memory_order_relaxed) != 1 || o->encoding != OBJ_ENCODING_RAW) { robj *decoded = getDecodedObject(o); o = createRawStringObject(szFromObj(decoded), sdslen(szFromObj(decoded))); decrRefCount(decoded); @@ -419,7 +456,9 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { emptyDbAsync(&g_pserver->db[j]); } else { dictEmpty(g_pserver->db[j].pdict,callback); - dictEmpty(g_pserver->db[j].expires,callback); + delete g_pserver->db[j].setexpire; + g_pserver->db[j].setexpire = new (MALLOC_LOCAL) semiorderedset(); + g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); } } if (g_pserver->cluster_enabled) { @@ -964,9 +1003,10 @@ void renameGenericCommand(client *c, int nx) { * with the same name. */ dbDelete(c->db,c->argv[2]); } - dbAdd(c->db,c->argv[2],o); - if (expire != -1) setExpire(c,c->db,c->argv[2],expire); dbDelete(c->db,c->argv[1]); + dbAdd(c->db,c->argv[2],o); + if (expire != -1) + setExpire(c,c->db,c->argv[2],expire); signalModifiedKey(c->db,c->argv[1]); signalModifiedKey(c->db,c->argv[2]); notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from", @@ -1024,6 +1064,12 @@ void moveCommand(client *c) { return; } expire = getExpire(c->db,c->argv[1]); + if (o->FExpires()) + removeExpire(c->db,c->argv[1]); + serverAssert(!o->FExpires()); + incrRefCount(o); + dbDelete(src,c->argv[1]); + g_pserver->dirty++; /* Return zero if the key already exists in the target DB */ if (lookupKeyWrite(dst,c->argv[1]) != NULL) { @@ -1032,11 +1078,7 @@ void moveCommand(client *c) { } dbAdd(dst,c->argv[1],o); if (expire != -1) setExpire(c,dst,c->argv[1],expire); - incrRefCount(o); - /* OK! key moved, free the entry in the source DB */ - dbDelete(src,c->argv[1]); - g_pserver->dirty++; addReply(c,shared.cone); } @@ -1077,12 +1119,16 @@ int dbSwapDatabases(int id1, int id2) { * ready_keys and watched_keys, since we want clients to * remain in the same DB they were. */ db1->pdict = db2->pdict; - db1->expires = db2->expires; + db1->setexpire = db2->setexpire; + db1->expireitr = db2->expireitr; db1->avg_ttl = db2->avg_ttl; + db1->last_expire_set = db2->last_expire_set; db2->pdict = aux.pdict; - db2->expires = aux.expires; + db2->setexpire = aux.setexpire; + db2->expireitr = aux.expireitr; db2->avg_ttl = aux.avg_ttl; + db2->last_expire_set = aux.last_expire_set; /* Now we need to handle clients blocked on lists: as an effect * of swapping the two DBs, a client that was waiting for list @@ -1130,12 +1176,25 @@ void swapdbCommand(client *c) { /*----------------------------------------------------------------------------- * Expires API *----------------------------------------------------------------------------*/ - int removeExpire(redisDb *db, robj *key) { + dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); + return removeExpireCore(db, key, de); +} +int removeExpireCore(redisDb *db, robj *key, dictEntry *de) { /* An expire may only be removed if there is a corresponding entry in the * main dict. Otherwise, the key will never be freed. */ - serverAssertWithInfo(NULL,key,dictFind(db->pdict,ptrFromObj(key)) != NULL); - return dictDelete(db->expires,ptrFromObj(key)) == DICT_OK; + serverAssertWithInfo(NULL,key,de != NULL); + + robj *val = (robj*)dictGetVal(de); + if (!val->FExpires()) + return 0; + + auto itr = db->setexpire->find((sds)dictGetKey(de)); + serverAssert(itr != db->setexpire->end()); + serverAssert(itr->key() == (sds)dictGetKey(de)); + db->setexpire->erase(itr); + val->SetFExpires(false); + return 1; } /* Set an expire to the specified key. If the expire is set in the context @@ -1143,14 +1202,40 @@ int removeExpire(redisDb *db, robj *key) { * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ void setExpire(client *c, redisDb *db, robj *key, long long when) { - dictEntry *kde, *de; + dictEntry *kde; + serverAssert(GlobalLocksAcquired()); /* Reuse the sds from the main dict in the expire dict */ kde = dictFind(db->pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,kde != NULL); - de = dictAddOrFind(db->expires,dictGetKey(kde)); - dictSetSignedIntegerVal(de,when); + + if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + { + // shared objects cannot have the expire bit set, create a real object + dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); + } + + if (((robj*)dictGetVal(kde))->FExpires()) + removeExpire(db, key); // should we optimize for when this is called with an already set expiry? + + expireEntry e((sds)dictGetKey(kde), when); + ((robj*)dictGetVal(kde))->SetFExpires(true); + + /* Update TTL stats (exponential moving average) */ + /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ + long long now = g_pserver->mstime; + db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed + if (db->setexpire->empty()) + db->avg_ttl = 0; + else + db->avg_ttl -= db->avg_ttl / db->setexpire->size(); // slide one entry out the window + if (db->avg_ttl < 0) + db->avg_ttl = 0; // TTLs are never negative + db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry + db->last_expire_set = now; + + db->setexpire->insert(e); int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) @@ -1163,13 +1248,18 @@ long long getExpire(redisDb *db, robj_roptr key) { dictEntry *de; /* No expire? return ASAP */ - if (dictSize(db->expires) == 0 || - (de = dictFind(db->expires,ptrFromObj(key))) == NULL) return -1; + if (db->setexpire->size() == 0) + return -1; - /* The entry was found in the expire dict, this means it should also - * be present in the main dict (safety check). */ - serverAssertWithInfo(NULL,key,dictFind(db->pdict,ptrFromObj(key)) != NULL); - return dictGetSignedIntegerVal(de); + de = dictFind(db->pdict, ptrFromObj(key)); + if (de == NULL) + return -1; + robj *obj = (robj*)dictGetVal(de); + if (!obj->FExpires()) + return -1; + + auto itr = db->setexpire->find((sds)dictGetKey(de)); + return itr->when(); } /* Propagate expires into slaves and the AOF file. diff --git a/src/debug.cpp b/src/debug.cpp index 3485df967..4d2f4bbca 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -436,7 +436,7 @@ NULL "Value at:%p refcount:%d " "encoding:%s serializedlength:%zu " "lru:%d lru_seconds_idle:%llu%s", - (void*)val, static_cast(val->refcount), + (void*)val, static_cast(val->getrefcount(std::memory_order_relaxed)), strenc, rdbSavedObjectLen(val), val->lru, estimateObjectIdleTime(val)/1000, extra); } else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) { @@ -638,9 +638,9 @@ NULL dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].pdict); stats = sdscat(stats,buf); - stats = sdscatprintf(stats,"[Expires HT]\n"); - dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].expires); - stats = sdscat(stats,buf); + stats = sdscatprintf(stats,"[Expires set]\n"); + g_pserver->db[dbid].setexpire->getstats(buf, sizeof(buf)); + stats = sdscat(stats, buf); addReplyBulkSds(c,stats); } else if (!strcasecmp(szFromObj(c->argv[1]),"htstats-key") && c->argc == 3) { @@ -721,14 +721,14 @@ void _serverAssertPrintClientInfo(const client *c) { arg = buf; } serverLog(LL_WARNING,"client->argv[%d] = \"%s\" (refcount: %d)", - j, arg, static_cast(c->argv[j]->refcount)); + j, arg, static_cast(c->argv[j]->getrefcount(std::memory_order_relaxed))); } } void serverLogObjectDebugInfo(robj_roptr o) { serverLog(LL_WARNING,"Object type: %d", o->type); serverLog(LL_WARNING,"Object encoding: %d", o->encoding); - serverLog(LL_WARNING,"Object refcount: %d", static_cast(o->refcount)); + serverLog(LL_WARNING,"Object refcount: %d", static_cast(o->getrefcount(std::memory_order_relaxed))); if (o->type == OBJ_STRING && sdsEncodedObject(o)) { serverLog(LL_WARNING,"Object raw string len: %zu", sdslen(szFromObj(o))); if (sdslen(szFromObj(o)) < 4096) { diff --git a/src/defrag.cpp b/src/defrag.cpp index 2e9abd290..b11564c4b 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -48,6 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); +void replaceSateliteOSetKeyPtr(semiorderedset &set, sds oldkey, sds newkey); /* Defrag helper for generic allocations. * @@ -102,7 +103,7 @@ sds activeDefragSds(sds sdsptr) { * and should NOT be accessed. */ robj *activeDefragStringOb(robj* ob, long *defragged) { robj *ret = NULL; - if (ob->refcount!=1) + if (ob->getrefcount(std::memory_order_relaxed)!=1) return NULL; /* try to defrag robj (only if not an EMBSTR type (handled below). */ @@ -406,6 +407,16 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd return NULL; } +void replaceSateliteOSetKeyPtr(semiorderedset &set, sds oldkey, sds newkey) { + auto itr = set.find(oldkey); + if (itr != set.end()) + { + expireEntry eNew(newkey, itr->when()); + set.erase(itr); + set.insert(eNew); + } +} + long activeDefragQuickListNodes(quicklist *ql) { quicklistNode *node = ql->head, *newnode; long defragged = 0; @@ -769,12 +780,8 @@ long defragKey(redisDb *db, dictEntry *de) { newsds = activeDefragSds(keysds); if (newsds) defragged++, de->key = newsds; - if (dictSize(db->expires)) { - /* Dirty code: - * I can't search in db->expires for that key after i already released - * the pointer it holds it won't be able to do the string compare */ - uint64_t hash = dictGetHash(db->pdict, de->key); - replaceSateliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged); + if (!db->setexpire->empty()) { + replaceSateliteOSetKeyPtr(*db->setexpire, keysds, newsds); } /* Try to defrag robj and / or string value. */ diff --git a/src/evict.cpp b/src/evict.cpp index 4be6bf761..4acdb5ad0 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -150,6 +150,84 @@ void evictionPoolAlloc(void) { EvictionPoolLRU = ep; } +void processEvictionCandidate(int dbid, sds key, robj *o, const expireEntry *e, struct evictionPoolEntry *pool) +{ + unsigned long long idle; + + /* Calculate the idle time according to the policy. This is called + * idle just because the code initially handled LRU, but is in fact + * just a score where an higher score means better candidate. */ + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU) { + idle = (o != nullptr) ? estimateObjectIdleTime(o) : 0; + } else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { + /* When we use an LRU policy, we sort the keys by idle time + * so that we expire keys starting from greater idle time. + * However when the policy is an LFU one, we have a frequency + * estimation, and we want to evict keys with lower frequency + * first. So inside the pool we put objects using the inverted + * frequency subtracting the actual frequency to the maximum + * frequency of 255. */ + idle = 255-LFUDecrAndReturn(o); + } else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { + /* In this case the sooner the expire the better. */ + idle = ULLONG_MAX - e->when(); + } else { + serverPanic("Unknown eviction policy in evictionPoolPopulate()"); + } + + /* Insert the element inside the pool. + * First, find the first empty bucket or the first populated + * bucket that has an idle time smaller than our idle time. */ + int k = 0; + while (k < EVPOOL_SIZE && + pool[k].key && + pool[k].idle < idle) k++; + if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) { + /* Can't insert if the element is < the worst element we have + * and there are no empty buckets. */ + return; + } else if (k < EVPOOL_SIZE && pool[k].key == NULL) { + /* Inserting into empty position. No setup needed before insert. */ + } else { + /* Inserting in the middle. Now k points to the first element + * greater than the element to insert. */ + if (pool[EVPOOL_SIZE-1].key == NULL) { + /* Free space on the right? Insert at k shifting + * all the elements from k to end to the right. */ + + /* Save SDS before overwriting. */ + sds cached = pool[EVPOOL_SIZE-1].cached; + memmove(pool+k+1,pool+k, + sizeof(pool[0])*(EVPOOL_SIZE-k-1)); + pool[k].cached = cached; + } else { + /* No free space on right? Insert at k-1 */ + k--; + /* Shift all elements on the left of k (included) to the + * left, so we discard the element with smaller idle time. */ + sds cached = pool[0].cached; /* Save SDS before overwriting. */ + if (pool[0].key != pool[0].cached) sdsfree(pool[0].key); + memmove(pool,pool+1,sizeof(pool[0])*k); + pool[k].cached = cached; + } + } + + /* Try to reuse the cached SDS string allocated in the pool entry, + * because allocating and deallocating this object is costly + * (according to the profiler, not my fantasy. Remember: + * premature optimizbla bla bla bla. */ + int klen = sdslen(key); + if (klen > EVPOOL_CACHED_SDS_SIZE) { + pool[k].key = sdsdup(key); + } else { + memcpy(pool[k].cached,key,klen+1); + sdssetlen(pool[k].cached,klen); + pool[k].key = pool[k].cached; + } + pool[k].idle = idle; + pool[k].dbid = dbid; +} + /* This is an helper function for freeMemoryIfNeeded(), it is used in order * to populate the evictionPool with a few entries every time we want to * expire a key. Keys with idle time smaller than one of the current @@ -159,100 +237,36 @@ void evictionPoolAlloc(void) { * idle time are on the left, and keys with the higher idle time on the * right. */ -void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) { - int j, k, count; - dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); +struct visitFunctor +{ + int dbid; + dict *dbdict; + struct evictionPoolEntry *pool; + int count; - count = dictGetSomeKeys(sampledict,samples,g_pserver->maxmemory_samples); - for (j = 0; j < count; j++) { - unsigned long long idle; - sds key; - robj *o = nullptr; - dictEntry *de; - - de = samples[j]; - key = (sds)dictGetKey(de); - - /* If the dictionary we are sampling from is not the main - * dictionary (but the expires one) we need to lookup the key - * again in the key dictionary to obtain the value object. */ - if (g_pserver->maxmemory_policy != MAXMEMORY_VOLATILE_TTL) { - if (sampledict != keydict) de = dictFind(keydict, key); - o = (robj*)dictGetVal(de); + bool operator()(const expireEntry &e) + { + dictEntry *de = dictFind(dbdict, e.key()); + processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool); + ++count; + return count < g_pserver->maxmemory_samples; + } +}; +void evictionPoolPopulate(int dbid, dict *dbdict, semiorderedset *setexpire, struct evictionPoolEntry *pool) +{ + if (setexpire != nullptr) + { + visitFunctor visitor { dbid, dbdict, pool, 0 }; + setexpire->random_visit(visitor); + } + else + { + dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); + int count = dictGetSomeKeys(dbdict,samples,g_pserver->maxmemory_samples); + for (int j = 0; j < count; j++) { + robj *o = (robj*)dictGetVal(samples[j]); + processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); } - - /* Calculate the idle time according to the policy. This is called - * idle just because the code initially handled LRU, but is in fact - * just a score where an higher score means better candidate. */ - if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU) { - idle = (o != nullptr) ? estimateObjectIdleTime(o) : 0; - } else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { - /* When we use an LRU policy, we sort the keys by idle time - * so that we expire keys starting from greater idle time. - * However when the policy is an LFU one, we have a frequency - * estimation, and we want to evict keys with lower frequency - * first. So inside the pool we put objects using the inverted - * frequency subtracting the actual frequency to the maximum - * frequency of 255. */ - idle = 255-LFUDecrAndReturn(o); - } else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { - /* In this case the sooner the expire the better. */ - idle = ULLONG_MAX - (long)dictGetVal(de); - } else { - serverPanic("Unknown eviction policy in evictionPoolPopulate()"); - } - - /* Insert the element inside the pool. - * First, find the first empty bucket or the first populated - * bucket that has an idle time smaller than our idle time. */ - k = 0; - while (k < EVPOOL_SIZE && - pool[k].key && - pool[k].idle < idle) k++; - if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) { - /* Can't insert if the element is < the worst element we have - * and there are no empty buckets. */ - continue; - } else if (k < EVPOOL_SIZE && pool[k].key == NULL) { - /* Inserting into empty position. No setup needed before insert. */ - } else { - /* Inserting in the middle. Now k points to the first element - * greater than the element to insert. */ - if (pool[EVPOOL_SIZE-1].key == NULL) { - /* Free space on the right? Insert at k shifting - * all the elements from k to end to the right. */ - - /* Save SDS before overwriting. */ - sds cached = pool[EVPOOL_SIZE-1].cached; - memmove(pool+k+1,pool+k, - sizeof(pool[0])*(EVPOOL_SIZE-k-1)); - pool[k].cached = cached; - } else { - /* No free space on right? Insert at k-1 */ - k--; - /* Shift all elements on the left of k (included) to the - * left, so we discard the element with smaller idle time. */ - sds cached = pool[0].cached; /* Save SDS before overwriting. */ - if (pool[0].key != pool[0].cached) sdsfree(pool[0].key); - memmove(pool,pool+1,sizeof(pool[0])*k); - pool[k].cached = cached; - } - } - - /* Try to reuse the cached SDS string allocated in the pool entry, - * because allocating and deallocating this object is costly - * (according to the profiler, not my fantasy. Remember: - * premature optimizbla bla bla bla. */ - int klen = sdslen(key); - if (klen > EVPOOL_CACHED_SDS_SIZE) { - pool[k].key = sdsdup(key); - } else { - memcpy(pool[k].cached,key,klen+1); - sdssetlen(pool[k].cached,klen); - pool[k].key = pool[k].cached; - } - pool[k].idle = idle; - pool[k].dbid = dbid; } } @@ -474,8 +488,6 @@ int freeMemoryIfNeeded(void) { sds bestkey = NULL; int bestdbid; redisDb *db; - dict *dict; - dictEntry *de; if (g_pserver->maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) @@ -490,10 +502,18 @@ int freeMemoryIfNeeded(void) { * every DB. */ for (i = 0; i < cserver.dbnum; i++) { db = g_pserver->db+i; - dict = (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ? - db->pdict : db->expires; - if ((keys = dictSize(dict)) != 0) { - evictionPoolPopulate(i, dict, db->pdict, pool); + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) + { + if ((keys = dictSize(db->pdict)) != 0) { + evictionPoolPopulate(i, db->pdict, nullptr, pool); + total_keys += keys; + } + } + else + { + keys = db->setexpire->size(); + if (keys != 0) + evictionPoolPopulate(i, db->pdict, db->setexpire, pool); total_keys += keys; } } @@ -503,14 +523,11 @@ int freeMemoryIfNeeded(void) { for (k = EVPOOL_SIZE-1; k >= 0; k--) { if (pool[k].key == NULL) continue; bestdbid = pool[k].dbid; + sds key = nullptr; - if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { - de = dictFind(g_pserver->db[pool[k].dbid].pdict, - pool[k].key); - } else { - de = dictFind(g_pserver->db[pool[k].dbid].expires, - pool[k].key); - } + dictEntry *de = dictFind(g_pserver->db[pool[k].dbid].pdict,pool[k].key); + if (de != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || ((robj*)dictGetVal(de))->FExpires())) + key = (sds)dictGetKey(de); /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) @@ -520,8 +537,8 @@ int freeMemoryIfNeeded(void) { /* If the key exists, is our pick. Otherwise it is * a ghost and we need to try the next element. */ - if (de) { - bestkey = (sds)dictGetKey(de); + if (key) { + bestkey = key; break; } else { /* Ghost... Iterate again. */ @@ -540,13 +557,23 @@ int freeMemoryIfNeeded(void) { for (i = 0; i < cserver.dbnum; i++) { j = (++next_db) % cserver.dbnum; db = g_pserver->db+j; - dict = (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ? - db->pdict : db->expires; - if (dictSize(dict) != 0) { - de = dictGetRandomKey(dict); - bestkey = (sds)dictGetKey(de); - bestdbid = j; - break; + if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) + { + if (dictSize(db->pdict) != 0) { + dictEntry *de = dictGetRandomKey(db->pdict); + bestkey = (sds)dictGetKey(de); + bestdbid = j; + break; + } + } + else + { + if (!db->setexpire->empty()) + { + bestkey = (sds)db->setexpire->random_value().key(); + bestdbid = j; + break; + } } } } diff --git a/src/expire.cpp b/src/expire.cpp index 5a0abbb06..38a65cf44 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -51,26 +51,19 @@ * * The parameter 'now' is the current time in milliseconds as is passed * to the function to avoid too many gettimeofday() syscalls. */ -int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) { - long long t = dictGetSignedIntegerVal(de); - if (now > t) { - sds key = (sds)dictGetKey(de); - robj *keyobj = createStringObject(key,sdslen(key)); +void activeExpireCycleExpire(redisDb *db, const char *key) { + robj *keyobj = createStringObject(key,sdslen(key)); - propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); - if (g_pserver->lazyfree_lazy_expire) - dbAsyncDelete(db,keyobj); - else - dbSyncDelete(db,keyobj); - notifyKeyspaceEvent(NOTIFY_EXPIRED, - "expired",keyobj,db->id); - if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); - decrRefCount(keyobj); - g_pserver->stat_expiredkeys++; - return 1; - } else { - return 0; - } + propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire); + if (g_pserver->lazyfree_lazy_expire) + dbAsyncDelete(db,keyobj); + else + dbSyncDelete(db,keyobj); + notifyKeyspaceEvent(NOTIFY_EXPIRED, + "expired",keyobj,db->id); + if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj); + decrRefCount(keyobj); + g_pserver->stat_expiredkeys++; } /* Try to expire a few timed out keys. The algorithm used is adaptive and @@ -148,7 +141,6 @@ void activeExpireCycle(int type) { long total_expired = 0; for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) { - int expired; redisDb *db = g_pserver->db+(current_db % cserver.dbnum); /* Increment the DB now so we are sure if we run out of time @@ -156,78 +148,44 @@ void activeExpireCycle(int type) { * distribute the time evenly across DBs. */ current_db++; - /* Continue to expire if at the end of the cycle more than 25% - * of the keys were expired. */ - do { - unsigned long num, slots; - long long now, ttl_sum; - int ttl_samples; - iteration++; + long long now; + iteration++; + now = mstime(); - /* If there is nothing to expire try next DB ASAP. */ - if ((num = dictSize(db->expires)) == 0) { - db->avg_ttl = 0; - break; + /* If there is nothing to expire try next DB ASAP. */ + if (db->setexpire->empty()) + { + db->avg_ttl = 0; + db->last_expire_set = now; + continue; + } + + size_t expired = 0; + size_t tried = 0; + db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](const expireEntry &e) __attribute__((always_inline)) { + if (e.when() < now) + { + activeExpireCycleExpire(db, e.key()); + ++expired; } - slots = dictSlots(db->expires); - now = mstime(); + ++tried; - /* When there are less than 1% filled slots getting random - * keys is expensive, so stop here waiting for better times... - * The dictionary will be resized asap. */ - if (num && slots > DICT_HT_INITIAL_SIZE && - (num*100/slots < 1)) break; - - /* The main collection cycle. Sample random keys among keys - * with an expire set, checking for expired ones. */ - expired = 0; - ttl_sum = 0; - ttl_samples = 0; - - if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) - num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP; - - while (num--) { - dictEntry *de; - long long ttl; - - if ((de = dictGetRandomKey(db->expires)) == NULL) break; - ttl = dictGetSignedIntegerVal(de)-now; - if (activeExpireCycleTryExpire(db,de,now)) expired++; - if (ttl > 0) { - /* We want the average TTL of keys yet not expired. */ - ttl_sum += ttl; - ttl_samples++; - } - total_sampled++; - } - total_expired += expired; - - /* Update the average TTL stats for this database. */ - if (ttl_samples) { - long long avg_ttl = ttl_sum/ttl_samples; - - /* Do a simple running average with a few samples. - * We just use the current estimate with a weight of 2% - * and the previous estimate with a weight of 98%. */ - if (db->avg_ttl == 0) db->avg_ttl = avg_ttl; - db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50); - } - - /* We can't block forever here even if there are many keys to - * expire. So after a given amount of milliseconds return to the - * caller waiting for the other active expire cycle. */ - if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */ + if ((tried % ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) == 0) + { + /* We can't block forever here even if there are many keys to + * expire. So after a given amount of milliseconds return to the + * caller waiting for the other active expire cycle. */ elapsed = ustime()-start; if (elapsed > timelimit) { timelimit_exit = 1; g_pserver->stat_expired_time_cap_reached_count++; - break; + return false; } } - /* We don't repeat the cycle if there are less than 25% of keys - * found expired in the current DB. */ - } while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4); + return true; + }); + + total_expired += expired; } elapsed = ustime()-start; @@ -301,20 +259,27 @@ void expireSlaveKeys(void) { while(dbids && dbid < cserver.dbnum) { if ((dbids & 1) != 0) { redisDb *db = g_pserver->db+dbid; - dictEntry *expire = dictFind(db->expires,keyname); + + // the expire is hashed based on the key pointer, so we need the point in the main db + dictEntry *deMain = dictFind(db->pdict, keyname); + auto itr = db->setexpire->end(); + if (deMain != nullptr) + itr = db->setexpire->find((sds)dictGetKey(deMain)); int expired = 0; - if (expire && - activeExpireCycleTryExpire(g_pserver->db+dbid,expire,start)) + if (itr != db->setexpire->end()) { - expired = 1; + if (itr->when() < start) { + activeExpireCycleExpire(g_pserver->db+dbid,itr->key()); + expired = 1; + } } /* If the key was not expired in this DB, we need to set the * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ - if (expire && !expired) { + if (itr != db->setexpire->end() && !expired) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 6d56ec86d..0dbfd57d1 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -52,16 +52,19 @@ size_t lazyfreeGetFreeEffort(robj *obj) { * will be reclaimed in a different bio.c thread. */ #define LAZYFREE_THRESHOLD 64 int dbAsyncDelete(redisDb *db, robj *key) { - /* Deleting an entry from the expires dict will not free the sds of - * the key, because it is shared with the main dictionary. */ - if (dictSize(db->expires) > 0) dictDelete(db->expires,ptrFromObj(key)); - /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ dictEntry *de = dictUnlink(db->pdict,ptrFromObj(key)); if (de) { robj *val = (robj*)dictGetVal(de); + if (val->FExpires()) + { + /* Deleting an entry from the expires dict will not free the sds of + * the key, because it is shared with the main dictionary. */ + removeExpireCore(db,key,de); + } + size_t free_effort = lazyfreeGetFreeEffort(val); /* If releasing the object is too much work, do it in the background @@ -72,7 +75,7 @@ int dbAsyncDelete(redisDb *db, robj *key) { * objects, and then call dbDelete(). In this case we'll fall * through and reach the dictFreeUnlinkedEntry() call, that will be * equivalent to just calling decrRefCount(). */ - if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { + if (free_effort > LAZYFREE_THRESHOLD && val->getrefcount(std::memory_order_relaxed) == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); dictSetVal(db->pdict,de,NULL); @@ -93,7 +96,7 @@ int dbAsyncDelete(redisDb *db, robj *key) { /* Free an object, if the object is huge enough, free it in async way. */ void freeObjAsync(robj *o) { size_t free_effort = lazyfreeGetFreeEffort(o); - if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) { + if (free_effort > LAZYFREE_THRESHOLD && o->getrefcount(std::memory_order_relaxed) == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL); } else { @@ -105,11 +108,13 @@ void freeObjAsync(robj *o) { * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ void emptyDbAsync(redisDb *db) { - dict *oldht1 = db->pdict, *oldht2 = db->expires; + dict *oldht1 = db->pdict; + auto *set = db->setexpire; + db->setexpire = new (MALLOC_LOCAL) semiorderedset(); + db->expireitr = db->setexpire->end(); db->pdict = dictCreate(&dbDictType,NULL); - db->expires = dictCreate(&keyptrDictType,NULL); atomicIncr(lazyfree_objects,dictSize(oldht1)); - bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2); + bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,set); } /* Empty the slots-keys map of Redis CLuster by creating a new empty one @@ -136,10 +141,10 @@ void lazyfreeFreeObjectFromBioThread(robj *o) { * when the database was logically deleted. 'sl' is a skiplist used by * Redis Cluster in order to take the hash slots -> keys mapping. This * may be NULL if Redis Cluster is disabled. */ -void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) { +void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset *set) { size_t numkeys = dictSize(ht1); dictRelease(ht1); - dictRelease(ht2); + delete set; atomicDecr(lazyfree_objects,numkeys); } diff --git a/src/module.cpp b/src/module.cpp index ee31cf7a5..7863ca4cf 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -566,7 +566,7 @@ void RedisModuleCommandDispatcher(client *c) { for (int i = 0; i < c->argc; i++) { /* Only do the work if the module took ownership of the object: * in that case the refcount is no longer 1. */ - if (c->argv[i]->refcount > 1) + if (c->argv[i]->getrefcount(std::memory_order_relaxed) > 1) trimStringObjectIfNeeded(c->argv[i]); } } @@ -1037,7 +1037,7 @@ int RM_StringCompare(RedisModuleString *a, RedisModuleString *b) { /* Return the (possibly modified in encoding) input 'str' object if * the string is unshared, otherwise NULL is returned. */ RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) { - if (str->refcount != 1) { + if (str->getrefcount(std::memory_order_relaxed) != 1) { serverLog(LL_WARNING, "Module attempted to use an in-place string modify operation " "with a string referenced multiple times. Please check the code " diff --git a/src/object.cpp b/src/object.cpp index 6e65ec52b..900a9058c 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -39,11 +39,11 @@ /* ===================== Creation and parsing of objects ==================== */ robj *createObject(int type, void *ptr) { - robj *o = (robj*)zmalloc(sizeof(*o), MALLOC_SHARED); + robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED); o->type = type; o->encoding = OBJ_ENCODING_RAW; o->m_ptr = ptr; - o->refcount.store(1, std::memory_order_relaxed); + o->setrefcount(1); o->mvcc_tstamp = OBJ_MVCC_INVALID; /* Set the LRU to the current lruclock (minutes resolution), or @@ -68,8 +68,9 @@ robj *createObject(int type, void *ptr) { * */ robj *makeObjectShared(robj *o) { - serverAssert(o->refcount == 1); - o->refcount.store(OBJ_SHARED_REFCOUNT, std::memory_order_relaxed); + serverAssert(o->getrefcount(std::memory_order_relaxed) == 1); + serverAssert(!o->FExpires()); + o->setrefcount(OBJ_SHARED_REFCOUNT); return o; } @@ -86,12 +87,12 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { size_t allocsize = sizeof(struct sdshdr8)+len+1; if (allocsize < sizeof(void*)) allocsize = sizeof(void*); - robj *o = (robj*)zmalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED); + robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED); struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr); o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; - o->refcount.store(1, std::memory_order_relaxed); + o->setrefcount(1); o->mvcc_tstamp = OBJ_MVCC_INVALID; if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { @@ -352,11 +353,14 @@ void freeStreamObject(robj_roptr o) { } void incrRefCount(robj_roptr o) { - if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount.fetch_add(1, std::memory_order_acquire); + if (o->getrefcount(std::memory_order_relaxed) != OBJ_SHARED_REFCOUNT) o->addref(); } void decrRefCount(robj_roptr o) { - if (o->refcount.load(std::memory_order_acquire) == 1) { + if (o->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + return; + unsigned prev = o->release(); + if (prev == 1) { switch(o->type) { case OBJ_STRING: freeStringObject(o); break; case OBJ_LIST: freeListObject(o); break; @@ -369,8 +373,7 @@ void decrRefCount(robj_roptr o) { } zfree(o.unsafe_robjcast()); } else { - if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0"); - if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount.fetch_sub(1, std::memory_order_acquire); + if (prev <= 0) serverPanic("decrRefCount against refcount <= 0"); } } @@ -394,7 +397,7 @@ void decrRefCountVoid(const void *o) { * decrRefCount(obj); */ robj *resetRefCount(robj *obj) { - obj->refcount = 0; + obj->setrefcount(0); return obj; } @@ -452,7 +455,7 @@ robj *tryObjectEncoding(robj *o) { /* It's not safe to encode shared objects: shared objects can be shared * everywhere in the "object space" of Redis and may end in places where * they are not handled. We handle them only as values in the keyspace. */ - if (o->refcount > 1) return o; + if (o->getrefcount(std::memory_order_relaxed) > 1) return o; /* Check if we can represent this string as a long integer. * Note that we are sure that a string larger than 20 chars is not @@ -1064,8 +1067,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; - mem = dictSize(db->expires) * sizeof(dictEntry) + - dictSlots(db->expires) * sizeof(dictEntry*); + mem = db->setexpire->bytes_used(); mh->db[mh->num_dbs].overhead_ht_expires = mem; mem_total+=mem; @@ -1275,7 +1277,7 @@ NULL } else if (!strcasecmp(szFromObj(c->argv[1]),"refcount") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) == NULL) return; - addReplyLongLong(c,o->refcount); + addReplyLongLong(c,o->getrefcount(std::memory_order_relaxed)); } else if (!strcasecmp(szFromObj(c->argv[1]),"encoding") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) == NULL) return; @@ -1474,3 +1476,18 @@ NULL addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)ptrFromObj(c->argv[1])); } } + +void redisObject::SetFExpires(bool fExpire) +{ + serverAssert(this->refcount != OBJ_SHARED_REFCOUNT); + if (fExpire) + this->refcount.fetch_or(1U << 31, std::memory_order_relaxed); + else + this->refcount.fetch_and(~(1U << 31), std::memory_order_relaxed); +} + +void redisObject::setrefcount(unsigned ref) +{ + serverAssert(!FExpires()); + refcount.store(ref, std::memory_order_relaxed); +} \ No newline at end of file diff --git a/src/rdb.cpp b/src/rdb.cpp index d4d91ff1f..5443ca064 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1096,6 +1096,29 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { return 1; } +int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o) +{ + robj key; + long long expire; + + initStaticStringObject(key,(char*)keystr); + expire = getExpire(db, &key); + + if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) + return 0; + + /* When this RDB is produced as part of an AOF rewrite, move + * accumulated diff from parent to child while rewriting in + * order to have a smaller final write. */ + if (flags & RDB_SAVE_AOF_PREAMBLE && + rdb->processed_bytes > *processed+AOF_READ_DIFF_INTERVAL_BYTES) + { + *processed = rdb->processed_bytes; + aofReadDiffFromParent(); + } + return 1; +} + /* Produces a dump of the database in RDB format sending it to the specified * Redis I/O channel. On success C_OK is returned, otherwise C_ERR * is returned and part of the output, or all the output, can be @@ -1134,31 +1157,24 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { * these sizes are just hints to resize the hash tables. */ uint64_t db_size, expires_size; db_size = dictSize(db->pdict); - expires_size = dictSize(db->expires); + expires_size = db->setexpire->size(); if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; if (rdbSaveLen(rdb,db_size) == -1) goto werr; if (rdbSaveLen(rdb,expires_size) == -1) goto werr; - + /* Iterate this DB writing every entry */ + size_t ckeysExpired = 0; while((de = dictNext(di)) != NULL) { sds keystr = (sds)dictGetKey(de); - robj key, *o = (robj*)dictGetVal(de); - long long expire; + robj *o = (robj*)dictGetVal(de); - initStaticStringObject(key,keystr); - expire = getExpire(db,&key); - if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr; - - /* When this RDB is produced as part of an AOF rewrite, move - * accumulated diff from parent to child while rewriting in - * order to have a smaller final write. */ - if (flags & RDB_SAVE_AOF_PREAMBLE && - rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) - { - processed = rdb->processed_bytes; - aofReadDiffFromParent(); - } + if (o->FExpires()) + ++ckeysExpired; + + if (!saveKey(rdb, db, flags, &processed, keystr, o)) + goto werr; } + serverAssert(ckeysExpired == db->setexpire->size()); dictReleaseIterator(di); di = NULL; /* So that we don't release it again on error. */ } @@ -1822,6 +1838,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { } o->mvcc_tstamp = mvcc_tstamp; + serverAssert(!o->FExpires()); return o; } @@ -1909,7 +1926,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { now = mstime(); lru_clock = LRU_CLOCK(); - + while(1) { robj *key, *val; @@ -1965,7 +1982,6 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; dictExpand(db->pdict,db_size); - dictExpand(db->expires,expires_size); continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_AUX) { /* AUX: generic string-string fields. Use to add state to RDB @@ -2079,7 +2095,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { if (fInserted) { /* Set the expire time if needed */ - if (expiretime != -1) setExpire(NULL,db,key,expiretime); + if (expiretime != -1) + setExpire(NULL,db,key,expiretime); /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); @@ -2101,6 +2118,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { lfu_freq = -1; lru_idle = -1; } + /* Verify the checksum if RDB version is >= 5 */ if (rdbver >= 5) { uint64_t cksum, expected = rdb->cksum; diff --git a/src/scripting.cpp b/src/scripting.cpp index 1548044e2..5ba336374 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -665,7 +665,7 @@ cleanup: * The object must be small, SDS-encoded, and with refcount = 1 * (we must be the only owner) for us to cache it. */ if (j < LUA_CMD_OBJCACHE_SIZE && - o->refcount == 1 && + o->getrefcount(std::memory_order_relaxed) == 1 && (o->encoding == OBJ_ENCODING_RAW || o->encoding == OBJ_ENCODING_EMBSTR) && sdslen((sds)ptrFromObj(o)) <= LUA_CMD_OBJCACHE_MAX_LEN) diff --git a/src/semiorderedset.h b/src/semiorderedset.h new file mode 100644 index 000000000..7713d5533 --- /dev/null +++ b/src/semiorderedset.h @@ -0,0 +1,344 @@ +#pragma once +#include +#include "compactvector.h" + +/**************************************** + * semiorderedset.h: + * + * The ordered set is a hash set that maintains semi-ordering, that is you can iterate in sub-linear time over the set comparing a value. + * It has a few other useful properties vs the traditional set: + * 1. The key need not be the underlying type, the only requirement is the value type is castable to the key + * 2. The key need not have total ordering. The set will iterate until it finds an exact match with operator== on the value + * This provides additional flexibility on insert allowing us to optimize this case. + * + */ + +extern uint64_t dictGenHashFunction(const void *key, int len); + +template +class semiorderedset +{ + friend struct setiter; + std::vector> m_data; + size_t celem = 0; + static const size_t bits_min = 8; + size_t bits = bits_min; + size_t idxRehash = (1ULL << bits_min); + bool fPauseRehash = false; + + constexpr size_t targetElementsPerBucket() + { + // Aim for roughly 4 cache lines per bucket (determined by imperical testing) + // lower values are faster but use more memory + return std::max((64/sizeof(T))*4, (size_t)2); + } + +public: + semiorderedset() + { + m_data.resize((1ULL << bits)); + } + + struct setiter + { + semiorderedset *set; + size_t idxPrimary = 0; + size_t idxSecondary = 0; + + setiter(semiorderedset *set) + { + this->set = set; + } + + bool operator==(const setiter &other) const + { + return (idxPrimary == other.idxPrimary) && (idxSecondary == other.idxSecondary); + } + + bool operator!=(const setiter &other) const { return !operator==(other); } + + inline T &operator*() { return set->m_data[idxPrimary][idxSecondary]; } + inline const T &operator*() const { return set->m_data[idxPrimary][idxSecondary]; } + + inline T *operator->() { return &set->m_data[idxPrimary][idxSecondary]; } + inline const T *operator->() const { return &set->m_data[idxPrimary][idxSecondary]; } + }; + + setiter find(const T_KEY &key) + { + RehashStep(); + setiter itr(this); + itr.idxPrimary = idxFromObj(key); + + for (int hashset = 0; hashset < 2; ++hashset) // rehashing may only be 1 resize behind, so we check up to two slots + { + auto &vecBucket = m_data[itr.idxPrimary]; + + auto itrFind = std::find(vecBucket.begin(), vecBucket.end(), key); + if (itrFind != vecBucket.end()) + { + itr.idxSecondary = itrFind - vecBucket.begin(); + return itr; + } + + // See if we have to check the older slot + size_t mask = (hashmask() >> 1); + if (itr.idxPrimary == (itr.idxPrimary & mask)) + break; // same bucket we just checked + itr.idxPrimary &= mask; + if (FRehashedRow(itr.idxPrimary)) + break; + } + + return end(); + } + + setiter end() + { + setiter itr(this); + itr.idxPrimary = m_data.size(); + return itr; + } + + void insert(T &e, bool fRehash = false) + { + if (!fRehash) + RehashStep(); + + auto idx = idxFromObj(static_cast(e)); + if (!fRehash) + ++celem; + + typename compactvector::iterator itrInsert; + if (!m_data[idx].empty() && !(e < m_data[idx].back())) + itrInsert = m_data[idx].end(); + else + itrInsert = std::upper_bound(m_data[idx].begin(), m_data[idx].end(), e); + itrInsert = m_data[idx].insert(itrInsert, e); + + if (celem > ((1ULL << bits)*targetElementsPerBucket())) + grow(); + } + + // enumeration starting from the 'itrStart'th key. Note that the iter is a hint, and need no be valid anymore + template + setiter enumerate(const setiter &itrStart, const T_MAX &max, T_VISITOR fn) + { + setiter itr(itrStart); + + if (itrStart.set == this) // really if this case isn't true its probably a bug + itr = itrStart; // but why crash the program when we can easily fix this? + + fPauseRehash = true; + if (itr.idxPrimary >= m_data.size()) + itr.idxPrimary = 0; + + for (size_t ibucket = 0; ibucket < m_data.size(); ++ibucket) + { + if (!enumerate_bucket(itr, max, fn)) + break; + itr.idxSecondary = 0; + + ++itr.idxPrimary; + if (itr.idxPrimary >= m_data.size()) + itr.idxPrimary = 0; + } + fPauseRehash = false; + return itr; + } + + // This will "randomly" visit nodes biased towards lower values first + template + size_t random_visit(T_VISITOR &fn) + { + bool fSawAny = true; + size_t visited = 0; + size_t basePrimary = rand() % m_data.size(); + for (size_t idxSecondary = 0; fSawAny; ++idxSecondary) + { + fSawAny = false; + for (size_t idxPrimaryCount = 0; idxPrimaryCount < m_data.size(); ++idxPrimaryCount) + { + size_t idxPrimary = (basePrimary + idxPrimaryCount) % m_data.size(); + if (idxSecondary < m_data[idxPrimary].size()) + { + ++visited; + fSawAny = true; + if (!fn(m_data[idxPrimary][idxSecondary])) + return visited; + } + } + } + return visited; + } + + const T& random_value() const + { + assert(!empty()); + for (;;) + { + size_t idxPrimary = rand() % m_data.size(); + if (m_data[idxPrimary].empty()) + continue; + + return m_data[idxPrimary][rand() % m_data[idxPrimary].size()]; + } + } + + void erase(const setiter &itr) + { + auto &vecRow = m_data[itr.idxPrimary]; + vecRow.erase(vecRow.begin() + itr.idxSecondary); + --celem; + RehashStep(); + } + + void clear() + { + m_data = decltype(m_data)(); + bits = bits_min; + m_data.resize(1ULL << bits); + idxRehash = m_data.size(); + } + + bool empty() const noexcept { return celem == 0; } + size_t size() const noexcept { return celem; } + + size_t bytes_used() const + { + size_t cb = sizeof(this) + (m_data.capacity()-m_data.size())*sizeof(T); + for (auto &vec : m_data) + { + cb += vec.bytes_used(); + } + return cb; + } + + #define DICT_STATS_VECTLEN 50 + size_t getstats(char *buf, size_t bufsize) const + { + unsigned long i, slots = 0, chainlen, maxchainlen = 0; + unsigned long totchainlen = 0; + unsigned long clvector[DICT_STATS_VECTLEN] = {0}; + size_t l = 0; + + if (empty()) { + return snprintf(buf,bufsize, + "No stats available for empty dictionaries\n"); + } + + /* Compute stats. */ + for (auto &vec : m_data) { + if (vec.empty()) { + clvector[0]++; + continue; + } + slots++; + /* For each hash entry on this slot... */ + chainlen = vec.size(); + + clvector[(chainlen < DICT_STATS_VECTLEN) ? chainlen : (DICT_STATS_VECTLEN-1)]++; + if (chainlen > maxchainlen) maxchainlen = chainlen; + totchainlen += chainlen; + } + + size_t used = m_data.size()-clvector[0]; + /* Generate human readable stats. */ + l += snprintf(buf+l,bufsize-l, + "semiordered set stats:\n" + " table size: %ld\n" + " number of slots: %ld\n" + " used slots: %ld\n" + " max chain length: %ld\n" + " avg chain length (counted): %.02f\n" + " avg chain length (computed): %.02f\n" + " Chain length distribution:\n", + size(), used, slots, maxchainlen, + (float)totchainlen/slots, (float)size()/m_data.size()); + + for (i = 0; i < DICT_STATS_VECTLEN; i++) { + if (clvector[i] == 0) continue; + if (l >= bufsize) break; + l += snprintf(buf+l,bufsize-l, + " %s%ld: %ld (%.02f%%)\n", + (i == DICT_STATS_VECTLEN-1)?">= ":"", + i, clvector[i], ((float)clvector[i]/m_data.size())*100); + } + + /* Unlike snprintf(), teturn the number of characters actually written. */ + if (bufsize) buf[bufsize-1] = '\0'; + return strlen(buf); + } + +private: + inline size_t hashmask() const { return (1ULL << bits) - 1; } + + size_t idxFromObj(const T_KEY &key) + { + size_t v = (size_t)dictGenHashFunction(&key, sizeof(key)); + return v & hashmask(); + } + + bool FRehashedRow(size_t idx) const + { + return (idx >= (m_data.size()/2)) || (idx < idxRehash); + } + + void RehashStep() + { + if (fPauseRehash) + return; + + int steps = 0; + for (; idxRehash < (m_data.size()/2); ++idxRehash) + { + compactvector vecT; + std::swap(m_data[idxRehash], vecT); + + for (auto &v : vecT) + insert(v, true); + + if (++steps > 1024) + break; + } + } + + void grow() + { + assert(idxRehash >= (m_data.size()/2)); // we should have finished rehashing by the time we need to grow again + + ++bits; + m_data.resize(1ULL << bits); + idxRehash = 0; + RehashStep(); + } + + template + inline bool enumerate_bucket(setiter &itr, const T_MAX &max, T_VISITOR &fn) + { + auto &vec = m_data[itr.idxPrimary]; + for (; itr.idxSecondary < vec.size(); ++itr.idxSecondary) + { + // Assert we're ordered by T_MAX + assert((itr.idxSecondary+1) >= vec.size() + || static_cast(vec[itr.idxSecondary]) <= static_cast(vec[itr.idxSecondary+1])); + + if (max < static_cast(*itr)) + return true; + + size_t sizeBefore = vec.size(); + if (!fn(*itr)) + { + itr.idxSecondary++; // we still visited this node + return false; + } + if (vec.size() != sizeBefore) + { + assert(vec.size() == (sizeBefore-1)); // they may only remove the element passed to them + --itr.idxSecondary; // they deleted the element + } + } + vec.shrink_to_fit(); + return true; + } +}; diff --git a/src/server.cpp b/src/server.cpp index ebdca3234..e6e86f6ea 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1428,8 +1428,6 @@ int htNeedsResize(dict *dict) { void tryResizeHashTables(int dbid) { if (htNeedsResize(g_pserver->db[dbid].pdict)) dictResize(g_pserver->db[dbid].pdict); - if (htNeedsResize(g_pserver->db[dbid].expires)) - dictResize(g_pserver->db[dbid].expires); } /* Our hash table implementation performs rehashing incrementally while @@ -1445,11 +1443,6 @@ int incrementallyRehash(int dbid) { dictRehashMilliseconds(g_pserver->db[dbid].pdict,1); return 1; /* already used our millisecond for this loop... */ } - /* Expires */ - if (dictIsRehashing(g_pserver->db[dbid].expires)) { - dictRehashMilliseconds(g_pserver->db[dbid].expires,1); - return 1; /* already used our millisecond for this loop... */ - } return 0; } @@ -1889,7 +1882,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { size = dictSlots(g_pserver->db[j].pdict); used = dictSize(g_pserver->db[j].pdict); - vkeys = dictSize(g_pserver->db[j].expires); + vkeys = g_pserver->db[j].setexpire->size(); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(g_pserver->dict); */ @@ -2926,12 +2919,14 @@ void initServer(void) { /* Create the Redis databases, and initialize other internal state. */ for (int j = 0; j < cserver.dbnum; j++) { g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); - g_pserver->db[j].expires = dictCreate(&keyptrDictType,NULL); + g_pserver->db[j].setexpire = new(MALLOC_LOCAL) semiorderedset; + g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL); g_pserver->db[j].id = j; g_pserver->db[j].avg_ttl = 0; + g_pserver->db[j].last_expire_set = 0; g_pserver->db[j].defrag_later = listCreate(); } @@ -4571,11 +4566,18 @@ sds genRedisInfoString(const char *section) { long long keys, vkeys; keys = dictSize(g_pserver->db[j].pdict); - vkeys = dictSize(g_pserver->db[j].expires); + vkeys = g_pserver->db[j].setexpire->size(); + + // Adjust TTL by the current time + g_pserver->db[j].avg_ttl -= (g_pserver->mstime - g_pserver->db[j].last_expire_set); + if (g_pserver->db[j].avg_ttl < 0) + g_pserver->db[j].avg_ttl = 0; + g_pserver->db[j].last_expire_set = g_pserver->mstime; + if (keys || vkeys) { info = sdscatprintf(info, "db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n", - j, keys, vkeys, g_pserver->db[j].avg_ttl); + j, keys, vkeys, static_cast(g_pserver->db[j].avg_ttl)); } } } diff --git a/src/server.h b/src/server.h index 5c66aaba6..8bc30a0f5 100644 --- a/src/server.h +++ b/src/server.h @@ -81,6 +81,7 @@ typedef long long mstime_t; /* millisecond time type. */ N-elements flat arrays */ #include "rax.h" /* Radix tree */ #include "uuid.h" +#include "semiorderedset.h" /* Following includes allow test functions to be called from Redis main() */ #include "zipmap.h" @@ -243,7 +244,7 @@ public: #define CONFIG_DEFAULT_ACTIVE_REPLICA 0 #define CONFIG_DEFAULT_ENABLE_MULTIMASTER 0 -#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ +#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 64 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ #define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */ #define ACTIVE_EXPIRE_CYCLE_SLOW 0 @@ -717,7 +718,7 @@ typedef struct RedisModuleDigest { #define LRU_CLOCK_MAX ((1<lru */ #define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */ -#define OBJ_SHARED_REFCOUNT INT_MAX +#define OBJ_SHARED_REFCOUNT (0x7FFFFFFF) #define OBJ_MVCC_INVALID (0xFFFFFFFFFFFFFFFFULL) typedef struct redisObject { @@ -726,10 +727,21 @@ typedef struct redisObject { unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ - mutable std::atomic refcount; +private: + mutable std::atomic refcount; +public: uint64_t mvcc_tstamp; void *m_ptr; + + inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } + void SetFExpires(bool fExpires); + + void setrefcount(unsigned ref); + unsigned getrefcount(std::memory_order order) const { return (refcount.load(order) & ~(1U << 31)); } + void addref() const { refcount.fetch_add(1, std::memory_order_acq_rel); } + unsigned release() const { return refcount.fetch_sub(1, std::memory_order_acq_rel) & ~(1U << 31); } } robj; +static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase"); __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) { @@ -755,6 +767,38 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o) return (char*)ptrFromObj(o); } +class expireEntry { + sds m_key; + long long m_when; + +public: + expireEntry(sds key, long long when) + { + m_key = key; + m_when = when; + } + + bool operator!=(const expireEntry &e) const noexcept + { + return m_when != e.m_when || m_key != e.m_key; + } + bool operator==(const expireEntry &e) const noexcept + { + return m_when == e.m_when && m_key == e.m_key; + } + bool operator==(const char *key) const noexcept { return m_key == key; } + + bool operator<(const expireEntry &e) const noexcept { return m_when < e.m_when; } + bool operator<(const char *key) const noexcept { return m_key < key; } + bool operator<(long long when) const noexcept { return m_when < when; } + + const char *key() const noexcept { return m_key; } + long long when() const noexcept { return m_when; } + + + explicit operator const char*() const noexcept { return m_key; } + explicit operator long long() const noexcept { return m_when; } +}; /* The a string name for an object's type as listed above * Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines, @@ -766,7 +810,7 @@ const char *getObjectTypeName(robj_roptr o); * we'll update it when the structure is changed, to avoid bugs like * bug #85 introduced exactly in this way. */ #define initStaticStringObject(_var,_ptr) do { \ - _var.refcount = 1; \ + _var.setrefcount(1); \ _var.type = OBJ_STRING; \ _var.encoding = OBJ_ENCODING_RAW; \ _var.m_ptr = _ptr; \ @@ -793,12 +837,15 @@ typedef struct clientReplyBlock { * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { dict *pdict; /* The keyspace for this DB */ - dict *expires; /* Timeout of keys with a timeout set */ + semiorderedset *setexpire; + semiorderedset::setiter expireitr; + dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ - long long avg_ttl; /* Average TTL, just for stats */ + long long last_expire_set; /* when the last expire was set */ + double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ } redisDb; @@ -2174,6 +2221,7 @@ int rewriteConfig(char *path); /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); +int removeExpireCore(redisDb *db, robj *key, dictEntry *de); void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); long long getExpire(redisDb *db, robj_roptr key); diff --git a/src/slowlog.cpp b/src/slowlog.cpp index 4f338b341..08a2e62e9 100644 --- a/src/slowlog.cpp +++ b/src/slowlog.cpp @@ -72,7 +72,7 @@ slowlogEntry *slowlogCreateEntry(client *c, robj **argv, int argc, long long dur (unsigned long) sdslen(szFromObj(argv[j])) - SLOWLOG_ENTRY_MAX_STRING); se->argv[j] = createObject(OBJ_STRING,s); - } else if (argv[j]->refcount == OBJ_SHARED_REFCOUNT) { + } else if (argv[j]->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) { se->argv[j] = argv[j]; } else { /* Here we need to dupliacate the string objects composing the diff --git a/src/t_string.cpp b/src/t_string.cpp index 4cb30eac6..a254f4f53 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -353,7 +353,7 @@ void incrDecrCommand(client *c, long long incr) { } value += incr; - if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT && + if (o && o->getrefcount(std::memory_order_relaxed) == 1 && o->encoding == OBJ_ENCODING_INT && (value < 0 || value >= OBJ_SHARED_INTEGERS) && value >= LONG_MIN && value <= LONG_MAX) {