Merge pull request #50 from JohnSully/expire
New Expire Datastructure which is faster and more memory efficient. This allows us to process expiries in sublinear time. Former-commit-id: d45edc493d111d4be81f2ce24e3022c8fffb3e2f
This commit is contained in:
commit
a3dbe03a66
3
.vscode/settings.json
vendored
3
.vscode/settings.json
vendored
@ -51,6 +51,7 @@
|
|||||||
"tuple": "cpp",
|
"tuple": "cpp",
|
||||||
"type_traits": "cpp",
|
"type_traits": "cpp",
|
||||||
"typeinfo": "cpp",
|
"typeinfo": "cpp",
|
||||||
"utility": "cpp"
|
"utility": "cpp",
|
||||||
|
"set": "cpp"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -85,7 +85,7 @@ struct bio_job {
|
|||||||
|
|
||||||
void *bioProcessBackgroundJobs(void *arg);
|
void *bioProcessBackgroundJobs(void *arg);
|
||||||
void lazyfreeFreeObjectFromBioThread(robj *o);
|
void lazyfreeFreeObjectFromBioThread(robj *o);
|
||||||
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2);
|
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset<expireEntry, const char *> *set);
|
||||||
void lazyfreeFreeSlotsMapFromBioThread(rax *rt);
|
void lazyfreeFreeSlotsMapFromBioThread(rax *rt);
|
||||||
|
|
||||||
/* Make sure we have enough stack to perform all the things we do in the
|
/* Make sure we have enough stack to perform all the things we do in the
|
||||||
@ -196,7 +196,7 @@ void *bioProcessBackgroundJobs(void *arg) {
|
|||||||
if (job->arg1)
|
if (job->arg1)
|
||||||
lazyfreeFreeObjectFromBioThread((robj*)job->arg1);
|
lazyfreeFreeObjectFromBioThread((robj*)job->arg1);
|
||||||
else if (job->arg2 && job->arg3)
|
else if (job->arg2 && job->arg3)
|
||||||
lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(dict*)job->arg3);
|
lazyfreeFreeDatabaseFromBioThread((dict*)job->arg2,(semiorderedset<expireEntry, const char *>*)job->arg3);
|
||||||
else if (job->arg3)
|
else if (job->arg3)
|
||||||
lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3);
|
lazyfreeFreeSlotsMapFromBioThread((rax*)job->arg3);
|
||||||
} else {
|
} else {
|
||||||
|
153
src/compactvector.h
Normal file
153
src/compactvector.h
Normal file
@ -0,0 +1,153 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <type_traits>
|
||||||
|
#include <assert.h>
|
||||||
|
|
||||||
|
/*************************************************
|
||||||
|
* compactvector - similar to std::vector but optimized for minimal memory
|
||||||
|
*
|
||||||
|
* Notable differences:
|
||||||
|
* - Limited to 2^32 elements
|
||||||
|
* - Grows linearly not exponentially
|
||||||
|
*
|
||||||
|
*************************************************/
|
||||||
|
|
||||||
|
template<typename T, bool MEMMOVE_SAFE=false>
|
||||||
|
class compactvector
|
||||||
|
{
|
||||||
|
static_assert(MEMMOVE_SAFE || std::is_trivially_copyable<T>::value, "compactvector requires trivially copyable types");
|
||||||
|
T *m_data = nullptr;
|
||||||
|
unsigned m_celem = 0;
|
||||||
|
unsigned m_max = 0;
|
||||||
|
|
||||||
|
public:
|
||||||
|
typedef T* iterator;
|
||||||
|
|
||||||
|
compactvector() noexcept = default;
|
||||||
|
~compactvector() noexcept
|
||||||
|
{
|
||||||
|
clear(); // call dtors
|
||||||
|
zfree(m_data);
|
||||||
|
}
|
||||||
|
|
||||||
|
compactvector(compactvector &) noexcept = delete;
|
||||||
|
|
||||||
|
compactvector(compactvector &&src) noexcept
|
||||||
|
{
|
||||||
|
m_data = src.m_data;
|
||||||
|
m_celem = src.m_celem;
|
||||||
|
m_max = src.m_max;
|
||||||
|
src.m_data = nullptr;
|
||||||
|
src.m_celem = 0;
|
||||||
|
src.m_max = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
compactvector &operator=(const compactvector&) noexcept = delete;
|
||||||
|
compactvector &operator=(compactvector &&src) noexcept
|
||||||
|
{
|
||||||
|
zfree(m_data);
|
||||||
|
m_data = src.m_data;
|
||||||
|
m_celem = src.m_celem;
|
||||||
|
m_max = src.m_max;
|
||||||
|
src.m_data = nullptr;
|
||||||
|
src.m_celem = 0;
|
||||||
|
src.m_max = 0;
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
|
inline T* begin() { return m_data; }
|
||||||
|
inline const T* begin() const { return m_data; }
|
||||||
|
|
||||||
|
inline T* end() { return m_data + m_celem; }
|
||||||
|
inline const T* end() const { return m_data + m_celem; }
|
||||||
|
|
||||||
|
T* insert(T* where, T &val)
|
||||||
|
{
|
||||||
|
assert(where >= m_data);
|
||||||
|
size_t idx = where - m_data;
|
||||||
|
if (m_celem >= m_max)
|
||||||
|
{
|
||||||
|
if (m_max < 2)
|
||||||
|
m_max = 2;
|
||||||
|
else
|
||||||
|
m_max = m_max + 4;
|
||||||
|
|
||||||
|
m_data = (T*)zrealloc(m_data, sizeof(T) * m_max, MALLOC_LOCAL);
|
||||||
|
m_max = zmalloc_usable(m_data) / sizeof(T);
|
||||||
|
}
|
||||||
|
assert(idx < m_max);
|
||||||
|
where = m_data + idx;
|
||||||
|
memmove(m_data + idx + 1, m_data + idx, (m_celem - idx)*sizeof(T));
|
||||||
|
new(m_data + idx) T(std::move(val));
|
||||||
|
++m_celem;
|
||||||
|
return where;
|
||||||
|
}
|
||||||
|
|
||||||
|
T &operator[](size_t idx)
|
||||||
|
{
|
||||||
|
assert(idx < m_celem);
|
||||||
|
return m_data[idx];
|
||||||
|
}
|
||||||
|
const T &operator[](size_t idx) const
|
||||||
|
{
|
||||||
|
assert(idx < m_celem);
|
||||||
|
return m_data[idx];
|
||||||
|
}
|
||||||
|
|
||||||
|
T& back() { assert(m_celem > 0); return m_data[m_celem-1]; }
|
||||||
|
const T& back() const { assert(m_celem > 0); return m_data[m_celem-1]; }
|
||||||
|
|
||||||
|
void erase(T* where)
|
||||||
|
{
|
||||||
|
assert(where >= m_data);
|
||||||
|
size_t idx = where - m_data;
|
||||||
|
assert(idx < m_celem);
|
||||||
|
where->~T();
|
||||||
|
memmove(where, where+1, ((m_celem - idx - 1)*sizeof(T)));
|
||||||
|
--m_celem;
|
||||||
|
|
||||||
|
if (m_celem == 0)
|
||||||
|
{
|
||||||
|
zfree(m_data);
|
||||||
|
m_data = nullptr;
|
||||||
|
m_max = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void shrink_to_fit()
|
||||||
|
{
|
||||||
|
if (m_max == m_celem)
|
||||||
|
return;
|
||||||
|
m_data = (T*)zrealloc(m_data, sizeof(T) * m_celem, MALLOC_LOCAL);
|
||||||
|
m_max = m_celem; // NOTE: We do not get the usable size here, because this could cause us to continually realloc
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t bytes_used() const
|
||||||
|
{
|
||||||
|
return sizeof(this) + (m_max * sizeof(T));
|
||||||
|
}
|
||||||
|
|
||||||
|
void clear()
|
||||||
|
{
|
||||||
|
for (size_t idx = 0; idx < m_celem; ++idx)
|
||||||
|
m_data[idx].~T();
|
||||||
|
zfree(m_data);
|
||||||
|
m_data = nullptr;
|
||||||
|
m_celem = 0;
|
||||||
|
m_max = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool empty() const noexcept
|
||||||
|
{
|
||||||
|
return m_celem == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t size() const noexcept
|
||||||
|
{
|
||||||
|
return m_celem;
|
||||||
|
}
|
||||||
|
|
||||||
|
T* data() noexcept { return m_data; }
|
||||||
|
const T* data() const noexcept { return m_data; }
|
||||||
|
};
|
||||||
|
static_assert(sizeof(compactvector<void*>) <= 16, "not compact");
|
156
src/db.cpp
156
src/db.cpp
@ -39,6 +39,8 @@
|
|||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
|
|
||||||
int keyIsExpired(redisDb *db, robj *key);
|
int keyIsExpired(redisDb *db, robj *key);
|
||||||
|
int expireIfNeeded(redisDb *db, robj *key, robj *o);
|
||||||
|
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
|
||||||
|
|
||||||
/* Update LFU when an object is accessed.
|
/* Update LFU when an object is accessed.
|
||||||
* Firstly, decrement the counter if the decrement time is reached.
|
* Firstly, decrement the counter if the decrement time is reached.
|
||||||
@ -49,6 +51,20 @@ void updateLFU(robj *val) {
|
|||||||
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
|
val->lru = (LFUGetTimeInMinutes()<<8) | counter;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew)
|
||||||
|
{
|
||||||
|
serverAssert(valOld->FExpires());
|
||||||
|
serverAssert(!valNew->FExpires());
|
||||||
|
|
||||||
|
auto itr = db->setexpire->find(key);
|
||||||
|
serverAssert(itr != db->setexpire->end());
|
||||||
|
|
||||||
|
valNew->SetFExpires(true);
|
||||||
|
valOld->SetFExpires(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Low level key lookup API, not actually called directly from commands
|
/* Low level key lookup API, not actually called directly from commands
|
||||||
* implementations that should instead rely on lookupKeyRead(),
|
* implementations that should instead rely on lookupKeyRead(),
|
||||||
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
|
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
|
||||||
@ -160,8 +176,10 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) {
|
|||||||
* Returns the linked value object if the key exists or NULL if the key
|
* Returns the linked value object if the key exists or NULL if the key
|
||||||
* does not exist in the specified DB. */
|
* does not exist in the specified DB. */
|
||||||
robj *lookupKeyWrite(redisDb *db, robj *key) {
|
robj *lookupKeyWrite(redisDb *db, robj *key) {
|
||||||
expireIfNeeded(db,key);
|
robj *o = lookupKey(db,key,LOOKUP_UPDATEMVCC);
|
||||||
return lookupKey(db,key,LOOKUP_UPDATEMVCC);
|
if (expireIfNeeded(db,key))
|
||||||
|
o = NULL;
|
||||||
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
|
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
|
||||||
@ -177,6 +195,7 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
int dbAddCore(redisDb *db, robj *key, robj *val) {
|
int dbAddCore(redisDb *db, robj *key, robj *val) {
|
||||||
|
serverAssert(!val->FExpires());
|
||||||
sds copy = sdsdup(szFromObj(key));
|
sds copy = sdsdup(szFromObj(key));
|
||||||
int retval = dictAdd(db->pdict, copy, val);
|
int retval = dictAdd(db->pdict, copy, val);
|
||||||
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
|
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
|
||||||
@ -206,10 +225,18 @@ void dbAdd(redisDb *db, robj *key, robj *val)
|
|||||||
serverAssertWithInfo(NULL,key,retval == DICT_OK);
|
serverAssertWithInfo(NULL,key,retval == DICT_OK);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *val, bool fUpdateMvcc)
|
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire)
|
||||||
{
|
{
|
||||||
dictEntry auxentry = *de;
|
dictEntry auxentry = *de;
|
||||||
robj *old = (robj*)dictGetVal(de);
|
robj *old = (robj*)dictGetVal(de);
|
||||||
|
|
||||||
|
if (old->FExpires()) {
|
||||||
|
if (fRemoveExpire)
|
||||||
|
removeExpire(db, key);
|
||||||
|
else
|
||||||
|
updateExpire(db, (sds)dictGetKey(de), old, val);
|
||||||
|
}
|
||||||
|
|
||||||
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||||
val->lru = old->lru;
|
val->lru = old->lru;
|
||||||
}
|
}
|
||||||
@ -235,7 +262,7 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
|
|||||||
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
|
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
|
||||||
|
|
||||||
serverAssertWithInfo(NULL,key,de != NULL);
|
serverAssertWithInfo(NULL,key,de != NULL);
|
||||||
dbOverwriteCore(db, de, val, true);
|
dbOverwriteCore(db, de, key, val, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Insert a key, handling duplicate keys according to fReplace */
|
/* Insert a key, handling duplicate keys according to fReplace */
|
||||||
@ -250,7 +277,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
|
|||||||
robj *old = (robj*)dictGetVal(de);
|
robj *old = (robj*)dictGetVal(de);
|
||||||
if (old->mvcc_tstamp <= val->mvcc_tstamp)
|
if (old->mvcc_tstamp <= val->mvcc_tstamp)
|
||||||
{
|
{
|
||||||
dbOverwriteCore(db, de, val, false);
|
dbOverwriteCore(db, de, key, val, false, true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -271,13 +298,13 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
|
|||||||
*
|
*
|
||||||
* All the new keys in the database should be created via this interface. */
|
* All the new keys in the database should be created via this interface. */
|
||||||
void setKey(redisDb *db, robj *key, robj *val) {
|
void setKey(redisDb *db, robj *key, robj *val) {
|
||||||
if (lookupKeyWrite(db,key) == NULL) {
|
dictEntry *de = dictFind(db->pdict, ptrFromObj(key));
|
||||||
|
if (de == NULL) {
|
||||||
dbAdd(db,key,val);
|
dbAdd(db,key,val);
|
||||||
} else {
|
} else {
|
||||||
dbOverwrite(db,key,val);
|
dbOverwriteCore(db,de,key,val,true,true);
|
||||||
}
|
}
|
||||||
incrRefCount(val);
|
incrRefCount(val);
|
||||||
removeExpire(db,key);
|
|
||||||
signalModifiedKey(db,key);
|
signalModifiedKey(db,key);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,7 +319,7 @@ int dbExists(redisDb *db, robj *key) {
|
|||||||
robj *dbRandomKey(redisDb *db) {
|
robj *dbRandomKey(redisDb *db) {
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
int maxtries = 100;
|
int maxtries = 100;
|
||||||
int allvolatile = dictSize(db->pdict) == dictSize(db->expires);
|
int allvolatile = dictSize(db->pdict) == db->setexpire->size();
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
sds key;
|
sds key;
|
||||||
@ -303,7 +330,9 @@ robj *dbRandomKey(redisDb *db) {
|
|||||||
|
|
||||||
key = (sds)dictGetKey(de);
|
key = (sds)dictGetKey(de);
|
||||||
keyobj = createStringObject(key,sdslen(key));
|
keyobj = createStringObject(key,sdslen(key));
|
||||||
if (dictFind(db->expires,key)) {
|
|
||||||
|
if (((robj*)dictGetVal(de))->FExpires())
|
||||||
|
{
|
||||||
if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) {
|
if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) {
|
||||||
/* If the DB is composed only of keys with an expire set,
|
/* If the DB is composed only of keys with an expire set,
|
||||||
* it could happen that all the keys are already logically
|
* it could happen that all the keys are already logically
|
||||||
@ -315,11 +344,16 @@ robj *dbRandomKey(redisDb *db) {
|
|||||||
* return a key name that may be already expired. */
|
* return a key name that may be already expired. */
|
||||||
return keyobj;
|
return keyobj;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (((robj*)dictGetVal(de))->FExpires())
|
||||||
|
{
|
||||||
if (expireIfNeeded(db,keyobj)) {
|
if (expireIfNeeded(db,keyobj)) {
|
||||||
decrRefCount(keyobj);
|
decrRefCount(keyobj);
|
||||||
continue; /* search for another key. This expired. */
|
continue; /* search for another key. This expired. */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return keyobj;
|
return keyobj;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -328,7 +362,10 @@ robj *dbRandomKey(redisDb *db) {
|
|||||||
int dbSyncDelete(redisDb *db, robj *key) {
|
int dbSyncDelete(redisDb *db, robj *key) {
|
||||||
/* Deleting an entry from the expires dict will not free the sds of
|
/* Deleting an entry from the expires dict will not free the sds of
|
||||||
* the key, because it is shared with the main dictionary. */
|
* the key, because it is shared with the main dictionary. */
|
||||||
if (dictSize(db->expires) > 0) dictDelete(db->expires,ptrFromObj(key));
|
|
||||||
|
dictEntry *de = dictFind(db->pdict, szFromObj(key));
|
||||||
|
if (de != nullptr && ((robj*)dictGetVal(de))->FExpires())
|
||||||
|
removeExpireCore(db, key, de);
|
||||||
if (dictDelete(db->pdict,ptrFromObj(key)) == DICT_OK) {
|
if (dictDelete(db->pdict,ptrFromObj(key)) == DICT_OK) {
|
||||||
if (g_pserver->cluster_enabled) slotToKeyDel(key);
|
if (g_pserver->cluster_enabled) slotToKeyDel(key);
|
||||||
return 1;
|
return 1;
|
||||||
@ -373,7 +410,7 @@ int dbDelete(redisDb *db, robj *key) {
|
|||||||
*/
|
*/
|
||||||
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
|
robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
|
||||||
serverAssert(o->type == OBJ_STRING);
|
serverAssert(o->type == OBJ_STRING);
|
||||||
if (o->refcount != 1 || o->encoding != OBJ_ENCODING_RAW) {
|
if (o->getrefcount(std::memory_order_relaxed) != 1 || o->encoding != OBJ_ENCODING_RAW) {
|
||||||
robj *decoded = getDecodedObject(o);
|
robj *decoded = getDecodedObject(o);
|
||||||
o = createRawStringObject(szFromObj(decoded), sdslen(szFromObj(decoded)));
|
o = createRawStringObject(szFromObj(decoded), sdslen(szFromObj(decoded)));
|
||||||
decrRefCount(decoded);
|
decrRefCount(decoded);
|
||||||
@ -419,7 +456,9 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
|
|||||||
emptyDbAsync(&g_pserver->db[j]);
|
emptyDbAsync(&g_pserver->db[j]);
|
||||||
} else {
|
} else {
|
||||||
dictEmpty(g_pserver->db[j].pdict,callback);
|
dictEmpty(g_pserver->db[j].pdict,callback);
|
||||||
dictEmpty(g_pserver->db[j].expires,callback);
|
delete g_pserver->db[j].setexpire;
|
||||||
|
g_pserver->db[j].setexpire = new (MALLOC_LOCAL) semiorderedset<expireEntry, const char*>();
|
||||||
|
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (g_pserver->cluster_enabled) {
|
if (g_pserver->cluster_enabled) {
|
||||||
@ -964,9 +1003,10 @@ void renameGenericCommand(client *c, int nx) {
|
|||||||
* with the same name. */
|
* with the same name. */
|
||||||
dbDelete(c->db,c->argv[2]);
|
dbDelete(c->db,c->argv[2]);
|
||||||
}
|
}
|
||||||
dbAdd(c->db,c->argv[2],o);
|
|
||||||
if (expire != -1) setExpire(c,c->db,c->argv[2],expire);
|
|
||||||
dbDelete(c->db,c->argv[1]);
|
dbDelete(c->db,c->argv[1]);
|
||||||
|
dbAdd(c->db,c->argv[2],o);
|
||||||
|
if (expire != -1)
|
||||||
|
setExpire(c,c->db,c->argv[2],expire);
|
||||||
signalModifiedKey(c->db,c->argv[1]);
|
signalModifiedKey(c->db,c->argv[1]);
|
||||||
signalModifiedKey(c->db,c->argv[2]);
|
signalModifiedKey(c->db,c->argv[2]);
|
||||||
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
|
notifyKeyspaceEvent(NOTIFY_GENERIC,"rename_from",
|
||||||
@ -1024,6 +1064,12 @@ void moveCommand(client *c) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
expire = getExpire(c->db,c->argv[1]);
|
expire = getExpire(c->db,c->argv[1]);
|
||||||
|
if (o->FExpires())
|
||||||
|
removeExpire(c->db,c->argv[1]);
|
||||||
|
serverAssert(!o->FExpires());
|
||||||
|
incrRefCount(o);
|
||||||
|
dbDelete(src,c->argv[1]);
|
||||||
|
g_pserver->dirty++;
|
||||||
|
|
||||||
/* Return zero if the key already exists in the target DB */
|
/* Return zero if the key already exists in the target DB */
|
||||||
if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
|
if (lookupKeyWrite(dst,c->argv[1]) != NULL) {
|
||||||
@ -1032,11 +1078,7 @@ void moveCommand(client *c) {
|
|||||||
}
|
}
|
||||||
dbAdd(dst,c->argv[1],o);
|
dbAdd(dst,c->argv[1],o);
|
||||||
if (expire != -1) setExpire(c,dst,c->argv[1],expire);
|
if (expire != -1) setExpire(c,dst,c->argv[1],expire);
|
||||||
incrRefCount(o);
|
|
||||||
|
|
||||||
/* OK! key moved, free the entry in the source DB */
|
|
||||||
dbDelete(src,c->argv[1]);
|
|
||||||
g_pserver->dirty++;
|
|
||||||
addReply(c,shared.cone);
|
addReply(c,shared.cone);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1077,12 +1119,16 @@ int dbSwapDatabases(int id1, int id2) {
|
|||||||
* ready_keys and watched_keys, since we want clients to
|
* ready_keys and watched_keys, since we want clients to
|
||||||
* remain in the same DB they were. */
|
* remain in the same DB they were. */
|
||||||
db1->pdict = db2->pdict;
|
db1->pdict = db2->pdict;
|
||||||
db1->expires = db2->expires;
|
db1->setexpire = db2->setexpire;
|
||||||
|
db1->expireitr = db2->expireitr;
|
||||||
db1->avg_ttl = db2->avg_ttl;
|
db1->avg_ttl = db2->avg_ttl;
|
||||||
|
db1->last_expire_set = db2->last_expire_set;
|
||||||
|
|
||||||
db2->pdict = aux.pdict;
|
db2->pdict = aux.pdict;
|
||||||
db2->expires = aux.expires;
|
db2->setexpire = aux.setexpire;
|
||||||
|
db2->expireitr = aux.expireitr;
|
||||||
db2->avg_ttl = aux.avg_ttl;
|
db2->avg_ttl = aux.avg_ttl;
|
||||||
|
db2->last_expire_set = aux.last_expire_set;
|
||||||
|
|
||||||
/* Now we need to handle clients blocked on lists: as an effect
|
/* Now we need to handle clients blocked on lists: as an effect
|
||||||
* of swapping the two DBs, a client that was waiting for list
|
* of swapping the two DBs, a client that was waiting for list
|
||||||
@ -1130,12 +1176,25 @@ void swapdbCommand(client *c) {
|
|||||||
/*-----------------------------------------------------------------------------
|
/*-----------------------------------------------------------------------------
|
||||||
* Expires API
|
* Expires API
|
||||||
*----------------------------------------------------------------------------*/
|
*----------------------------------------------------------------------------*/
|
||||||
|
|
||||||
int removeExpire(redisDb *db, robj *key) {
|
int removeExpire(redisDb *db, robj *key) {
|
||||||
|
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
|
||||||
|
return removeExpireCore(db, key, de);
|
||||||
|
}
|
||||||
|
int removeExpireCore(redisDb *db, robj *key, dictEntry *de) {
|
||||||
/* An expire may only be removed if there is a corresponding entry in the
|
/* An expire may only be removed if there is a corresponding entry in the
|
||||||
* main dict. Otherwise, the key will never be freed. */
|
* main dict. Otherwise, the key will never be freed. */
|
||||||
serverAssertWithInfo(NULL,key,dictFind(db->pdict,ptrFromObj(key)) != NULL);
|
serverAssertWithInfo(NULL,key,de != NULL);
|
||||||
return dictDelete(db->expires,ptrFromObj(key)) == DICT_OK;
|
|
||||||
|
robj *val = (robj*)dictGetVal(de);
|
||||||
|
if (!val->FExpires())
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
auto itr = db->setexpire->find((sds)dictGetKey(de));
|
||||||
|
serverAssert(itr != db->setexpire->end());
|
||||||
|
serverAssert(itr->key() == (sds)dictGetKey(de));
|
||||||
|
db->setexpire->erase(itr);
|
||||||
|
val->SetFExpires(false);
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Set an expire to the specified key. If the expire is set in the context
|
/* Set an expire to the specified key. If the expire is set in the context
|
||||||
@ -1143,14 +1202,40 @@ int removeExpire(redisDb *db, robj *key) {
|
|||||||
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
|
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
|
||||||
* after which the key will no longer be considered valid. */
|
* after which the key will no longer be considered valid. */
|
||||||
void setExpire(client *c, redisDb *db, robj *key, long long when) {
|
void setExpire(client *c, redisDb *db, robj *key, long long when) {
|
||||||
dictEntry *kde, *de;
|
dictEntry *kde;
|
||||||
|
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
|
||||||
/* Reuse the sds from the main dict in the expire dict */
|
/* Reuse the sds from the main dict in the expire dict */
|
||||||
kde = dictFind(db->pdict,ptrFromObj(key));
|
kde = dictFind(db->pdict,ptrFromObj(key));
|
||||||
serverAssertWithInfo(NULL,key,kde != NULL);
|
serverAssertWithInfo(NULL,key,kde != NULL);
|
||||||
de = dictAddOrFind(db->expires,dictGetKey(kde));
|
|
||||||
dictSetSignedIntegerVal(de,when);
|
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
|
||||||
|
{
|
||||||
|
// shared objects cannot have the expire bit set, create a real object
|
||||||
|
dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde)));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (((robj*)dictGetVal(kde))->FExpires())
|
||||||
|
removeExpire(db, key); // should we optimize for when this is called with an already set expiry?
|
||||||
|
|
||||||
|
expireEntry e((sds)dictGetKey(kde), when);
|
||||||
|
((robj*)dictGetVal(kde))->SetFExpires(true);
|
||||||
|
|
||||||
|
/* Update TTL stats (exponential moving average) */
|
||||||
|
/* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */
|
||||||
|
long long now = g_pserver->mstime;
|
||||||
|
db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed
|
||||||
|
if (db->setexpire->empty())
|
||||||
|
db->avg_ttl = 0;
|
||||||
|
else
|
||||||
|
db->avg_ttl -= db->avg_ttl / db->setexpire->size(); // slide one entry out the window
|
||||||
|
if (db->avg_ttl < 0)
|
||||||
|
db->avg_ttl = 0; // TTLs are never negative
|
||||||
|
db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry
|
||||||
|
db->last_expire_set = now;
|
||||||
|
|
||||||
|
db->setexpire->insert(e);
|
||||||
|
|
||||||
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0;
|
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0;
|
||||||
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
|
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
|
||||||
@ -1163,13 +1248,18 @@ long long getExpire(redisDb *db, robj_roptr key) {
|
|||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
|
|
||||||
/* No expire? return ASAP */
|
/* No expire? return ASAP */
|
||||||
if (dictSize(db->expires) == 0 ||
|
if (db->setexpire->size() == 0)
|
||||||
(de = dictFind(db->expires,ptrFromObj(key))) == NULL) return -1;
|
return -1;
|
||||||
|
|
||||||
/* The entry was found in the expire dict, this means it should also
|
de = dictFind(db->pdict, ptrFromObj(key));
|
||||||
* be present in the main dict (safety check). */
|
if (de == NULL)
|
||||||
serverAssertWithInfo(NULL,key,dictFind(db->pdict,ptrFromObj(key)) != NULL);
|
return -1;
|
||||||
return dictGetSignedIntegerVal(de);
|
robj *obj = (robj*)dictGetVal(de);
|
||||||
|
if (!obj->FExpires())
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
auto itr = db->setexpire->find((sds)dictGetKey(de));
|
||||||
|
return itr->when();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Propagate expires into slaves and the AOF file.
|
/* Propagate expires into slaves and the AOF file.
|
||||||
|
@ -436,7 +436,7 @@ NULL
|
|||||||
"Value at:%p refcount:%d "
|
"Value at:%p refcount:%d "
|
||||||
"encoding:%s serializedlength:%zu "
|
"encoding:%s serializedlength:%zu "
|
||||||
"lru:%d lru_seconds_idle:%llu%s",
|
"lru:%d lru_seconds_idle:%llu%s",
|
||||||
(void*)val, static_cast<int>(val->refcount),
|
(void*)val, static_cast<int>(val->getrefcount(std::memory_order_relaxed)),
|
||||||
strenc, rdbSavedObjectLen(val),
|
strenc, rdbSavedObjectLen(val),
|
||||||
val->lru, estimateObjectIdleTime(val)/1000, extra);
|
val->lru, estimateObjectIdleTime(val)/1000, extra);
|
||||||
} else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) {
|
} else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) {
|
||||||
@ -638,8 +638,8 @@ NULL
|
|||||||
dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].pdict);
|
dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].pdict);
|
||||||
stats = sdscat(stats,buf);
|
stats = sdscat(stats,buf);
|
||||||
|
|
||||||
stats = sdscatprintf(stats,"[Expires HT]\n");
|
stats = sdscatprintf(stats,"[Expires set]\n");
|
||||||
dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].expires);
|
g_pserver->db[dbid].setexpire->getstats(buf, sizeof(buf));
|
||||||
stats = sdscat(stats, buf);
|
stats = sdscat(stats, buf);
|
||||||
|
|
||||||
addReplyBulkSds(c,stats);
|
addReplyBulkSds(c,stats);
|
||||||
@ -721,14 +721,14 @@ void _serverAssertPrintClientInfo(const client *c) {
|
|||||||
arg = buf;
|
arg = buf;
|
||||||
}
|
}
|
||||||
serverLog(LL_WARNING,"client->argv[%d] = \"%s\" (refcount: %d)",
|
serverLog(LL_WARNING,"client->argv[%d] = \"%s\" (refcount: %d)",
|
||||||
j, arg, static_cast<int>(c->argv[j]->refcount));
|
j, arg, static_cast<int>(c->argv[j]->getrefcount(std::memory_order_relaxed)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void serverLogObjectDebugInfo(robj_roptr o) {
|
void serverLogObjectDebugInfo(robj_roptr o) {
|
||||||
serverLog(LL_WARNING,"Object type: %d", o->type);
|
serverLog(LL_WARNING,"Object type: %d", o->type);
|
||||||
serverLog(LL_WARNING,"Object encoding: %d", o->encoding);
|
serverLog(LL_WARNING,"Object encoding: %d", o->encoding);
|
||||||
serverLog(LL_WARNING,"Object refcount: %d", static_cast<int>(o->refcount));
|
serverLog(LL_WARNING,"Object refcount: %d", static_cast<int>(o->getrefcount(std::memory_order_relaxed)));
|
||||||
if (o->type == OBJ_STRING && sdsEncodedObject(o)) {
|
if (o->type == OBJ_STRING && sdsEncodedObject(o)) {
|
||||||
serverLog(LL_WARNING,"Object raw string len: %zu", sdslen(szFromObj(o)));
|
serverLog(LL_WARNING,"Object raw string len: %zu", sdslen(szFromObj(o)));
|
||||||
if (sdslen(szFromObj(o)) < 4096) {
|
if (sdslen(szFromObj(o)) < 4096) {
|
||||||
|
@ -48,6 +48,7 @@ extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
|
|||||||
/* forward declarations*/
|
/* forward declarations*/
|
||||||
void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
|
void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
|
||||||
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
|
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged);
|
||||||
|
void replaceSateliteOSetKeyPtr(semiorderedset<expireEntry, const char*> &set, sds oldkey, sds newkey);
|
||||||
|
|
||||||
/* Defrag helper for generic allocations.
|
/* Defrag helper for generic allocations.
|
||||||
*
|
*
|
||||||
@ -102,7 +103,7 @@ sds activeDefragSds(sds sdsptr) {
|
|||||||
* and should NOT be accessed. */
|
* and should NOT be accessed. */
|
||||||
robj *activeDefragStringOb(robj* ob, long *defragged) {
|
robj *activeDefragStringOb(robj* ob, long *defragged) {
|
||||||
robj *ret = NULL;
|
robj *ret = NULL;
|
||||||
if (ob->refcount!=1)
|
if (ob->getrefcount(std::memory_order_relaxed)!=1)
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
/* try to defrag robj (only if not an EMBSTR type (handled below). */
|
/* try to defrag robj (only if not an EMBSTR type (handled below). */
|
||||||
@ -406,6 +407,16 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void replaceSateliteOSetKeyPtr(semiorderedset<expireEntry, const char*> &set, sds oldkey, sds newkey) {
|
||||||
|
auto itr = set.find(oldkey);
|
||||||
|
if (itr != set.end())
|
||||||
|
{
|
||||||
|
expireEntry eNew(newkey, itr->when());
|
||||||
|
set.erase(itr);
|
||||||
|
set.insert(eNew);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
long activeDefragQuickListNodes(quicklist *ql) {
|
long activeDefragQuickListNodes(quicklist *ql) {
|
||||||
quicklistNode *node = ql->head, *newnode;
|
quicklistNode *node = ql->head, *newnode;
|
||||||
long defragged = 0;
|
long defragged = 0;
|
||||||
@ -769,12 +780,8 @@ long defragKey(redisDb *db, dictEntry *de) {
|
|||||||
newsds = activeDefragSds(keysds);
|
newsds = activeDefragSds(keysds);
|
||||||
if (newsds)
|
if (newsds)
|
||||||
defragged++, de->key = newsds;
|
defragged++, de->key = newsds;
|
||||||
if (dictSize(db->expires)) {
|
if (!db->setexpire->empty()) {
|
||||||
/* Dirty code:
|
replaceSateliteOSetKeyPtr(*db->setexpire, keysds, newsds);
|
||||||
* I can't search in db->expires for that key after i already released
|
|
||||||
* the pointer it holds it won't be able to do the string compare */
|
|
||||||
uint64_t hash = dictGetHash(db->pdict, de->key);
|
|
||||||
replaceSateliteDictKeyPtrAndOrDefragDictEntry(db->expires, keysds, newsds, hash, &defragged);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Try to defrag robj and / or string value. */
|
/* Try to defrag robj and / or string value. */
|
||||||
|
129
src/evict.cpp
129
src/evict.cpp
@ -150,36 +150,9 @@ void evictionPoolAlloc(void) {
|
|||||||
EvictionPoolLRU = ep;
|
EvictionPoolLRU = ep;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This is an helper function for freeMemoryIfNeeded(), it is used in order
|
void processEvictionCandidate(int dbid, sds key, robj *o, const expireEntry *e, struct evictionPoolEntry *pool)
|
||||||
* to populate the evictionPool with a few entries every time we want to
|
{
|
||||||
* expire a key. Keys with idle time smaller than one of the current
|
|
||||||
* keys are added. Keys are always added if there are free entries.
|
|
||||||
*
|
|
||||||
* We insert keys on place in ascending order, so keys with the smaller
|
|
||||||
* idle time are on the left, and keys with the higher idle time on the
|
|
||||||
* right. */
|
|
||||||
|
|
||||||
void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evictionPoolEntry *pool) {
|
|
||||||
int j, k, count;
|
|
||||||
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
|
|
||||||
|
|
||||||
count = dictGetSomeKeys(sampledict,samples,g_pserver->maxmemory_samples);
|
|
||||||
for (j = 0; j < count; j++) {
|
|
||||||
unsigned long long idle;
|
unsigned long long idle;
|
||||||
sds key;
|
|
||||||
robj *o = nullptr;
|
|
||||||
dictEntry *de;
|
|
||||||
|
|
||||||
de = samples[j];
|
|
||||||
key = (sds)dictGetKey(de);
|
|
||||||
|
|
||||||
/* If the dictionary we are sampling from is not the main
|
|
||||||
* dictionary (but the expires one) we need to lookup the key
|
|
||||||
* again in the key dictionary to obtain the value object. */
|
|
||||||
if (g_pserver->maxmemory_policy != MAXMEMORY_VOLATILE_TTL) {
|
|
||||||
if (sampledict != keydict) de = dictFind(keydict, key);
|
|
||||||
o = (robj*)dictGetVal(de);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Calculate the idle time according to the policy. This is called
|
/* Calculate the idle time according to the policy. This is called
|
||||||
* idle just because the code initially handled LRU, but is in fact
|
* idle just because the code initially handled LRU, but is in fact
|
||||||
@ -197,7 +170,7 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
|
|||||||
idle = 255-LFUDecrAndReturn(o);
|
idle = 255-LFUDecrAndReturn(o);
|
||||||
} else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
|
} else if (g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) {
|
||||||
/* In this case the sooner the expire the better. */
|
/* In this case the sooner the expire the better. */
|
||||||
idle = ULLONG_MAX - (long)dictGetVal(de);
|
idle = ULLONG_MAX - e->when();
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
|
serverPanic("Unknown eviction policy in evictionPoolPopulate()");
|
||||||
}
|
}
|
||||||
@ -205,14 +178,14 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
|
|||||||
/* Insert the element inside the pool.
|
/* Insert the element inside the pool.
|
||||||
* First, find the first empty bucket or the first populated
|
* First, find the first empty bucket or the first populated
|
||||||
* bucket that has an idle time smaller than our idle time. */
|
* bucket that has an idle time smaller than our idle time. */
|
||||||
k = 0;
|
int k = 0;
|
||||||
while (k < EVPOOL_SIZE &&
|
while (k < EVPOOL_SIZE &&
|
||||||
pool[k].key &&
|
pool[k].key &&
|
||||||
pool[k].idle < idle) k++;
|
pool[k].idle < idle) k++;
|
||||||
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
|
if (k == 0 && pool[EVPOOL_SIZE-1].key != NULL) {
|
||||||
/* Can't insert if the element is < the worst element we have
|
/* Can't insert if the element is < the worst element we have
|
||||||
* and there are no empty buckets. */
|
* and there are no empty buckets. */
|
||||||
continue;
|
return;
|
||||||
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
|
} else if (k < EVPOOL_SIZE && pool[k].key == NULL) {
|
||||||
/* Inserting into empty position. No setup needed before insert. */
|
/* Inserting into empty position. No setup needed before insert. */
|
||||||
} else {
|
} else {
|
||||||
@ -254,6 +227,47 @@ void evictionPoolPopulate(int dbid, dict *sampledict, dict *keydict, struct evic
|
|||||||
pool[k].idle = idle;
|
pool[k].idle = idle;
|
||||||
pool[k].dbid = dbid;
|
pool[k].dbid = dbid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* This is an helper function for freeMemoryIfNeeded(), it is used in order
|
||||||
|
* to populate the evictionPool with a few entries every time we want to
|
||||||
|
* expire a key. Keys with idle time smaller than one of the current
|
||||||
|
* keys are added. Keys are always added if there are free entries.
|
||||||
|
*
|
||||||
|
* We insert keys on place in ascending order, so keys with the smaller
|
||||||
|
* idle time are on the left, and keys with the higher idle time on the
|
||||||
|
* right. */
|
||||||
|
|
||||||
|
struct visitFunctor
|
||||||
|
{
|
||||||
|
int dbid;
|
||||||
|
dict *dbdict;
|
||||||
|
struct evictionPoolEntry *pool;
|
||||||
|
int count;
|
||||||
|
|
||||||
|
bool operator()(const expireEntry &e)
|
||||||
|
{
|
||||||
|
dictEntry *de = dictFind(dbdict, e.key());
|
||||||
|
processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool);
|
||||||
|
++count;
|
||||||
|
return count < g_pserver->maxmemory_samples;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
void evictionPoolPopulate(int dbid, dict *dbdict, semiorderedset<expireEntry,const char*> *setexpire, struct evictionPoolEntry *pool)
|
||||||
|
{
|
||||||
|
if (setexpire != nullptr)
|
||||||
|
{
|
||||||
|
visitFunctor visitor { dbid, dbdict, pool, 0 };
|
||||||
|
setexpire->random_visit(visitor);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
|
||||||
|
int count = dictGetSomeKeys(dbdict,samples,g_pserver->maxmemory_samples);
|
||||||
|
for (int j = 0; j < count; j++) {
|
||||||
|
robj *o = (robj*)dictGetVal(samples[j]);
|
||||||
|
processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ----------------------------------------------------------------------------
|
/* ----------------------------------------------------------------------------
|
||||||
@ -474,8 +488,6 @@ int freeMemoryIfNeeded(void) {
|
|||||||
sds bestkey = NULL;
|
sds bestkey = NULL;
|
||||||
int bestdbid;
|
int bestdbid;
|
||||||
redisDb *db;
|
redisDb *db;
|
||||||
dict *dict;
|
|
||||||
dictEntry *de;
|
|
||||||
|
|
||||||
if (g_pserver->maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
|
if (g_pserver->maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) ||
|
||||||
g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
|
g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL)
|
||||||
@ -490,10 +502,18 @@ int freeMemoryIfNeeded(void) {
|
|||||||
* every DB. */
|
* every DB. */
|
||||||
for (i = 0; i < cserver.dbnum; i++) {
|
for (i = 0; i < cserver.dbnum; i++) {
|
||||||
db = g_pserver->db+i;
|
db = g_pserver->db+i;
|
||||||
dict = (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) ?
|
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)
|
||||||
db->pdict : db->expires;
|
{
|
||||||
if ((keys = dictSize(dict)) != 0) {
|
if ((keys = dictSize(db->pdict)) != 0) {
|
||||||
evictionPoolPopulate(i, dict, db->pdict, pool);
|
evictionPoolPopulate(i, db->pdict, nullptr, pool);
|
||||||
|
total_keys += keys;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
keys = db->setexpire->size();
|
||||||
|
if (keys != 0)
|
||||||
|
evictionPoolPopulate(i, db->pdict, db->setexpire, pool);
|
||||||
total_keys += keys;
|
total_keys += keys;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -503,14 +523,11 @@ int freeMemoryIfNeeded(void) {
|
|||||||
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
|
for (k = EVPOOL_SIZE-1; k >= 0; k--) {
|
||||||
if (pool[k].key == NULL) continue;
|
if (pool[k].key == NULL) continue;
|
||||||
bestdbid = pool[k].dbid;
|
bestdbid = pool[k].dbid;
|
||||||
|
sds key = nullptr;
|
||||||
|
|
||||||
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) {
|
dictEntry *de = dictFind(g_pserver->db[pool[k].dbid].pdict,pool[k].key);
|
||||||
de = dictFind(g_pserver->db[pool[k].dbid].pdict,
|
if (de != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || ((robj*)dictGetVal(de))->FExpires()))
|
||||||
pool[k].key);
|
key = (sds)dictGetKey(de);
|
||||||
} else {
|
|
||||||
de = dictFind(g_pserver->db[pool[k].dbid].expires,
|
|
||||||
pool[k].key);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Remove the entry from the pool. */
|
/* Remove the entry from the pool. */
|
||||||
if (pool[k].key != pool[k].cached)
|
if (pool[k].key != pool[k].cached)
|
||||||
@ -520,8 +537,8 @@ int freeMemoryIfNeeded(void) {
|
|||||||
|
|
||||||
/* If the key exists, is our pick. Otherwise it is
|
/* If the key exists, is our pick. Otherwise it is
|
||||||
* a ghost and we need to try the next element. */
|
* a ghost and we need to try the next element. */
|
||||||
if (de) {
|
if (key) {
|
||||||
bestkey = (sds)dictGetKey(de);
|
bestkey = key;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
/* Ghost... Iterate again. */
|
/* Ghost... Iterate again. */
|
||||||
@ -540,15 +557,25 @@ int freeMemoryIfNeeded(void) {
|
|||||||
for (i = 0; i < cserver.dbnum; i++) {
|
for (i = 0; i < cserver.dbnum; i++) {
|
||||||
j = (++next_db) % cserver.dbnum;
|
j = (++next_db) % cserver.dbnum;
|
||||||
db = g_pserver->db+j;
|
db = g_pserver->db+j;
|
||||||
dict = (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) ?
|
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
|
||||||
db->pdict : db->expires;
|
{
|
||||||
if (dictSize(dict) != 0) {
|
if (dictSize(db->pdict) != 0) {
|
||||||
de = dictGetRandomKey(dict);
|
dictEntry *de = dictGetRandomKey(db->pdict);
|
||||||
bestkey = (sds)dictGetKey(de);
|
bestkey = (sds)dictGetKey(de);
|
||||||
bestdbid = j;
|
bestdbid = j;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
if (!db->setexpire->empty())
|
||||||
|
{
|
||||||
|
bestkey = (sds)db->setexpire->random_value().key();
|
||||||
|
bestdbid = j;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Finally remove the selected key. */
|
/* Finally remove the selected key. */
|
||||||
|
107
src/expire.cpp
107
src/expire.cpp
@ -51,10 +51,7 @@
|
|||||||
*
|
*
|
||||||
* The parameter 'now' is the current time in milliseconds as is passed
|
* The parameter 'now' is the current time in milliseconds as is passed
|
||||||
* to the function to avoid too many gettimeofday() syscalls. */
|
* to the function to avoid too many gettimeofday() syscalls. */
|
||||||
int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
|
void activeExpireCycleExpire(redisDb *db, const char *key) {
|
||||||
long long t = dictGetSignedIntegerVal(de);
|
|
||||||
if (now > t) {
|
|
||||||
sds key = (sds)dictGetKey(de);
|
|
||||||
robj *keyobj = createStringObject(key,sdslen(key));
|
robj *keyobj = createStringObject(key,sdslen(key));
|
||||||
|
|
||||||
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire);
|
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_expire);
|
||||||
@ -67,10 +64,6 @@ int activeExpireCycleTryExpire(redisDb *db, dictEntry *de, long long now) {
|
|||||||
if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj);
|
if (g_pserver->tracking_clients) trackingInvalidateKey(keyobj);
|
||||||
decrRefCount(keyobj);
|
decrRefCount(keyobj);
|
||||||
g_pserver->stat_expiredkeys++;
|
g_pserver->stat_expiredkeys++;
|
||||||
return 1;
|
|
||||||
} else {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Try to expire a few timed out keys. The algorithm used is adaptive and
|
/* Try to expire a few timed out keys. The algorithm used is adaptive and
|
||||||
@ -148,7 +141,6 @@ void activeExpireCycle(int type) {
|
|||||||
long total_expired = 0;
|
long total_expired = 0;
|
||||||
|
|
||||||
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
|
for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
|
||||||
int expired;
|
|
||||||
redisDb *db = g_pserver->db+(current_db % cserver.dbnum);
|
redisDb *db = g_pserver->db+(current_db % cserver.dbnum);
|
||||||
|
|
||||||
/* Increment the DB now so we are sure if we run out of time
|
/* Increment the DB now so we are sure if we run out of time
|
||||||
@ -156,78 +148,44 @@ void activeExpireCycle(int type) {
|
|||||||
* distribute the time evenly across DBs. */
|
* distribute the time evenly across DBs. */
|
||||||
current_db++;
|
current_db++;
|
||||||
|
|
||||||
/* Continue to expire if at the end of the cycle more than 25%
|
long long now;
|
||||||
* of the keys were expired. */
|
|
||||||
do {
|
|
||||||
unsigned long num, slots;
|
|
||||||
long long now, ttl_sum;
|
|
||||||
int ttl_samples;
|
|
||||||
iteration++;
|
iteration++;
|
||||||
|
|
||||||
/* If there is nothing to expire try next DB ASAP. */
|
|
||||||
if ((num = dictSize(db->expires)) == 0) {
|
|
||||||
db->avg_ttl = 0;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
slots = dictSlots(db->expires);
|
|
||||||
now = mstime();
|
now = mstime();
|
||||||
|
|
||||||
/* When there are less than 1% filled slots getting random
|
/* If there is nothing to expire try next DB ASAP. */
|
||||||
* keys is expensive, so stop here waiting for better times...
|
if (db->setexpire->empty())
|
||||||
* The dictionary will be resized asap. */
|
{
|
||||||
if (num && slots > DICT_HT_INITIAL_SIZE &&
|
db->avg_ttl = 0;
|
||||||
(num*100/slots < 1)) break;
|
db->last_expire_set = now;
|
||||||
|
continue;
|
||||||
/* The main collection cycle. Sample random keys among keys
|
|
||||||
* with an expire set, checking for expired ones. */
|
|
||||||
expired = 0;
|
|
||||||
ttl_sum = 0;
|
|
||||||
ttl_samples = 0;
|
|
||||||
|
|
||||||
if (num > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP)
|
|
||||||
num = ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP;
|
|
||||||
|
|
||||||
while (num--) {
|
|
||||||
dictEntry *de;
|
|
||||||
long long ttl;
|
|
||||||
|
|
||||||
if ((de = dictGetRandomKey(db->expires)) == NULL) break;
|
|
||||||
ttl = dictGetSignedIntegerVal(de)-now;
|
|
||||||
if (activeExpireCycleTryExpire(db,de,now)) expired++;
|
|
||||||
if (ttl > 0) {
|
|
||||||
/* We want the average TTL of keys yet not expired. */
|
|
||||||
ttl_sum += ttl;
|
|
||||||
ttl_samples++;
|
|
||||||
}
|
|
||||||
total_sampled++;
|
|
||||||
}
|
|
||||||
total_expired += expired;
|
|
||||||
|
|
||||||
/* Update the average TTL stats for this database. */
|
|
||||||
if (ttl_samples) {
|
|
||||||
long long avg_ttl = ttl_sum/ttl_samples;
|
|
||||||
|
|
||||||
/* Do a simple running average with a few samples.
|
|
||||||
* We just use the current estimate with a weight of 2%
|
|
||||||
* and the previous estimate with a weight of 98%. */
|
|
||||||
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
|
|
||||||
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
size_t expired = 0;
|
||||||
|
size_t tried = 0;
|
||||||
|
db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](const expireEntry &e) __attribute__((always_inline)) {
|
||||||
|
if (e.when() < now)
|
||||||
|
{
|
||||||
|
activeExpireCycleExpire(db, e.key());
|
||||||
|
++expired;
|
||||||
|
}
|
||||||
|
++tried;
|
||||||
|
|
||||||
|
if ((tried % ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP) == 0)
|
||||||
|
{
|
||||||
/* We can't block forever here even if there are many keys to
|
/* We can't block forever here even if there are many keys to
|
||||||
* expire. So after a given amount of milliseconds return to the
|
* expire. So after a given amount of milliseconds return to the
|
||||||
* caller waiting for the other active expire cycle. */
|
* caller waiting for the other active expire cycle. */
|
||||||
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
|
|
||||||
elapsed = ustime()-start;
|
elapsed = ustime()-start;
|
||||||
if (elapsed > timelimit) {
|
if (elapsed > timelimit) {
|
||||||
timelimit_exit = 1;
|
timelimit_exit = 1;
|
||||||
g_pserver->stat_expired_time_cap_reached_count++;
|
g_pserver->stat_expired_time_cap_reached_count++;
|
||||||
break;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* We don't repeat the cycle if there are less than 25% of keys
|
return true;
|
||||||
* found expired in the current DB. */
|
});
|
||||||
} while (expired > ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP/4);
|
|
||||||
|
total_expired += expired;
|
||||||
}
|
}
|
||||||
|
|
||||||
elapsed = ustime()-start;
|
elapsed = ustime()-start;
|
||||||
@ -301,20 +259,27 @@ void expireSlaveKeys(void) {
|
|||||||
while(dbids && dbid < cserver.dbnum) {
|
while(dbids && dbid < cserver.dbnum) {
|
||||||
if ((dbids & 1) != 0) {
|
if ((dbids & 1) != 0) {
|
||||||
redisDb *db = g_pserver->db+dbid;
|
redisDb *db = g_pserver->db+dbid;
|
||||||
dictEntry *expire = dictFind(db->expires,keyname);
|
|
||||||
|
// the expire is hashed based on the key pointer, so we need the point in the main db
|
||||||
|
dictEntry *deMain = dictFind(db->pdict, keyname);
|
||||||
|
auto itr = db->setexpire->end();
|
||||||
|
if (deMain != nullptr)
|
||||||
|
itr = db->setexpire->find((sds)dictGetKey(deMain));
|
||||||
int expired = 0;
|
int expired = 0;
|
||||||
|
|
||||||
if (expire &&
|
if (itr != db->setexpire->end())
|
||||||
activeExpireCycleTryExpire(g_pserver->db+dbid,expire,start))
|
|
||||||
{
|
{
|
||||||
|
if (itr->when() < start) {
|
||||||
|
activeExpireCycleExpire(g_pserver->db+dbid,itr->key());
|
||||||
expired = 1;
|
expired = 1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* If the key was not expired in this DB, we need to set the
|
/* If the key was not expired in this DB, we need to set the
|
||||||
* corresponding bit in the new bitmap we set as value.
|
* corresponding bit in the new bitmap we set as value.
|
||||||
* At the end of the loop if the bitmap is zero, it means we
|
* At the end of the loop if the bitmap is zero, it means we
|
||||||
* no longer need to keep track of this key. */
|
* no longer need to keep track of this key. */
|
||||||
if (expire && !expired) {
|
if (itr != db->setexpire->end() && !expired) {
|
||||||
noexpire++;
|
noexpire++;
|
||||||
new_dbids |= (uint64_t)1 << dbid;
|
new_dbids |= (uint64_t)1 << dbid;
|
||||||
}
|
}
|
||||||
|
@ -52,16 +52,19 @@ size_t lazyfreeGetFreeEffort(robj *obj) {
|
|||||||
* will be reclaimed in a different bio.c thread. */
|
* will be reclaimed in a different bio.c thread. */
|
||||||
#define LAZYFREE_THRESHOLD 64
|
#define LAZYFREE_THRESHOLD 64
|
||||||
int dbAsyncDelete(redisDb *db, robj *key) {
|
int dbAsyncDelete(redisDb *db, robj *key) {
|
||||||
/* Deleting an entry from the expires dict will not free the sds of
|
|
||||||
* the key, because it is shared with the main dictionary. */
|
|
||||||
if (dictSize(db->expires) > 0) dictDelete(db->expires,ptrFromObj(key));
|
|
||||||
|
|
||||||
/* If the value is composed of a few allocations, to free in a lazy way
|
/* If the value is composed of a few allocations, to free in a lazy way
|
||||||
* is actually just slower... So under a certain limit we just free
|
* is actually just slower... So under a certain limit we just free
|
||||||
* the object synchronously. */
|
* the object synchronously. */
|
||||||
dictEntry *de = dictUnlink(db->pdict,ptrFromObj(key));
|
dictEntry *de = dictUnlink(db->pdict,ptrFromObj(key));
|
||||||
if (de) {
|
if (de) {
|
||||||
robj *val = (robj*)dictGetVal(de);
|
robj *val = (robj*)dictGetVal(de);
|
||||||
|
if (val->FExpires())
|
||||||
|
{
|
||||||
|
/* Deleting an entry from the expires dict will not free the sds of
|
||||||
|
* the key, because it is shared with the main dictionary. */
|
||||||
|
removeExpireCore(db,key,de);
|
||||||
|
}
|
||||||
|
|
||||||
size_t free_effort = lazyfreeGetFreeEffort(val);
|
size_t free_effort = lazyfreeGetFreeEffort(val);
|
||||||
|
|
||||||
/* If releasing the object is too much work, do it in the background
|
/* If releasing the object is too much work, do it in the background
|
||||||
@ -72,7 +75,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
|
|||||||
* objects, and then call dbDelete(). In this case we'll fall
|
* objects, and then call dbDelete(). In this case we'll fall
|
||||||
* through and reach the dictFreeUnlinkedEntry() call, that will be
|
* through and reach the dictFreeUnlinkedEntry() call, that will be
|
||||||
* equivalent to just calling decrRefCount(). */
|
* equivalent to just calling decrRefCount(). */
|
||||||
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
|
if (free_effort > LAZYFREE_THRESHOLD && val->getrefcount(std::memory_order_relaxed) == 1) {
|
||||||
atomicIncr(lazyfree_objects,1);
|
atomicIncr(lazyfree_objects,1);
|
||||||
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
|
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
|
||||||
dictSetVal(db->pdict,de,NULL);
|
dictSetVal(db->pdict,de,NULL);
|
||||||
@ -93,7 +96,7 @@ int dbAsyncDelete(redisDb *db, robj *key) {
|
|||||||
/* Free an object, if the object is huge enough, free it in async way. */
|
/* Free an object, if the object is huge enough, free it in async way. */
|
||||||
void freeObjAsync(robj *o) {
|
void freeObjAsync(robj *o) {
|
||||||
size_t free_effort = lazyfreeGetFreeEffort(o);
|
size_t free_effort = lazyfreeGetFreeEffort(o);
|
||||||
if (free_effort > LAZYFREE_THRESHOLD && o->refcount == 1) {
|
if (free_effort > LAZYFREE_THRESHOLD && o->getrefcount(std::memory_order_relaxed) == 1) {
|
||||||
atomicIncr(lazyfree_objects,1);
|
atomicIncr(lazyfree_objects,1);
|
||||||
bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL);
|
bioCreateBackgroundJob(BIO_LAZY_FREE,o,NULL,NULL);
|
||||||
} else {
|
} else {
|
||||||
@ -105,11 +108,13 @@ void freeObjAsync(robj *o) {
|
|||||||
* create a new empty set of hash tables and scheduling the old ones for
|
* create a new empty set of hash tables and scheduling the old ones for
|
||||||
* lazy freeing. */
|
* lazy freeing. */
|
||||||
void emptyDbAsync(redisDb *db) {
|
void emptyDbAsync(redisDb *db) {
|
||||||
dict *oldht1 = db->pdict, *oldht2 = db->expires;
|
dict *oldht1 = db->pdict;
|
||||||
|
auto *set = db->setexpire;
|
||||||
|
db->setexpire = new (MALLOC_LOCAL) semiorderedset<expireEntry, const char*>();
|
||||||
|
db->expireitr = db->setexpire->end();
|
||||||
db->pdict = dictCreate(&dbDictType,NULL);
|
db->pdict = dictCreate(&dbDictType,NULL);
|
||||||
db->expires = dictCreate(&keyptrDictType,NULL);
|
|
||||||
atomicIncr(lazyfree_objects,dictSize(oldht1));
|
atomicIncr(lazyfree_objects,dictSize(oldht1));
|
||||||
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,oldht2);
|
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,set);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
|
/* Empty the slots-keys map of Redis CLuster by creating a new empty one
|
||||||
@ -136,10 +141,10 @@ void lazyfreeFreeObjectFromBioThread(robj *o) {
|
|||||||
* when the database was logically deleted. 'sl' is a skiplist used by
|
* when the database was logically deleted. 'sl' is a skiplist used by
|
||||||
* Redis Cluster in order to take the hash slots -> keys mapping. This
|
* Redis Cluster in order to take the hash slots -> keys mapping. This
|
||||||
* may be NULL if Redis Cluster is disabled. */
|
* may be NULL if Redis Cluster is disabled. */
|
||||||
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, dict *ht2) {
|
void lazyfreeFreeDatabaseFromBioThread(dict *ht1, semiorderedset<expireEntry, const char *> *set) {
|
||||||
size_t numkeys = dictSize(ht1);
|
size_t numkeys = dictSize(ht1);
|
||||||
dictRelease(ht1);
|
dictRelease(ht1);
|
||||||
dictRelease(ht2);
|
delete set;
|
||||||
atomicDecr(lazyfree_objects,numkeys);
|
atomicDecr(lazyfree_objects,numkeys);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -566,7 +566,7 @@ void RedisModuleCommandDispatcher(client *c) {
|
|||||||
for (int i = 0; i < c->argc; i++) {
|
for (int i = 0; i < c->argc; i++) {
|
||||||
/* Only do the work if the module took ownership of the object:
|
/* Only do the work if the module took ownership of the object:
|
||||||
* in that case the refcount is no longer 1. */
|
* in that case the refcount is no longer 1. */
|
||||||
if (c->argv[i]->refcount > 1)
|
if (c->argv[i]->getrefcount(std::memory_order_relaxed) > 1)
|
||||||
trimStringObjectIfNeeded(c->argv[i]);
|
trimStringObjectIfNeeded(c->argv[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1037,7 +1037,7 @@ int RM_StringCompare(RedisModuleString *a, RedisModuleString *b) {
|
|||||||
/* Return the (possibly modified in encoding) input 'str' object if
|
/* Return the (possibly modified in encoding) input 'str' object if
|
||||||
* the string is unshared, otherwise NULL is returned. */
|
* the string is unshared, otherwise NULL is returned. */
|
||||||
RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) {
|
RedisModuleString *moduleAssertUnsharedString(RedisModuleString *str) {
|
||||||
if (str->refcount != 1) {
|
if (str->getrefcount(std::memory_order_relaxed) != 1) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
"Module attempted to use an in-place string modify operation "
|
"Module attempted to use an in-place string modify operation "
|
||||||
"with a string referenced multiple times. Please check the code "
|
"with a string referenced multiple times. Please check the code "
|
||||||
|
@ -39,11 +39,11 @@
|
|||||||
/* ===================== Creation and parsing of objects ==================== */
|
/* ===================== Creation and parsing of objects ==================== */
|
||||||
|
|
||||||
robj *createObject(int type, void *ptr) {
|
robj *createObject(int type, void *ptr) {
|
||||||
robj *o = (robj*)zmalloc(sizeof(*o), MALLOC_SHARED);
|
robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED);
|
||||||
o->type = type;
|
o->type = type;
|
||||||
o->encoding = OBJ_ENCODING_RAW;
|
o->encoding = OBJ_ENCODING_RAW;
|
||||||
o->m_ptr = ptr;
|
o->m_ptr = ptr;
|
||||||
o->refcount.store(1, std::memory_order_relaxed);
|
o->setrefcount(1);
|
||||||
o->mvcc_tstamp = OBJ_MVCC_INVALID;
|
o->mvcc_tstamp = OBJ_MVCC_INVALID;
|
||||||
|
|
||||||
/* Set the LRU to the current lruclock (minutes resolution), or
|
/* Set the LRU to the current lruclock (minutes resolution), or
|
||||||
@ -68,8 +68,9 @@ robj *createObject(int type, void *ptr) {
|
|||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
robj *makeObjectShared(robj *o) {
|
robj *makeObjectShared(robj *o) {
|
||||||
serverAssert(o->refcount == 1);
|
serverAssert(o->getrefcount(std::memory_order_relaxed) == 1);
|
||||||
o->refcount.store(OBJ_SHARED_REFCOUNT, std::memory_order_relaxed);
|
serverAssert(!o->FExpires());
|
||||||
|
o->setrefcount(OBJ_SHARED_REFCOUNT);
|
||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,12 +87,12 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
|
|||||||
size_t allocsize = sizeof(struct sdshdr8)+len+1;
|
size_t allocsize = sizeof(struct sdshdr8)+len+1;
|
||||||
if (allocsize < sizeof(void*))
|
if (allocsize < sizeof(void*))
|
||||||
allocsize = sizeof(void*);
|
allocsize = sizeof(void*);
|
||||||
robj *o = (robj*)zmalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED);
|
robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED);
|
||||||
struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr);
|
struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr);
|
||||||
|
|
||||||
o->type = OBJ_STRING;
|
o->type = OBJ_STRING;
|
||||||
o->encoding = OBJ_ENCODING_EMBSTR;
|
o->encoding = OBJ_ENCODING_EMBSTR;
|
||||||
o->refcount.store(1, std::memory_order_relaxed);
|
o->setrefcount(1);
|
||||||
o->mvcc_tstamp = OBJ_MVCC_INVALID;
|
o->mvcc_tstamp = OBJ_MVCC_INVALID;
|
||||||
|
|
||||||
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||||
@ -352,11 +353,14 @@ void freeStreamObject(robj_roptr o) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void incrRefCount(robj_roptr o) {
|
void incrRefCount(robj_roptr o) {
|
||||||
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount.fetch_add(1, std::memory_order_acquire);
|
if (o->getrefcount(std::memory_order_relaxed) != OBJ_SHARED_REFCOUNT) o->addref();
|
||||||
}
|
}
|
||||||
|
|
||||||
void decrRefCount(robj_roptr o) {
|
void decrRefCount(robj_roptr o) {
|
||||||
if (o->refcount.load(std::memory_order_acquire) == 1) {
|
if (o->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
|
||||||
|
return;
|
||||||
|
unsigned prev = o->release();
|
||||||
|
if (prev == 1) {
|
||||||
switch(o->type) {
|
switch(o->type) {
|
||||||
case OBJ_STRING: freeStringObject(o); break;
|
case OBJ_STRING: freeStringObject(o); break;
|
||||||
case OBJ_LIST: freeListObject(o); break;
|
case OBJ_LIST: freeListObject(o); break;
|
||||||
@ -369,8 +373,7 @@ void decrRefCount(robj_roptr o) {
|
|||||||
}
|
}
|
||||||
zfree(o.unsafe_robjcast());
|
zfree(o.unsafe_robjcast());
|
||||||
} else {
|
} else {
|
||||||
if (o->refcount <= 0) serverPanic("decrRefCount against refcount <= 0");
|
if (prev <= 0) serverPanic("decrRefCount against refcount <= 0");
|
||||||
if (o->refcount != OBJ_SHARED_REFCOUNT) o->refcount.fetch_sub(1, std::memory_order_acquire);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -394,7 +397,7 @@ void decrRefCountVoid(const void *o) {
|
|||||||
* decrRefCount(obj);
|
* decrRefCount(obj);
|
||||||
*/
|
*/
|
||||||
robj *resetRefCount(robj *obj) {
|
robj *resetRefCount(robj *obj) {
|
||||||
obj->refcount = 0;
|
obj->setrefcount(0);
|
||||||
return obj;
|
return obj;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -452,7 +455,7 @@ robj *tryObjectEncoding(robj *o) {
|
|||||||
/* It's not safe to encode shared objects: shared objects can be shared
|
/* It's not safe to encode shared objects: shared objects can be shared
|
||||||
* everywhere in the "object space" of Redis and may end in places where
|
* everywhere in the "object space" of Redis and may end in places where
|
||||||
* they are not handled. We handle them only as values in the keyspace. */
|
* they are not handled. We handle them only as values in the keyspace. */
|
||||||
if (o->refcount > 1) return o;
|
if (o->getrefcount(std::memory_order_relaxed) > 1) return o;
|
||||||
|
|
||||||
/* Check if we can represent this string as a long integer.
|
/* Check if we can represent this string as a long integer.
|
||||||
* Note that we are sure that a string larger than 20 chars is not
|
* Note that we are sure that a string larger than 20 chars is not
|
||||||
@ -1064,8 +1067,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
|
|||||||
mh->db[mh->num_dbs].overhead_ht_main = mem;
|
mh->db[mh->num_dbs].overhead_ht_main = mem;
|
||||||
mem_total+=mem;
|
mem_total+=mem;
|
||||||
|
|
||||||
mem = dictSize(db->expires) * sizeof(dictEntry) +
|
mem = db->setexpire->bytes_used();
|
||||||
dictSlots(db->expires) * sizeof(dictEntry*);
|
|
||||||
mh->db[mh->num_dbs].overhead_ht_expires = mem;
|
mh->db[mh->num_dbs].overhead_ht_expires = mem;
|
||||||
mem_total+=mem;
|
mem_total+=mem;
|
||||||
|
|
||||||
@ -1275,7 +1277,7 @@ NULL
|
|||||||
} else if (!strcasecmp(szFromObj(c->argv[1]),"refcount") && c->argc == 3) {
|
} else if (!strcasecmp(szFromObj(c->argv[1]),"refcount") && c->argc == 3) {
|
||||||
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
|
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
|
||||||
== NULL) return;
|
== NULL) return;
|
||||||
addReplyLongLong(c,o->refcount);
|
addReplyLongLong(c,o->getrefcount(std::memory_order_relaxed));
|
||||||
} else if (!strcasecmp(szFromObj(c->argv[1]),"encoding") && c->argc == 3) {
|
} else if (!strcasecmp(szFromObj(c->argv[1]),"encoding") && c->argc == 3) {
|
||||||
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
|
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
|
||||||
== NULL) return;
|
== NULL) return;
|
||||||
@ -1474,3 +1476,18 @@ NULL
|
|||||||
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)ptrFromObj(c->argv[1]));
|
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)ptrFromObj(c->argv[1]));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void redisObject::SetFExpires(bool fExpire)
|
||||||
|
{
|
||||||
|
serverAssert(this->refcount != OBJ_SHARED_REFCOUNT);
|
||||||
|
if (fExpire)
|
||||||
|
this->refcount.fetch_or(1U << 31, std::memory_order_relaxed);
|
||||||
|
else
|
||||||
|
this->refcount.fetch_and(~(1U << 31), std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
void redisObject::setrefcount(unsigned ref)
|
||||||
|
{
|
||||||
|
serverAssert(!FExpires());
|
||||||
|
refcount.store(ref, std::memory_order_relaxed);
|
||||||
|
}
|
52
src/rdb.cpp
52
src/rdb.cpp
@ -1096,6 +1096,29 @@ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o)
|
||||||
|
{
|
||||||
|
robj key;
|
||||||
|
long long expire;
|
||||||
|
|
||||||
|
initStaticStringObject(key,(char*)keystr);
|
||||||
|
expire = getExpire(db, &key);
|
||||||
|
|
||||||
|
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1)
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
/* When this RDB is produced as part of an AOF rewrite, move
|
||||||
|
* accumulated diff from parent to child while rewriting in
|
||||||
|
* order to have a smaller final write. */
|
||||||
|
if (flags & RDB_SAVE_AOF_PREAMBLE &&
|
||||||
|
rdb->processed_bytes > *processed+AOF_READ_DIFF_INTERVAL_BYTES)
|
||||||
|
{
|
||||||
|
*processed = rdb->processed_bytes;
|
||||||
|
aofReadDiffFromParent();
|
||||||
|
}
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
/* Produces a dump of the database in RDB format sending it to the specified
|
/* Produces a dump of the database in RDB format sending it to the specified
|
||||||
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
|
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
|
||||||
* is returned and part of the output, or all the output, can be
|
* is returned and part of the output, or all the output, can be
|
||||||
@ -1134,31 +1157,24 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
|
|||||||
* these sizes are just hints to resize the hash tables. */
|
* these sizes are just hints to resize the hash tables. */
|
||||||
uint64_t db_size, expires_size;
|
uint64_t db_size, expires_size;
|
||||||
db_size = dictSize(db->pdict);
|
db_size = dictSize(db->pdict);
|
||||||
expires_size = dictSize(db->expires);
|
expires_size = db->setexpire->size();
|
||||||
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
|
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
|
||||||
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
|
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
|
||||||
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
|
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
|
||||||
|
|
||||||
/* Iterate this DB writing every entry */
|
/* Iterate this DB writing every entry */
|
||||||
|
size_t ckeysExpired = 0;
|
||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
sds keystr = (sds)dictGetKey(de);
|
sds keystr = (sds)dictGetKey(de);
|
||||||
robj key, *o = (robj*)dictGetVal(de);
|
robj *o = (robj*)dictGetVal(de);
|
||||||
long long expire;
|
|
||||||
|
|
||||||
initStaticStringObject(key,keystr);
|
if (o->FExpires())
|
||||||
expire = getExpire(db,&key);
|
++ckeysExpired;
|
||||||
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
|
|
||||||
|
|
||||||
/* When this RDB is produced as part of an AOF rewrite, move
|
if (!saveKey(rdb, db, flags, &processed, keystr, o))
|
||||||
* accumulated diff from parent to child while rewriting in
|
goto werr;
|
||||||
* order to have a smaller final write. */
|
|
||||||
if (flags & RDB_SAVE_AOF_PREAMBLE &&
|
|
||||||
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
|
|
||||||
{
|
|
||||||
processed = rdb->processed_bytes;
|
|
||||||
aofReadDiffFromParent();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
serverAssert(ckeysExpired == db->setexpire->size());
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
di = NULL; /* So that we don't release it again on error. */
|
di = NULL; /* So that we don't release it again on error. */
|
||||||
}
|
}
|
||||||
@ -1822,6 +1838,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
o->mvcc_tstamp = mvcc_tstamp;
|
o->mvcc_tstamp = mvcc_tstamp;
|
||||||
|
serverAssert(!o->FExpires());
|
||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1965,7 +1982,6 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
|||||||
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
|
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
|
||||||
goto eoferr;
|
goto eoferr;
|
||||||
dictExpand(db->pdict,db_size);
|
dictExpand(db->pdict,db_size);
|
||||||
dictExpand(db->expires,expires_size);
|
|
||||||
continue; /* Read next opcode. */
|
continue; /* Read next opcode. */
|
||||||
} else if (type == RDB_OPCODE_AUX) {
|
} else if (type == RDB_OPCODE_AUX) {
|
||||||
/* AUX: generic string-string fields. Use to add state to RDB
|
/* AUX: generic string-string fields. Use to add state to RDB
|
||||||
@ -2079,7 +2095,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
|||||||
if (fInserted)
|
if (fInserted)
|
||||||
{
|
{
|
||||||
/* Set the expire time if needed */
|
/* Set the expire time if needed */
|
||||||
if (expiretime != -1) setExpire(NULL,db,key,expiretime);
|
if (expiretime != -1)
|
||||||
|
setExpire(NULL,db,key,expiretime);
|
||||||
|
|
||||||
/* Set usage information (for eviction). */
|
/* Set usage information (for eviction). */
|
||||||
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
|
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
|
||||||
@ -2101,6 +2118,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
|
|||||||
lfu_freq = -1;
|
lfu_freq = -1;
|
||||||
lru_idle = -1;
|
lru_idle = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Verify the checksum if RDB version is >= 5 */
|
/* Verify the checksum if RDB version is >= 5 */
|
||||||
if (rdbver >= 5) {
|
if (rdbver >= 5) {
|
||||||
uint64_t cksum, expected = rdb->cksum;
|
uint64_t cksum, expected = rdb->cksum;
|
||||||
|
@ -665,7 +665,7 @@ cleanup:
|
|||||||
* The object must be small, SDS-encoded, and with refcount = 1
|
* The object must be small, SDS-encoded, and with refcount = 1
|
||||||
* (we must be the only owner) for us to cache it. */
|
* (we must be the only owner) for us to cache it. */
|
||||||
if (j < LUA_CMD_OBJCACHE_SIZE &&
|
if (j < LUA_CMD_OBJCACHE_SIZE &&
|
||||||
o->refcount == 1 &&
|
o->getrefcount(std::memory_order_relaxed) == 1 &&
|
||||||
(o->encoding == OBJ_ENCODING_RAW ||
|
(o->encoding == OBJ_ENCODING_RAW ||
|
||||||
o->encoding == OBJ_ENCODING_EMBSTR) &&
|
o->encoding == OBJ_ENCODING_EMBSTR) &&
|
||||||
sdslen((sds)ptrFromObj(o)) <= LUA_CMD_OBJCACHE_MAX_LEN)
|
sdslen((sds)ptrFromObj(o)) <= LUA_CMD_OBJCACHE_MAX_LEN)
|
||||||
|
344
src/semiorderedset.h
Normal file
344
src/semiorderedset.h
Normal file
@ -0,0 +1,344 @@
|
|||||||
|
#pragma once
|
||||||
|
#include <assert.h>
|
||||||
|
#include "compactvector.h"
|
||||||
|
|
||||||
|
/****************************************
|
||||||
|
* semiorderedset.h:
|
||||||
|
*
|
||||||
|
* The ordered set is a hash set that maintains semi-ordering, that is you can iterate in sub-linear time over the set comparing a value.
|
||||||
|
* It has a few other useful properties vs the traditional set:
|
||||||
|
* 1. The key need not be the underlying type, the only requirement is the value type is castable to the key
|
||||||
|
* 2. The key need not have total ordering. The set will iterate until it finds an exact match with operator== on the value
|
||||||
|
* This provides additional flexibility on insert allowing us to optimize this case.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
|
||||||
|
extern uint64_t dictGenHashFunction(const void *key, int len);
|
||||||
|
|
||||||
|
template<typename T, typename T_KEY = T>
|
||||||
|
class semiorderedset
|
||||||
|
{
|
||||||
|
friend struct setiter;
|
||||||
|
std::vector<compactvector<T>> m_data;
|
||||||
|
size_t celem = 0;
|
||||||
|
static const size_t bits_min = 8;
|
||||||
|
size_t bits = bits_min;
|
||||||
|
size_t idxRehash = (1ULL << bits_min);
|
||||||
|
bool fPauseRehash = false;
|
||||||
|
|
||||||
|
constexpr size_t targetElementsPerBucket()
|
||||||
|
{
|
||||||
|
// Aim for roughly 4 cache lines per bucket (determined by imperical testing)
|
||||||
|
// lower values are faster but use more memory
|
||||||
|
return std::max((64/sizeof(T))*4, (size_t)2);
|
||||||
|
}
|
||||||
|
|
||||||
|
public:
|
||||||
|
semiorderedset()
|
||||||
|
{
|
||||||
|
m_data.resize((1ULL << bits));
|
||||||
|
}
|
||||||
|
|
||||||
|
struct setiter
|
||||||
|
{
|
||||||
|
semiorderedset *set;
|
||||||
|
size_t idxPrimary = 0;
|
||||||
|
size_t idxSecondary = 0;
|
||||||
|
|
||||||
|
setiter(semiorderedset *set)
|
||||||
|
{
|
||||||
|
this->set = set;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator==(const setiter &other) const
|
||||||
|
{
|
||||||
|
return (idxPrimary == other.idxPrimary) && (idxSecondary == other.idxSecondary);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator!=(const setiter &other) const { return !operator==(other); }
|
||||||
|
|
||||||
|
inline T &operator*() { return set->m_data[idxPrimary][idxSecondary]; }
|
||||||
|
inline const T &operator*() const { return set->m_data[idxPrimary][idxSecondary]; }
|
||||||
|
|
||||||
|
inline T *operator->() { return &set->m_data[idxPrimary][idxSecondary]; }
|
||||||
|
inline const T *operator->() const { return &set->m_data[idxPrimary][idxSecondary]; }
|
||||||
|
};
|
||||||
|
|
||||||
|
setiter find(const T_KEY &key)
|
||||||
|
{
|
||||||
|
RehashStep();
|
||||||
|
setiter itr(this);
|
||||||
|
itr.idxPrimary = idxFromObj(key);
|
||||||
|
|
||||||
|
for (int hashset = 0; hashset < 2; ++hashset) // rehashing may only be 1 resize behind, so we check up to two slots
|
||||||
|
{
|
||||||
|
auto &vecBucket = m_data[itr.idxPrimary];
|
||||||
|
|
||||||
|
auto itrFind = std::find(vecBucket.begin(), vecBucket.end(), key);
|
||||||
|
if (itrFind != vecBucket.end())
|
||||||
|
{
|
||||||
|
itr.idxSecondary = itrFind - vecBucket.begin();
|
||||||
|
return itr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// See if we have to check the older slot
|
||||||
|
size_t mask = (hashmask() >> 1);
|
||||||
|
if (itr.idxPrimary == (itr.idxPrimary & mask))
|
||||||
|
break; // same bucket we just checked
|
||||||
|
itr.idxPrimary &= mask;
|
||||||
|
if (FRehashedRow(itr.idxPrimary))
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
return end();
|
||||||
|
}
|
||||||
|
|
||||||
|
setiter end()
|
||||||
|
{
|
||||||
|
setiter itr(this);
|
||||||
|
itr.idxPrimary = m_data.size();
|
||||||
|
return itr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void insert(T &e, bool fRehash = false)
|
||||||
|
{
|
||||||
|
if (!fRehash)
|
||||||
|
RehashStep();
|
||||||
|
|
||||||
|
auto idx = idxFromObj(static_cast<T_KEY>(e));
|
||||||
|
if (!fRehash)
|
||||||
|
++celem;
|
||||||
|
|
||||||
|
typename compactvector<T>::iterator itrInsert;
|
||||||
|
if (!m_data[idx].empty() && !(e < m_data[idx].back()))
|
||||||
|
itrInsert = m_data[idx].end();
|
||||||
|
else
|
||||||
|
itrInsert = std::upper_bound(m_data[idx].begin(), m_data[idx].end(), e);
|
||||||
|
itrInsert = m_data[idx].insert(itrInsert, e);
|
||||||
|
|
||||||
|
if (celem > ((1ULL << bits)*targetElementsPerBucket()))
|
||||||
|
grow();
|
||||||
|
}
|
||||||
|
|
||||||
|
// enumeration starting from the 'itrStart'th key. Note that the iter is a hint, and need no be valid anymore
|
||||||
|
template<typename T_VISITOR, typename T_MAX>
|
||||||
|
setiter enumerate(const setiter &itrStart, const T_MAX &max, T_VISITOR fn)
|
||||||
|
{
|
||||||
|
setiter itr(itrStart);
|
||||||
|
|
||||||
|
if (itrStart.set == this) // really if this case isn't true its probably a bug
|
||||||
|
itr = itrStart; // but why crash the program when we can easily fix this?
|
||||||
|
|
||||||
|
fPauseRehash = true;
|
||||||
|
if (itr.idxPrimary >= m_data.size())
|
||||||
|
itr.idxPrimary = 0;
|
||||||
|
|
||||||
|
for (size_t ibucket = 0; ibucket < m_data.size(); ++ibucket)
|
||||||
|
{
|
||||||
|
if (!enumerate_bucket(itr, max, fn))
|
||||||
|
break;
|
||||||
|
itr.idxSecondary = 0;
|
||||||
|
|
||||||
|
++itr.idxPrimary;
|
||||||
|
if (itr.idxPrimary >= m_data.size())
|
||||||
|
itr.idxPrimary = 0;
|
||||||
|
}
|
||||||
|
fPauseRehash = false;
|
||||||
|
return itr;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This will "randomly" visit nodes biased towards lower values first
|
||||||
|
template<typename T_VISITOR>
|
||||||
|
size_t random_visit(T_VISITOR &fn)
|
||||||
|
{
|
||||||
|
bool fSawAny = true;
|
||||||
|
size_t visited = 0;
|
||||||
|
size_t basePrimary = rand() % m_data.size();
|
||||||
|
for (size_t idxSecondary = 0; fSawAny; ++idxSecondary)
|
||||||
|
{
|
||||||
|
fSawAny = false;
|
||||||
|
for (size_t idxPrimaryCount = 0; idxPrimaryCount < m_data.size(); ++idxPrimaryCount)
|
||||||
|
{
|
||||||
|
size_t idxPrimary = (basePrimary + idxPrimaryCount) % m_data.size();
|
||||||
|
if (idxSecondary < m_data[idxPrimary].size())
|
||||||
|
{
|
||||||
|
++visited;
|
||||||
|
fSawAny = true;
|
||||||
|
if (!fn(m_data[idxPrimary][idxSecondary]))
|
||||||
|
return visited;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return visited;
|
||||||
|
}
|
||||||
|
|
||||||
|
const T& random_value() const
|
||||||
|
{
|
||||||
|
assert(!empty());
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
size_t idxPrimary = rand() % m_data.size();
|
||||||
|
if (m_data[idxPrimary].empty())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
return m_data[idxPrimary][rand() % m_data[idxPrimary].size()];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void erase(const setiter &itr)
|
||||||
|
{
|
||||||
|
auto &vecRow = m_data[itr.idxPrimary];
|
||||||
|
vecRow.erase(vecRow.begin() + itr.idxSecondary);
|
||||||
|
--celem;
|
||||||
|
RehashStep();
|
||||||
|
}
|
||||||
|
|
||||||
|
void clear()
|
||||||
|
{
|
||||||
|
m_data = decltype(m_data)();
|
||||||
|
bits = bits_min;
|
||||||
|
m_data.resize(1ULL << bits);
|
||||||
|
idxRehash = m_data.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool empty() const noexcept { return celem == 0; }
|
||||||
|
size_t size() const noexcept { return celem; }
|
||||||
|
|
||||||
|
size_t bytes_used() const
|
||||||
|
{
|
||||||
|
size_t cb = sizeof(this) + (m_data.capacity()-m_data.size())*sizeof(T);
|
||||||
|
for (auto &vec : m_data)
|
||||||
|
{
|
||||||
|
cb += vec.bytes_used();
|
||||||
|
}
|
||||||
|
return cb;
|
||||||
|
}
|
||||||
|
|
||||||
|
#define DICT_STATS_VECTLEN 50
|
||||||
|
size_t getstats(char *buf, size_t bufsize) const
|
||||||
|
{
|
||||||
|
unsigned long i, slots = 0, chainlen, maxchainlen = 0;
|
||||||
|
unsigned long totchainlen = 0;
|
||||||
|
unsigned long clvector[DICT_STATS_VECTLEN] = {0};
|
||||||
|
size_t l = 0;
|
||||||
|
|
||||||
|
if (empty()) {
|
||||||
|
return snprintf(buf,bufsize,
|
||||||
|
"No stats available for empty dictionaries\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Compute stats. */
|
||||||
|
for (auto &vec : m_data) {
|
||||||
|
if (vec.empty()) {
|
||||||
|
clvector[0]++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
slots++;
|
||||||
|
/* For each hash entry on this slot... */
|
||||||
|
chainlen = vec.size();
|
||||||
|
|
||||||
|
clvector[(chainlen < DICT_STATS_VECTLEN) ? chainlen : (DICT_STATS_VECTLEN-1)]++;
|
||||||
|
if (chainlen > maxchainlen) maxchainlen = chainlen;
|
||||||
|
totchainlen += chainlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t used = m_data.size()-clvector[0];
|
||||||
|
/* Generate human readable stats. */
|
||||||
|
l += snprintf(buf+l,bufsize-l,
|
||||||
|
"semiordered set stats:\n"
|
||||||
|
" table size: %ld\n"
|
||||||
|
" number of slots: %ld\n"
|
||||||
|
" used slots: %ld\n"
|
||||||
|
" max chain length: %ld\n"
|
||||||
|
" avg chain length (counted): %.02f\n"
|
||||||
|
" avg chain length (computed): %.02f\n"
|
||||||
|
" Chain length distribution:\n",
|
||||||
|
size(), used, slots, maxchainlen,
|
||||||
|
(float)totchainlen/slots, (float)size()/m_data.size());
|
||||||
|
|
||||||
|
for (i = 0; i < DICT_STATS_VECTLEN; i++) {
|
||||||
|
if (clvector[i] == 0) continue;
|
||||||
|
if (l >= bufsize) break;
|
||||||
|
l += snprintf(buf+l,bufsize-l,
|
||||||
|
" %s%ld: %ld (%.02f%%)\n",
|
||||||
|
(i == DICT_STATS_VECTLEN-1)?">= ":"",
|
||||||
|
i, clvector[i], ((float)clvector[i]/m_data.size())*100);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Unlike snprintf(), teturn the number of characters actually written. */
|
||||||
|
if (bufsize) buf[bufsize-1] = '\0';
|
||||||
|
return strlen(buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
inline size_t hashmask() const { return (1ULL << bits) - 1; }
|
||||||
|
|
||||||
|
size_t idxFromObj(const T_KEY &key)
|
||||||
|
{
|
||||||
|
size_t v = (size_t)dictGenHashFunction(&key, sizeof(key));
|
||||||
|
return v & hashmask();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool FRehashedRow(size_t idx) const
|
||||||
|
{
|
||||||
|
return (idx >= (m_data.size()/2)) || (idx < idxRehash);
|
||||||
|
}
|
||||||
|
|
||||||
|
void RehashStep()
|
||||||
|
{
|
||||||
|
if (fPauseRehash)
|
||||||
|
return;
|
||||||
|
|
||||||
|
int steps = 0;
|
||||||
|
for (; idxRehash < (m_data.size()/2); ++idxRehash)
|
||||||
|
{
|
||||||
|
compactvector<T> vecT;
|
||||||
|
std::swap(m_data[idxRehash], vecT);
|
||||||
|
|
||||||
|
for (auto &v : vecT)
|
||||||
|
insert(v, true);
|
||||||
|
|
||||||
|
if (++steps > 1024)
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void grow()
|
||||||
|
{
|
||||||
|
assert(idxRehash >= (m_data.size()/2)); // we should have finished rehashing by the time we need to grow again
|
||||||
|
|
||||||
|
++bits;
|
||||||
|
m_data.resize(1ULL << bits);
|
||||||
|
idxRehash = 0;
|
||||||
|
RehashStep();
|
||||||
|
}
|
||||||
|
|
||||||
|
template<typename T_VISITOR, typename T_MAX>
|
||||||
|
inline bool enumerate_bucket(setiter &itr, const T_MAX &max, T_VISITOR &fn)
|
||||||
|
{
|
||||||
|
auto &vec = m_data[itr.idxPrimary];
|
||||||
|
for (; itr.idxSecondary < vec.size(); ++itr.idxSecondary)
|
||||||
|
{
|
||||||
|
// Assert we're ordered by T_MAX
|
||||||
|
assert((itr.idxSecondary+1) >= vec.size()
|
||||||
|
|| static_cast<T_MAX>(vec[itr.idxSecondary]) <= static_cast<T_MAX>(vec[itr.idxSecondary+1]));
|
||||||
|
|
||||||
|
if (max < static_cast<T_MAX>(*itr))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
size_t sizeBefore = vec.size();
|
||||||
|
if (!fn(*itr))
|
||||||
|
{
|
||||||
|
itr.idxSecondary++; // we still visited this node
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (vec.size() != sizeBefore)
|
||||||
|
{
|
||||||
|
assert(vec.size() == (sizeBefore-1)); // they may only remove the element passed to them
|
||||||
|
--itr.idxSecondary; // they deleted the element
|
||||||
|
}
|
||||||
|
}
|
||||||
|
vec.shrink_to_fit();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
};
|
@ -1428,8 +1428,6 @@ int htNeedsResize(dict *dict) {
|
|||||||
void tryResizeHashTables(int dbid) {
|
void tryResizeHashTables(int dbid) {
|
||||||
if (htNeedsResize(g_pserver->db[dbid].pdict))
|
if (htNeedsResize(g_pserver->db[dbid].pdict))
|
||||||
dictResize(g_pserver->db[dbid].pdict);
|
dictResize(g_pserver->db[dbid].pdict);
|
||||||
if (htNeedsResize(g_pserver->db[dbid].expires))
|
|
||||||
dictResize(g_pserver->db[dbid].expires);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Our hash table implementation performs rehashing incrementally while
|
/* Our hash table implementation performs rehashing incrementally while
|
||||||
@ -1445,11 +1443,6 @@ int incrementallyRehash(int dbid) {
|
|||||||
dictRehashMilliseconds(g_pserver->db[dbid].pdict,1);
|
dictRehashMilliseconds(g_pserver->db[dbid].pdict,1);
|
||||||
return 1; /* already used our millisecond for this loop... */
|
return 1; /* already used our millisecond for this loop... */
|
||||||
}
|
}
|
||||||
/* Expires */
|
|
||||||
if (dictIsRehashing(g_pserver->db[dbid].expires)) {
|
|
||||||
dictRehashMilliseconds(g_pserver->db[dbid].expires,1);
|
|
||||||
return 1; /* already used our millisecond for this loop... */
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1889,7 +1882,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
|
|
||||||
size = dictSlots(g_pserver->db[j].pdict);
|
size = dictSlots(g_pserver->db[j].pdict);
|
||||||
used = dictSize(g_pserver->db[j].pdict);
|
used = dictSize(g_pserver->db[j].pdict);
|
||||||
vkeys = dictSize(g_pserver->db[j].expires);
|
vkeys = g_pserver->db[j].setexpire->size();
|
||||||
if (used || vkeys) {
|
if (used || vkeys) {
|
||||||
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
|
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
|
||||||
/* dictPrintStats(g_pserver->dict); */
|
/* dictPrintStats(g_pserver->dict); */
|
||||||
@ -2926,12 +2919,14 @@ void initServer(void) {
|
|||||||
/* Create the Redis databases, and initialize other internal state. */
|
/* Create the Redis databases, and initialize other internal state. */
|
||||||
for (int j = 0; j < cserver.dbnum; j++) {
|
for (int j = 0; j < cserver.dbnum; j++) {
|
||||||
g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL);
|
g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL);
|
||||||
g_pserver->db[j].expires = dictCreate(&keyptrDictType,NULL);
|
g_pserver->db[j].setexpire = new(MALLOC_LOCAL) semiorderedset<expireEntry, const char*>;
|
||||||
|
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
|
||||||
g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
|
g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
|
||||||
g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
|
g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
|
||||||
g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL);
|
g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL);
|
||||||
g_pserver->db[j].id = j;
|
g_pserver->db[j].id = j;
|
||||||
g_pserver->db[j].avg_ttl = 0;
|
g_pserver->db[j].avg_ttl = 0;
|
||||||
|
g_pserver->db[j].last_expire_set = 0;
|
||||||
g_pserver->db[j].defrag_later = listCreate();
|
g_pserver->db[j].defrag_later = listCreate();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4571,11 +4566,18 @@ sds genRedisInfoString(const char *section) {
|
|||||||
long long keys, vkeys;
|
long long keys, vkeys;
|
||||||
|
|
||||||
keys = dictSize(g_pserver->db[j].pdict);
|
keys = dictSize(g_pserver->db[j].pdict);
|
||||||
vkeys = dictSize(g_pserver->db[j].expires);
|
vkeys = g_pserver->db[j].setexpire->size();
|
||||||
|
|
||||||
|
// Adjust TTL by the current time
|
||||||
|
g_pserver->db[j].avg_ttl -= (g_pserver->mstime - g_pserver->db[j].last_expire_set);
|
||||||
|
if (g_pserver->db[j].avg_ttl < 0)
|
||||||
|
g_pserver->db[j].avg_ttl = 0;
|
||||||
|
g_pserver->db[j].last_expire_set = g_pserver->mstime;
|
||||||
|
|
||||||
if (keys || vkeys) {
|
if (keys || vkeys) {
|
||||||
info = sdscatprintf(info,
|
info = sdscatprintf(info,
|
||||||
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
|
"db%d:keys=%lld,expires=%lld,avg_ttl=%lld\r\n",
|
||||||
j, keys, vkeys, g_pserver->db[j].avg_ttl);
|
j, keys, vkeys, static_cast<long long>(g_pserver->db[j].avg_ttl));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
60
src/server.h
60
src/server.h
@ -81,6 +81,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
|||||||
N-elements flat arrays */
|
N-elements flat arrays */
|
||||||
#include "rax.h" /* Radix tree */
|
#include "rax.h" /* Radix tree */
|
||||||
#include "uuid.h"
|
#include "uuid.h"
|
||||||
|
#include "semiorderedset.h"
|
||||||
|
|
||||||
/* Following includes allow test functions to be called from Redis main() */
|
/* Following includes allow test functions to be called from Redis main() */
|
||||||
#include "zipmap.h"
|
#include "zipmap.h"
|
||||||
@ -243,7 +244,7 @@ public:
|
|||||||
#define CONFIG_DEFAULT_ACTIVE_REPLICA 0
|
#define CONFIG_DEFAULT_ACTIVE_REPLICA 0
|
||||||
#define CONFIG_DEFAULT_ENABLE_MULTIMASTER 0
|
#define CONFIG_DEFAULT_ENABLE_MULTIMASTER 0
|
||||||
|
|
||||||
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
|
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 64 /* Loopkups per loop. */
|
||||||
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
|
#define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */
|
||||||
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */
|
#define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */
|
||||||
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
|
#define ACTIVE_EXPIRE_CYCLE_SLOW 0
|
||||||
@ -717,7 +718,7 @@ typedef struct RedisModuleDigest {
|
|||||||
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
|
#define LRU_CLOCK_MAX ((1<<LRU_BITS)-1) /* Max value of obj->lru */
|
||||||
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
|
#define LRU_CLOCK_RESOLUTION 1000 /* LRU clock resolution in ms */
|
||||||
|
|
||||||
#define OBJ_SHARED_REFCOUNT INT_MAX
|
#define OBJ_SHARED_REFCOUNT (0x7FFFFFFF)
|
||||||
#define OBJ_MVCC_INVALID (0xFFFFFFFFFFFFFFFFULL)
|
#define OBJ_MVCC_INVALID (0xFFFFFFFFFFFFFFFFULL)
|
||||||
|
|
||||||
typedef struct redisObject {
|
typedef struct redisObject {
|
||||||
@ -726,10 +727,21 @@ typedef struct redisObject {
|
|||||||
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
|
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
|
||||||
* LFU data (least significant 8 bits frequency
|
* LFU data (least significant 8 bits frequency
|
||||||
* and most significant 16 bits access time). */
|
* and most significant 16 bits access time). */
|
||||||
mutable std::atomic<int> refcount;
|
private:
|
||||||
|
mutable std::atomic<unsigned> refcount;
|
||||||
|
public:
|
||||||
uint64_t mvcc_tstamp;
|
uint64_t mvcc_tstamp;
|
||||||
void *m_ptr;
|
void *m_ptr;
|
||||||
|
|
||||||
|
inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; }
|
||||||
|
void SetFExpires(bool fExpires);
|
||||||
|
|
||||||
|
void setrefcount(unsigned ref);
|
||||||
|
unsigned getrefcount(std::memory_order order) const { return (refcount.load(order) & ~(1U << 31)); }
|
||||||
|
void addref() const { refcount.fetch_add(1, std::memory_order_acq_rel); }
|
||||||
|
unsigned release() const { return refcount.fetch_sub(1, std::memory_order_acq_rel) & ~(1U << 31); }
|
||||||
} robj;
|
} robj;
|
||||||
|
static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase");
|
||||||
|
|
||||||
__attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o)
|
__attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o)
|
||||||
{
|
{
|
||||||
@ -755,6 +767,38 @@ __attribute__((always_inline)) inline char *szFromObj(const robj *o)
|
|||||||
return (char*)ptrFromObj(o);
|
return (char*)ptrFromObj(o);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class expireEntry {
|
||||||
|
sds m_key;
|
||||||
|
long long m_when;
|
||||||
|
|
||||||
|
public:
|
||||||
|
expireEntry(sds key, long long when)
|
||||||
|
{
|
||||||
|
m_key = key;
|
||||||
|
m_when = when;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator!=(const expireEntry &e) const noexcept
|
||||||
|
{
|
||||||
|
return m_when != e.m_when || m_key != e.m_key;
|
||||||
|
}
|
||||||
|
bool operator==(const expireEntry &e) const noexcept
|
||||||
|
{
|
||||||
|
return m_when == e.m_when && m_key == e.m_key;
|
||||||
|
}
|
||||||
|
bool operator==(const char *key) const noexcept { return m_key == key; }
|
||||||
|
|
||||||
|
bool operator<(const expireEntry &e) const noexcept { return m_when < e.m_when; }
|
||||||
|
bool operator<(const char *key) const noexcept { return m_key < key; }
|
||||||
|
bool operator<(long long when) const noexcept { return m_when < when; }
|
||||||
|
|
||||||
|
const char *key() const noexcept { return m_key; }
|
||||||
|
long long when() const noexcept { return m_when; }
|
||||||
|
|
||||||
|
|
||||||
|
explicit operator const char*() const noexcept { return m_key; }
|
||||||
|
explicit operator long long() const noexcept { return m_when; }
|
||||||
|
};
|
||||||
|
|
||||||
/* The a string name for an object's type as listed above
|
/* The a string name for an object's type as listed above
|
||||||
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
|
* Native types are checked against the OBJ_STRING, OBJ_LIST, OBJ_* defines,
|
||||||
@ -766,7 +810,7 @@ const char *getObjectTypeName(robj_roptr o);
|
|||||||
* we'll update it when the structure is changed, to avoid bugs like
|
* we'll update it when the structure is changed, to avoid bugs like
|
||||||
* bug #85 introduced exactly in this way. */
|
* bug #85 introduced exactly in this way. */
|
||||||
#define initStaticStringObject(_var,_ptr) do { \
|
#define initStaticStringObject(_var,_ptr) do { \
|
||||||
_var.refcount = 1; \
|
_var.setrefcount(1); \
|
||||||
_var.type = OBJ_STRING; \
|
_var.type = OBJ_STRING; \
|
||||||
_var.encoding = OBJ_ENCODING_RAW; \
|
_var.encoding = OBJ_ENCODING_RAW; \
|
||||||
_var.m_ptr = _ptr; \
|
_var.m_ptr = _ptr; \
|
||||||
@ -793,12 +837,15 @@ typedef struct clientReplyBlock {
|
|||||||
* database. The database number is the 'id' field in the structure. */
|
* database. The database number is the 'id' field in the structure. */
|
||||||
typedef struct redisDb {
|
typedef struct redisDb {
|
||||||
dict *pdict; /* The keyspace for this DB */
|
dict *pdict; /* The keyspace for this DB */
|
||||||
dict *expires; /* Timeout of keys with a timeout set */
|
semiorderedset<expireEntry, const char*> *setexpire;
|
||||||
|
semiorderedset<expireEntry, const char*>::setiter expireitr;
|
||||||
|
|
||||||
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
|
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
|
||||||
dict *ready_keys; /* Blocked keys that received a PUSH */
|
dict *ready_keys; /* Blocked keys that received a PUSH */
|
||||||
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
|
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
|
||||||
int id; /* Database ID */
|
int id; /* Database ID */
|
||||||
long long avg_ttl; /* Average TTL, just for stats */
|
long long last_expire_set; /* when the last expire was set */
|
||||||
|
double avg_ttl; /* Average TTL, just for stats */
|
||||||
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
||||||
} redisDb;
|
} redisDb;
|
||||||
|
|
||||||
@ -2174,6 +2221,7 @@ int rewriteConfig(char *path);
|
|||||||
|
|
||||||
/* db.c -- Keyspace access API */
|
/* db.c -- Keyspace access API */
|
||||||
int removeExpire(redisDb *db, robj *key);
|
int removeExpire(redisDb *db, robj *key);
|
||||||
|
int removeExpireCore(redisDb *db, robj *key, dictEntry *de);
|
||||||
void propagateExpire(redisDb *db, robj *key, int lazy);
|
void propagateExpire(redisDb *db, robj *key, int lazy);
|
||||||
int expireIfNeeded(redisDb *db, robj *key);
|
int expireIfNeeded(redisDb *db, robj *key);
|
||||||
long long getExpire(redisDb *db, robj_roptr key);
|
long long getExpire(redisDb *db, robj_roptr key);
|
||||||
|
@ -72,7 +72,7 @@ slowlogEntry *slowlogCreateEntry(client *c, robj **argv, int argc, long long dur
|
|||||||
(unsigned long)
|
(unsigned long)
|
||||||
sdslen(szFromObj(argv[j])) - SLOWLOG_ENTRY_MAX_STRING);
|
sdslen(szFromObj(argv[j])) - SLOWLOG_ENTRY_MAX_STRING);
|
||||||
se->argv[j] = createObject(OBJ_STRING,s);
|
se->argv[j] = createObject(OBJ_STRING,s);
|
||||||
} else if (argv[j]->refcount == OBJ_SHARED_REFCOUNT) {
|
} else if (argv[j]->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) {
|
||||||
se->argv[j] = argv[j];
|
se->argv[j] = argv[j];
|
||||||
} else {
|
} else {
|
||||||
/* Here we need to dupliacate the string objects composing the
|
/* Here we need to dupliacate the string objects composing the
|
||||||
|
@ -353,7 +353,7 @@ void incrDecrCommand(client *c, long long incr) {
|
|||||||
}
|
}
|
||||||
value += incr;
|
value += incr;
|
||||||
|
|
||||||
if (o && o->refcount == 1 && o->encoding == OBJ_ENCODING_INT &&
|
if (o && o->getrefcount(std::memory_order_relaxed) == 1 && o->encoding == OBJ_ENCODING_INT &&
|
||||||
(value < 0 || value >= OBJ_SHARED_INTEGERS) &&
|
(value < 0 || value >= OBJ_SHARED_INTEGERS) &&
|
||||||
value >= LONG_MIN && value <= LONG_MAX)
|
value >= LONG_MIN && value <= LONG_MAX)
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user