Hide the database dict

Former-commit-id: 065846a9c32aeb55901c194d824828fb70805350
This commit is contained in:
John Sully 2019-09-19 17:44:42 -04:00
parent 4669d4d0d0
commit ceaaf39c96
12 changed files with 246 additions and 188 deletions

View File

@ -1300,29 +1300,23 @@ ssize_t aofReadDiffFromParent(void) {
int rewriteAppendOnlyFileRio(rio *aof) {
dictIterator *di = NULL;
dictEntry *de;
size_t processed = 0;
int j;
for (j = 0; j < cserver.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
redisDb *db = g_pserver->db+j;
dict *d = db->pdict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
if (db->size() == 0) continue;
/* SELECT the new DB */
if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr;
if (rioWriteBulkLongLong(aof,j) == 0) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr;
robj key, *o;
bool fComplete = db->iterate([&](const char *keystr, robj *o)->bool{
robj key;
keystr = (sds)dictGetKey(de);
o = (robj*)dictGetVal(de);
initStaticStringObject(key,keystr);
initStaticStringObject(key,(sds)keystr);
expireEntry *pexpire = getExpire(db,&key);
@ -1330,22 +1324,22 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (o->type == OBJ_STRING) {
/* Emit a SET command */
char cmd[]="*3\r\n$3\r\nSET\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) return false;
/* Key and value */
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWriteBulkObject(aof,o) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) return false;
if (rioWriteBulkObject(aof,o) == 0) return false;
} else if (o->type == OBJ_LIST) {
if (rewriteListObject(aof,&key,o) == 0) goto werr;
if (rewriteListObject(aof,&key,o) == 0) return false;
} else if (o->type == OBJ_SET) {
if (rewriteSetObject(aof,&key,o) == 0) goto werr;
if (rewriteSetObject(aof,&key,o) == 0) return false;
} else if (o->type == OBJ_ZSET) {
if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr;
if (rewriteSortedSetObject(aof,&key,o) == 0) return false;
} else if (o->type == OBJ_HASH) {
if (rewriteHashObject(aof,&key,o) == 0) goto werr;
if (rewriteHashObject(aof,&key,o) == 0) return false;
} else if (o->type == OBJ_STREAM) {
if (rewriteStreamObject(aof,&key,o) == 0) goto werr;
if (rewriteStreamObject(aof,&key,o) == 0) return false;
} else if (o->type == OBJ_MODULE) {
if (rewriteModuleObject(aof,&key,o) == 0) goto werr;
if (rewriteModuleObject(aof,&key,o) == 0) return false;
} else {
serverPanic("Unknown object type");
}
@ -1355,17 +1349,17 @@ int rewriteAppendOnlyFileRio(rio *aof) {
if (subExpire.subkey() == nullptr)
{
char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) return false;
if (rioWriteBulkObject(aof,&key) == 0) return false;
}
else
{
char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n";
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr;
if (rioWriteBulkObject(aof,&key) == 0) goto werr;
if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr;
if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) return false;
if (rioWriteBulkObject(aof,&key) == 0) return false;
if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) return false;
}
if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) goto werr; // common
if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) return false; // common
}
}
/* Read some diff from the parent process from time to time. */
@ -1373,9 +1367,10 @@ int rewriteAppendOnlyFileRio(rio *aof) {
processed = aof->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL;
return true;
});
if (!fComplete)
goto werr;
}
return C_OK;

View File

@ -3910,7 +3910,7 @@ int verifyClusterConfigWithData(void) {
/* Make sure we only have keys in DB0. */
for (j = 1; j < cserver.dbnum; j++) {
if (dictSize(g_pserver->db[j].pdict)) return C_ERR;
if (g_pserver->db[j].size()) return C_ERR;
}
/* Check that all the slots we see populated memory have a corresponding
@ -4286,7 +4286,7 @@ NULL
clusterReplyMultiBulkSlots(c);
} else if (!strcasecmp(szFromObj(c->argv[1]),"flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */
if (dictSize(g_pserver->db[0].pdict) != 0) {
if (g_pserver->db[0].size() != 0) {
addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS.");
return;
}
@ -4621,7 +4621,7 @@ NULL
* slots nor keys to accept to replicate some other node.
* Slaves can switch to another master without issues. */
if (nodeIsMaster(myself) &&
(myself->numslots != 0 || dictSize(g_pserver->db[0].pdict) != 0)) {
(myself->numslots != 0 || g_pserver->db[0].size() != 0)) {
addReplyError(c,
"To set a master the node must be empty and "
"without assigned slots.");
@ -4778,7 +4778,7 @@ NULL
/* Slaves can be reset while containing data, but not master nodes
* that must be empty. */
if (nodeIsMaster(myself) && dictSize(c->db->pdict) != 0) {
if (nodeIsMaster(myself) && c->db->size() != 0) {
addReplyError(c,"CLUSTER RESET can't be called with "
"master nodes containing keys");
return;

View File

@ -70,10 +70,8 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew)
* implementations that should instead rely on lookupKeyRead(),
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
static robj *lookupKey(redisDb *db, robj *key, int flags) {
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
if (de) {
robj *val = (robj*)dictGetVal(de);
robj *val = db->find(key);
if (val) {
/* Update the access time for the ageing algorithm.
* Don't do it if we have a saving child, as this will trigger
* a copy on write madness. */
@ -317,7 +315,7 @@ void setKey(redisDb *db, robj *key, robj *val) {
}
int dbExists(redisDb *db, robj *key) {
return dictFind(db->pdict,ptrFromObj(key)) != NULL;
return (db->find(key) != nullptr);
}
/* Return a random key, in form of a Redis object.
@ -325,21 +323,20 @@ int dbExists(redisDb *db, robj *key) {
*
* The function makes sure to return keys not already expired. */
robj *dbRandomKey(redisDb *db) {
dictEntry *de;
int maxtries = 100;
int allvolatile = dictSize(db->pdict) == db->setexpire->size();
bool allvolatile = db->size() == db->setexpire->size();
while(1) {
sds key;
robj *keyobj;
de = dictGetRandomKey(db->pdict);
if (de == NULL) return NULL;
auto pair = db->random();
if (pair.first == NULL) return NULL;
key = (sds)dictGetKey(de);
key = (sds)pair.first;
keyobj = createStringObject(key,sdslen(key));
if (((robj*)dictGetVal(de))->FExpires())
if (pair.second->FExpires())
{
if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) {
/* If the DB is composed only of keys with an expire set,
@ -354,7 +351,7 @@ robj *dbRandomKey(redisDb *db) {
}
}
if (((robj*)dictGetVal(de))->FExpires())
if (pair.second->FExpires())
{
if (expireIfNeeded(db,keyobj)) {
decrRefCount(keyobj);
@ -427,7 +424,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) {
return o;
}
/* Remove all keys from all the databases in a Redis g_pserver->
/* Remove all keys from all the databases in a Redis DB
* If callback is given the function is called from time to time to
* signal that work is in progress.
*
@ -635,9 +632,25 @@ void randomkeyCommand(client *c) {
decrRefCount(key);
}
bool redisDb::iterate(std::function<bool(const char*, robj*)> fn)
{
dictIterator *di = dictGetSafeIterator(pdict);
dictEntry *de = nullptr;
bool fResult = true;
while((de = dictNext(di)) != nullptr)
{
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
{
fResult = false;
break;
}
}
dictReleaseIterator(di);
return fResult;
}
void keysCommand(client *c) {
dictIterator *di;
dictEntry *de;
sds pattern = szFromObj(c->argv[1]);
int plen = sdslen(pattern), allkeys;
unsigned long numkeys = 0;
@ -645,10 +658,8 @@ void keysCommand(client *c) {
aeReleaseLock();
di = dictGetSafeIterator(c->db->pdict);
allkeys = (pattern[0] == '*' && pattern[1] == '\0');
while((de = dictNext(di)) != NULL) {
sds key = (sds)dictGetKey(de);
c->db->iterate([&](const char *key, robj *)->bool {
robj *keyobj;
if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) {
@ -659,8 +670,8 @@ void keysCommand(client *c) {
}
decrRefCount(keyobj);
}
}
dictReleaseIterator(di);
return true;
});
setDeferredArrayLen(c,replylen,numkeys);
fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks
@ -928,7 +939,7 @@ void scanCommand(client *c) {
}
void dbsizeCommand(client *c) {
addReplyLongLong(c,dictSize(c->db->pdict));
addReplyLongLong(c,c->db->size());
}
void lastsaveCommand(client *c) {
@ -1313,20 +1324,17 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e)
/* Return the expire time of the specified key, or null if no expire
* is associated with this key (i.e. the key is non volatile) */
expireEntry *getExpire(redisDb *db, robj_roptr key) {
dictEntry *de;
/* No expire? return ASAP */
if (db->setexpire->size() == 0)
return nullptr;
de = dictFind(db->pdict, ptrFromObj(key));
if (de == NULL)
auto pair = db->lookup_tuple(key);
if (pair.first == nullptr)
return nullptr;
robj *obj = (robj*)dictGetVal(de);
if (!obj->FExpires())
if (!pair.second->FExpires())
return nullptr;
auto itr = db->setexpire->find((sds)dictGetKey(de));
auto itr = db->setexpire->find(pair.first);
return itr.operator->();
}
@ -1789,3 +1797,17 @@ unsigned int delKeysInSlot(unsigned int hashslot) {
unsigned int countKeysInSlot(unsigned int hashslot) {
return g_pserver->cluster->slots_keys_count[hashslot];
}
void redisDb::initialize(int id)
{
this->pdict = dictCreate(&dbDictType,NULL);
this->setexpire = new(MALLOC_LOCAL) expireset();
this->expireitr = this->setexpire->end();
this->blocking_keys = dictCreate(&keylistDictType,NULL);
this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
this->watched_keys = dictCreate(&keylistDictType,NULL);
this->id = id;
this->avg_ttl = 0;
this->last_expire_set = 0;
this->defrag_later = listCreate();
}

View File

@ -266,8 +266,6 @@ void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj
* a different digest. */
void computeDatasetDigest(unsigned char *final) {
unsigned char digest[20];
dictIterator *di = NULL;
dictEntry *de;
int j;
uint32_t aux;
@ -276,8 +274,7 @@ void computeDatasetDigest(unsigned char *final) {
for (j = 0; j < cserver.dbnum; j++) {
redisDb *db = g_pserver->db+j;
if (dictSize(db->pdict) == 0) continue;
di = dictGetSafeIterator(db->pdict);
if (db->size() == 0) continue;
/* hash the DB id, so the same dataset moved in a different
* DB will lead to a different digest */
@ -285,24 +282,21 @@ void computeDatasetDigest(unsigned char *final) {
mixDigest(final,&aux,sizeof(aux));
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds key;
robj *keyobj, *o;
db->iterate([&](const char *key, robj *o)->bool {
robj *keyobj;
memset(digest,0,20); /* This key-val digest */
key = (sds)dictGetKey(de);
keyobj = createStringObject(key,sdslen(key));
mixDigest(digest,key,sdslen(key));
o = (robj*)dictGetVal(de);
xorObjectDigest(db,keyobj,digest,o);
/* We can finally xor the key-val digest to the final digest */
xorDigest(final,digest,20);
decrRefCount(keyobj);
}
dictReleaseIterator(di);
return true;
});
}
}
@ -394,15 +388,14 @@ NULL
serverLog(LL_WARNING,"Append Only File loaded by DEBUG LOADAOF");
addReply(c,shared.ok);
} else if (!strcasecmp(szFromObj(c->argv[1]),"object") && c->argc == 3) {
dictEntry *de;
robj *val;
const char *strenc;
if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) {
val = c->db->find(c->argv[2]);
if (val == NULL) {
addReply(c,shared.nokeyerr);
return;
}
val = (robj*)dictGetVal(de);
strenc = strEncoding(val->encoding);
char extra[138] = {0};
@ -446,16 +439,14 @@ NULL
strenc, rdbSavedObjectLen(val),
val->lru, estimateObjectIdleTime(val)/1000, extra);
} else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) {
dictEntry *de;
robj *val;
sds key;
auto pair = c->db->lookup_tuple(c->argv[2]);
robj *val = pair.second;
const char *key = pair.first;
if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) {
if (val == NULL) {
addReply(c,shared.nokeyerr);
return;
}
val = (robj*)dictGetVal(de);
key = (sds)dictGetKey(de);
if (val->type != OBJ_STRING || !sdsEncodedObject(val)) {
addReplyError(c,"Not an sds encoded string.");
@ -465,16 +456,16 @@ NULL
"val_sds_len:%lld, val_sds_avail:%lld, val_zmalloc: %lld",
(long long) sdslen(key),
(long long) sdsavail(key),
(long long) sdsZmallocSize(key),
(long long) sdsZmallocSize((sds)key),
(long long) sdslen(szFromObj(val)),
(long long) sdsavail(szFromObj(val)),
(long long) getStringObjectSdsUsedMemory(val));
}
} else if (!strcasecmp(szFromObj(c->argv[1]),"ziplist") && c->argc == 3) {
robj *o;
robj_roptr o;
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr))
== NULL) return;
== nullptr) return;
if (o->encoding != OBJ_ENCODING_ZIPLIST) {
addReplyError(c,"Not an sds encoded string.");
@ -490,7 +481,7 @@ NULL
if (getLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != C_OK)
return;
dictExpand(c->db->pdict,keys);
c->db->expand(keys);
for (j = 0; j < keys; j++) {
long valsize = 0;
snprintf(buf,sizeof(buf),"%s:%lu",
@ -641,7 +632,7 @@ NULL
}
stats = sdscatprintf(stats,"[Dictionary HT]\n");
dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].pdict);
g_pserver->db[dbid].getStats(buf,sizeof(buf));
stats = sdscat(stats,buf);
stats = sdscatprintf(stats,"[Expires set]\n");
@ -650,11 +641,11 @@ NULL
addReplyBulkSds(c,stats);
} else if (!strcasecmp(szFromObj(c->argv[1]),"htstats-key") && c->argc == 3) {
robj *o;
robj_roptr o;
dict *ht = NULL;
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr))
== NULL) return;
== nullptr) return;
/* Get the hash table reference from the object, if possible. */
switch (o->encoding) {
@ -1254,12 +1245,10 @@ void logCurrentClient(void) {
* selected DB, and if so print info about the associated object. */
if (cc->argc >= 1) {
robj *val, *key;
dictEntry *de;
key = getDecodedObject(cc->argv[1]);
de = dictFind(cc->db->pdict, ptrFromObj(key));
if (de) {
val = (robj*)dictGetVal(de);
val = cc->db->find(key);
if (val) {
serverLog(LL_WARNING,"key '%s' found in DB containing the following object:", (char*)ptrFromObj(key));
serverLogObjectDebugInfo(val);
}

View File

@ -902,9 +902,8 @@ long defragOtherGlobals() {
/* returns 0 more work may or may not be needed (see non-zero cursor),
* and 1 if time is up and more work is needed. */
int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) {
if (de) {
robj *ob = (robj*)dictGetVal(de);
int defragLaterItem(robj *ob, unsigned long *cursor, long long endtime) {
if (ob) {
if (ob->type == OBJ_LIST) {
g_pserver->stat_active_defrag_hits += scanLaterList(ob);
*cursor = 0; /* list has no scan, we must finish it in one go */
@ -959,11 +958,11 @@ int defragLaterStep(redisDb *db, long long endtime) {
}
/* each time we enter this function we need to fetch the key from the dict again (if it still exists) */
dictEntry *de = dictFind(db->pdict, current_key);
robj *o = db->find(current_key);
key_defragged = g_pserver->stat_active_defrag_hits;
do {
int quit = 0;
if (defragLaterItem(de, &cursor, endtime))
if (defragLaterItem(o, &cursor, endtime))
quit = 1; /* time is up, we didn't finish all the work */
/* Don't start a new BIG key in this loop, this is because the

View File

@ -87,7 +87,7 @@ unsigned int LRU_CLOCK(void) {
/* Given an object returns the min number of milliseconds the object was never
* requested, using an approximated LRU algorithm. */
unsigned long long estimateObjectIdleTime(robj *o) {
unsigned long long estimateObjectIdleTime(robj_roptr o) {
unsigned long long lruclock = LRU_CLOCK();
if (lruclock >= o->lru) {
return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION;
@ -252,17 +252,17 @@ struct visitFunctor
return count < g_pserver->maxmemory_samples;
}
};
void evictionPoolPopulate(int dbid, dict *dbdict, expireset *setexpire, struct evictionPoolEntry *pool)
void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool)
{
if (setexpire != nullptr)
{
visitFunctor visitor { dbid, dbdict, pool, 0 };
visitFunctor visitor { dbid, db->pdict, pool, 0 };
setexpire->random_visit(visitor);
}
else
{
dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*));
int count = dictGetSomeKeys(dbdict,samples,g_pserver->maxmemory_samples);
int count = dictGetSomeKeys(db->pdict,samples,g_pserver->maxmemory_samples);
for (int j = 0; j < count; j++) {
robj *o = (robj*)dictGetVal(samples[j]);
processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool);
@ -504,8 +504,8 @@ int freeMemoryIfNeeded(void) {
db = g_pserver->db+i;
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS)
{
if ((keys = dictSize(db->pdict)) != 0) {
evictionPoolPopulate(i, db->pdict, nullptr, pool);
if ((keys = db->size()) != 0) {
evictionPoolPopulate(i, db, nullptr, pool);
total_keys += keys;
}
}
@ -513,7 +513,7 @@ int freeMemoryIfNeeded(void) {
{
keys = db->setexpire->size();
if (keys != 0)
evictionPoolPopulate(i, db->pdict, db->setexpire, pool);
evictionPoolPopulate(i, db, db->setexpire, pool);
total_keys += keys;
}
}
@ -525,9 +525,9 @@ int freeMemoryIfNeeded(void) {
bestdbid = pool[k].dbid;
sds key = nullptr;
dictEntry *de = dictFind(g_pserver->db[pool[k].dbid].pdict,pool[k].key);
if (de != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || ((robj*)dictGetVal(de))->FExpires()))
key = (sds)dictGetKey(de);
auto pair = g_pserver->db[pool[k].dbid].lookup_tuple(pool[k].key);
if (pair.first != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || pair.second->FExpires()))
key = (sds)pair.first;
/* Remove the entry from the pool. */
if (pool[k].key != pool[k].cached)
@ -559,9 +559,9 @@ int freeMemoryIfNeeded(void) {
db = g_pserver->db+j;
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
{
if (dictSize(db->pdict) != 0) {
dictEntry *de = dictGetRandomKey(db->pdict);
bestkey = (sds)dictGetKey(de);
if (db->size() != 0) {
auto pair = db->random();
bestkey = (sds)pair.first;
bestdbid = j;
break;
}
@ -581,16 +581,17 @@ int freeMemoryIfNeeded(void) {
/* Finally remove the selected key. */
if (bestkey) {
db = g_pserver->db+bestdbid;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction);
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
if (g_pserver->lazyfree_lazy_eviction)
@ -609,18 +610,18 @@ int freeMemoryIfNeeded(void) {
keys_freed++;
/* When the memory to free starts to be big enough, we may
* start spending so much time here that is impossible to
* deliver data to the slaves fast enough, so we force the
* transmission here inside the loop. */
* start spending so much time here that is impossible to
* deliver data to the slaves fast enough, so we force the
* transmission here inside the loop. */
if (slaves) flushSlavesOutputBuffers();
/* Normally our stop condition is the ability to release
* a fixed, pre-computed amount of memory. However when we
* are deleting objects in another thread, it's better to
* check, from time to time, if we already reached our target
* memory, since the "mem_freed" amount is computed only
* across the dbAsyncDelete() call, while the thread can
* release the memory all the time. */
* a fixed, pre-computed amount of memory. However when we
* are deleting objects in another thread, it's better to
* check, from time to time, if we already reached our target
* memory, since the "mem_freed" amount is computed only
* across the dbAsyncDelete() call, while the thread can
* release the memory all the time. */
if (g_pserver->lazyfree_lazy_eviction && !(keys_freed % 16)) {
if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) {
/* Let's satisfy our stop condition. */

View File

@ -74,8 +74,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
}
expireEntryFat *pfat = e.pfatentry();
dictEntry *de = dictFind(db->pdict, e.key());
robj *val = (robj*)dictGetVal(de);
robj *val = db->find(e.key());
int deleted = 0;
while (!pfat->FEmpty())
{
@ -140,14 +139,12 @@ void expireMemberCommand(client *c)
when += mstime();
/* No key, return zero. */
dictEntry *de = dictFind(c->db->pdict, szFromObj(c->argv[1]));
if (de == NULL) {
robj *val = c->db->find(c->argv[1]);
if (val == nullptr) {
addReply(c,shared.czero);
return;
}
robj *val = (robj*)dictGetVal(de);
switch (val->type)
{
case OBJ_SET:
@ -361,10 +358,10 @@ void expireSlaveKeys(void) {
redisDb *db = g_pserver->db+dbid;
// the expire is hashed based on the key pointer, so we need the point in the main db
dictEntry *deMain = dictFind(db->pdict, keyname);
auto pairMain = db->lookup_tuple(keyname);
auto itr = db->setexpire->end();
if (deMain != nullptr)
itr = db->setexpire->find((sds)dictGetKey(deMain));
if (pairMain.first != nullptr)
itr = db->setexpire->find((sds)pairMain.first);
int expired = 0;
if (itr != db->setexpire->end())

View File

@ -333,7 +333,7 @@ void touchWatchedKeysOnFlush(int dbid) {
* key exists, mark the client as dirty, as the key will be
* removed. */
if (dbid == -1 || wk->db->id == dbid) {
if (dictFind(wk->db->pdict, ptrFromObj(wk->key)) != NULL)
if (wk->db->find(wk->key) != NULL)
c->flags |= CLIENT_DIRTY_CAS;
}
}

View File

@ -789,7 +789,7 @@ size_t streamRadixTreeMemoryUsage(rax *rax) {
* case of aggregated data types where only "sample_size" elements
* are checked and averaged to estimate the total size. */
#define OBJ_COMPUTE_SIZE_DEF_SAMPLES 5 /* Default sample size. */
size_t objectComputeSize(robj *o, size_t sample_size) {
size_t objectComputeSize(robj_roptr o, size_t sample_size) {
sds ele, ele2;
dict *d;
dictIterator *di;
@ -800,7 +800,7 @@ size_t objectComputeSize(robj *o, size_t sample_size) {
if(o->encoding == OBJ_ENCODING_INT) {
asize = sizeof(*o);
} else if(o->encoding == OBJ_ENCODING_RAW) {
asize = sdsAllocSize(szFromObj(o))+sizeof(*o);
asize = sdsAllocSize((sds)szFromObj(o))+sizeof(*o);
} else if(o->encoding == OBJ_ENCODING_EMBSTR) {
asize = sdslen(szFromObj(o))+2+sizeof(*o);
} else {
@ -1054,16 +1054,16 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
for (j = 0; j < cserver.dbnum; j++) {
redisDb *db = g_pserver->db+j;
long long keyscount = dictSize(db->pdict);
long long keyscount = db->size();
if (keyscount==0) continue;
mh->total_keys += keyscount;
mh->db = (decltype(mh->db))zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1), MALLOC_LOCAL);
mh->db[mh->num_dbs].dbid = j;
mem = dictSize(db->pdict) * sizeof(dictEntry) +
dictSlots(db->pdict) * sizeof(dictEntry*) +
dictSize(db->pdict) * sizeof(robj);
mem = db->size() * sizeof(dictEntry) +
db->slots() * sizeof(dictEntry*) +
db->size() * sizeof(robj);
mh->db[mh->num_dbs].overhead_ht_main = mem;
mem_total+=mem;
@ -1246,15 +1246,12 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
/* This is a helper function for the OBJECT command. We need to lookup keys
* without any modification of LRU or other parameters. */
robj *objectCommandLookup(client *c, robj *key) {
dictEntry *de;
if ((de = dictFind(c->db->pdict,ptrFromObj(key))) == NULL) return NULL;
return (robj*) dictGetVal(de);
robj_roptr objectCommandLookup(client *c, robj *key) {
return c->db->find(key);
}
robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply) {
robj *o = objectCommandLookup(c,key);
robj_roptr objectCommandLookupOrReply(client *c, robj *key, robj *reply) {
robj_roptr o = objectCommandLookup(c,key);
if (!o) addReply(c, reply);
return o;
@ -1263,7 +1260,7 @@ robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply) {
/* Object command allows to inspect the internals of an Redis Object.
* Usage: OBJECT <refcount|encoding|idletime|freq> <key> */
void objectCommand(client *c) {
robj *o;
robj_roptr o;
if (c->argc == 2 && !strcasecmp(szFromObj(c->argv[1]),"help")) {
const char *help[] = {
@ -1276,15 +1273,15 @@ NULL
addReplyHelp(c, help);
} else if (!strcasecmp(szFromObj(c->argv[1]),"refcount") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
== nullptr) return;
addReplyLongLong(c,o->getrefcount(std::memory_order_relaxed));
} else if (!strcasecmp(szFromObj(c->argv[1]),"encoding") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
== nullptr) return;
addReplyBulkCString(c,strEncoding(o->encoding));
} else if (!strcasecmp(szFromObj(c->argv[1]),"idletime") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
== nullptr) return;
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
@ -1292,7 +1289,7 @@ NULL
addReplyLongLong(c,estimateObjectIdleTime(o)/1000);
} else if (!strcasecmp(szFromObj(c->argv[1]),"freq") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== NULL) return;
== nullptr) return;
if (!(g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU)) {
addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust.");
return;
@ -1301,7 +1298,7 @@ NULL
* in case of the key has not been accessed for a long time,
* because we update the access time only
* when the key is read or overwritten. */
addReplyLongLong(c,LFUDecrAndReturn(o));
addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast()));
} else {
addReplySubcommandSyntaxError(c);
}
@ -1323,7 +1320,6 @@ NULL
};
addReplyHelp(c, help);
} else if (!strcasecmp(szFromObj(c->argv[1]),"usage") && c->argc >= 3) {
dictEntry *de;
long long samples = OBJ_COMPUTE_SIZE_DEF_SAMPLES;
for (int j = 3; j < c->argc; j++) {
if (!strcasecmp(szFromObj(c->argv[j]),"samples") &&
@ -1342,12 +1338,14 @@ NULL
return;
}
}
if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) {
auto pair = c->db->lookup_tuple(c->argv[2]);
if (pair.first == NULL) {
addReplyNull(c, shared.nullbulk);
return;
}
size_t usage = objectComputeSize((robj*)dictGetVal(de),samples);
usage += sdsAllocSize((sds)dictGetKey(de));
size_t usage = objectComputeSize(pair.second,samples);
usage += sdsAllocSize((sds)pair.first);
usage += sizeof(dictEntry);
addReplyLongLong(c,usage);
} else if (!strcasecmp(szFromObj(c->argv[1]),"stats") && c->argc == 2) {

View File

@ -1143,8 +1143,8 @@ int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *key
* integer pointed by 'error' is set to the value of errno just after the I/O
* error. */
int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
dictIterator *di = NULL;
char magic[10];
int j;
uint64_t cksum;
@ -1158,9 +1158,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
for (j = 0; j < cserver.dbnum; j++) {
redisDb *db = g_pserver->db+j;
dict *d = db->pdict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
if (db->size() == 0) continue;
/* Write the SELECT DB opcode */
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
@ -1171,7 +1169,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
* However this does not limit the actual size of the DB to load since
* these sizes are just hints to resize the hash tables. */
uint64_t db_size, expires_size;
db_size = dictSize(db->pdict);
db_size = db->size();
expires_size = db->setexpire->size();
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
@ -1179,19 +1177,17 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) {
/* Iterate this DB writing every entry */
size_t ckeysExpired = 0;
while((de = dictNext(di)) != NULL) {
sds keystr = (sds)dictGetKey(de);
robj *o = (robj*)dictGetVal(de);
bool fSavedAll = db->iterate([&](const char *keystr, robj *o)->bool{
if (o->FExpires())
++ckeysExpired;
if (!saveKey(rdb, db, flags, &processed, keystr, o))
goto werr;
}
return false;
return true;
});
if (!fSavedAll)
goto werr;
serverAssert(ckeysExpired == db->setexpire->size());
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
/* If we are storing the replication information on disk, persist
@ -1998,7 +1994,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
goto eoferr;
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr;
dictExpand(db->pdict,db_size);
db->expand(db_size);
continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB

View File

@ -1885,8 +1885,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
for (j = 0; j < cserver.dbnum; j++) {
long long size, used, vkeys;
size = dictSlots(g_pserver->db[j].pdict);
used = dictSize(g_pserver->db[j].pdict);
size = g_pserver->db[j].slots();
used = g_pserver->db[j].size();
vkeys = g_pserver->db[j].setexpire->size();
if (used || vkeys) {
serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size);
@ -2924,16 +2924,7 @@ void initServer(void) {
/* Create the Redis databases, and initialize other internal state. */
for (int j = 0; j < cserver.dbnum; j++) {
new (&g_pserver->db[j]) redisDb;
g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL);
g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset();
g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end();
g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL);
g_pserver->db[j].id = j;
g_pserver->db[j].avg_ttl = 0;
g_pserver->db[j].last_expire_set = 0;
g_pserver->db[j].defrag_later = listCreate();
g_pserver->db[j].initialize(j);
}
/* Fixup Master Client Database */
@ -4575,7 +4566,7 @@ sds genRedisInfoString(const char *section) {
for (j = 0; j < cserver.dbnum; j++) {
long long keys, vkeys;
keys = dictSize(g_pserver->db[j].pdict);
keys = g_pserver->db[j].size();
vkeys = g_pserver->db[j].setexpire->size();
// Adjust TTL by the current time

View File

@ -129,6 +129,11 @@ public:
return m_ptr;
}
const redisObject& operator*() const
{
return *m_ptr;
}
bool operator!() const
{
return !m_ptr;
@ -1014,10 +1019,74 @@ typedef struct clientReplyBlock {
* by integers from 0 (the default database) up to the max configured
* database. The database number is the 'id' field in the structure. */
typedef struct redisDb {
// Legacy C API, Do not add more
friend void tryResizeHashTables(int);
friend int incrementallyRehash(int);
friend int dbAddCore(redisDb *db, robj *key, robj *val);
friend void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire);
friend void dbOverwrite(redisDb *db, robj *key, robj *val);
friend int dbMerge(redisDb *db, robj *key, robj *val, int fReplace);
friend void setKey(redisDb *db, robj *key, robj *val);
friend int dbSyncDelete(redisDb *db, robj *key);
friend int dbAsyncDelete(redisDb *db, robj *key);
friend long long emptyDb(int dbnum, int flags, void(callback)(void*));
friend void emptyDbAsync(redisDb *db);
friend void scanGenericCommand(struct client *c, robj_roptr o, unsigned long cursor);
friend int dbSwapDatabases(int id1, int id2);
friend int removeExpire(redisDb *db, robj *key);
friend void setExpire(struct client *c, redisDb *db, robj *key, robj *subkey, long long when);
friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e);
friend void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool);
friend void activeDefragCycle(void);
redisDb()
: expireitr(nullptr)
{};
void initialize(int id);
size_t slots() const { return dictSlots(pdict); }
size_t size() const { return dictSize(pdict); }
void expand(uint64_t slots) { dictExpand(pdict, slots); }
robj *find(robj_roptr key)
{
return find(szFromObj(key));
}
robj *find(const char *key)
{
dictEntry *de = dictFind(pdict, key);
if (de != nullptr)
return (robj*)dictGetVal(de);
return nullptr;
}
std::pair<const char*,robj*> lookup_tuple(robj_roptr key)
{
return lookup_tuple(szFromObj(key));
}
std::pair<const char*,robj*> lookup_tuple(const char *key)
{
dictEntry *de = dictFind(pdict, key);
if (de != nullptr)
return std::make_pair<const char*,robj*>((const char*)dictGetKey(de), (robj*)dictGetVal(de));
return std::make_pair<const char*,robj*>(nullptr, nullptr);
}
std::pair<const char*,robj*> random()
{
dictEntry *de = dictGetRandomKey(pdict);
if (de != nullptr)
return std::make_pair<const char*,robj*>((const char*)dictGetKey(de), (robj*)dictGetVal(de));
return std::make_pair<const char*,robj*>(nullptr, nullptr);
}
bool iterate(std::function<bool(const char*, robj*)> fn);
void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, pdict); }
private:
dict *pdict; /* The keyspace for this DB */
public:
expireset *setexpire;
expireset::setiter expireitr;
@ -1907,6 +1976,7 @@ extern dictType dbDictType;
extern dictType shaScriptObjectDictType;
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
extern dictType hashDictType;
extern dictType keylistDictType;
extern dictType replScriptCacheDictType;
extern dictType keyptrDictType;
extern dictType modulesDictType;
@ -2135,7 +2205,7 @@ const char *strEncoding(int encoding);
int compareStringObjects(robj *a, robj *b);
int collateStringObjects(robj *a, robj *b);
int equalStringObjects(robj *a, robj *b);
unsigned long long estimateObjectIdleTime(robj *o);
unsigned long long estimateObjectIdleTime(robj_roptr o);
void trimStringObjectIfNeeded(robj *o);
#define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR)
@ -2417,8 +2487,8 @@ robj *lookupKeyWrite(redisDb *db, robj *key);
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply);
robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply);
robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags);
robj *objectCommandLookup(client *c, robj *key);
robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply);
robj_roptr objectCommandLookup(client *c, robj *key);
robj_roptr objectCommandLookupOrReply(client *c, robj *key, robj *reply);
void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
long long lru_clock);
#define LOOKUP_NONE 0