Merge pull request #50 from JohnSully/expire

New Expire Datastructure which is faster and more memory efficient.  This allows us to process expiries in sublinear time.

Former-commit-id: d45edc493d111d4be81f2ce24e3022c8fffb3e2f
This commit is contained in:
John Sully 2019-07-23 18:40:48 -04:00 committed by GitHub
commit be123c44b9
18 changed files with 1005 additions and 328 deletions

View File

@ -51,6 +51,7 @@
"tuple": "cpp",
"type_traits": "cpp",
"typeinfo": "cpp",
"utility": "cpp"
"utility": "cpp",
"set": "cpp"
}
}

View File

@ -85,7 +85,7 @@ struct bio_job {
void *bioProcessBackgroundJobs(void *arg);
void lazyfreeFreeObjectFromBioThread(robj *o);
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2);
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset<expireEntry, const char *> *set);
void lazyfreeFreeSlotsMapFromBioThread(rax *rt);
/* Make sure we have enough stack to perform all the things we do in the
@ -196,7 +196,7 @@ void *bioProcessBackgroundJobs(void *arg) {
if (job->arg1)
lazyfreeFreeObjectFromBioThread((robj*)job->arg1);
else if (job->arg2 && job->arg3)
lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(dict*)job->arg3);
lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(semiorderedset<expireEntry, const char *>*)job->arg3);
else if (job->arg3)
lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3);
} else {

153
src/compactvector.h Normal file
View File

@ -0,0 +1,153 @@
#pragma once
#include <type_traits>
#include <assert.h>
/*************************************************
* compactvector - similar to std::vector but optimized for minimal memory
*
* Notable differences:
* - Limited to 2^32 elements
* - Grows linearly not exponentially
*
*************************************************/
template<typename T, bool MEMMOVE_SAFE=false>
class compactvector
{
static_assert(MEMMOVE_SAFE || std::is_trivially_copyable<T>::value, "compactvector requires trivially copyable types");
T *m_data = nullptr;
unsigned m_celem = 0;
unsigned m_max = 0;
public:
typedef T* iterator;
compactvector() noexcept = default;
~compactvector() noexcept
{
clear(); // call dtors
zfree(m_data);
}
compactvector(compactvector &) noexcept = delete;
compactvector(compactvector &&src) noexcept
{
m_data = src.m_data;
m_celem = src.m_celem;
m_max = src.m_max;
src.m_data = nullptr;
src.m_celem = 0;
src.m_max = 0;
}
compactvector &operator=(const compactvector&) noexcept = delete;
compactvector &operator=(compactvector &&src) noexcept
{
zfree(m_data);
m_data = src.m_data;
m_celem = src.m_celem;
m_max = src.m_max;
src.m_data = nullptr;
src.m_celem = 0;
src.m_max = 0;
return *this;
}
inline T* begin() { return m_data; }
inline const T* begin() const { return m_data; }
inline T* end() { return m_data + m_celem; }
inline const T* end() const { return m_data + m_celem; }
T* insert(T* where, T &val)
{
assert(where >= m_data);
size_t idx = where - m_data;
if (m_celem >= m_max)
{
if (m_max < 2)
m_max = 2;
else
m_max = m_max + 4;
m_data = (T*)zrealloc(m_data, sizeof(T) * m_max, MALLOC_LOCAL);
m_max = zmalloc_usable(m_data) / sizeof(T);
}
assert(idx < m_max);
where = m_data + idx;
memmove(m_data + idx + 1, m_data + idx, (m_celem - idx)*sizeof(T));
new(m_data + idx) T(std::move(val));
++m_celem;
return where;
}
T &operator[](size_t idx)
{
assert(idx < m_celem);
return m_data[idx];
}
const T &operator[](size_t idx) const
{
assert(idx < m_celem);
return m_data[idx];
}
T& back() { assert(m_celem > 0); return m_data[m_celem-1]; }
const T& back() const { assert(m_celem > 0); return m_data[m_celem-1]; }
void erase(T* where)
{
assert(where >= m_data);
size_t idx = where - m_data;
assert(idx < m_celem);
where->~T();
memmove(where, where+1, ((m_celem - idx - 1)*sizeof(T)));
--m_celem;
if (m_celem == 0)
{
zfree(m_data);
m_data = nullptr;
m_max = 0;
}
}
void shrink_to_fit()
{
if (m_max == m_celem)
return;
m_data = (T*)zrealloc(m_data, sizeof(T) * m_celem, MALLOC_LOCAL);
m_max = m_celem; // NOTE: We do not get the usable size here, because this could cause us to continually realloc
}
size_t bytes_used() const
{
return sizeof(this) + (m_max * sizeof(T));
}
void clear()
{
for (size_t idx = 0; idx < m_celem; ++idx)
m_data[idx].~T();
zfree(m_data);
m_data = nullptr;
m_celem = 0;
m_max = 0;
}
bool empty() const noexcept
{
return m_celem == 0;
}
size_t size() const noexcept
{
return m_celem;
}
T* data() noexcept { return m_data; }
const T* data() const noexcept { return m_data; }
};
static_assert(sizeof(compactvector<void*>) <= 16, "not compact");

View File

@ -39,6 +39,8 @@
*----------------------------------------------------------------------------*/
int keyIsExpired(redisDb *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key, robj *o);
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
@ -49,6 +51,20 @@ void updateLFU(robj *val) {
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
}
void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew)
{
serverAssert(valOld->FExpires());
serverAssert(!valNew->FExpires());
auto itr = db->setexpire->find(key);
serverAssert(itr != db->setexpire->end());
valNew->SetFExpires(true);
valOld->SetFExpires(false);
return;
}
/* Low level key lookup API, not actually called directly from commands
* implementations that should instead rely on lookupKeyRead(),
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
@ -160,8 +176,10 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) {
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWrite(redisDb *db, robj *key) {
expireIfNeeded(db,key);
return lookupKey(db,key,LOOKUP_UPDATEMVCC);
robj *o = lookupKey(db,key,LOOKUP_UPDATEMVCC);
if (expireIfNeeded(db,key))
o = NULL;
return o;
}
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
@ -177,6 +195,7 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
}
int dbAddCore(redisDb *db, robj *key, robj *val) {
serverAssert(!val->FExpires());
sds copy = sdsdup(szFromObj(key));
int retval = dictAdd(db->pdict, copy, val);
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
@ -206,10 +225,18 @@ void dbAdd(redisDb *db, robj *key, robj *val)
serverAssertWithInfo(NULL,key,retval == DICT_OK);
}
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *val, bool fUpdateMvcc)
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire)
{
dictEntry auxentry = *de;
robj *old = (robj*)dictGetVal(de);
if (old->FExpires()) {
if (fRemoveExpire)
removeExpire(db, key);
else
updateExpire(db, (sds)dictGetKey(de), old, val);
}
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
val->lru = old->lru;
}
@ -235,7 +262,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,de != NULL);
dbOverwriteCore(db, de, val, true);
dbOverwriteCore(db, de, key, val, true, false);
}
/* Insert a key, handling duplicate keys according to fReplace */
@ -250,7 +277,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
robj *old = (robj*)dictGetVal(de);
if (old->mvcc_tstamp <= val->mvcc_tstamp)
{
dbOverwriteCore(db, de, val, false);
dbOverwriteCore(db, de, key, val, false, true);
return true;
}
@ -271,13 +298,13 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
*
* All the new keys in the database should be created via this interface. */
void setKey(redisDb *db, robj *key, robj *val) {
if (lookupKeyWrite(db,key) == NULL) {
dictEntry *de = dictFind(db->pdict, ptrFromObj(key));
if (de == NULL) {
dbAdd(db,key,val);
} else {
dbOverwrite(db,key,val);
dbOverwriteCore(db,de,key,val,true,true);
}
incrRefCount(val);
removeExpire(db,key);
signalModifiedKey(db,key);
}
@ -292,7 +319,7 @@ int dbExists(redisDb *db, robj *key) {
robj *dbRandomKey(redisDb *db) {
dictEntry *de;
int maxtries = 100;
int allvolatile = dictSize(db->pdict) == dictSize(db->expires);
int allvolatile = dictSize(db->pdict) == db->setexpire->size();
while(1) {
sds key;
@ -303,23 +330,30 @@ robj *dbRandomKey(redisDb *db) {
key = (sds)dictGetKey(de);
keyobj = createStringObject(key,sdslen(key));
if (dictFind(db->expires,key)) {
if (((robj*)dictGetVal(de))->FExpires())
{
if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
* it could happen that all the keys are already logically
* expired in the slave, so the function cannot stop because
* expireIfNeeded() is false, nor it can stop because
* dictGetRandomKey() returns NULL (there are keys to return).
* To prevent the infinite loop we do some tries, but if there
* are the conditions for an infinite loop, eventually we
* return a key name that may be already expired. */
* it could happen that all the keys are already logically
* expired in the slave, so the function cannot stop because
* expireIfNeeded() is false, nor it can stop because
* dictGetRandomKey() returns NULL (there are keys to return).
* To prevent the infinite loop we do some tries, but if there
* are the conditions for an infinite loop, eventually we
* return a key name that may be already expired. */
return keyobj;
}
if (expireIfNeeded(db,keyobj)) {
}
if (((robj*)dictGetVal(de))->FExpires())
{
if (expireIfNeeded(db,keyobj)) {
decrRefCount(keyobj);
continue; /* search for another key. This expired. */
}
}
}
return keyobj;
}
}
@ -328,7 +362,10 @@ robj *dbRandomKey(redisDb *db) {
int dbSyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,ptrFromObj(key));
dictEntry *de = dictFind(db->pdict, szFromObj(key));
if (de != nullptr && ((robj*)dictGetVal(de))->FExpires())
removeExpireCore(db, key, de);
if (dictDelete(db->pdict,ptrFromObj(key)) == DICT_OK) {
if (g_pserver->cluster_enabled) slotToKeyDel(key);
return 1;
@ -373,7 +410,7 @@ int dbDelete(redisDb *db, robj *key) {
*/
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
serverAssert(o->type == OBJ_STRING);
if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
if (o->getrefcount(std::memory_order_relaxed) != 1 || o->encoding != OBJ_ENCODING_RAW) {
robj *decoded = getDecodedObject(o);
o = createRawStringObject(szFromObj(decoded), sdslen(szFromObj(decoded)));
decrRefCount(decoded);
@ -419,7 +456,9 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
emptyDbAsync(&g_pserver->db[j]);
} else {
dictEmpty(g_pserver->db[j].pdict,callback);
dictEmpty(g_pserver->db[j].expires,callback);
delete g_pserver->db[j].setexpire;
g_pserver->db[j].setexpire = new (MALLOC_LOCAL) semiorderedset<expireEntry, const char*>();
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
}
}
if (g_pserver->cluster_enabled) {
@ -964,9 +1003,10 @@ void renameGenericCommand(client *c, int nx) {
* with the same name. */
dbDelete(c->db,c->argv[2]);
}
dbAdd(c->db,c->argv[2],o);
if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
dbDelete(c->db,c->argv[1]);
dbAdd(c->db,c->argv[2],o);
if (expire != -1)
setExpire(c,c->db,c->argv[2],expire);
signalModifiedKey(c->db,c->argv[1]);
signalModifiedKey(c->db,c->argv[2]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
@ -1024,6 +1064,12 @@ void moveCommand(client *c) {
return;
}
expire = getExpire(c->db,c->argv[1]);
if (o->FExpires())
removeExpire(c->db,c->argv[1]);
serverAssert(!o->FExpires());
incrRefCount(o);
dbDelete(src,c->argv[1]);
g_pserver->dirty++;
/* Return zero if the key already exists in the target DB */
if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
@ -1032,11 +1078,7 @@ void moveCommand(client *c) {
}
dbAdd(dst,c->argv[1],o);
if (expire != -1) setExpire(c,dst,c->argv[1],expire);
incrRefCount(o);
/* OK! key moved, free the entry in the source DB */
dbDelete(src,c->argv[1]);
g_pserver->dirty++;
addReply(c,shared.cone);
}
@ -1077,12 +1119,16 @@ int dbSwapDatabases(int id1, int id2) {
* ready_keys and watched_keys, since we want clients to
* remain in the same DB they were. */
db1->pdict = db2->pdict;
db1->expires = db2->expires;
db1->setexpire = db2->setexpire;
db1->expireitr = db2->expireitr;
db1->avg_ttl = db2->avg_ttl;
db1->last_expire_set = db2->last_expire_set;
db2->pdict = aux.pdict;
db2->expires = aux.expires;
db2->setexpire = aux.setexpire;
db2->expireitr = aux.expireitr;
db2->avg_ttl = aux.avg_ttl;
db2->last_expire_set = aux.last_expire_set;
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
@ -1130,12 +1176,25 @@ void swapdbCommand(client *c) {
/*-----------------------------------------------------------------------------
* Expires API
*----------------------------------------------------------------------------*/
int removeExpire(redisDb *db, robj *key) {
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
return removeExpireCore(db, key, de);
}
int removeExpireCore(redisDb *db, robj *key, dictEntry *de) {
/* An expire may only be removed if there is a corresponding entry in the
* main dict. Otherwise, the key will never be freed. */
serverAssertWithInfo(NULL,key,dictFind(db->pdict,ptrFromObj(key)) != NULL);
return dictDelete(db->expires,ptrFromObj(key)) == DICT_OK;
serverAssertWithInfo(NULL,key,de != NULL);
robj *val = (robj*)dictGetVal(de);
if (!val->FExpires())
return 0;
auto itr = db->setexpire->find((sds)dictGetKey(de));
serverAssert(itr != db->setexpire->end());
serverAssert(itr->key() == (sds)dictGetKey(de));
db->setexpire->erase(itr);
val->SetFExpires(false);
return 1;
}
/* Set an expire to the specified key. If the expire is set in the context
@ -1143,14 +1202,40 @@ int removeExpire(redisDb *db, robj *key) {
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
* after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, long long when) {
dictEntry *kde, *de;
dictEntry *kde;
serverAssert(GlobalLocksAcquired());
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,kde != NULL);
de = dictAddOrFind(db->expires,dictGetKey(kde));
dictSetSignedIntegerVal(de,when);
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
{
// shared objects cannot have the expire bit set, create a real object
dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde)));
}
if (((robj*)dictGetVal(kde))->FExpires())
removeExpire(db, key); // should we optimize for when this is called with an already set expiry?
expireEntry e((sds)dictGetKey(kde), when);
((robj*)dictGetVal(kde))->SetFExpires(true);
/* Update TTL stats (exponential moving average) */
/* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */
long long now = g_pserver->mstime;
db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed
if (db->setexpire->empty())
db->avg_ttl = 0;
else
db->avg_ttl -= db->avg_ttl / db->setexpire->size(); // slide one entry out the window
if (db->avg_ttl < 0)
db->avg_ttl = 0; // TTLs are never negative
db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry
db->last_expire_set = now;
db->setexpire->insert(e);
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
@ -1163,13 +1248,18 @@ long long getExpire(redisDb *db, robj_roptr key) {
dictEntry *de;
/* No expire? return ASAP */
if (dictSize(db->expires) == 0 ||
(de = dictFind(db->expires,ptrFromObj(key))) == NULL) return -1;
if (db->setexpire->size() == 0)
return -1;
/* The entry was found in the expire dict, this means it should also
* be present in the main dict (safety check). */
serverAssertWithInfo(NULL,key,dictFind(db->pdict,ptrFromObj(key)) != NULL);
return dictGetSignedIntegerVal(de);
de = dictFind(db->pdict, ptrFromObj(key));
if (de == NULL)
return -1;
robj *obj = (robj*)dictGetVal(de);
if (!obj->FExpires())
return -1;
auto itr = db->setexpire->find((sds)dictGetKey(de));
return itr->when();
}
/* Propagate expires into slaves and the AOF file.

View File

@ -436,7 +436,7 @@ NULL
"Value at:%p refcount:%d "
"encoding:%s serializedlength:%zu "
"lru:%d lru_seconds_idle:%llu%s",
(void*)val, static_cast<int>(val->refcount),
(void*)val, static_cast<int>(val->getrefcount(std::memory_order_relaxed)),
strenc, rdbSavedObjectLen(val),
val->lru, estimateObjectIdleTime(val)/1000, extra);
} else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) {
@ -638,9 +638,9 @@ NULL
dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].pdict);
stats = sdscat(stats,buf);
stats = sdscatprintf(stats,"[Expires HT]\n");
dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].expires);
stats = sdscat(stats,buf);
stats = sdscatprintf(stats,"[Expires set]\n");
g_pserver->db[dbid].setexpire->getstats(buf, sizeof(buf));
stats = sdscat(stats, buf);
addReplyBulkSds(c,stats);
} else if (!strcasecmp(szFromObj(c->argv[1]),"htstats-key") && c->argc == 3) {
@ -721,14 +721,14 @@ void _serverAssertPrintClientInfo(const client *c) {
arg = buf;
}
serverLog(LL_WARNING,"client->argv[%d] = \"%s\" (refcount: %d)",
j, arg, static_cast<int>(c->argv[j]->refcount));
j, arg, static_cast<int>(c->argv[j]->getrefcount(std::memory_order_relaxed)));
}
}
void serverLogObjectDebugInfo(robj_roptr o) {
serverLog(LL_WARNING,"Object type: %d", o->type);
serverLog(LL_WARNING,"Object encoding: %d", o->encoding);
serverLog(LL_WARNING,"Object refcount: %d", static_cast<int>(o->refcount));
serverLog(LL_WARNING,"Object refcount: %d", static_cast<int>(o->getrefcount(std::memory_order_relaxed)));
if (o->type == OBJ_STRING && sdsEncodedObject(o)) {
serverLog(LL_WARNING,"Object raw string len: %zu", sdslen(szFromObj(o)));
if (sdslen(szFromObj(o)) < 4096) {

View File

@ -48,6 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
/* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
void replaceSateliteOSetKeyPtr(semiorderedset<expireEntry, const char*> &set, sds oldkey, sds newkey);
/* Defrag helper for generic allocations.
*
@ -102,7 +103,7 @@ sds activeDefragSds(sds sdsptr) {
* and should NOT be accessed. */
robj *activeDefragStringOb(robj* ob, long *defragged) {
robj *ret = NULL;
if (ob->refcount!=1)
if (ob->getrefcount(std::memory_order_relaxed)!=1)
return NULL;
/* try to defrag robj (only if not an EMBSTR type (handled below). */
@ -406,6 +407,16 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
return NULL;
}
void replaceSateliteOSetKeyPtr(semiorderedset<expireEntry, const char*> &set, sds oldkey, sds newkey) {
auto itr = set.find(oldkey);
if (itr != set.end())
{
expireEntry eNew(newkey, itr->when());
set.erase(itr);
set.insert(eNew);
}
}
long activeDefragQuickListNodes(quicklist *ql) {
quicklistNode *node = ql->head, *newnode;
long defragged = 0;
@ -769,12 +780,8 @@ long defragKey(redisDb *db, dictEntry *de) {
newsds = activeDefragSds(keysds);
if (newsds)
defragged++, de->key = newsds;
if (dictSize(db->expires)) {
/* Dirty code:
* I can't search in db->expires for that key after i already released
* the pointer it holds it won't be able to do the string compare */
uint64_t hash = dictGetHash(db->pdict, de->key);
replaceSateliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged);
if (!db->setexpire->empty()) {
replaceSateliteOSetKeyPtr(*db->setexpire, keysds, newsds);
}
/* Try to defrag robj and / or string value. */

View File

@ -150,6 +150,84 @@ void evictionPoolAlloc(void) {
EvictionPoolLRU = ep;
}
void processEvictionCandidate(int dbid, sds key, robj *o, const expireEntry *e, struct evictionPoolEntry *pool)
{
unsigned long long idle;
/* Calculate the idle time according to the policy. This is called
* idle just because the code initially handled LRU, but is in fact
* just a score where an higher score means better candidate. */
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU) {
idle = (o != nullptr) ? estimateObjectIdleTime(o) : 0;
} else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
/* When we use an LRU policy, we sort the keys by idle time
* so that we expire keys starting from greater idle time.
* However when the policy is an LFU one, we have a frequency
* estimation, and we want to evict keys with lower frequency
* first. So inside the pool we put objects using the inverted
* frequency subtracting the actual frequency to the maximum
* frequency of 255. */
idle = 255-LFUDecrAndReturn(o);
} else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
/* In this case the sooner the expire the better. */
idle = ULLONG_MAX - e->when();
} else {
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}
/* Insert the element inside the pool.
* First, find the first empty bucket or the first populated
* bucket that has an idle time smaller than our idle time. */
int k = 0;
while (k < EVPOOL_SIZE &&
pool[k].key &&
pool[k].idle < idle) k++;
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
/* Can't insert if the element is < the worst element we have
* and there are no empty buckets. */
return;
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
/* Inserting into empty position. No setup needed before insert. */
} else {
/* Inserting in the middle. Now k points to the first element
* greater than the element to insert. */
if (pool[EVPOOL_SIZE-1].key == NULL) {
/* Free space on the right? Insert at k shifting
* all the elements from k to end to the right. */
/* Save SDS before overwriting. */
sds cached = pool[EVPOOL_SIZE-1].cached;
memmove(pool+k+1,pool+k,
sizeof(pool[0])*(EVPOOL_SIZE-k-1));
pool[k].cached = cached;
} else {
/* No free space on right? Insert at k-1 */
k--;
/* Shift all elements on the left of k (included) to the
* left, so we discard the element with smaller idle time. */
sds cached = pool[0].cached; /* Save SDS before overwriting. */
if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
memmove(pool,pool+1,sizeof(pool[0])*k);
pool[k].cached = cached;
}
}
/* Try to reuse the cached SDS string allocated in the pool entry,
* because allocating and deallocating this object is costly
* (according to the profiler, not my fantasy. Remember:
* premature optimizbla bla bla bla. */
int klen = sdslen(key);
if (klen > EVPOOL_CACHED_SDS_SIZE) {
pool[k].key = sdsdup(key);
} else {
memcpy(pool[k].cached,key,klen+1);
sdssetlen(pool[k].cached,klen);
pool[k].key = pool[k].cached;
}
pool[k].idle = idle;
pool[k].dbid = dbid;
}
/* This is an helper function for freeMemoryIfNeeded(), it is used in order
* to populate the evictionPool with a few entries every time we want to
* expire a key. Keys with idle time smaller than one of the current
@ -159,100 +237,36 @@ void evictionPoolAlloc(void) {
* idle time are on the left, and keys with the higher idle time on the
* right. */
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
int j, k, count;
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
struct visitFunctor
{
int dbid;
dict *dbdict;
struct evictionPoolEntry *pool;
int count;
count = dictGetSomeKeys(sampledict,samples,g_pserver->maxmemory_samples);
for (j = 0; j < count; j++) {
unsigned long long idle;
sds key;
robj *o = nullptr;
dictEntry *de;
de = samples[j];
key = (sds)dictGetKey(de);
/* If the dictionary we are sampling from is not the main
* dictionary (but the expires one) we need to lookup the key
* again in the key dictionary to obtain the value object. */
if (g_pserver->maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
if (sampledict != keydict) de = dictFind(keydict, key);
o = (robj*)dictGetVal(de);
bool operator()(const expireEntry &e)
{
dictEntry *de = dictFind(dbdict, e.key());
processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool);
++count;
return count < g_pserver->maxmemory_samples;
}
};
void evictionPoolPopulate(int dbid, dict *dbdict, semiorderedset<expireEntry,const char*> *setexpire, struct evictionPoolEntry *pool)
{
if (setexpire != nullptr)
{
visitFunctor visitor { dbid, dbdict, pool, 0 };
setexpire->random_visit(visitor);
}
else
{
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
int count = dictGetSomeKeys(dbdict,samples,g_pserver->maxmemory_samples);
for (int j = 0; j < count; j++) {
robj *o = (robj*)dictGetVal(samples[j]);
processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool);
}
/* Calculate the idle time according to the policy. This is called
* idle just because the code initially handled LRU, but is in fact
* just a score where an higher score means better candidate. */
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LRU) {
idle = (o != nullptr) ? estimateObjectIdleTime(o) : 0;
} else if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
/* When we use an LRU policy, we sort the keys by idle time
* so that we expire keys starting from greater idle time.
* However when the policy is an LFU one, we have a frequency
* estimation, and we want to evict keys with lower frequency
* first. So inside the pool we put objects using the inverted
* frequency subtracting the actual frequency to the maximum
* frequency of 255. */
idle = 255-LFUDecrAndReturn(o);
} else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
/* In this case the sooner the expire the better. */
idle = ULLONG_MAX - (long)dictGetVal(de);
} else {
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
}
/* Insert the element inside the pool.
* First, find the first empty bucket or the first populated
* bucket that has an idle time smaller than our idle time. */
k = 0;
while (k < EVPOOL_SIZE &&
pool[k].key &&
pool[k].idle < idle) k++;
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
/* Can't insert if the element is < the worst element we have
* and there are no empty buckets. */
continue;
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
/* Inserting into empty position. No setup needed before insert. */
} else {
/* Inserting in the middle. Now k points to the first element
* greater than the element to insert. */
if (pool[EVPOOL_SIZE-1].key == NULL) {
/* Free space on the right? Insert at k shifting
* all the elements from k to end to the right. */
/* Save SDS before overwriting. */
sds cached = pool[EVPOOL_SIZE-1].cached;
memmove(pool+k+1,pool+k,
sizeof(pool[0])*(EVPOOL_SIZE-k-1));
pool[k].cached = cached;
} else {
/* No free space on right? Insert at k-1 */
k--;
/* Shift all elements on the left of k (included) to the
* left, so we discard the element with smaller idle time. */
sds cached = pool[0].cached; /* Save SDS before overwriting. */
if (pool[0].key != pool[0].cached) sdsfree(pool[0].key);
memmove(pool,pool+1,sizeof(pool[0])*k);
pool[k].cached = cached;
}
}
/* Try to reuse the cached SDS string allocated in the pool entry,
* because allocating and deallocating this object is costly
* (according to the profiler, not my fantasy. Remember:
* premature optimizbla bla bla bla. */
int klen = sdslen(key);
if (klen > EVPOOL_CACHED_SDS_SIZE) {
pool[k].key = sdsdup(key);
} else {
memcpy(pool[k].cached,key,klen+1);
sdssetlen(pool[k].cached,klen);
pool[k].key = pool[k].cached;
}
pool[k].idle = idle;
pool[k].dbid = dbid;
}
}
@ -474,8 +488,6 @@ int freeMemoryIfNeeded(void) {
sds bestkey = NULL;
int bestdbid;
redisDb *db;
dict *dict;
dictEntry *de;
if (g_pserver->maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
@ -490,10 +502,18 @@ int freeMemoryIfNeeded(void) {
* every DB. */
for (i = 0; i < cserver.dbnum; i++) {
db = g_pserver->db+i;
dict = (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
db->pdict : db->expires;
if ((keys = dictSize(dict)) != 0) {
evictionPoolPopulate(i, dict, db->pdict, pool);
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)
{
if ((keys = dictSize(db->pdict)) != 0) {
evictionPoolPopulate(i, db->pdict, nullptr, pool);
total_keys += keys;
}
}
else
{
keys = db->setexpire->size();
if (keys != 0)
evictionPoolPopulate(i, db->pdict, db->setexpire, pool);
total_keys += keys;
}
}
@ -503,14 +523,11 @@ int freeMemoryIfNeeded(void) {
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
if (pool[k].key == NULL) continue;
bestdbid = pool[k].dbid;
sds key = nullptr;
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
de = dictFind(g_pserver->db[pool[k].dbid].pdict,
pool[k].key);
} else {
de = dictFind(g_pserver->db[pool[k].dbid].expires,
pool[k].key);
}
dictEntry *de = dictFind(g_pserver->db[pool[k].dbid].pdict,pool[k].key);
if (de != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || ((robj*)dictGetVal(de))->FExpires()))
key = (sds)dictGetKey(de);
/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
@ -520,8 +537,8 @@ int freeMemoryIfNeeded(void) {
/* If the key exists, is our pick. Otherwise it is
* a ghost and we need to try the next element. */
if (de) {
bestkey = (sds)dictGetKey(de);
if (key) {
bestkey = key;
break;
} else {
/* Ghost... Iterate again. */
@ -540,13 +557,23 @@ int freeMemoryIfNeeded(void) {
for (i = 0; i < cserver.dbnum; i++) {
j = (++next_db) % cserver.dbnum;
db = g_pserver->db+j;
dict = (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
db->pdict : db->expires;
if (dictSize(dict) != 0) {
de = dictGetRandomKey(dict);
bestkey = (sds)dictGetKey(de);
bestdbid = j;
break;
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
{
if (dictSize(db->pdict) != 0) {
dictEntry *de = dictGetRandomKey(db->pdict);
bestkey = (sds)dictGetKey(de);
bestdbid = j;
break;
}
}
else
{
if (!db->setexpire->empty())
{
bestkey = (sds)db->setexpire->random_value().key();
bestdbid = j;
break;
}
}
}
}

View File

@ -51,26 +51,19 @@
*
* The parameter 'now' is the current time in milliseconds as is passed
* to the function to avoid too many gettimeofday() syscalls. */
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
long long t = dictGetSignedIntegerVal(de);
if (now > t) {
sds key = (sds)dictGetKey(de);
robj *keyobj = createStringObject(key,sdslen(key));
void activeExpireCycleExpire(redisDb *db, const char *key) {
robj *keyobj = createStringObject(key,sdslen(key));
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire);
if (g_pserver->lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj);
decrRefCount(keyobj);
g_pserver->stat_expiredkeys++;
return 1;
} else {
return 0;
}
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire);
if (g_pserver->lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
notifyKeyspaceEvent(NOTIFY_EXPIRED,
"expired",keyobj,db->id);
if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj);
decrRefCount(keyobj);
g_pserver->stat_expiredkeys++;
}
/* Try to expire a few timed out keys. The algorithm used is adaptive and
@ -148,7 +141,6 @@ void activeExpireCycle(int type) {
long total_expired = 0;
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
int expired;
redisDb *db = g_pserver->db+(current_db % cserver.dbnum);
/* Increment the DB now so we are sure if we run out of time
@ -156,78 +148,44 @@ void activeExpireCycle(int type) {
* distribute the time evenly across DBs. */
current_db++;
/* Continue to expire if at the end of the cycle more than 25%
* of the keys were expired. */
do {
unsigned long num, slots;
long long now, ttl_sum;
int ttl_samples;
iteration++;
long long now;
iteration++;
now = mstime();
/* If there is nothing to expire try next DB ASAP. */
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
/* If there is nothing to expire try next DB ASAP. */
if (db->setexpire->empty())
{
db->avg_ttl = 0;
db->last_expire_set = now;
continue;
}
size_t expired = 0;
size_t tried = 0;
db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](const expireEntry &e) __attribute__((always_inline)) {
if (e.when() < now)
{
activeExpireCycleExpire(db, e.key());
++expired;
}
slots = dictSlots(db->expires);
now = mstime();
++tried;
/* When there are less than 1% filled slots getting random
* keys is expensive, so stop here waiting for better times...
* The dictionary will be resized asap. */
if (num && slots > DICT_HT_INITIAL_SIZE &&
(num*100/slots < 1)) break;
/* The main collection cycle. Sample random keys among keys
* with an expire set, checking for expired ones. */
expired = 0;
ttl_sum = 0;
ttl_samples = 0;
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
while (num--) {
dictEntry *de;
long long ttl;
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
ttl = dictGetSignedIntegerVal(de)-now;
if (activeExpireCycleTryExpire(db,de,now)) expired++;
if (ttl > 0) {
/* We want the average TTL of keys yet not expired. */
ttl_sum += ttl;
ttl_samples++;
}
total_sampled++;
}
total_expired += expired;
/* Update the average TTL stats for this database. */
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;
/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
if ((tried % ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) == 0)
{
/* We can't block forever here even if there are many keys to
* expire. So after a given amount of milliseconds return to the
* caller waiting for the other active expire cycle. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
g_pserver->stat_expired_time_cap_reached_count++;
break;
return false;
}
}
/* We don't repeat the cycle if there are less than 25% of keys
* found expired in the current DB. */
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
return true;
});
total_expired += expired;
}
elapsed = ustime()-start;
@ -301,20 +259,27 @@ void expireSlaveKeys(void) {
while(dbids && dbid < cserver.dbnum) {
if ((dbids & 1) != 0) {
redisDb *db = g_pserver->db+dbid;
dictEntry *expire = dictFind(db->expires,keyname);
// the expire is hashed based on the key pointer, so we need the point in the main db
dictEntry *deMain = dictFind(db->pdict, keyname);
auto itr = db->setexpire->end();
if (deMain != nullptr)
itr = db->setexpire->find((sds)dictGetKey(deMain));
int expired = 0;
if (expire &&
activeExpireCycleTryExpire(g_pserver->db+dbid,expire,start))
if (itr != db->setexpire->end())
{
expired = 1;
if (itr->when() < start) {
activeExpireCycleExpire(g_pserver->db+dbid,itr->key());
expired = 1;
}
}
/* If the key was not expired in this DB, we need to set the
* corresponding bit in the new bitmap we set as value.
* At the end of the loop if the bitmap is zero, it means we
* no longer need to keep track of this key. */
if (expire && !expired) {
if (itr != db->setexpire->end() && !expired) {
noexpire++;
new_dbids |= (uint64_t)1 << dbid;
}

View File

@ -52,16 +52,19 @@ size_t lazyfreeGetFreeEffort(robj *obj) {
* will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
if (dictSize(db->expires) > 0) dictDelete(db->expires,ptrFromObj(key));
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictUnlink(db->pdict,ptrFromObj(key));
if (de) {
robj *val = (robj*)dictGetVal(de);
if (val->FExpires())
{
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
removeExpireCore(db,key,de);
}
size_t free_effort = lazyfreeGetFreeEffort(val);
/* If releasing the object is too much work, do it in the background
@ -72,7 +75,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
* objects, and then call dbDelete(). In this case we'll fall
* through and reach the dictFreeUnlinkedEntry() call, that will be
* equivalent to just calling decrRefCount(). */
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
if (free_effort > LAZYFREE_THRESHOLD && val->getrefcount(std::memory_order_relaxed) == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->pdict,de,NULL);
@ -93,7 +96,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
/* Free an object, if the object is huge enough, free it in async way. */
void freeObjAsync(robj *o) {
size_t free_effort = lazyfreeGetFreeEffort(o);
if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) {
if (free_effort > LAZYFREE_THRESHOLD && o->getrefcount(std::memory_order_relaxed) == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL);
} else {
@ -105,11 +108,13 @@ void freeObjAsync(robj *o) {
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->pdict, *oldht2 = db->expires;
dict *oldht1 = db->pdict;
auto *set = db->setexpire;
db->setexpire = new (MALLOC_LOCAL) semiorderedset<expireEntry, const char*>();
db->expireitr = db->setexpire->end();
db->pdict = dictCreate(&dbDictType,NULL);
db->expires = dictCreate(&keyptrDictType,NULL);
atomicIncr(lazyfree_objects,dictSize(oldht1));
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,set);
}
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
@ -136,10 +141,10 @@ void lazyfreeFreeObjectFromBioThread(robj *o) {
* when the database was logically deleted. 'sl' is a skiplist used by
* Redis Cluster in order to take the hash slots -> keys mapping. This
* may be NULL if Redis Cluster is disabled. */
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset<expireEntry, const char *> *set) {
size_t numkeys = dictSize(ht1);
dictRelease(ht1);
dictRelease(ht2);
delete set;
atomicDecr(lazyfree_objects,numkeys);
}

View File

@ -566,7 +566,7 @@ void RedisModuleCommandDispatcher(client *c) {
for (int i = 0; i < c->argc; i++) {
/* Only do the work if the module took ownership of the object:
* in that case the refcount is no longer 1. */
if (c->argv[i]->refcount > 1)
if (c->argv[i]->getrefcount(std::memory_order_relaxed) > 1)
trimStringObjectIfNeeded(c->argv[i]);
}
}
@ -1037,7 +1037,7 @@ int RM_StringCompare(RedisModuleString *a, RedisModuleString *b) {
/* Return the (possibly modified in encoding) input 'str' object if
* the string is unshared, otherwise NULL is returned. */
RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) {
if (str->refcount != 1) {
if (str->getrefcount(std::memory_order_relaxed) != 1) {
serverLog(LL_WARNING,
"Module attempted to use an in-place string modify operation "
"with a string referenced multiple times. Please check the code "

View File

@ -39,11 +39,11 @@
/* ===================== Creation and parsing of objects ==================== */
robj *createObject(int type, void *ptr) {
robj *o = (robj*)zmalloc(sizeof(*o), MALLOC_SHARED);
robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED);
o->type = type;
o->encoding = OBJ_ENCODING_RAW;
o->m_ptr = ptr;
o->refcount.store(1, std::memory_order_relaxed);
o->setrefcount(1);
o->mvcc_tstamp = OBJ_MVCC_INVALID;
/* Set the LRU to the current lruclock (minutes resolution), or
@ -68,8 +68,9 @@ robj *createObject(int type, void *ptr) {
*
*/
robj *makeObjectShared(robj *o) {
serverAssert(o->refcount == 1);
o->refcount.store(OBJ_SHARED_REFCOUNT, std::memory_order_relaxed);
serverAssert(o->getrefcount(std::memory_order_relaxed) == 1);
serverAssert(!o->FExpires());
o->setrefcount(OBJ_SHARED_REFCOUNT);
return o;
}
@ -86,12 +87,12 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
size_t allocsize = sizeof(struct sdshdr8)+len+1;
if (allocsize < sizeof(void*))
allocsize = sizeof(void*);
robj *o = (robj*)zmalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED);
robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED);
struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr);
o->type = OBJ_STRING;
o->encoding = OBJ_ENCODING_EMBSTR;
o->refcount.store(1, std::memory_order_relaxed);
o->setrefcount(1);
o->mvcc_tstamp = OBJ_MVCC_INVALID;
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
@ -352,11 +353,14 @@ void freeStreamObject(robj_roptr o) {
}
void incrRefCount(robj_roptr o) {
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount.fetch_add(1, std::memory_order_acquire);
if (o->getrefcount(std::memory_order_relaxed) != OBJ_SHARED_REFCOUNT) o->addref();
}
void decrRefCount(robj_roptr o) {
if (o->refcount.load(std::memory_order_acquire) == 1) {
if (o->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
return;
unsigned prev = o->release();
if (prev == 1) {
switch(o->type) {
case OBJ_STRING: freeStringObject(o); break;
case OBJ_LIST: freeListObject(o); break;
@ -369,8 +373,7 @@ void decrRefCount(robj_roptr o) {
}
zfree(o.unsafe_robjcast());
} else {
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount.fetch_sub(1, std::memory_order_acquire);
if (prev <= 0) serverPanic("decrRefCount against refcount <= 0");
}
}
@ -394,7 +397,7 @@ void decrRefCountVoid(const void *o) {
* decrRefCount(obj);
*/
robj *resetRefCount(robj *obj) {
obj->refcount = 0;
obj->setrefcount(0);
return obj;
}
@ -452,7 +455,7 @@ robj *tryObjectEncoding(robj *o) {
/* It's not safe to encode shared objects: shared objects can be shared
* everywhere in the "object space" of Redis and may end in places where
* they are not handled. We handle them only as values in the keyspace. */
if (o->refcount > 1) return o;
if (o->getrefcount(std::memory_order_relaxed) > 1) return o;
/* Check if we can represent this string as a long integer.
* Note that we are sure that a string larger than 20 chars is not
@ -1064,8 +1067,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mh->db[mh->num_dbs].overhead_ht_main = mem;
mem_total+=mem;
mem = dictSize(db->expires) * sizeof(dictEntry) +
dictSlots(db->expires) * sizeof(dictEntry*);
mem = db->setexpire->bytes_used();
mh->db[mh->num_dbs].overhead_ht_expires = mem;
mem_total+=mem;
@ -1275,7 +1277,7 @@ NULL
} else if (!strcasecmp(szFromObj(c->argv[1]),"refcount") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
addReplyLongLong(c,o->refcount);
addReplyLongLong(c,o->getrefcount(std::memory_order_relaxed));
} else if (!strcasecmp(szFromObj(c->argv[1]),"encoding") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
@ -1474,3 +1476,18 @@ NULL
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)ptrFromObj(c->argv[1]));
}
}
void redisObject::SetFExpires(bool fExpire)
{
serverAssert(this->refcount != OBJ_SHARED_REFCOUNT);
if (fExpire)
this->refcount.fetch_or(1U << 31, std::memory_order_relaxed);
else
this->refcount.fetch_and(~(1U << 31), std::memory_order_relaxed);
}
void redisObject::setrefcount(unsigned ref)
{
serverAssert(!FExpires());
refcount.store(ref, std::memory_order_relaxed);
}

View File

@ -1096,6 +1096,29 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
return 1;
}
int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o)
{
robj key;
long long expire;
initStaticStringObject(key,(char*)keystr);
expire = getExpire(db, &key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1)
return 0;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > *processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
*processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
return 1;
}
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
* is returned and part of the output, or all the output, can be
@ -1134,31 +1157,24 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
* these sizes are just hints to resize the hash tables. */
uint64_t db_size, expires_size;
db_size = dictSize(db->pdict);
expires_size = dictSize(db->expires);
expires_size = db->setexpire->size();
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
/* Iterate this DB writing every entry */
size_t ckeysExpired = 0;
while((de = dictNext(di)) != NULL) {
sds keystr = (sds)dictGetKey(de);
robj key, *o = (robj*)dictGetVal(de);
long long expire;
robj *o = (robj*)dictGetVal(de);
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (flags & RDB_SAVE_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
if (o->FExpires())
++ckeysExpired;
if (!saveKey(rdb, db, flags, &processed, keystr, o))
goto werr;
}
serverAssert(ckeysExpired == db->setexpire->size());
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
@ -1822,6 +1838,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
}
o->mvcc_tstamp = mvcc_tstamp;
serverAssert(!o->FExpires());
return o;
}
@ -1909,7 +1926,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
now = mstime();
lru_clock = LRU_CLOCK();
while(1) {
robj *key, *val;
@ -1965,7 +1982,6 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
dictExpand(db->pdict,db_size);
dictExpand(db->expires,expires_size);
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB
@ -2079,7 +2095,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
if (fInserted)
{
/* Set the expire time if needed */
if (expiretime != -1) setExpire(NULL,db,key,expiretime);
if (expiretime != -1)
setExpire(NULL,db,key,expiretime);
/* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
@ -2101,6 +2118,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
lfu_freq = -1;
lru_idle = -1;
}
/* Verify the checksum if RDB version is >= 5 */
if (rdbver >= 5) {
uint64_t cksum, expected = rdb->cksum;

View File

@ -665,7 +665,7 @@ cleanup:
* The object must be small, SDS-encoded, and with refcount = 1
* (we must be the only owner) for us to cache it. */
if (j < LUA_CMD_OBJCACHE_SIZE &&
o->refcount == 1 &&
o->getrefcount(std::memory_order_relaxed) == 1 &&
(o->encoding == OBJ_ENCODING_RAW ||
o->encoding == OBJ_ENCODING_EMBSTR) &&
sdslen((sds)ptrFromObj(o)) <= LUA_CMD_OBJCACHE_MAX_LEN)

344
src/semiorderedset.h Normal file
View File

@ -0,0 +1,344 @@
#pragma once
#include <assert.h>
#include "compactvector.h"
/****************************************
* semiorderedset.h:
*
* The ordered set is a hash set that maintains semi-ordering, that is you can iterate in sub-linear time over the set comparing a value.
* It has a few other useful properties vs the traditional set:
* 1. The key need not be the underlying type, the only requirement is the value type is castable to the key
* 2. The key need not have total ordering. The set will iterate until it finds an exact match with operator== on the value
* This provides additional flexibility on insert allowing us to optimize this case.
*
*/
extern uint64_t dictGenHashFunction(const void *key, int len);
template<typename T, typename T_KEY = T>
class semiorderedset
{
friend struct setiter;
std::vector<compactvector<T>> m_data;
size_t celem = 0;
static const size_t bits_min = 8;
size_t bits = bits_min;
size_t idxRehash = (1ULL << bits_min);
bool fPauseRehash = false;
constexpr size_t targetElementsPerBucket()
{
// Aim for roughly 4 cache lines per bucket (determined by imperical testing)
// lower values are faster but use more memory
return std::max((64/sizeof(T))*4, (size_t)2);
}
public:
semiorderedset()
{
m_data.resize((1ULL << bits));
}
struct setiter
{
semiorderedset *set;
size_t idxPrimary = 0;
size_t idxSecondary = 0;
setiter(semiorderedset *set)
{
this->set = set;
}
bool operator==(const setiter &other) const
{
return (idxPrimary == other.idxPrimary) && (idxSecondary == other.idxSecondary);
}
bool operator!=(const setiter &other) const { return !operator==(other); }
inline T &operator*() { return set->m_data[idxPrimary][idxSecondary]; }
inline const T &operator*() const { return set->m_data[idxPrimary][idxSecondary]; }
inline T *operator->() { return &set->m_data[idxPrimary][idxSecondary]; }
inline const T *operator->() const { return &set->m_data[idxPrimary][idxSecondary]; }
};
setiter find(const T_KEY &key)
{
RehashStep();
setiter itr(this);
itr.idxPrimary = idxFromObj(key);
for (int hashset = 0; hashset < 2; ++hashset) // rehashing may only be 1 resize behind, so we check up to two slots
{
auto &vecBucket = m_data[itr.idxPrimary];
auto itrFind = std::find(vecBucket.begin(), vecBucket.end(), key);
if (itrFind != vecBucket.end())
{
itr.idxSecondary = itrFind - vecBucket.begin();
return itr;
}
// See if we have to check the older slot
size_t mask = (hashmask() >> 1);
if (itr.idxPrimary == (itr.idxPrimary & mask))
break; // same bucket we just checked
itr.idxPrimary &= mask;
if (FRehashedRow(itr.idxPrimary))
break;
}
return end();
}
setiter end()
{
setiter itr(this);
itr.idxPrimary = m_data.size();
return itr;
}
void insert(T &e, bool fRehash = false)
{
if (!fRehash)
RehashStep();
auto idx = idxFromObj(static_cast<T_KEY>(e));
if (!fRehash)
++celem;
typename compactvector<T>::iterator itrInsert;
if (!m_data[idx].empty() && !(e < m_data[idx].back()))
itrInsert = m_data[idx].end();
else
itrInsert = std::upper_bound(m_data[idx].begin(), m_data[idx].end(), e);
itrInsert = m_data[idx].insert(itrInsert, e);
if (celem > ((1ULL << bits)*targetElementsPerBucket()))
grow();
}
// enumeration starting from the 'itrStart'th key. Note that the iter is a hint, and need no be valid anymore
template<typename T_VISITOR, typename T_MAX>
setiter enumerate(const setiter &itrStart, const T_MAX &max, T_VISITOR fn)
{
setiter itr(itrStart);
if (itrStart.set == this) // really if this case isn't true its probably a bug
itr = itrStart; // but why crash the program when we can easily fix this?
fPauseRehash = true;
if (itr.idxPrimary >= m_data.size())
itr.idxPrimary = 0;
for (size_t ibucket = 0; ibucket < m_data.size(); ++ibucket)
{
if (!enumerate_bucket(itr, max, fn))
break;
itr.idxSecondary = 0;
++itr.idxPrimary;
if (itr.idxPrimary >= m_data.size())
itr.idxPrimary = 0;
}
fPauseRehash = false;
return itr;
}
// This will "randomly" visit nodes biased towards lower values first
template<typename T_VISITOR>
size_t random_visit(T_VISITOR &fn)
{
bool fSawAny = true;
size_t visited = 0;
size_t basePrimary = rand() % m_data.size();
for (size_t idxSecondary = 0; fSawAny; ++idxSecondary)
{
fSawAny = false;
for (size_t idxPrimaryCount = 0; idxPrimaryCount < m_data.size(); ++idxPrimaryCount)
{
size_t idxPrimary = (basePrimary + idxPrimaryCount) % m_data.size();
if (idxSecondary < m_data[idxPrimary].size())
{
++visited;
fSawAny = true;
if (!fn(m_data[idxPrimary][idxSecondary]))
return visited;
}
}
}
return visited;
}
const T& random_value() const
{
assert(!empty());
for (;;)
{
size_t idxPrimary = rand() % m_data.size();
if (m_data[idxPrimary].empty())
continue;
return m_data[idxPrimary][rand() % m_data[idxPrimary].size()];
}
}
void erase(const setiter &itr)
{
auto &vecRow = m_data[itr.idxPrimary];
vecRow.erase(vecRow.begin() + itr.idxSecondary);
--celem;
RehashStep();
}
void clear()
{
m_data = decltype(m_data)();
bits = bits_min;
m_data.resize(1ULL << bits);
idxRehash = m_data.size();
}
bool empty() const noexcept { return celem == 0; }
size_t size() const noexcept { return celem; }
size_t bytes_used() const
{
size_t cb = sizeof(this) + (m_data.capacity()-m_data.size())*sizeof(T);
for (auto &vec : m_data)
{
cb += vec.bytes_used();
}
return cb;
}
#define DICT_STATS_VECTLEN 50
size_t getstats(char *buf, size_t bufsize) const
{
unsigned long i, slots = 0, chainlen, maxchainlen = 0;
unsigned long totchainlen = 0;
unsigned long clvector[DICT_STATS_VECTLEN] = {0};
size_t l = 0;
if (empty()) {
return snprintf(buf,bufsize,
"No stats available for empty dictionaries\n");
}
/* Compute stats. */
for (auto &vec : m_data) {
if (vec.empty()) {
clvector[0]++;
continue;
}
slots++;
/* For each hash entry on this slot... */
chainlen = vec.size();
clvector[(chainlen < DICT_STATS_VECTLEN) ? chainlen : (DICT_STATS_VECTLEN-1)]++;
if (chainlen > maxchainlen) maxchainlen = chainlen;
totchainlen += chainlen;
}
size_t used = m_data.size()-clvector[0];
/* Generate human readable stats. */
l += snprintf(buf+l,bufsize-l,
"semiordered set stats:\n"
" table size: %ld\n"
" number of slots: %ld\n"
" used slots: %ld\n"
" max chain length: %ld\n"
" avg chain length (counted): %.02f\n"
" avg chain length (computed): %.02f\n"
" Chain length distribution:\n",
size(), used, slots, maxchainlen,
(float)totchainlen/slots, (float)size()/m_data.size());
for (i = 0; i < DICT_STATS_VECTLEN; i++) {
if (clvector[i] == 0) continue;
if (l >= bufsize) break;
l += snprintf(buf+l,bufsize-l,
" %s%ld: %ld (%.02f%%)\n",
(i == DICT_STATS_VECTLEN-1)?">= ":"",
i, clvector[i], ((float)clvector[i]/m_data.size())*100);
}
/* Unlike snprintf(), teturn the number of characters actually written. */
if (bufsize) buf[bufsize-1] = '\0';
return strlen(buf);
}
private:
inline size_t hashmask() const { return (1ULL << bits) - 1; }
size_t idxFromObj(const T_KEY &key)
{
size_t v = (size_t)dictGenHashFunction(&key, sizeof(key));
return v & hashmask();
}
bool FRehashedRow(size_t idx) const
{
return (idx >= (m_data.size()/2)) || (idx < idxRehash);
}
void RehashStep()
{
if (fPauseRehash)
return;
int steps = 0;
for (; idxRehash < (m_data.size()/2); ++idxRehash)
{
compactvector<T> vecT;
std::swap(m_data[idxRehash], vecT);
for (auto &v : vecT)
insert(v, true);
if (++steps > 1024)
break;
}
}
void grow()
{
assert(idxRehash >= (m_data.size()/2)); // we should have finished rehashing by the time we need to grow again
++bits;
m_data.resize(1ULL << bits);
idxRehash = 0;
RehashStep();
}
template<typename T_VISITOR, typename T_MAX>
inline bool enumerate_bucket(setiter &itr, const T_MAX &max, T_VISITOR &fn)
{
auto &vec = m_data[itr.idxPrimary];
for (; itr.idxSecondary < vec.size(); ++itr.idxSecondary)
{
// Assert we're ordered by T_MAX
assert((itr.idxSecondary+1) >= vec.size()
|| static_cast<T_MAX>(vec[itr.idxSecondary]) <= static_cast<T_MAX>(vec[itr.idxSecondary+1]));
if (max < static_cast<T_MAX>(*itr))
return true;
size_t sizeBefore = vec.size();
if (!fn(*itr))
{
itr.idxSecondary++; // we still visited this node
return false;
}
if (vec.size() != sizeBefore)
{
assert(vec.size() == (sizeBefore-1)); // they may only remove the element passed to them
--itr.idxSecondary; // they deleted the element
}
}
vec.shrink_to_fit();
return true;
}
};

View File

@ -1428,8 +1428,6 @@ int htNeedsResize(dict *dict) {
void tryResizeHashTables(int dbid) {
if (htNeedsResize(g_pserver->db[dbid].pdict))
dictResize(g_pserver->db[dbid].pdict);
if (htNeedsResize(g_pserver->db[dbid].expires))
dictResize(g_pserver->db[dbid].expires);
}
/* Our hash table implementation performs rehashing incrementally while
@ -1445,11 +1443,6 @@ int incrementallyRehash(int dbid) {
dictRehashMilliseconds(g_pserver->db[dbid].pdict,1);
return 1; /* already used our millisecond for this loop... */
}
/* Expires */
if (dictIsRehashing(g_pserver->db[dbid].expires)) {
dictRehashMilliseconds(g_pserver->db[dbid].expires,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
}
@ -1889,7 +1882,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
size = dictSlots(g_pserver->db[j].pdict);
used = dictSize(g_pserver->db[j].pdict);
vkeys = dictSize(g_pserver->db[j].expires);
vkeys = g_pserver->db[j].setexpire->size();
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(g_pserver->dict); */
@ -2926,12 +2919,14 @@ void initServer(void) {
/* Create the Redis databases, and initialize other internal state. */
for (int j = 0; j < cserver.dbnum; j++) {
g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL);
g_pserver->db[j].expires = dictCreate(&keyptrDictType,NULL);
g_pserver->db[j].setexpire = new(MALLOC_LOCAL) semiorderedset<expireEntry, const char*>;
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL);
g_pserver->db[j].id = j;
g_pserver->db[j].avg_ttl = 0;
g_pserver->db[j].last_expire_set = 0;
g_pserver->db[j].defrag_later = listCreate();
}
@ -4571,11 +4566,18 @@ sds genRedisInfoString(const char *section) {
long long keys, vkeys;
keys = dictSize(g_pserver->db[j].pdict);
vkeys = dictSize(g_pserver->db[j].expires);
vkeys = g_pserver->db[j].setexpire->size();
// Adjust TTL by the current time
g_pserver->db[j].avg_ttl -= (g_pserver->mstime - g_pserver->db[j].last_expire_set);
if (g_pserver->db[j].avg_ttl < 0)
g_pserver->db[j].avg_ttl = 0;
g_pserver->db[j].last_expire_set = g_pserver->mstime;
if (keys || vkeys) {
info = sdscatprintf(info,
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
j, keys, vkeys, g_pserver->db[j].avg_ttl);
j, keys, vkeys, static_cast<long long>(g_pserver->db[j].avg_ttl));
}
}
}

View File

@ -81,6 +81,7 @@ typedef long long mstime_t; /* millisecond time type. */
N-elements flat arrays */
#include "rax.h" /* Radix tree */
#include "uuid.h"
#include "semiorderedset.h"
/* Following includes allow test functions to be called from Redis main() */
#include "zipmap.h"
@ -243,7 +244,7 @@ public:
#define CONFIG_DEFAULT_ACTIVE_REPLICA 0
#define CONFIG_DEFAULT_ENABLE_MULTIMASTER 0
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 64 /* Loopkups per loop. */
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
@ -717,7 +718,7 @@ typedef struct RedisModuleDigest {
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
#define OBJ_SHARED_REFCOUNT INT_MAX
#define OBJ_SHARED_REFCOUNT (0x7FFFFFFF)
#define OBJ_MVCC_INVALID (0xFFFFFFFFFFFFFFFFULL)
typedef struct redisObject {
@ -726,10 +727,21 @@ typedef struct redisObject {
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
mutable std::atomic<int> refcount;
private:
mutable std::atomic<unsigned> refcount;
public:
uint64_t mvcc_tstamp;
void *m_ptr;
inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; }
void SetFExpires(bool fExpires);
void setrefcount(unsigned ref);
unsigned getrefcount(std::memory_order order) const { return (refcount.load(order) & ~(1U << 31)); }
void addref() const { refcount.fetch_add(1, std::memory_order_acq_rel); }
unsigned release() const { return refcount.fetch_sub(1, std::memory_order_acq_rel) & ~(1U << 31); }
} robj;
static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase");
__attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o)
{
@ -755,6 +767,38 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o)
return (char*)ptrFromObj(o);
}
class expireEntry {
sds m_key;
long long m_when;
public:
expireEntry(sds key, long long when)
{
m_key = key;
m_when = when;
}
bool operator!=(const expireEntry &e) const noexcept
{
return m_when != e.m_when || m_key != e.m_key;
}
bool operator==(const expireEntry &e) const noexcept
{
return m_when == e.m_when && m_key == e.m_key;
}
bool operator==(const char *key) const noexcept { return m_key == key; }
bool operator<(const expireEntry &e) const noexcept { return m_when < e.m_when; }
bool operator<(const char *key) const noexcept { return m_key < key; }
bool operator<(long long when) const noexcept { return m_when < when; }
const char *key() const noexcept { return m_key; }
long long when() const noexcept { return m_when; }
explicit operator const char*() const noexcept { return m_key; }
explicit operator long long() const noexcept { return m_when; }
};
/* The a string name for an object's type as listed above
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
@ -766,7 +810,7 @@ const char *getObjectTypeName(robj_roptr o);
* we'll update it when the structure is changed, to avoid bugs like
* bug #85 introduced exactly in this way. */
#define initStaticStringObject(_var,_ptr) do { \
_var.refcount = 1; \
_var.setrefcount(1); \
_var.type = OBJ_STRING; \
_var.encoding = OBJ_ENCODING_RAW; \
_var.m_ptr = _ptr; \
@ -793,12 +837,15 @@ typedef struct clientReplyBlock {
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
dict *pdict; /* The keyspace for this DB */
dict *expires; /* Timeout of keys with a timeout set */
semiorderedset<expireEntry, const char*> *setexpire;
semiorderedset<expireEntry, const char*>::setiter expireitr;
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
long long last_expire_set; /* when the last expire was set */
double avg_ttl; /* Average TTL, just for stats */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} redisDb;
@ -2174,6 +2221,7 @@ int rewriteConfig(char *path);
/* db.c -- Keyspace access API */
int removeExpire(redisDb *db, robj *key);
int removeExpireCore(redisDb *db, robj *key, dictEntry *de);
void propagateExpire(redisDb *db, robj *key, int lazy);
int expireIfNeeded(redisDb *db, robj *key);
long long getExpire(redisDb *db, robj_roptr key);

View File

@ -72,7 +72,7 @@ slowlogEntry *slowlogCreateEntry(client *c, robj **argv, int argc, long long dur
(unsigned long)
sdslen(szFromObj(argv[j])) - SLOWLOG_ENTRY_MAX_STRING);
se->argv[j] = createObject(OBJ_STRING,s);
} else if (argv[j]->refcount == OBJ_SHARED_REFCOUNT) {
} else if (argv[j]->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) {
se->argv[j] = argv[j];
} else {
/* Here we need to dupliacate the string objects composing the

View File

@ -353,7 +353,7 @@ void incrDecrCommand(client *c, long long incr) {
}
value += incr;
if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
if (o && o->getrefcount(std::memory_order_relaxed) == 1 && o->encoding == OBJ_ENCODING_INT &&
(value < 0 || value >= OBJ_SHARED_INTEGERS) &&
value >= LONG_MIN && value <= LONG_MAX)
{