diff --git a/src/aof.cpp b/src/aof.cpp index 637b2ce34..2907ee52d 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1300,29 +1300,23 @@ ssize_t aofReadDiffFromParent(void) { int rewriteAppendOnlyFileRio(rio *aof) { dictIterator *di = NULL; - dictEntry *de; size_t processed = 0; int j; for (j = 0; j < cserver.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = g_pserver->db+j; - dict *d = db->pdict; - if (dictSize(d) == 0) continue; - di = dictGetSafeIterator(d); + if (db->size() == 0) continue; /* SELECT the new DB */ if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ - while((de = dictNext(di)) != NULL) { - sds keystr; - robj key, *o; + bool fComplete = db->iterate([&](const char *keystr, robj *o)->bool{ + robj key; - keystr = (sds)dictGetKey(de); - o = (robj*)dictGetVal(de); - initStaticStringObject(key,keystr); + initStaticStringObject(key,(sds)keystr); expireEntry *pexpire = getExpire(db,&key); @@ -1330,22 +1324,22 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (o->type == OBJ_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) return false; /* Key and value */ - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWriteBulkObject(aof,o) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) return false; + if (rioWriteBulkObject(aof,o) == 0) return false; } else if (o->type == OBJ_LIST) { - if (rewriteListObject(aof,&key,o) == 0) goto werr; + if (rewriteListObject(aof,&key,o) == 0) return false; } else if (o->type == OBJ_SET) { - if (rewriteSetObject(aof,&key,o) == 0) goto werr; + if (rewriteSetObject(aof,&key,o) == 0) return false; } else if (o->type == OBJ_ZSET) { - if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; + if (rewriteSortedSetObject(aof,&key,o) == 0) return false; } else if (o->type == OBJ_HASH) { - if (rewriteHashObject(aof,&key,o) == 0) goto werr; + if (rewriteHashObject(aof,&key,o) == 0) return false; } else if (o->type == OBJ_STREAM) { - if (rewriteStreamObject(aof,&key,o) == 0) goto werr; + if (rewriteStreamObject(aof,&key,o) == 0) return false; } else if (o->type == OBJ_MODULE) { - if (rewriteModuleObject(aof,&key,o) == 0) goto werr; + if (rewriteModuleObject(aof,&key,o) == 0) return false; } else { serverPanic("Unknown object type"); } @@ -1355,17 +1349,17 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (subExpire.subkey() == nullptr) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) return false; + if (rioWriteBulkObject(aof,&key) == 0) return false; } else { char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) return false; + if (rioWriteBulkObject(aof,&key) == 0) return false; + if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) return false; } - if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) goto werr; // common + if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) return false; // common } } /* Read some diff from the parent process from time to time. */ @@ -1373,9 +1367,10 @@ int rewriteAppendOnlyFileRio(rio *aof) { processed = aof->processed_bytes; aofReadDiffFromParent(); } - } - dictReleaseIterator(di); - di = NULL; + return true; + }); + if (!fComplete) + goto werr; } return C_OK; diff --git a/src/cluster.cpp b/src/cluster.cpp index 619ce3b3a..c58cdeb5f 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -3910,7 +3910,7 @@ int verifyClusterConfigWithData(void) { /* Make sure we only have keys in DB0. */ for (j = 1; j < cserver.dbnum; j++) { - if (dictSize(g_pserver->db[j].pdict)) return C_ERR; + if (g_pserver->db[j].size()) return C_ERR; } /* Check that all the slots we see populated memory have a corresponding @@ -4286,7 +4286,7 @@ NULL clusterReplyMultiBulkSlots(c); } else if (!strcasecmp(szFromObj(c->argv[1]),"flushslots") && c->argc == 2) { /* CLUSTER FLUSHSLOTS */ - if (dictSize(g_pserver->db[0].pdict) != 0) { + if (g_pserver->db[0].size() != 0) { addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS."); return; } @@ -4621,7 +4621,7 @@ NULL * slots nor keys to accept to replicate some other node. * Slaves can switch to another master without issues. */ if (nodeIsMaster(myself) && - (myself->numslots != 0 || dictSize(g_pserver->db[0].pdict) != 0)) { + (myself->numslots != 0 || g_pserver->db[0].size() != 0)) { addReplyError(c, "To set a master the node must be empty and " "without assigned slots."); @@ -4778,7 +4778,7 @@ NULL /* Slaves can be reset while containing data, but not master nodes * that must be empty. */ - if (nodeIsMaster(myself) && dictSize(c->db->pdict) != 0) { + if (nodeIsMaster(myself) && c->db->size() != 0) { addReplyError(c,"CLUSTER RESET can't be called with " "master nodes containing keys"); return; diff --git a/src/db.cpp b/src/db.cpp index b4ac46a2a..a87cedb29 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -70,10 +70,8 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew) * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ static robj *lookupKey(redisDb *db, robj *key, int flags) { - dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); - if (de) { - robj *val = (robj*)dictGetVal(de); - + robj *val = db->find(key); + if (val) { /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ @@ -317,7 +315,7 @@ void setKey(redisDb *db, robj *key, robj *val) { } int dbExists(redisDb *db, robj *key) { - return dictFind(db->pdict,ptrFromObj(key)) != NULL; + return (db->find(key) != nullptr); } /* Return a random key, in form of a Redis object. @@ -325,21 +323,20 @@ int dbExists(redisDb *db, robj *key) { * * The function makes sure to return keys not already expired. */ robj *dbRandomKey(redisDb *db) { - dictEntry *de; int maxtries = 100; - int allvolatile = dictSize(db->pdict) == db->setexpire->size(); + bool allvolatile = db->size() == db->setexpire->size(); while(1) { sds key; robj *keyobj; - de = dictGetRandomKey(db->pdict); - if (de == NULL) return NULL; + auto pair = db->random(); + if (pair.first == NULL) return NULL; - key = (sds)dictGetKey(de); + key = (sds)pair.first; keyobj = createStringObject(key,sdslen(key)); - if (((robj*)dictGetVal(de))->FExpires()) + if (pair.second->FExpires()) { if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, @@ -354,7 +351,7 @@ robj *dbRandomKey(redisDb *db) { } } - if (((robj*)dictGetVal(de))->FExpires()) + if (pair.second->FExpires()) { if (expireIfNeeded(db,keyobj)) { decrRefCount(keyobj); @@ -427,7 +424,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { return o; } -/* Remove all keys from all the databases in a Redis g_pserver-> +/* Remove all keys from all the databases in a Redis DB * If callback is given the function is called from time to time to * signal that work is in progress. * @@ -635,9 +632,25 @@ void randomkeyCommand(client *c) { decrRefCount(key); } + +bool redisDb::iterate(std::function fn) +{ + dictIterator *di = dictGetSafeIterator(pdict); + dictEntry *de = nullptr; + bool fResult = true; + while((de = dictNext(di)) != nullptr) + { + if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de))) + { + fResult = false; + break; + } + } + dictReleaseIterator(di); + return fResult; +} + void keysCommand(client *c) { - dictIterator *di; - dictEntry *de; sds pattern = szFromObj(c->argv[1]); int plen = sdslen(pattern), allkeys; unsigned long numkeys = 0; @@ -645,10 +658,8 @@ void keysCommand(client *c) { aeReleaseLock(); - di = dictGetSafeIterator(c->db->pdict); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); - while((de = dictNext(di)) != NULL) { - sds key = (sds)dictGetKey(de); + c->db->iterate([&](const char *key, robj *)->bool { robj *keyobj; if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { @@ -659,8 +670,8 @@ void keysCommand(client *c) { } decrRefCount(keyobj); } - } - dictReleaseIterator(di); + return true; + }); setDeferredArrayLen(c,replylen,numkeys); fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks @@ -928,7 +939,7 @@ void scanCommand(client *c) { } void dbsizeCommand(client *c) { - addReplyLongLong(c,dictSize(c->db->pdict)); + addReplyLongLong(c,c->db->size()); } void lastsaveCommand(client *c) { @@ -1313,20 +1324,17 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) /* Return the expire time of the specified key, or null if no expire * is associated with this key (i.e. the key is non volatile) */ expireEntry *getExpire(redisDb *db, robj_roptr key) { - dictEntry *de; - /* No expire? return ASAP */ if (db->setexpire->size() == 0) return nullptr; - de = dictFind(db->pdict, ptrFromObj(key)); - if (de == NULL) + auto pair = db->lookup_tuple(key); + if (pair.first == nullptr) return nullptr; - robj *obj = (robj*)dictGetVal(de); - if (!obj->FExpires()) + if (!pair.second->FExpires()) return nullptr; - auto itr = db->setexpire->find((sds)dictGetKey(de)); + auto itr = db->setexpire->find(pair.first); return itr.operator->(); } @@ -1789,3 +1797,17 @@ unsigned int delKeysInSlot(unsigned int hashslot) { unsigned int countKeysInSlot(unsigned int hashslot) { return g_pserver->cluster->slots_keys_count[hashslot]; } + +void redisDb::initialize(int id) +{ + this->pdict = dictCreate(&dbDictType,NULL); + this->setexpire = new(MALLOC_LOCAL) expireset(); + this->expireitr = this->setexpire->end(); + this->blocking_keys = dictCreate(&keylistDictType,NULL); + this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); + this->watched_keys = dictCreate(&keylistDictType,NULL); + this->id = id; + this->avg_ttl = 0; + this->last_expire_set = 0; + this->defrag_later = listCreate(); +} diff --git a/src/debug.cpp b/src/debug.cpp index 3246f9d19..3310e114c 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -266,8 +266,6 @@ void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj * a different digest. */ void computeDatasetDigest(unsigned char *final) { unsigned char digest[20]; - dictIterator *di = NULL; - dictEntry *de; int j; uint32_t aux; @@ -276,8 +274,7 @@ void computeDatasetDigest(unsigned char *final) { for (j = 0; j < cserver.dbnum; j++) { redisDb *db = g_pserver->db+j; - if (dictSize(db->pdict) == 0) continue; - di = dictGetSafeIterator(db->pdict); + if (db->size() == 0) continue; /* hash the DB id, so the same dataset moved in a different * DB will lead to a different digest */ @@ -285,24 +282,21 @@ void computeDatasetDigest(unsigned char *final) { mixDigest(final,&aux,sizeof(aux)); /* Iterate this DB writing every entry */ - while((de = dictNext(di)) != NULL) { - sds key; - robj *keyobj, *o; + db->iterate([&](const char *key, robj *o)->bool { + robj *keyobj; memset(digest,0,20); /* This key-val digest */ - key = (sds)dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); mixDigest(digest,key,sdslen(key)); - o = (robj*)dictGetVal(de); xorObjectDigest(db,keyobj,digest,o); /* We can finally xor the key-val digest to the final digest */ xorDigest(final,digest,20); decrRefCount(keyobj); - } - dictReleaseIterator(di); + return true; + }); } } @@ -394,15 +388,14 @@ NULL serverLog(LL_WARNING,"Append Only File loaded by DEBUG LOADAOF"); addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"object") && c->argc == 3) { - dictEntry *de; robj *val; const char *strenc; - if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) { + val = c->db->find(c->argv[2]); + if (val == NULL) { addReply(c,shared.nokeyerr); return; } - val = (robj*)dictGetVal(de); strenc = strEncoding(val->encoding); char extra[138] = {0}; @@ -446,16 +439,14 @@ NULL strenc, rdbSavedObjectLen(val), val->lru, estimateObjectIdleTime(val)/1000, extra); } else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) { - dictEntry *de; - robj *val; - sds key; + auto pair = c->db->lookup_tuple(c->argv[2]); + robj *val = pair.second; + const char *key = pair.first; - if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) { + if (val == NULL) { addReply(c,shared.nokeyerr); return; } - val = (robj*)dictGetVal(de); - key = (sds)dictGetKey(de); if (val->type != OBJ_STRING || !sdsEncodedObject(val)) { addReplyError(c,"Not an sds encoded string."); @@ -465,16 +456,16 @@ NULL "val_sds_len:%lld, val_sds_avail:%lld, val_zmalloc: %lld", (long long) sdslen(key), (long long) sdsavail(key), - (long long) sdsZmallocSize(key), + (long long) sdsZmallocSize((sds)key), (long long) sdslen(szFromObj(val)), (long long) sdsavail(szFromObj(val)), (long long) getStringObjectSdsUsedMemory(val)); } } else if (!strcasecmp(szFromObj(c->argv[1]),"ziplist") && c->argc == 3) { - robj *o; + robj_roptr o; if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr)) - == NULL) return; + == nullptr) return; if (o->encoding != OBJ_ENCODING_ZIPLIST) { addReplyError(c,"Not an sds encoded string."); @@ -490,7 +481,7 @@ NULL if (getLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != C_OK) return; - dictExpand(c->db->pdict,keys); + c->db->expand(keys); for (j = 0; j < keys; j++) { long valsize = 0; snprintf(buf,sizeof(buf),"%s:%lu", @@ -641,7 +632,7 @@ NULL } stats = sdscatprintf(stats,"[Dictionary HT]\n"); - dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].pdict); + g_pserver->db[dbid].getStats(buf,sizeof(buf)); stats = sdscat(stats,buf); stats = sdscatprintf(stats,"[Expires set]\n"); @@ -650,11 +641,11 @@ NULL addReplyBulkSds(c,stats); } else if (!strcasecmp(szFromObj(c->argv[1]),"htstats-key") && c->argc == 3) { - robj *o; + robj_roptr o; dict *ht = NULL; if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr)) - == NULL) return; + == nullptr) return; /* Get the hash table reference from the object, if possible. */ switch (o->encoding) { @@ -1254,12 +1245,10 @@ void logCurrentClient(void) { * selected DB, and if so print info about the associated object. */ if (cc->argc >= 1) { robj *val, *key; - dictEntry *de; key = getDecodedObject(cc->argv[1]); - de = dictFind(cc->db->pdict, ptrFromObj(key)); - if (de) { - val = (robj*)dictGetVal(de); + val = cc->db->find(key); + if (val) { serverLog(LL_WARNING,"key '%s' found in DB containing the following object:", (char*)ptrFromObj(key)); serverLogObjectDebugInfo(val); } diff --git a/src/defrag.cpp b/src/defrag.cpp index c49cd2665..59020e2d9 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -902,9 +902,8 @@ long defragOtherGlobals() { /* returns 0 more work may or may not be needed (see non-zero cursor), * and 1 if time is up and more work is needed. */ -int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { - if (de) { - robj *ob = (robj*)dictGetVal(de); +int defragLaterItem(robj *ob, unsigned long *cursor, long long endtime) { + if (ob) { if (ob->type == OBJ_LIST) { g_pserver->stat_active_defrag_hits += scanLaterList(ob); *cursor = 0; /* list has no scan, we must finish it in one go */ @@ -959,11 +958,11 @@ int defragLaterStep(redisDb *db, long long endtime) { } /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ - dictEntry *de = dictFind(db->pdict, current_key); + robj *o = db->find(current_key); key_defragged = g_pserver->stat_active_defrag_hits; do { int quit = 0; - if (defragLaterItem(de, &cursor, endtime)) + if (defragLaterItem(o, &cursor, endtime)) quit = 1; /* time is up, we didn't finish all the work */ /* Don't start a new BIG key in this loop, this is because the diff --git a/src/evict.cpp b/src/evict.cpp index 8cf24dd5e..23569597f 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -87,7 +87,7 @@ unsigned int LRU_CLOCK(void) { /* Given an object returns the min number of milliseconds the object was never * requested, using an approximated LRU algorithm. */ -unsigned long long estimateObjectIdleTime(robj *o) { +unsigned long long estimateObjectIdleTime(robj_roptr o) { unsigned long long lruclock = LRU_CLOCK(); if (lruclock >= o->lru) { return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION; @@ -252,17 +252,17 @@ struct visitFunctor return count < g_pserver->maxmemory_samples; } }; -void evictionPoolPopulate(int dbid, dict *dbdict, expireset *setexpire, struct evictionPoolEntry *pool) +void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool) { if (setexpire != nullptr) { - visitFunctor visitor { dbid, dbdict, pool, 0 }; + visitFunctor visitor { dbid, db->pdict, pool, 0 }; setexpire->random_visit(visitor); } else { dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); - int count = dictGetSomeKeys(dbdict,samples,g_pserver->maxmemory_samples); + int count = dictGetSomeKeys(db->pdict,samples,g_pserver->maxmemory_samples); for (int j = 0; j < count; j++) { robj *o = (robj*)dictGetVal(samples[j]); processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); @@ -504,8 +504,8 @@ int freeMemoryIfNeeded(void) { db = g_pserver->db+i; if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { - if ((keys = dictSize(db->pdict)) != 0) { - evictionPoolPopulate(i, db->pdict, nullptr, pool); + if ((keys = db->size()) != 0) { + evictionPoolPopulate(i, db, nullptr, pool); total_keys += keys; } } @@ -513,7 +513,7 @@ int freeMemoryIfNeeded(void) { { keys = db->setexpire->size(); if (keys != 0) - evictionPoolPopulate(i, db->pdict, db->setexpire, pool); + evictionPoolPopulate(i, db, db->setexpire, pool); total_keys += keys; } } @@ -525,9 +525,9 @@ int freeMemoryIfNeeded(void) { bestdbid = pool[k].dbid; sds key = nullptr; - dictEntry *de = dictFind(g_pserver->db[pool[k].dbid].pdict,pool[k].key); - if (de != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || ((robj*)dictGetVal(de))->FExpires())) - key = (sds)dictGetKey(de); + auto pair = g_pserver->db[pool[k].dbid].lookup_tuple(pool[k].key); + if (pair.first != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || pair.second->FExpires())) + key = (sds)pair.first; /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) @@ -559,9 +559,9 @@ int freeMemoryIfNeeded(void) { db = g_pserver->db+j; if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) { - if (dictSize(db->pdict) != 0) { - dictEntry *de = dictGetRandomKey(db->pdict); - bestkey = (sds)dictGetKey(de); + if (db->size() != 0) { + auto pair = db->random(); + bestkey = (sds)pair.first; bestdbid = j; break; } @@ -581,16 +581,17 @@ int freeMemoryIfNeeded(void) { /* Finally remove the selected key. */ if (bestkey) { db = g_pserver->db+bestdbid; + robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction); /* We compute the amount of memory freed by db*Delete() alone. - * It is possible that actually the memory needed to propagate - * the DEL in AOF and replication link is greater than the one - * we are freeing removing the key, but we can't account for - * that otherwise we would never exit the loop. - * - * AOF and Output buffer memory will be freed eventually so - * we only care about memory used by the key space. */ + * It is possible that actually the memory needed to propagate + * the DEL in AOF and replication link is greater than the one + * we are freeing removing the key, but we can't account for + * that otherwise we would never exit the loop. + * + * AOF and Output buffer memory will be freed eventually so + * we only care about memory used by the key space. */ delta = (long long) zmalloc_used_memory(); latencyStartMonitor(eviction_latency); if (g_pserver->lazyfree_lazy_eviction) @@ -609,18 +610,18 @@ int freeMemoryIfNeeded(void) { keys_freed++; /* When the memory to free starts to be big enough, we may - * start spending so much time here that is impossible to - * deliver data to the slaves fast enough, so we force the - * transmission here inside the loop. */ + * start spending so much time here that is impossible to + * deliver data to the slaves fast enough, so we force the + * transmission here inside the loop. */ if (slaves) flushSlavesOutputBuffers(); /* Normally our stop condition is the ability to release - * a fixed, pre-computed amount of memory. However when we - * are deleting objects in another thread, it's better to - * check, from time to time, if we already reached our target - * memory, since the "mem_freed" amount is computed only - * across the dbAsyncDelete() call, while the thread can - * release the memory all the time. */ + * a fixed, pre-computed amount of memory. However when we + * are deleting objects in another thread, it's better to + * check, from time to time, if we already reached our target + * memory, since the "mem_freed" amount is computed only + * across the dbAsyncDelete() call, while the thread can + * release the memory all the time. */ if (g_pserver->lazyfree_lazy_eviction && !(keys_freed % 16)) { if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) { /* Let's satisfy our stop condition. */ diff --git a/src/expire.cpp b/src/expire.cpp index ba0b99284..e77ba6c73 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -74,8 +74,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { } expireEntryFat *pfat = e.pfatentry(); - dictEntry *de = dictFind(db->pdict, e.key()); - robj *val = (robj*)dictGetVal(de); + robj *val = db->find(e.key()); int deleted = 0; while (!pfat->FEmpty()) { @@ -140,14 +139,12 @@ void expireMemberCommand(client *c) when += mstime(); /* No key, return zero. */ - dictEntry *de = dictFind(c->db->pdict, szFromObj(c->argv[1])); - if (de == NULL) { + robj *val = c->db->find(c->argv[1]); + if (val == nullptr) { addReply(c,shared.czero); return; } - robj *val = (robj*)dictGetVal(de); - switch (val->type) { case OBJ_SET: @@ -361,10 +358,10 @@ void expireSlaveKeys(void) { redisDb *db = g_pserver->db+dbid; // the expire is hashed based on the key pointer, so we need the point in the main db - dictEntry *deMain = dictFind(db->pdict, keyname); + auto pairMain = db->lookup_tuple(keyname); auto itr = db->setexpire->end(); - if (deMain != nullptr) - itr = db->setexpire->find((sds)dictGetKey(deMain)); + if (pairMain.first != nullptr) + itr = db->setexpire->find((sds)pairMain.first); int expired = 0; if (itr != db->setexpire->end()) diff --git a/src/multi.cpp b/src/multi.cpp index 3383f5a49..3beb973d7 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -333,7 +333,7 @@ void touchWatchedKeysOnFlush(int dbid) { * key exists, mark the client as dirty, as the key will be * removed. */ if (dbid == -1 || wk->db->id == dbid) { - if (dictFind(wk->db->pdict, ptrFromObj(wk->key)) != NULL) + if (wk->db->find(wk->key) != NULL) c->flags |= CLIENT_DIRTY_CAS; } } diff --git a/src/object.cpp b/src/object.cpp index ce6265ad1..a23ebf870 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -789,7 +789,7 @@ size_t streamRadixTreeMemoryUsage(rax *rax) { * case of aggregated data types where only "sample_size" elements * are checked and averaged to estimate the total size. */ #define OBJ_COMPUTE_SIZE_DEF_SAMPLES 5 /* Default sample size. */ -size_t objectComputeSize(robj *o, size_t sample_size) { +size_t objectComputeSize(robj_roptr o, size_t sample_size) { sds ele, ele2; dict *d; dictIterator *di; @@ -800,7 +800,7 @@ size_t objectComputeSize(robj *o, size_t sample_size) { if(o->encoding == OBJ_ENCODING_INT) { asize = sizeof(*o); } else if(o->encoding == OBJ_ENCODING_RAW) { - asize = sdsAllocSize(szFromObj(o))+sizeof(*o); + asize = sdsAllocSize((sds)szFromObj(o))+sizeof(*o); } else if(o->encoding == OBJ_ENCODING_EMBSTR) { asize = sdslen(szFromObj(o))+2+sizeof(*o); } else { @@ -1054,16 +1054,16 @@ struct redisMemOverhead *getMemoryOverheadData(void) { for (j = 0; j < cserver.dbnum; j++) { redisDb *db = g_pserver->db+j; - long long keyscount = dictSize(db->pdict); + long long keyscount = db->size(); if (keyscount==0) continue; mh->total_keys += keyscount; mh->db = (decltype(mh->db))zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1), MALLOC_LOCAL); mh->db[mh->num_dbs].dbid = j; - mem = dictSize(db->pdict) * sizeof(dictEntry) + - dictSlots(db->pdict) * sizeof(dictEntry*) + - dictSize(db->pdict) * sizeof(robj); + mem = db->size() * sizeof(dictEntry) + + db->slots() * sizeof(dictEntry*) + + db->size() * sizeof(robj); mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; @@ -1246,15 +1246,12 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, /* This is a helper function for the OBJECT command. We need to lookup keys * without any modification of LRU or other parameters. */ -robj *objectCommandLookup(client *c, robj *key) { - dictEntry *de; - - if ((de = dictFind(c->db->pdict,ptrFromObj(key))) == NULL) return NULL; - return (robj*) dictGetVal(de); +robj_roptr objectCommandLookup(client *c, robj *key) { + return c->db->find(key); } -robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply) { - robj *o = objectCommandLookup(c,key); +robj_roptr objectCommandLookupOrReply(client *c, robj *key, robj *reply) { + robj_roptr o = objectCommandLookup(c,key); if (!o) addReply(c, reply); return o; @@ -1263,7 +1260,7 @@ robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply) { /* Object command allows to inspect the internals of an Redis Object. * Usage: OBJECT */ void objectCommand(client *c) { - robj *o; + robj_roptr o; if (c->argc == 2 && !strcasecmp(szFromObj(c->argv[1]),"help")) { const char *help[] = { @@ -1276,15 +1273,15 @@ NULL addReplyHelp(c, help); } else if (!strcasecmp(szFromObj(c->argv[1]),"refcount") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + == nullptr) return; addReplyLongLong(c,o->getrefcount(std::memory_order_relaxed)); } else if (!strcasecmp(szFromObj(c->argv[1]),"encoding") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + == nullptr) return; addReplyBulkCString(c,strEncoding(o->encoding)); } else if (!strcasecmp(szFromObj(c->argv[1]),"idletime") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + == nullptr) return; if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust."); return; @@ -1292,7 +1289,7 @@ NULL addReplyLongLong(c,estimateObjectIdleTime(o)/1000); } else if (!strcasecmp(szFromObj(c->argv[1]),"freq") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + == nullptr) return; if (!(g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU)) { addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust."); return; @@ -1301,7 +1298,7 @@ NULL * in case of the key has not been accessed for a long time, * because we update the access time only * when the key is read or overwritten. */ - addReplyLongLong(c,LFUDecrAndReturn(o)); + addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast())); } else { addReplySubcommandSyntaxError(c); } @@ -1323,7 +1320,6 @@ NULL }; addReplyHelp(c, help); } else if (!strcasecmp(szFromObj(c->argv[1]),"usage") && c->argc >= 3) { - dictEntry *de; long long samples = OBJ_COMPUTE_SIZE_DEF_SAMPLES; for (int j = 3; j < c->argc; j++) { if (!strcasecmp(szFromObj(c->argv[j]),"samples") && @@ -1342,12 +1338,14 @@ NULL return; } } - if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) { + + auto pair = c->db->lookup_tuple(c->argv[2]); + if (pair.first == NULL) { addReplyNull(c, shared.nullbulk); return; } - size_t usage = objectComputeSize((robj*)dictGetVal(de),samples); - usage += sdsAllocSize((sds)dictGetKey(de)); + size_t usage = objectComputeSize(pair.second,samples); + usage += sdsAllocSize((sds)pair.first); usage += sizeof(dictEntry); addReplyLongLong(c,usage); } else if (!strcasecmp(szFromObj(c->argv[1]),"stats") && c->argc == 2) { diff --git a/src/rdb.cpp b/src/rdb.cpp index 97ade6d1f..1f1a4ee2b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1143,8 +1143,8 @@ int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *key * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { - dictIterator *di = NULL; dictEntry *de; + dictIterator *di = NULL; char magic[10]; int j; uint64_t cksum; @@ -1158,9 +1158,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { for (j = 0; j < cserver.dbnum; j++) { redisDb *db = g_pserver->db+j; - dict *d = db->pdict; - if (dictSize(d) == 0) continue; - di = dictGetSafeIterator(d); + if (db->size() == 0) continue; /* Write the SELECT DB opcode */ if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr; @@ -1171,7 +1169,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { * However this does not limit the actual size of the DB to load since * these sizes are just hints to resize the hash tables. */ uint64_t db_size, expires_size; - db_size = dictSize(db->pdict); + db_size = db->size(); expires_size = db->setexpire->size(); if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; if (rdbSaveLen(rdb,db_size) == -1) goto werr; @@ -1179,19 +1177,17 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { /* Iterate this DB writing every entry */ size_t ckeysExpired = 0; - while((de = dictNext(di)) != NULL) { - sds keystr = (sds)dictGetKey(de); - robj *o = (robj*)dictGetVal(de); - + bool fSavedAll = db->iterate([&](const char *keystr, robj *o)->bool{ if (o->FExpires()) ++ckeysExpired; if (!saveKey(rdb, db, flags, &processed, keystr, o)) - goto werr; - } + return false; + return true; + }); + if (!fSavedAll) + goto werr; serverAssert(ckeysExpired == db->setexpire->size()); - dictReleaseIterator(di); - di = NULL; /* So that we don't release it again on error. */ } /* If we are storing the replication information on disk, persist @@ -1998,7 +1994,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { goto eoferr; if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; - dictExpand(db->pdict,db_size); + db->expand(db_size); continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_AUX) { /* AUX: generic string-string fields. Use to add state to RDB diff --git a/src/server.cpp b/src/server.cpp index 74c18df24..8649beaaf 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1885,8 +1885,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { for (j = 0; j < cserver.dbnum; j++) { long long size, used, vkeys; - size = dictSlots(g_pserver->db[j].pdict); - used = dictSize(g_pserver->db[j].pdict); + size = g_pserver->db[j].slots(); + used = g_pserver->db[j].size(); vkeys = g_pserver->db[j].setexpire->size(); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); @@ -2924,16 +2924,7 @@ void initServer(void) { /* Create the Redis databases, and initialize other internal state. */ for (int j = 0; j < cserver.dbnum; j++) { new (&g_pserver->db[j]) redisDb; - g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); - g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset(); - g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); - g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); - g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); - g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL); - g_pserver->db[j].id = j; - g_pserver->db[j].avg_ttl = 0; - g_pserver->db[j].last_expire_set = 0; - g_pserver->db[j].defrag_later = listCreate(); + g_pserver->db[j].initialize(j); } /* Fixup Master Client Database */ @@ -4575,7 +4566,7 @@ sds genRedisInfoString(const char *section) { for (j = 0; j < cserver.dbnum; j++) { long long keys, vkeys; - keys = dictSize(g_pserver->db[j].pdict); + keys = g_pserver->db[j].size(); vkeys = g_pserver->db[j].setexpire->size(); // Adjust TTL by the current time diff --git a/src/server.h b/src/server.h index 94e679dc2..b536cef3f 100644 --- a/src/server.h +++ b/src/server.h @@ -129,6 +129,11 @@ public: return m_ptr; } + const redisObject& operator*() const + { + return *m_ptr; + } + bool operator!() const { return !m_ptr; @@ -1014,10 +1019,74 @@ typedef struct clientReplyBlock { * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { + // Legacy C API, Do not add more + friend void tryResizeHashTables(int); + friend int incrementallyRehash(int); + friend int dbAddCore(redisDb *db, robj *key, robj *val); + friend void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); + friend void dbOverwrite(redisDb *db, robj *key, robj *val); + friend int dbMerge(redisDb *db, robj *key, robj *val, int fReplace); + friend void setKey(redisDb *db, robj *key, robj *val); + friend int dbSyncDelete(redisDb *db, robj *key); + friend int dbAsyncDelete(redisDb *db, robj *key); + friend long long emptyDb(int dbnum, int flags, void(callback)(void*)); + friend void emptyDbAsync(redisDb *db); + friend void scanGenericCommand(struct client *c, robj_roptr o, unsigned long cursor); + friend int dbSwapDatabases(int id1, int id2); + friend int removeExpire(redisDb *db, robj *key); + friend void setExpire(struct client *c, redisDb *db, robj *key, robj *subkey, long long when); + friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e); + friend void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); + friend void activeDefragCycle(void); + redisDb() : expireitr(nullptr) {}; + + void initialize(int id); + + size_t slots() const { return dictSlots(pdict); } + size_t size() const { return dictSize(pdict); } + void expand(uint64_t slots) { dictExpand(pdict, slots); } + + robj *find(robj_roptr key) + { + return find(szFromObj(key)); + } + robj *find(const char *key) + { + dictEntry *de = dictFind(pdict, key); + if (de != nullptr) + return (robj*)dictGetVal(de); + return nullptr; + } + + std::pair lookup_tuple(robj_roptr key) + { + return lookup_tuple(szFromObj(key)); + } + std::pair lookup_tuple(const char *key) + { + dictEntry *de = dictFind(pdict, key); + if (de != nullptr) + return std::make_pair((const char*)dictGetKey(de), (robj*)dictGetVal(de)); + return std::make_pair(nullptr, nullptr); + } + + std::pair random() + { + dictEntry *de = dictGetRandomKey(pdict); + if (de != nullptr) + return std::make_pair((const char*)dictGetKey(de), (robj*)dictGetVal(de)); + return std::make_pair(nullptr, nullptr); + } + + bool iterate(std::function fn); + void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, pdict); } + +private: dict *pdict; /* The keyspace for this DB */ +public: expireset *setexpire; expireset::setiter expireitr; @@ -1907,6 +1976,7 @@ extern dictType dbDictType; extern dictType shaScriptObjectDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; extern dictType hashDictType; +extern dictType keylistDictType; extern dictType replScriptCacheDictType; extern dictType keyptrDictType; extern dictType modulesDictType; @@ -2135,7 +2205,7 @@ const char *strEncoding(int encoding); int compareStringObjects(robj *a, robj *b); int collateStringObjects(robj *a, robj *b); int equalStringObjects(robj *a, robj *b); -unsigned long long estimateObjectIdleTime(robj *o); +unsigned long long estimateObjectIdleTime(robj_roptr o); void trimStringObjectIfNeeded(robj *o); #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) @@ -2417,8 +2487,8 @@ robj *lookupKeyWrite(redisDb *db, robj *key); robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply); robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply); robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags); -robj *objectCommandLookup(client *c, robj *key); -robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply); +robj_roptr objectCommandLookup(client *c, robj *key); +robj_roptr objectCommandLookupOrReply(client *c, robj *key, robj *reply); void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, long long lru_clock); #define LOOKUP_NONE 0