Isolate the persistent parts of the database from the runtime parts

Former-commit-id: 08995b065cb0ed6df16528e73c7acb28bcf3c1f4
This commit is contained in:
John Sully 2019-09-19 22:36:29 -04:00
parent ceaaf39c96
commit f10e05f1bf
10 changed files with 397 additions and 218 deletions

View File

@ -41,7 +41,6 @@
int keyIsExpired(redisDb *db, robj *key);
int expireIfNeeded(redisDb *db, robj *key, robj *o);
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
/* Update LFU when an object is accessed.
* Firstly, decrement the counter if the decrement time is reached.
@ -57,8 +56,7 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew)
serverAssert(valOld->FExpires());
serverAssert(!valNew->FExpires());
auto itr = db->setexpire->find(key);
serverAssert(itr != db->setexpire->end());
serverAssert(db->FKeyExpires((const char*)key));
valNew->SetFExpires(true);
valOld->SetFExpires(false);
@ -70,8 +68,9 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew)
* implementations that should instead rely on lookupKeyRead(),
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
static robj *lookupKey(redisDb *db, robj *key, int flags) {
robj *val = db->find(key);
if (val) {
auto itr = db->find(key);
if (itr) {
robj *val = itr.val();
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
@ -193,13 +192,13 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
return o;
}
int dbAddCore(redisDb *db, robj *key, robj *val) {
bool dbAddCore(redisDb *db, robj *key, robj *val) {
serverAssert(!val->FExpires());
sds copy = sdsdup(szFromObj(key));
int retval = dictAdd(db->pdict, copy, val);
bool fInserted = db->insert(copy, val);
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
if (retval == DICT_OK)
if (fInserted)
{
if (val->type == OBJ_LIST ||
val->type == OBJ_ZSET)
@ -211,7 +210,7 @@ int dbAddCore(redisDb *db, robj *key, robj *val) {
sdsfree(copy);
}
return retval;
return fInserted;
}
/* Add the key to the DB. It's up to the caller to increment the reference
@ -220,23 +219,22 @@ int dbAddCore(redisDb *db, robj *key, robj *val) {
* The program is aborted if the key already exists. */
void dbAdd(redisDb *db, robj *key, robj *val)
{
int retval = dbAddCore(db, key, val);
serverAssertWithInfo(NULL,key,retval == DICT_OK);
bool fInserted = dbAddCore(db, key, val);
serverAssertWithInfo(NULL,key,fInserted);
}
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire)
void redisDb::dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire)
{
dictEntry auxentry = *de;
robj *old = (robj*)dictGetVal(de);
robj *old = itr.val();
if (old->FExpires()) {
if (fRemoveExpire) {
removeExpire(db, key);
removeExpire(this, key);
}
else {
if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
val = dupStringObject(val);
updateExpire(db, (sds)dictGetKey(de), old, val);
updateExpire(this, itr.key(), old, val);
}
}
@ -249,14 +247,12 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd
val->mvcc_tstamp = getMvccTstamp();
}
dictSetVal(db->pdict, de, val);
if (g_pserver->lazyfree_lazy_server_del)
freeObjAsync(itr.val());
else
decrRefCount(itr.val());
if (g_pserver->lazyfree_lazy_server_del) {
freeObjAsync(old);
dictSetVal(db->pdict, &auxentry, NULL);
}
dictFreeVal(db->pdict, &auxentry);
m_persistentData.updateValue(itr, val);
}
/* Overwrite an existing key with a new value. Incrementing the reference
@ -265,10 +261,10 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd
*
* The program is aborted if the key was not already present. */
void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
auto itr = db->find(key);
serverAssertWithInfo(NULL,key,de != NULL);
dbOverwriteCore(db, de, key, val, !!g_pserver->fActiveReplica, false);
serverAssertWithInfo(NULL,key,itr != nullptr);
db->dbOverwriteCore(itr, key, val, !!g_pserver->fActiveReplica, false);
}
/* Insert a key, handling duplicate keys according to fReplace */
@ -276,14 +272,14 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
{
if (fReplace)
{
dictEntry *de = dictFind(db->pdict, ptrFromObj(key));
if (de == nullptr)
return (dbAddCore(db, key, val) == DICT_OK);
auto itr = db->find(key);
if (itr == nullptr)
return (dbAddCore(db, key, val) == true);
robj *old = (robj*)dictGetVal(de);
robj *old = itr.val();
if (old->mvcc_tstamp <= val->mvcc_tstamp)
{
dbOverwriteCore(db, de, key, val, false, true);
db->dbOverwriteCore(itr, key, val, false, true);
return true;
}
@ -291,7 +287,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
}
else
{
return (dbAddCore(db, key, val) == DICT_OK);
return (dbAddCore(db, key, val) == true);
}
}
@ -304,11 +300,11 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
*
* All the new keys in the database should be created via this interface. */
void setKey(redisDb *db, robj *key, robj *val) {
dictEntry *de = dictFind(db->pdict, ptrFromObj(key));
if (de == NULL) {
auto itr = db->find(key);
if (itr == NULL) {
dbAdd(db,key,val);
} else {
dbOverwriteCore(db,de,key,val,!!g_pserver->fActiveReplica,true);
db->dbOverwriteCore(itr,key,val,!!g_pserver->fActiveReplica,true);
}
incrRefCount(val);
signalModifiedKey(db,key);
@ -324,19 +320,19 @@ int dbExists(redisDb *db, robj *key) {
* The function makes sure to return keys not already expired. */
robj *dbRandomKey(redisDb *db) {
int maxtries = 100;
bool allvolatile = db->size() == db->setexpire->size();
bool allvolatile = db->expireSize() == db->size();
while(1) {
sds key;
robj *keyobj;
auto pair = db->random();
if (pair.first == NULL) return NULL;
auto itr = db->random();
if (itr == nullptr) return NULL;
key = (sds)pair.first;
key = itr.key();
keyobj = createStringObject(key,sdslen(key));
if (pair.second->FExpires())
if (itr.val()->FExpires())
{
if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
@ -351,7 +347,7 @@ robj *dbRandomKey(redisDb *db) {
}
}
if (pair.second->FExpires())
if (itr.val()->FExpires())
{
if (expireIfNeeded(db,keyobj)) {
decrRefCount(keyobj);
@ -363,15 +359,16 @@ robj *dbRandomKey(redisDb *db) {
}
}
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
bool redisDbPersistentData::syncDelete(robj *key)
{
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
dictEntry *de = dictFind(db->pdict, szFromObj(key));
if (de != nullptr && ((robj*)dictGetVal(de))->FExpires())
removeExpireCore(db, key, de);
if (dictDelete(db->pdict,ptrFromObj(key)) == DICT_OK) {
auto itr = find(szFromObj(key));
trackkey(szFromObj(key));
if (itr != nullptr && itr.val()->FExpires())
removeExpire(key, itr);
if (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) {
if (g_pserver->cluster_enabled) slotToKeyDel(key);
return 1;
} else {
@ -379,6 +376,11 @@ int dbSyncDelete(redisDb *db, robj *key) {
}
}
/* Delete a key, value, and associated expiration entry if any, from the DB */
int dbSyncDelete(redisDb *db, robj *key) {
return db->m_persistentData.syncDelete(key);
}
/* This is a wrapper whose behavior depends on the Redis lazy free
* configuration. Deletes the key synchronously or asynchronously. */
int dbDelete(redisDb *db, robj *key) {
@ -456,15 +458,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) {
}
for (int j = startdb; j <= enddb; j++) {
removed += dictSize(g_pserver->db[j].pdict);
if (async) {
emptyDbAsync(&g_pserver->db[j]);
} else {
dictEmpty(g_pserver->db[j].pdict,callback);
delete g_pserver->db[j].setexpire;
g_pserver->db[j].setexpire = new (MALLOC_LOCAL) expireset();
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
}
removed += g_pserver->db[j].clear(!!async, callback);
}
if (g_pserver->cluster_enabled) {
if (async) {
@ -633,9 +627,9 @@ void randomkeyCommand(client *c) {
}
bool redisDb::iterate(std::function<bool(const char*, robj*)> fn)
bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> &fn)
{
dictIterator *di = dictGetSafeIterator(pdict);
dictIterator *di = dictGetSafeIterator(m_pdict);
dictEntry *de = nullptr;
bool fResult = true;
while((de = dictNext(di)) != nullptr)
@ -806,7 +800,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
/* Handle the case of a hash table. */
ht = NULL;
if (o == nullptr) {
ht = c->db->pdict;
ht = c->db->dictUnsafe();
} else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) {
ht = (dict*)ptrFromObj(o);
} else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) {
@ -1159,17 +1153,14 @@ int dbSwapDatabases(int id1, int id2) {
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since we want clients to
* remain in the same DB they were. */
db1->pdict = db2->pdict;
db1->setexpire = db2->setexpire;
db1->expireitr = db2->expireitr;
redisDbPersistentData::swap(&db1->m_persistentData, &db2->m_persistentData);
db1->avg_ttl = db2->avg_ttl;
db1->last_expire_set = db2->last_expire_set;
db1->expireitr = db2->expireitr;
db2->pdict = aux.pdict;
db2->setexpire = aux.setexpire;
db2->expireitr = aux.expireitr;
db2->avg_ttl = aux.avg_ttl;
db2->last_expire_set = aux.last_expire_set;
db2->expireitr = aux.expireitr;
/* Now we need to handle clients blocked on lists: as an effect
* of swapping the two DBs, a client that was waiting for list
@ -1218,23 +1209,24 @@ void swapdbCommand(client *c) {
* Expires API
*----------------------------------------------------------------------------*/
int removeExpire(redisDb *db, robj *key) {
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
return removeExpireCore(db, key, de);
auto itr = db->find(key);
return db->m_persistentData.removeExpire(key, itr);
}
int removeExpireCore(redisDb *db, robj *key, dictEntry *de) {
int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) {
/* An expire may only be removed if there is a corresponding entry in the
* main dict. Otherwise, the key will never be freed. */
serverAssertWithInfo(NULL,key,de != NULL);
serverAssertWithInfo(NULL,key,itr != nullptr);
robj *val = (robj*)dictGetVal(de);
robj *val = itr.val();
if (!val->FExpires())
return 0;
auto itr = db->setexpire->find((sds)dictGetKey(de));
serverAssert(itr != db->setexpire->end());
serverAssert(itr->key() == (sds)dictGetKey(de));
db->setexpire->erase(itr);
auto itrExpire = m_setexpire->find(itr.key());
serverAssert(itrExpire != m_setexpire->end());
serverAssert(itrExpire->key() == itr.key());
m_setexpire->erase(itrExpire);
val->SetFExpires(false);
trackkey(key);
return 1;
}
@ -1243,49 +1235,23 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de) {
* to NULL. The 'when' parameter is the absolute unix time in milliseconds
* after which the key will no longer be considered valid. */
void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) {
dictEntry *kde;
serverAssert(GlobalLocksAcquired());
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,kde != NULL);
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
{
// shared objects cannot have the expire bit set, create a real object
dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde)));
}
/* Update TTL stats (exponential moving average) */
/* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */
long long now = g_pserver->mstime;
db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed
if (db->setexpire->empty())
if (db->expireSize() == 0)
db->avg_ttl = 0;
else
db->avg_ttl -= db->avg_ttl / db->setexpire->size(); // slide one entry out the window
db->avg_ttl -= db->avg_ttl / db->expireSize(); // slide one entry out the window
if (db->avg_ttl < 0)
db->avg_ttl = 0; // TTLs are never negative
db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry
db->avg_ttl += (double)(when-now) / (db->expireSize()+1); // add the new entry
db->last_expire_set = now;
/* Update the expire set */
const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr;
if (((robj*)dictGetVal(kde))->FExpires()) {
auto itr = db->setexpire->find((sds)dictGetKey(kde));
serverAssert(itr != db->setexpire->end());
expireEntry eNew(std::move(*itr));
eNew.update(szSubKey, when);
db->setexpire->erase(itr);
db->setexpire->insert(eNew);
}
else
{
expireEntry e((sds)dictGetKey(kde), szSubKey, when);
((robj*)dictGetVal(kde))->SetFExpires(true);
db->setexpire->insert(e);
}
db->m_persistentData.setExpire(key, subkey, when);
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0;
if (c && writable_slave && !(c->flags & CLIENT_MASTER))
@ -1294,26 +1260,24 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when)
void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
{
dictEntry *kde;
serverAssert(GlobalLocksAcquired());
/* Reuse the sds from the main dict in the expire dict */
kde = dictFind(db->pdict,ptrFromObj(key));
auto kde = db->find(key);
serverAssertWithInfo(NULL,key,kde != NULL);
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
if (kde.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
{
// shared objects cannot have the expire bit set, create a real object
dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde)));
db->m_persistentData.updateValue(kde, dupStringObject(kde.val()));
}
if (((robj*)dictGetVal(kde))->FExpires())
if (kde.val()->FExpires())
removeExpire(db, key);
e.setKeyUnsafe((sds)dictGetKey(kde));
db->setexpire->insert(e);
((robj*)dictGetVal(kde))->SetFExpires(true);
e.setKeyUnsafe(kde.key());
db->m_persistentData.setExpire(std::move(e));
kde.val()->SetFExpires(true);
int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0;
@ -1323,19 +1287,24 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
/* Return the expire time of the specified key, or null if no expire
* is associated with this key (i.e. the key is non volatile) */
expireEntry *getExpire(redisDb *db, robj_roptr key) {
expireEntry *redisDb::getExpire(robj_roptr key) {
/* No expire? return ASAP */
if (db->setexpire->size() == 0)
if (expireSize() == 0)
return nullptr;
auto pair = db->lookup_tuple(key);
if (pair.first == nullptr)
auto itr = find(key);
if (itr == nullptr)
return nullptr;
if (!pair.second->FExpires())
if (!itr.val()->FExpires())
return nullptr;
auto itr = db->setexpire->find(pair.first);
return itr.operator->();
auto itrExpire = m_persistentData.findExpire(itr.key());
return itrExpire.operator->();
}
expireEntry *getExpire(redisDb *db, robj_roptr key)
{
return db->getExpire(key);
}
/* Propagate expires into slaves and the AOF file.
@ -1798,11 +1767,18 @@ unsigned int countKeysInSlot(unsigned int hashslot) {
return g_pserver->cluster->slots_keys_count[hashslot];
}
void redisDbPersistentData::initialize()
{
m_pdict = dictCreate(&dbDictType,NULL);
m_setexpire = new(MALLOC_LOCAL) expireset();
m_fAllChanged = false;
m_fTrackingChanges = false;
}
void redisDb::initialize(int id)
{
this->pdict = dictCreate(&dbDictType,NULL);
this->setexpire = new(MALLOC_LOCAL) expireset();
this->expireitr = this->setexpire->end();
m_persistentData.initialize();
this->expireitr = m_persistentData.setexpireUnsafe()->end();
this->blocking_keys = dictCreate(&keylistDictType,NULL);
this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
this->watched_keys = dictCreate(&keylistDictType,NULL);
@ -1811,3 +1787,96 @@ void redisDb::initialize(int id)
this->last_expire_set = 0;
this->defrag_later = listCreate();
}
bool redisDbPersistentData::insert(char *key, robj *o)
{
int res = dictAdd(m_pdict, key, o);
if (res == DICT_OK)
trackkey(key);
return (res == DICT_OK);
}
void redisDbPersistentData::tryResize()
{
if (htNeedsResize(m_pdict))
dictResize(m_pdict);
}
size_t redisDb::clear(bool fAsync, void(callback)(void*))
{
size_t removed = m_persistentData.size();
if (fAsync) {
m_persistentData.emptyDbAsync();
} else {
m_persistentData.clear(callback);
}
expireitr = m_persistentData.setexpireUnsafe()->end();
return removed;
}
void redisDbPersistentData::clear(void(callback)(void*))
{
dictEmpty(m_pdict,callback);
if (m_fTrackingChanges)
m_fAllChanged = true;
delete m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset();
}
/* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2)
{
redisDbPersistentData aux = *db1;
db1->m_pdict = db2->m_pdict;
db1->m_fTrackingChanges = db2->m_fTrackingChanges;
db1->m_fAllChanged = db2->m_fAllChanged;
db1->m_setexpire = db2->m_setexpire;
db2->m_pdict = aux.m_pdict;
db2->m_fTrackingChanges = aux.m_fTrackingChanges;
db2->m_fAllChanged = aux.m_fAllChanged;
db2->m_setexpire = aux.m_setexpire;
}
void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
{
/* Reuse the sds from the main dict in the expire dict */
dictEntry *kde = dictFind(m_pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,kde != NULL);
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
{
// shared objects cannot have the expire bit set, create a real object
dictSetVal(m_pdict, kde, dupStringObject((robj*)dictGetVal(kde)));
}
const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr;
if (((robj*)dictGetVal(kde))->FExpires()) {
auto itr = m_setexpire->find((sds)dictGetKey(kde));
serverAssert(itr != m_setexpire->end());
expireEntry eNew(std::move(*itr));
eNew.update(szSubKey, when);
m_setexpire->erase(itr);
m_setexpire->insert(eNew);
}
else
{
expireEntry e((sds)dictGetKey(kde), szSubKey, when);
((robj*)dictGetVal(kde))->SetFExpires(true);
m_setexpire->insert(e);
}
}
void redisDbPersistentData::setExpire(expireEntry &&e)
{
m_setexpire->insert(e);
}
bool redisDb::FKeyExpires(const char *key)
{
return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpireUnsafe()->end();
}
void redisDbPersistentData::updateValue(dict_iter itr, robj *val)
{
dictSetVal(m_pdict, itr.de, val);
}

View File

@ -439,9 +439,9 @@ NULL
strenc, rdbSavedObjectLen(val),
val->lru, estimateObjectIdleTime(val)/1000, extra);
} else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) {
auto pair = c->db->lookup_tuple(c->argv[2]);
robj *val = pair.second;
const char *key = pair.first;
auto itr = c->db->find(c->argv[2]);
robj *val = itr.val();
const char *key = itr.key();
if (val == NULL) {
addReply(c,shared.nokeyerr);
@ -636,7 +636,7 @@ NULL
stats = sdscat(stats,buf);
stats = sdscatprintf(stats,"[Expires set]\n");
g_pserver->db[dbid].setexpire->getstats(buf, sizeof(buf));
g_pserver->db[dbid].getExpireStats(buf, sizeof(buf));
stats = sdscat(stats, buf);
addReplyBulkSds(c,stats);

View File

@ -781,8 +781,8 @@ long defragKey(redisDb *db, dictEntry *de) {
newsds = activeDefragSds(keysds);
if (newsds)
defragged++, de->key = newsds;
if (!db->setexpire->empty()) {
replaceSateliteOSetKeyPtr(*db->setexpire, keysds, newsds);
if (!db->setexpire()->empty()) {
replaceSateliteOSetKeyPtr(*const_cast<expireset*>(db->setexpire()), keysds, newsds);
}
/* Try to defrag robj and / or string value. */
@ -1111,7 +1111,7 @@ void activeDefragCycle(void) {
break; /* this will exit the function and we'll continue on the next cycle */
}
cursor = dictScan(db->pdict, cursor, defragScanCallback, defragDictBucketCallback, db);
cursor = dictScan(db->dictUnsafe(), cursor, defragScanCallback, defragDictBucketCallback, db);
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
* (if we have a lot of pointers in one hash bucket or rehasing),

View File

@ -256,13 +256,13 @@ void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct ev
{
if (setexpire != nullptr)
{
visitFunctor visitor { dbid, db->pdict, pool, 0 };
visitFunctor visitor { dbid, db->m_persistentData.dictUnsafe(), pool, 0 };
setexpire->random_visit(visitor);
}
else
{
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
int count = dictGetSomeKeys(db->pdict,samples,g_pserver->maxmemory_samples);
int count = dictGetSomeKeys(db->m_persistentData.dictUnsafe(),samples,g_pserver->maxmemory_samples);
for (int j = 0; j < count; j++) {
robj *o = (robj*)dictGetVal(samples[j]);
processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool);
@ -511,9 +511,9 @@ int freeMemoryIfNeeded(void) {
}
else
{
keys = db->setexpire->size();
keys = db->expireSize();
if (keys != 0)
evictionPoolPopulate(i, db, db->setexpire, pool);
evictionPoolPopulate(i, db, db->m_persistentData.setexpireUnsafe(), pool);
total_keys += keys;
}
}
@ -525,9 +525,9 @@ int freeMemoryIfNeeded(void) {
bestdbid = pool[k].dbid;
sds key = nullptr;
auto pair = g_pserver->db[pool[k].dbid].lookup_tuple(pool[k].key);
if (pair.first != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || pair.second->FExpires()))
key = (sds)pair.first;
auto itr = g_pserver->db[pool[k].dbid].find(pool[k].key);
if (itr != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || itr.val()->FExpires()))
key = itr.key();
/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
@ -560,17 +560,17 @@ int freeMemoryIfNeeded(void) {
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
{
if (db->size() != 0) {
auto pair = db->random();
bestkey = (sds)pair.first;
auto itr = db->random();
bestkey = itr.key();
bestdbid = j;
break;
}
}
else
{
if (!db->setexpire->empty())
if (db->expireSize())
{
bestkey = (sds)db->setexpire->random_value().key();
bestkey = (sds)db->random_expire().key();
bestdbid = j;
break;
}

View File

@ -248,7 +248,7 @@ void activeExpireCycle(int type) {
now = mstime();
/* If there is nothing to expire try next DB ASAP. */
if (db->setexpire->empty())
if (db->m_persistentData.setexpireUnsafe()->empty())
{
db->avg_ttl = 0;
db->last_expire_set = now;
@ -258,7 +258,7 @@ void activeExpireCycle(int type) {
size_t expired = 0;
size_t tried = 0;
long long check = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; // assume a check is roughly 1us. It isn't but good enough
db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) {
db->expireitr = db->m_persistentData.setexpireUnsafe()->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) {
if (e.when() < now)
{
activeExpireCycleExpire(db, e, now);
@ -358,16 +358,16 @@ void expireSlaveKeys(void) {
redisDb *db = g_pserver->db+dbid;
// the expire is hashed based on the key pointer, so we need the point in the main db
auto pairMain = db->lookup_tuple(keyname);
auto itr = db->setexpire->end();
if (pairMain.first != nullptr)
itr = db->setexpire->find((sds)pairMain.first);
auto itrDB = db->find(keyname);
auto itrExpire = db->m_persistentData.setexpireUnsafe()->end();
if (itrDB != nullptr)
itrExpire = db->m_persistentData.setexpireUnsafe()->find(itrDB.key());
int expired = 0;
if (itr != db->setexpire->end())
if (itrExpire != db->m_persistentData.setexpireUnsafe()->end())
{
if (itr->when() < start) {
activeExpireCycleExpire(g_pserver->db+dbid,*itr,start);
if (itrExpire->when() < start) {
activeExpireCycleExpire(g_pserver->db+dbid,*itrExpire,start);
expired = 1;
}
}
@ -376,7 +376,7 @@ void expireSlaveKeys(void) {
* corresponding bit in the new bitmap we set as value.
* At the end of the loop if the bitmap is zero, it means we
* no longer need to keep track of this key. */
if (itr != db->setexpire->end() && !expired) {
if (itrExpire != db->m_persistentData.setexpireUnsafe()->end() && !expired) {
noexpire++;
new_dbids |= (uint64_t)1 << dbid;
}

View File

@ -51,18 +51,18 @@ size_t lazyfreeGetFreeEffort(robj *obj) {
* a lazy free list instead of being freed synchronously. The lazy free list
* will be reclaimed in a different bio.c thread. */
#define LAZYFREE_THRESHOLD 64
int dbAsyncDelete(redisDb *db, robj *key) {
bool redisDbPersistentData::asyncDelete(robj *key) {
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
dictEntry *de = dictUnlink(db->pdict,ptrFromObj(key));
dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key));
if (de) {
robj *val = (robj*)dictGetVal(de);
if (val->FExpires())
{
/* Deleting an entry from the expires dict will not free the sds of
* the key, because it is shared with the main dictionary. */
removeExpireCore(db,key,de);
removeExpire(key,dict_iter(de));
}
size_t free_effort = lazyfreeGetFreeEffort(val);
@ -78,21 +78,25 @@ int dbAsyncDelete(redisDb *db, robj *key) {
if (free_effort > LAZYFREE_THRESHOLD && val->getrefcount(std::memory_order_relaxed) == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL);
dictSetVal(db->pdict,de,NULL);
dictSetVal(m_pdict,de,NULL);
}
}
/* Release the key-val pair, or just the key if we set the val
* field to NULL in order to lazy free it later. */
if (de) {
dictFreeUnlinkedEntry(db->pdict,de);
dictFreeUnlinkedEntry(m_pdict,de);
if (g_pserver->cluster_enabled) slotToKeyDel(key);
return 1;
return true;
} else {
return 0;
return false;
}
}
int dbAsyncDelete(redisDb *db, robj *key) {
return db->m_persistentData.asyncDelete(key);
}
/* Free an object, if the object is huge enough, free it in async way. */
void freeObjAsync(robj *o) {
size_t free_effort = lazyfreeGetFreeEffort(o);
@ -107,12 +111,13 @@ void freeObjAsync(robj *o) {
/* Empty a Redis DB asynchronously. What the function does actually is to
* create a new empty set of hash tables and scheduling the old ones for
* lazy freeing. */
void emptyDbAsync(redisDb *db) {
dict *oldht1 = db->pdict;
auto *set = db->setexpire;
db->setexpire = new (MALLOC_LOCAL) expireset();
db->expireitr = db->setexpire->end();
db->pdict = dictCreate(&dbDictType,NULL);
void redisDbPersistentData::emptyDbAsync() {
dict *oldht1 = m_pdict;
auto *set = m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset();
m_pdict = dictCreate(&dbDictType,NULL);
if (m_fTrackingChanges)
m_fAllChanged = true;
atomicIncr(lazyfree_objects,dictSize(oldht1));
bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,set);
}

View File

@ -1067,7 +1067,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
mh->db[mh->num_dbs].overhead_ht_main = mem;
mem_total+=mem;
mem = db->setexpire->bytes_used();
mem = db->setexpire()->bytes_used();
mh->db[mh->num_dbs].overhead_ht_expires = mem;
mem_total+=mem;
@ -1339,13 +1339,13 @@ NULL
}
}
auto pair = c->db->lookup_tuple(c->argv[2]);
if (pair.first == NULL) {
auto itr = c->db->find(c->argv[2]);
if (itr == nullptr) {
addReplyNull(c, shared.nullbulk);
return;
}
size_t usage = objectComputeSize(pair.second,samples);
usage += sdsAllocSize((sds)pair.first);
size_t usage = objectComputeSize(itr.val(),samples);
usage += sdsAllocSize(itr.key());
usage += sizeof(dictEntry);
addReplyLongLong(c,usage);
} else if (!strcasecmp(szFromObj(c->argv[1]),"stats") && c->argc == 2) {

View File

@ -1170,7 +1170,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
* these sizes are just hints to resize the hash tables. */
uint64_t db_size, expires_size;
db_size = db->size();
expires_size = db->setexpire->size();
expires_size = db->expireSize();
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
@ -1187,7 +1187,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
});
if (!fSavedAll)
goto werr;
serverAssert(ckeysExpired == db->setexpire->size());
serverAssert(ckeysExpired == db->expireSize());
}
/* If we are storing the replication information on disk, persist

View File

@ -1431,8 +1431,7 @@ int htNeedsResize(dict *dict) {
/* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL
* we resize the hash table to save memory */
void tryResizeHashTables(int dbid) {
if (htNeedsResize(g_pserver->db[dbid].pdict))
dictResize(g_pserver->db[dbid].pdict);
g_pserver->db[dbid].tryResize();
}
/* Our hash table implementation performs rehashing incrementally while
@ -1442,10 +1441,10 @@ void tryResizeHashTables(int dbid) {
*
* The function returns 1 if some rehashing was performed, otherwise 0
* is returned. */
int incrementallyRehash(int dbid) {
int redisDbPersistentData::incrementallyRehash() {
/* Keys dictionary */
if (dictIsRehashing(g_pserver->db[dbid].pdict)) {
dictRehashMilliseconds(g_pserver->db[dbid].pdict,1);
if (dictIsRehashing(m_pdict)) {
dictRehashMilliseconds(m_pdict,1);
return 1; /* already used our millisecond for this loop... */
}
return 0;
@ -1726,7 +1725,7 @@ void databasesCron(void) {
/* Rehash */
if (g_pserver->activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
int work_done = g_pserver->db[rehash_db].incrementallyRehash();
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
@ -1887,7 +1886,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
size = g_pserver->db[j].slots();
used = g_pserver->db[j].size();
vkeys = g_pserver->db[j].setexpire->size();
vkeys = g_pserver->db[j].expireSize();
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
/* dictPrintStats(g_pserver->dict); */
@ -4567,7 +4566,7 @@ sds genRedisInfoString(const char *section) {
long long keys, vkeys;
keys = g_pserver->db[j].size();
vkeys = g_pserver->db[j].setexpire->size();
vkeys = g_pserver->db[j].expireSize();
// Adjust TTL by the current time
g_pserver->db[j].avg_ttl -= (g_pserver->mstime - g_pserver->db[j].last_expire_set);

View File

@ -54,6 +54,7 @@
#include <vector>
#include <algorithm>
#include <memory>
#include <set>
#ifdef __cplusplus
extern "C" {
#include <lua.h>
@ -1015,22 +1016,123 @@ typedef struct clientReplyBlock {
#endif
} clientReplyBlock;
struct dictEntry;
class dict_const_iter
{
friend class redisDb;
friend class redisDbPersistentData;
protected:
dictEntry *de;
public:
explicit dict_const_iter(dictEntry *de)
: de(de)
{}
const char *key() const { return de ? (const char*)dictGetKey(de) : nullptr; }
robj_roptr val() const { return de ? (robj*)dictGetVal(de) : nullptr; }
const robj* operator->() const { return de ? (robj*)dictGetVal(de) : nullptr; }
operator robj_roptr() const { return de ? (robj*)dictGetVal(de) : nullptr; }
bool operator==(std::nullptr_t) const { return de == nullptr; }
bool operator!=(std::nullptr_t) const { return de != nullptr; }
bool operator==(const dict_const_iter &other) { return de == other.de; }
};
class dict_iter : public dict_const_iter
{
public:
explicit dict_iter(dictEntry *de)
: dict_const_iter(de)
{}
sds key() { return de ? (sds)dictGetKey(de) : nullptr; }
robj *val() { return de ? (robj*)dictGetVal(de) : nullptr; }
robj *operator->() { return de ? (robj*)dictGetVal(de) : nullptr; }
operator robj*() const { return de ? (robj*)dictGetVal(de) : nullptr; }
};
class redisDbPersistentData
{
public:
static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2);
size_t slots() const { return dictSlots(m_pdict); }
size_t size() const { return dictSize(m_pdict); }
void expand(uint64_t slots) { dictExpand(m_pdict, slots); }
void trackkey(robj_roptr o)
{
trackkey(szFromObj(o));
}
void trackkey(const char *key)
{
if (m_fTrackingChanges)
m_setchanged.insert(key);
}
dict_iter find(const char *key)
{
dictEntry *de = dictFind(m_pdict, key);
return dict_iter(de);
}
dict_iter random()
{
dictEntry *de = dictGetRandomKey(m_pdict);
return dict_iter(de);
}
const expireEntry &random_expire()
{
return m_setexpire->random_value();
}
auto findExpire(const char *key)
{
return m_setexpire->find(key);
}
void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); }
void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); }
bool insert(char *k, robj *o);
void tryResize();
int incrementallyRehash();
void updateValue(dict_iter itr, robj *val);
bool syncDelete(robj *key);
bool asyncDelete(robj *key);
size_t expireSize() const { return m_setexpire->size(); }
int removeExpire(robj *key, dict_iter itr);
void clear(void(callback)(void*));
void emptyDbAsync();
bool iterate(std::function<bool(const char*, robj*)> &fn);
void setExpire(robj *key, robj *subkey, long long when);
void setExpire(expireEntry &&e);
void initialize();
dict *dictUnsafe() { return m_pdict; }
expireset *setexpireUnsafe() { return m_setexpire; }
const expireset *setexpire() { return m_setexpire; }
private:
// Keyspace
dict *m_pdict; /* The keyspace for this DB */
bool m_fTrackingChanges = false;
bool m_fAllChanged = false;
std::set<std::string> m_setchanged;
// Expire
expireset *m_setexpire;
};
/* Redis database representation. There are multiple databases identified
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
// Legacy C API, Do not add more
friend void tryResizeHashTables(int);
friend int incrementallyRehash(int);
friend int dbAddCore(redisDb *db, robj *key, robj *val);
friend void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
friend void dbOverwrite(redisDb *db, robj *key, robj *val);
friend int dbMerge(redisDb *db, robj *key, robj *val, int fReplace);
friend void setKey(redisDb *db, robj *key, robj *val);
friend int dbSyncDelete(redisDb *db, robj *key);
friend int dbAsyncDelete(redisDb *db, robj *key);
friend long long emptyDb(int dbnum, int flags, void(callback)(void*));
friend void emptyDbAsync(redisDb *db);
friend void scanGenericCommand(struct client *c, robj_roptr o, unsigned long cursor);
friend int dbSwapDatabases(int id1, int id2);
friend int removeExpire(redisDb *db, robj *key);
@ -1038,58 +1140,64 @@ typedef struct redisDb {
friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e);
friend void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool);
friend void activeDefragCycle(void);
friend int freeMemoryIfNeeded(void);
friend void activeExpireCycle(int);
friend void expireSlaveKeys(void);
redisDb()
typedef ::dict_const_iter const_iter;
typedef ::dict_iter iter;
redisDb()
: expireitr(nullptr)
{};
{}
void initialize(int id);
size_t slots() const { return dictSlots(pdict); }
size_t size() const { return dictSize(pdict); }
void expand(uint64_t slots) { dictExpand(pdict, slots); }
size_t slots() const { return m_persistentData.slots(); }
size_t size() const { return m_persistentData.size(); }
size_t expireSize() const { return m_persistentData.expireSize(); }
void expand(uint64_t slots) { m_persistentData.expand(slots); }
void tryResize() { m_persistentData.tryResize(); }
const expireset *setexpire() { return m_persistentData.setexpire(); }
robj *find(robj_roptr key)
iter find(robj_roptr key)
{
return find(szFromObj(key));
}
robj *find(const char *key)
iter find(const char *key)
{
dictEntry *de = dictFind(pdict, key);
if (de != nullptr)
return (robj*)dictGetVal(de);
return nullptr;
return m_persistentData.find(key);
}
std::pair<const char*,robj*> lookup_tuple(robj_roptr key)
iter random()
{
return lookup_tuple(szFromObj(key));
}
std::pair<const char*,robj*> lookup_tuple(const char *key)
{
dictEntry *de = dictFind(pdict, key);
if (de != nullptr)
return std::make_pair<const char*,robj*>((const char*)dictGetKey(de), (robj*)dictGetVal(de));
return std::make_pair<const char*,robj*>(nullptr, nullptr);
return m_persistentData.random();
}
std::pair<const char*,robj*> random()
const expireEntry &random_expire()
{
dictEntry *de = dictGetRandomKey(pdict);
if (de != nullptr)
return std::make_pair<const char*,robj*>((const char*)dictGetKey(de), (robj*)dictGetVal(de));
return std::make_pair<const char*,robj*>(nullptr, nullptr);
return m_persistentData.random_expire();
}
bool iterate(std::function<bool(const char*, robj*)> fn);
void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, pdict); }
const_iter end() { return const_iter(nullptr); }
bool iterate(std::function<bool(const char*, robj*)> fn) { return m_persistentData.iterate(fn); }
void getStats(char *buf, size_t bufsize) { m_persistentData.getStats(buf, bufsize); }
void getExpireStats(char *buf, size_t bufsize) { m_persistentData.getExpireStats(buf, bufsize); }
bool insert(char *key, robj *o) { return m_persistentData.insert(key, o); }
void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
int incrementallyRehash() { return m_persistentData.incrementallyRehash(); };
bool FKeyExpires(const char *key);
size_t clear(bool fAsync, void(callback)(void*));
dict *dictUnsafe() { return m_persistentData.dictUnsafe(); }
expireEntry *getExpire(robj_roptr key);
private:
dict *pdict; /* The keyspace for this DB */
redisDbPersistentData m_persistentData;
public:
expireset *setexpire;
expireset::setiter expireitr;
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
dict *ready_keys; /* Blocked keys that received a PUSH */
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
@ -2476,7 +2584,6 @@ int rewriteConfig(char *path);
/* db.c -- Keyspace access API */
int removeExpire(redisDb *db, robj *key);
int removeExpireCore(redisDb *db, robj *key, dictEntry *de);
void propagateExpire(redisDb *db, robj *key, int lazy);
int expireIfNeeded(redisDb *db, robj *key);
expireEntry *getExpire(redisDb *db, robj_roptr key);
@ -2521,7 +2628,6 @@ void slotToKeyAdd(robj *key);
void slotToKeyDel(robj *key);
void slotToKeyFlush(void);
int dbAsyncDelete(redisDb *db, robj *key);
void emptyDbAsync(redisDb *db);
void slotToKeyFlushAsync(void);
size_t lazyfreeGetPendingObjectsCount(void);
void freeObjAsync(robj *o);