From ceaaf39c96194779eb7fdb96d1711e166f3b1543 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Sep 2019 17:44:42 -0400 Subject: [PATCH 1/3] Hide the database dict Former-commit-id: 065846a9c32aeb55901c194d824828fb70805350 --- src/aof.cpp | 51 +++++++++++++++----------------- src/cluster.cpp | 8 ++--- src/db.cpp | 78 +++++++++++++++++++++++++++++++------------------ src/debug.cpp | 51 +++++++++++++------------------- src/defrag.cpp | 9 +++--- src/evict.cpp | 59 +++++++++++++++++++------------------ src/expire.cpp | 15 ++++------ src/multi.cpp | 2 +- src/object.cpp | 44 +++++++++++++--------------- src/rdb.cpp | 24 +++++++-------- src/server.cpp | 17 +++-------- src/server.h | 76 +++++++++++++++++++++++++++++++++++++++++++++-- 12 files changed, 246 insertions(+), 188 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index 637b2ce34..2907ee52d 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1300,29 +1300,23 @@ ssize_t aofReadDiffFromParent(void) { int rewriteAppendOnlyFileRio(rio *aof) { dictIterator *di = NULL; - dictEntry *de; size_t processed = 0; int j; for (j = 0; j < cserver.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = g_pserver->db+j; - dict *d = db->pdict; - if (dictSize(d) == 0) continue; - di = dictGetSafeIterator(d); + if (db->size() == 0) continue; /* SELECT the new DB */ if (rioWrite(aof,selectcmd,sizeof(selectcmd)-1) == 0) goto werr; if (rioWriteBulkLongLong(aof,j) == 0) goto werr; /* Iterate this DB writing every entry */ - while((de = dictNext(di)) != NULL) { - sds keystr; - robj key, *o; + bool fComplete = db->iterate([&](const char *keystr, robj *o)->bool{ + robj key; - keystr = (sds)dictGetKey(de); - o = (robj*)dictGetVal(de); - initStaticStringObject(key,keystr); + initStaticStringObject(key,(sds)keystr); expireEntry *pexpire = getExpire(db,&key); @@ -1330,22 +1324,22 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (o->type == OBJ_STRING) { /* Emit a SET command */ char cmd[]="*3\r\n$3\r\nSET\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) return false; /* Key and value */ - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWriteBulkObject(aof,o) == 0) goto werr; + if (rioWriteBulkObject(aof,&key) == 0) return false; + if (rioWriteBulkObject(aof,o) == 0) return false; } else if (o->type == OBJ_LIST) { - if (rewriteListObject(aof,&key,o) == 0) goto werr; + if (rewriteListObject(aof,&key,o) == 0) return false; } else if (o->type == OBJ_SET) { - if (rewriteSetObject(aof,&key,o) == 0) goto werr; + if (rewriteSetObject(aof,&key,o) == 0) return false; } else if (o->type == OBJ_ZSET) { - if (rewriteSortedSetObject(aof,&key,o) == 0) goto werr; + if (rewriteSortedSetObject(aof,&key,o) == 0) return false; } else if (o->type == OBJ_HASH) { - if (rewriteHashObject(aof,&key,o) == 0) goto werr; + if (rewriteHashObject(aof,&key,o) == 0) return false; } else if (o->type == OBJ_STREAM) { - if (rewriteStreamObject(aof,&key,o) == 0) goto werr; + if (rewriteStreamObject(aof,&key,o) == 0) return false; } else if (o->type == OBJ_MODULE) { - if (rewriteModuleObject(aof,&key,o) == 0) goto werr; + if (rewriteModuleObject(aof,&key,o) == 0) return false; } else { serverPanic("Unknown object type"); } @@ -1355,17 +1349,17 @@ int rewriteAppendOnlyFileRio(rio *aof) { if (subExpire.subkey() == nullptr) { char cmd[]="*3\r\n$9\r\nPEXPIREAT\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) return false; + if (rioWriteBulkObject(aof,&key) == 0) return false; } else { char cmd[]="*4\r\n$12\r\nEXPIREMEMBER\r\n"; - if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) goto werr; - if (rioWriteBulkObject(aof,&key) == 0) goto werr; - if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) goto werr; + if (rioWrite(aof,cmd,sizeof(cmd)-1) == 0) return false; + if (rioWriteBulkObject(aof,&key) == 0) return false; + if (rioWrite(aof,subExpire.subkey(),sdslen(subExpire.subkey())) == 0) return false; } - if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) goto werr; // common + if (rioWriteBulkLongLong(aof,subExpire.when()) == 0) return false; // common } } /* Read some diff from the parent process from time to time. */ @@ -1373,9 +1367,10 @@ int rewriteAppendOnlyFileRio(rio *aof) { processed = aof->processed_bytes; aofReadDiffFromParent(); } - } - dictReleaseIterator(di); - di = NULL; + return true; + }); + if (!fComplete) + goto werr; } return C_OK; diff --git a/src/cluster.cpp b/src/cluster.cpp index 619ce3b3a..c58cdeb5f 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -3910,7 +3910,7 @@ int verifyClusterConfigWithData(void) { /* Make sure we only have keys in DB0. */ for (j = 1; j < cserver.dbnum; j++) { - if (dictSize(g_pserver->db[j].pdict)) return C_ERR; + if (g_pserver->db[j].size()) return C_ERR; } /* Check that all the slots we see populated memory have a corresponding @@ -4286,7 +4286,7 @@ NULL clusterReplyMultiBulkSlots(c); } else if (!strcasecmp(szFromObj(c->argv[1]),"flushslots") && c->argc == 2) { /* CLUSTER FLUSHSLOTS */ - if (dictSize(g_pserver->db[0].pdict) != 0) { + if (g_pserver->db[0].size() != 0) { addReplyError(c,"DB must be empty to perform CLUSTER FLUSHSLOTS."); return; } @@ -4621,7 +4621,7 @@ NULL * slots nor keys to accept to replicate some other node. * Slaves can switch to another master without issues. */ if (nodeIsMaster(myself) && - (myself->numslots != 0 || dictSize(g_pserver->db[0].pdict) != 0)) { + (myself->numslots != 0 || g_pserver->db[0].size() != 0)) { addReplyError(c, "To set a master the node must be empty and " "without assigned slots."); @@ -4778,7 +4778,7 @@ NULL /* Slaves can be reset while containing data, but not master nodes * that must be empty. */ - if (nodeIsMaster(myself) && dictSize(c->db->pdict) != 0) { + if (nodeIsMaster(myself) && c->db->size() != 0) { addReplyError(c,"CLUSTER RESET can't be called with " "master nodes containing keys"); return; diff --git a/src/db.cpp b/src/db.cpp index b4ac46a2a..a87cedb29 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -70,10 +70,8 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew) * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ static robj *lookupKey(redisDb *db, robj *key, int flags) { - dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); - if (de) { - robj *val = (robj*)dictGetVal(de); - + robj *val = db->find(key); + if (val) { /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ @@ -317,7 +315,7 @@ void setKey(redisDb *db, robj *key, robj *val) { } int dbExists(redisDb *db, robj *key) { - return dictFind(db->pdict,ptrFromObj(key)) != NULL; + return (db->find(key) != nullptr); } /* Return a random key, in form of a Redis object. @@ -325,21 +323,20 @@ int dbExists(redisDb *db, robj *key) { * * The function makes sure to return keys not already expired. */ robj *dbRandomKey(redisDb *db) { - dictEntry *de; int maxtries = 100; - int allvolatile = dictSize(db->pdict) == db->setexpire->size(); + bool allvolatile = db->size() == db->setexpire->size(); while(1) { sds key; robj *keyobj; - de = dictGetRandomKey(db->pdict); - if (de == NULL) return NULL; + auto pair = db->random(); + if (pair.first == NULL) return NULL; - key = (sds)dictGetKey(de); + key = (sds)pair.first; keyobj = createStringObject(key,sdslen(key)); - if (((robj*)dictGetVal(de))->FExpires()) + if (pair.second->FExpires()) { if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, @@ -354,7 +351,7 @@ robj *dbRandomKey(redisDb *db) { } } - if (((robj*)dictGetVal(de))->FExpires()) + if (pair.second->FExpires()) { if (expireIfNeeded(db,keyobj)) { decrRefCount(keyobj); @@ -427,7 +424,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { return o; } -/* Remove all keys from all the databases in a Redis g_pserver-> +/* Remove all keys from all the databases in a Redis DB * If callback is given the function is called from time to time to * signal that work is in progress. * @@ -635,9 +632,25 @@ void randomkeyCommand(client *c) { decrRefCount(key); } + +bool redisDb::iterate(std::function fn) +{ + dictIterator *di = dictGetSafeIterator(pdict); + dictEntry *de = nullptr; + bool fResult = true; + while((de = dictNext(di)) != nullptr) + { + if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de))) + { + fResult = false; + break; + } + } + dictReleaseIterator(di); + return fResult; +} + void keysCommand(client *c) { - dictIterator *di; - dictEntry *de; sds pattern = szFromObj(c->argv[1]); int plen = sdslen(pattern), allkeys; unsigned long numkeys = 0; @@ -645,10 +658,8 @@ void keysCommand(client *c) { aeReleaseLock(); - di = dictGetSafeIterator(c->db->pdict); allkeys = (pattern[0] == '*' && pattern[1] == '\0'); - while((de = dictNext(di)) != NULL) { - sds key = (sds)dictGetKey(de); + c->db->iterate([&](const char *key, robj *)->bool { robj *keyobj; if (allkeys || stringmatchlen(pattern,plen,key,sdslen(key),0)) { @@ -659,8 +670,8 @@ void keysCommand(client *c) { } decrRefCount(keyobj); } - } - dictReleaseIterator(di); + return true; + }); setDeferredArrayLen(c,replylen,numkeys); fastlock_unlock(&c->db->lock); // we must release the DB lock before acquiring the AE lock to prevent deadlocks @@ -928,7 +939,7 @@ void scanCommand(client *c) { } void dbsizeCommand(client *c) { - addReplyLongLong(c,dictSize(c->db->pdict)); + addReplyLongLong(c,c->db->size()); } void lastsaveCommand(client *c) { @@ -1313,20 +1324,17 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) /* Return the expire time of the specified key, or null if no expire * is associated with this key (i.e. the key is non volatile) */ expireEntry *getExpire(redisDb *db, robj_roptr key) { - dictEntry *de; - /* No expire? return ASAP */ if (db->setexpire->size() == 0) return nullptr; - de = dictFind(db->pdict, ptrFromObj(key)); - if (de == NULL) + auto pair = db->lookup_tuple(key); + if (pair.first == nullptr) return nullptr; - robj *obj = (robj*)dictGetVal(de); - if (!obj->FExpires()) + if (!pair.second->FExpires()) return nullptr; - auto itr = db->setexpire->find((sds)dictGetKey(de)); + auto itr = db->setexpire->find(pair.first); return itr.operator->(); } @@ -1789,3 +1797,17 @@ unsigned int delKeysInSlot(unsigned int hashslot) { unsigned int countKeysInSlot(unsigned int hashslot) { return g_pserver->cluster->slots_keys_count[hashslot]; } + +void redisDb::initialize(int id) +{ + this->pdict = dictCreate(&dbDictType,NULL); + this->setexpire = new(MALLOC_LOCAL) expireset(); + this->expireitr = this->setexpire->end(); + this->blocking_keys = dictCreate(&keylistDictType,NULL); + this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); + this->watched_keys = dictCreate(&keylistDictType,NULL); + this->id = id; + this->avg_ttl = 0; + this->last_expire_set = 0; + this->defrag_later = listCreate(); +} diff --git a/src/debug.cpp b/src/debug.cpp index 3246f9d19..3310e114c 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -266,8 +266,6 @@ void xorObjectDigest(redisDb *db, robj_roptr keyobj, unsigned char *digest, robj * a different digest. */ void computeDatasetDigest(unsigned char *final) { unsigned char digest[20]; - dictIterator *di = NULL; - dictEntry *de; int j; uint32_t aux; @@ -276,8 +274,7 @@ void computeDatasetDigest(unsigned char *final) { for (j = 0; j < cserver.dbnum; j++) { redisDb *db = g_pserver->db+j; - if (dictSize(db->pdict) == 0) continue; - di = dictGetSafeIterator(db->pdict); + if (db->size() == 0) continue; /* hash the DB id, so the same dataset moved in a different * DB will lead to a different digest */ @@ -285,24 +282,21 @@ void computeDatasetDigest(unsigned char *final) { mixDigest(final,&aux,sizeof(aux)); /* Iterate this DB writing every entry */ - while((de = dictNext(di)) != NULL) { - sds key; - robj *keyobj, *o; + db->iterate([&](const char *key, robj *o)->bool { + robj *keyobj; memset(digest,0,20); /* This key-val digest */ - key = (sds)dictGetKey(de); keyobj = createStringObject(key,sdslen(key)); mixDigest(digest,key,sdslen(key)); - o = (robj*)dictGetVal(de); xorObjectDigest(db,keyobj,digest,o); /* We can finally xor the key-val digest to the final digest */ xorDigest(final,digest,20); decrRefCount(keyobj); - } - dictReleaseIterator(di); + return true; + }); } } @@ -394,15 +388,14 @@ NULL serverLog(LL_WARNING,"Append Only File loaded by DEBUG LOADAOF"); addReply(c,shared.ok); } else if (!strcasecmp(szFromObj(c->argv[1]),"object") && c->argc == 3) { - dictEntry *de; robj *val; const char *strenc; - if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) { + val = c->db->find(c->argv[2]); + if (val == NULL) { addReply(c,shared.nokeyerr); return; } - val = (robj*)dictGetVal(de); strenc = strEncoding(val->encoding); char extra[138] = {0}; @@ -446,16 +439,14 @@ NULL strenc, rdbSavedObjectLen(val), val->lru, estimateObjectIdleTime(val)/1000, extra); } else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) { - dictEntry *de; - robj *val; - sds key; + auto pair = c->db->lookup_tuple(c->argv[2]); + robj *val = pair.second; + const char *key = pair.first; - if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) { + if (val == NULL) { addReply(c,shared.nokeyerr); return; } - val = (robj*)dictGetVal(de); - key = (sds)dictGetKey(de); if (val->type != OBJ_STRING || !sdsEncodedObject(val)) { addReplyError(c,"Not an sds encoded string."); @@ -465,16 +456,16 @@ NULL "val_sds_len:%lld, val_sds_avail:%lld, val_zmalloc: %lld", (long long) sdslen(key), (long long) sdsavail(key), - (long long) sdsZmallocSize(key), + (long long) sdsZmallocSize((sds)key), (long long) sdslen(szFromObj(val)), (long long) sdsavail(szFromObj(val)), (long long) getStringObjectSdsUsedMemory(val)); } } else if (!strcasecmp(szFromObj(c->argv[1]),"ziplist") && c->argc == 3) { - robj *o; + robj_roptr o; if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr)) - == NULL) return; + == nullptr) return; if (o->encoding != OBJ_ENCODING_ZIPLIST) { addReplyError(c,"Not an sds encoded string."); @@ -490,7 +481,7 @@ NULL if (getLongFromObjectOrReply(c, c->argv[2], &keys, NULL) != C_OK) return; - dictExpand(c->db->pdict,keys); + c->db->expand(keys); for (j = 0; j < keys; j++) { long valsize = 0; snprintf(buf,sizeof(buf),"%s:%lu", @@ -641,7 +632,7 @@ NULL } stats = sdscatprintf(stats,"[Dictionary HT]\n"); - dictGetStats(buf,sizeof(buf),g_pserver->db[dbid].pdict); + g_pserver->db[dbid].getStats(buf,sizeof(buf)); stats = sdscat(stats,buf); stats = sdscatprintf(stats,"[Expires set]\n"); @@ -650,11 +641,11 @@ NULL addReplyBulkSds(c,stats); } else if (!strcasecmp(szFromObj(c->argv[1]),"htstats-key") && c->argc == 3) { - robj *o; + robj_roptr o; dict *ht = NULL; if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.nokeyerr)) - == NULL) return; + == nullptr) return; /* Get the hash table reference from the object, if possible. */ switch (o->encoding) { @@ -1254,12 +1245,10 @@ void logCurrentClient(void) { * selected DB, and if so print info about the associated object. */ if (cc->argc >= 1) { robj *val, *key; - dictEntry *de; key = getDecodedObject(cc->argv[1]); - de = dictFind(cc->db->pdict, ptrFromObj(key)); - if (de) { - val = (robj*)dictGetVal(de); + val = cc->db->find(key); + if (val) { serverLog(LL_WARNING,"key '%s' found in DB containing the following object:", (char*)ptrFromObj(key)); serverLogObjectDebugInfo(val); } diff --git a/src/defrag.cpp b/src/defrag.cpp index c49cd2665..59020e2d9 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -902,9 +902,8 @@ long defragOtherGlobals() { /* returns 0 more work may or may not be needed (see non-zero cursor), * and 1 if time is up and more work is needed. */ -int defragLaterItem(dictEntry *de, unsigned long *cursor, long long endtime) { - if (de) { - robj *ob = (robj*)dictGetVal(de); +int defragLaterItem(robj *ob, unsigned long *cursor, long long endtime) { + if (ob) { if (ob->type == OBJ_LIST) { g_pserver->stat_active_defrag_hits += scanLaterList(ob); *cursor = 0; /* list has no scan, we must finish it in one go */ @@ -959,11 +958,11 @@ int defragLaterStep(redisDb *db, long long endtime) { } /* each time we enter this function we need to fetch the key from the dict again (if it still exists) */ - dictEntry *de = dictFind(db->pdict, current_key); + robj *o = db->find(current_key); key_defragged = g_pserver->stat_active_defrag_hits; do { int quit = 0; - if (defragLaterItem(de, &cursor, endtime)) + if (defragLaterItem(o, &cursor, endtime)) quit = 1; /* time is up, we didn't finish all the work */ /* Don't start a new BIG key in this loop, this is because the diff --git a/src/evict.cpp b/src/evict.cpp index 8cf24dd5e..23569597f 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -87,7 +87,7 @@ unsigned int LRU_CLOCK(void) { /* Given an object returns the min number of milliseconds the object was never * requested, using an approximated LRU algorithm. */ -unsigned long long estimateObjectIdleTime(robj *o) { +unsigned long long estimateObjectIdleTime(robj_roptr o) { unsigned long long lruclock = LRU_CLOCK(); if (lruclock >= o->lru) { return (lruclock - o->lru) * LRU_CLOCK_RESOLUTION; @@ -252,17 +252,17 @@ struct visitFunctor return count < g_pserver->maxmemory_samples; } }; -void evictionPoolPopulate(int dbid, dict *dbdict, expireset *setexpire, struct evictionPoolEntry *pool) +void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool) { if (setexpire != nullptr) { - visitFunctor visitor { dbid, dbdict, pool, 0 }; + visitFunctor visitor { dbid, db->pdict, pool, 0 }; setexpire->random_visit(visitor); } else { dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); - int count = dictGetSomeKeys(dbdict,samples,g_pserver->maxmemory_samples); + int count = dictGetSomeKeys(db->pdict,samples,g_pserver->maxmemory_samples); for (int j = 0; j < count; j++) { robj *o = (robj*)dictGetVal(samples[j]); processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); @@ -504,8 +504,8 @@ int freeMemoryIfNeeded(void) { db = g_pserver->db+i; if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS) { - if ((keys = dictSize(db->pdict)) != 0) { - evictionPoolPopulate(i, db->pdict, nullptr, pool); + if ((keys = db->size()) != 0) { + evictionPoolPopulate(i, db, nullptr, pool); total_keys += keys; } } @@ -513,7 +513,7 @@ int freeMemoryIfNeeded(void) { { keys = db->setexpire->size(); if (keys != 0) - evictionPoolPopulate(i, db->pdict, db->setexpire, pool); + evictionPoolPopulate(i, db, db->setexpire, pool); total_keys += keys; } } @@ -525,9 +525,9 @@ int freeMemoryIfNeeded(void) { bestdbid = pool[k].dbid; sds key = nullptr; - dictEntry *de = dictFind(g_pserver->db[pool[k].dbid].pdict,pool[k].key); - if (de != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || ((robj*)dictGetVal(de))->FExpires())) - key = (sds)dictGetKey(de); + auto pair = g_pserver->db[pool[k].dbid].lookup_tuple(pool[k].key); + if (pair.first != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || pair.second->FExpires())) + key = (sds)pair.first; /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) @@ -559,9 +559,9 @@ int freeMemoryIfNeeded(void) { db = g_pserver->db+j; if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) { - if (dictSize(db->pdict) != 0) { - dictEntry *de = dictGetRandomKey(db->pdict); - bestkey = (sds)dictGetKey(de); + if (db->size() != 0) { + auto pair = db->random(); + bestkey = (sds)pair.first; bestdbid = j; break; } @@ -581,16 +581,17 @@ int freeMemoryIfNeeded(void) { /* Finally remove the selected key. */ if (bestkey) { db = g_pserver->db+bestdbid; + robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction); /* We compute the amount of memory freed by db*Delete() alone. - * It is possible that actually the memory needed to propagate - * the DEL in AOF and replication link is greater than the one - * we are freeing removing the key, but we can't account for - * that otherwise we would never exit the loop. - * - * AOF and Output buffer memory will be freed eventually so - * we only care about memory used by the key space. */ + * It is possible that actually the memory needed to propagate + * the DEL in AOF and replication link is greater than the one + * we are freeing removing the key, but we can't account for + * that otherwise we would never exit the loop. + * + * AOF and Output buffer memory will be freed eventually so + * we only care about memory used by the key space. */ delta = (long long) zmalloc_used_memory(); latencyStartMonitor(eviction_latency); if (g_pserver->lazyfree_lazy_eviction) @@ -609,18 +610,18 @@ int freeMemoryIfNeeded(void) { keys_freed++; /* When the memory to free starts to be big enough, we may - * start spending so much time here that is impossible to - * deliver data to the slaves fast enough, so we force the - * transmission here inside the loop. */ + * start spending so much time here that is impossible to + * deliver data to the slaves fast enough, so we force the + * transmission here inside the loop. */ if (slaves) flushSlavesOutputBuffers(); /* Normally our stop condition is the ability to release - * a fixed, pre-computed amount of memory. However when we - * are deleting objects in another thread, it's better to - * check, from time to time, if we already reached our target - * memory, since the "mem_freed" amount is computed only - * across the dbAsyncDelete() call, while the thread can - * release the memory all the time. */ + * a fixed, pre-computed amount of memory. However when we + * are deleting objects in another thread, it's better to + * check, from time to time, if we already reached our target + * memory, since the "mem_freed" amount is computed only + * across the dbAsyncDelete() call, while the thread can + * release the memory all the time. */ if (g_pserver->lazyfree_lazy_eviction && !(keys_freed % 16)) { if (getMaxmemoryState(NULL,NULL,NULL,NULL) == C_OK) { /* Let's satisfy our stop condition. */ diff --git a/src/expire.cpp b/src/expire.cpp index ba0b99284..e77ba6c73 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -74,8 +74,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { } expireEntryFat *pfat = e.pfatentry(); - dictEntry *de = dictFind(db->pdict, e.key()); - robj *val = (robj*)dictGetVal(de); + robj *val = db->find(e.key()); int deleted = 0; while (!pfat->FEmpty()) { @@ -140,14 +139,12 @@ void expireMemberCommand(client *c) when += mstime(); /* No key, return zero. */ - dictEntry *de = dictFind(c->db->pdict, szFromObj(c->argv[1])); - if (de == NULL) { + robj *val = c->db->find(c->argv[1]); + if (val == nullptr) { addReply(c,shared.czero); return; } - robj *val = (robj*)dictGetVal(de); - switch (val->type) { case OBJ_SET: @@ -361,10 +358,10 @@ void expireSlaveKeys(void) { redisDb *db = g_pserver->db+dbid; // the expire is hashed based on the key pointer, so we need the point in the main db - dictEntry *deMain = dictFind(db->pdict, keyname); + auto pairMain = db->lookup_tuple(keyname); auto itr = db->setexpire->end(); - if (deMain != nullptr) - itr = db->setexpire->find((sds)dictGetKey(deMain)); + if (pairMain.first != nullptr) + itr = db->setexpire->find((sds)pairMain.first); int expired = 0; if (itr != db->setexpire->end()) diff --git a/src/multi.cpp b/src/multi.cpp index 3383f5a49..3beb973d7 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -333,7 +333,7 @@ void touchWatchedKeysOnFlush(int dbid) { * key exists, mark the client as dirty, as the key will be * removed. */ if (dbid == -1 || wk->db->id == dbid) { - if (dictFind(wk->db->pdict, ptrFromObj(wk->key)) != NULL) + if (wk->db->find(wk->key) != NULL) c->flags |= CLIENT_DIRTY_CAS; } } diff --git a/src/object.cpp b/src/object.cpp index ce6265ad1..a23ebf870 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -789,7 +789,7 @@ size_t streamRadixTreeMemoryUsage(rax *rax) { * case of aggregated data types where only "sample_size" elements * are checked and averaged to estimate the total size. */ #define OBJ_COMPUTE_SIZE_DEF_SAMPLES 5 /* Default sample size. */ -size_t objectComputeSize(robj *o, size_t sample_size) { +size_t objectComputeSize(robj_roptr o, size_t sample_size) { sds ele, ele2; dict *d; dictIterator *di; @@ -800,7 +800,7 @@ size_t objectComputeSize(robj *o, size_t sample_size) { if(o->encoding == OBJ_ENCODING_INT) { asize = sizeof(*o); } else if(o->encoding == OBJ_ENCODING_RAW) { - asize = sdsAllocSize(szFromObj(o))+sizeof(*o); + asize = sdsAllocSize((sds)szFromObj(o))+sizeof(*o); } else if(o->encoding == OBJ_ENCODING_EMBSTR) { asize = sdslen(szFromObj(o))+2+sizeof(*o); } else { @@ -1054,16 +1054,16 @@ struct redisMemOverhead *getMemoryOverheadData(void) { for (j = 0; j < cserver.dbnum; j++) { redisDb *db = g_pserver->db+j; - long long keyscount = dictSize(db->pdict); + long long keyscount = db->size(); if (keyscount==0) continue; mh->total_keys += keyscount; mh->db = (decltype(mh->db))zrealloc(mh->db,sizeof(mh->db[0])*(mh->num_dbs+1), MALLOC_LOCAL); mh->db[mh->num_dbs].dbid = j; - mem = dictSize(db->pdict) * sizeof(dictEntry) + - dictSlots(db->pdict) * sizeof(dictEntry*) + - dictSize(db->pdict) * sizeof(robj); + mem = db->size() * sizeof(dictEntry) + + db->slots() * sizeof(dictEntry*) + + db->size() * sizeof(robj); mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; @@ -1246,15 +1246,12 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, /* This is a helper function for the OBJECT command. We need to lookup keys * without any modification of LRU or other parameters. */ -robj *objectCommandLookup(client *c, robj *key) { - dictEntry *de; - - if ((de = dictFind(c->db->pdict,ptrFromObj(key))) == NULL) return NULL; - return (robj*) dictGetVal(de); +robj_roptr objectCommandLookup(client *c, robj *key) { + return c->db->find(key); } -robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply) { - robj *o = objectCommandLookup(c,key); +robj_roptr objectCommandLookupOrReply(client *c, robj *key, robj *reply) { + robj_roptr o = objectCommandLookup(c,key); if (!o) addReply(c, reply); return o; @@ -1263,7 +1260,7 @@ robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply) { /* Object command allows to inspect the internals of an Redis Object. * Usage: OBJECT */ void objectCommand(client *c) { - robj *o; + robj_roptr o; if (c->argc == 2 && !strcasecmp(szFromObj(c->argv[1]),"help")) { const char *help[] = { @@ -1276,15 +1273,15 @@ NULL addReplyHelp(c, help); } else if (!strcasecmp(szFromObj(c->argv[1]),"refcount") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + == nullptr) return; addReplyLongLong(c,o->getrefcount(std::memory_order_relaxed)); } else if (!strcasecmp(szFromObj(c->argv[1]),"encoding") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + == nullptr) return; addReplyBulkCString(c,strEncoding(o->encoding)); } else if (!strcasecmp(szFromObj(c->argv[1]),"idletime") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + == nullptr) return; if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { addReplyError(c,"An LFU maxmemory policy is selected, idle time not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust."); return; @@ -1292,7 +1289,7 @@ NULL addReplyLongLong(c,estimateObjectIdleTime(o)/1000); } else if (!strcasecmp(szFromObj(c->argv[1]),"freq") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) - == NULL) return; + == nullptr) return; if (!(g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU)) { addReplyError(c,"An LFU maxmemory policy is not selected, access frequency not tracked. Please note that when switching between policies at runtime LRU and LFU data will take some time to adjust."); return; @@ -1301,7 +1298,7 @@ NULL * in case of the key has not been accessed for a long time, * because we update the access time only * when the key is read or overwritten. */ - addReplyLongLong(c,LFUDecrAndReturn(o)); + addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast())); } else { addReplySubcommandSyntaxError(c); } @@ -1323,7 +1320,6 @@ NULL }; addReplyHelp(c, help); } else if (!strcasecmp(szFromObj(c->argv[1]),"usage") && c->argc >= 3) { - dictEntry *de; long long samples = OBJ_COMPUTE_SIZE_DEF_SAMPLES; for (int j = 3; j < c->argc; j++) { if (!strcasecmp(szFromObj(c->argv[j]),"samples") && @@ -1342,12 +1338,14 @@ NULL return; } } - if ((de = dictFind(c->db->pdict,ptrFromObj(c->argv[2]))) == NULL) { + + auto pair = c->db->lookup_tuple(c->argv[2]); + if (pair.first == NULL) { addReplyNull(c, shared.nullbulk); return; } - size_t usage = objectComputeSize((robj*)dictGetVal(de),samples); - usage += sdsAllocSize((sds)dictGetKey(de)); + size_t usage = objectComputeSize(pair.second,samples); + usage += sdsAllocSize((sds)pair.first); usage += sizeof(dictEntry); addReplyLongLong(c,usage); } else if (!strcasecmp(szFromObj(c->argv[1]),"stats") && c->argc == 2) { diff --git a/src/rdb.cpp b/src/rdb.cpp index 97ade6d1f..1f1a4ee2b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1143,8 +1143,8 @@ int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *key * integer pointed by 'error' is set to the value of errno just after the I/O * error. */ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { - dictIterator *di = NULL; dictEntry *de; + dictIterator *di = NULL; char magic[10]; int j; uint64_t cksum; @@ -1158,9 +1158,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { for (j = 0; j < cserver.dbnum; j++) { redisDb *db = g_pserver->db+j; - dict *d = db->pdict; - if (dictSize(d) == 0) continue; - di = dictGetSafeIterator(d); + if (db->size() == 0) continue; /* Write the SELECT DB opcode */ if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr; @@ -1171,7 +1169,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { * However this does not limit the actual size of the DB to load since * these sizes are just hints to resize the hash tables. */ uint64_t db_size, expires_size; - db_size = dictSize(db->pdict); + db_size = db->size(); expires_size = db->setexpire->size(); if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; if (rdbSaveLen(rdb,db_size) == -1) goto werr; @@ -1179,19 +1177,17 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { /* Iterate this DB writing every entry */ size_t ckeysExpired = 0; - while((de = dictNext(di)) != NULL) { - sds keystr = (sds)dictGetKey(de); - robj *o = (robj*)dictGetVal(de); - + bool fSavedAll = db->iterate([&](const char *keystr, robj *o)->bool{ if (o->FExpires()) ++ckeysExpired; if (!saveKey(rdb, db, flags, &processed, keystr, o)) - goto werr; - } + return false; + return true; + }); + if (!fSavedAll) + goto werr; serverAssert(ckeysExpired == db->setexpire->size()); - dictReleaseIterator(di); - di = NULL; /* So that we don't release it again on error. */ } /* If we are storing the replication information on disk, persist @@ -1998,7 +1994,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { goto eoferr; if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; - dictExpand(db->pdict,db_size); + db->expand(db_size); continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_AUX) { /* AUX: generic string-string fields. Use to add state to RDB diff --git a/src/server.cpp b/src/server.cpp index 74c18df24..8649beaaf 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1885,8 +1885,8 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { for (j = 0; j < cserver.dbnum; j++) { long long size, used, vkeys; - size = dictSlots(g_pserver->db[j].pdict); - used = dictSize(g_pserver->db[j].pdict); + size = g_pserver->db[j].slots(); + used = g_pserver->db[j].size(); vkeys = g_pserver->db[j].setexpire->size(); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); @@ -2924,16 +2924,7 @@ void initServer(void) { /* Create the Redis databases, and initialize other internal state. */ for (int j = 0; j < cserver.dbnum; j++) { new (&g_pserver->db[j]) redisDb; - g_pserver->db[j].pdict = dictCreate(&dbDictType,NULL); - g_pserver->db[j].setexpire = new(MALLOC_LOCAL) expireset(); - g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); - g_pserver->db[j].blocking_keys = dictCreate(&keylistDictType,NULL); - g_pserver->db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); - g_pserver->db[j].watched_keys = dictCreate(&keylistDictType,NULL); - g_pserver->db[j].id = j; - g_pserver->db[j].avg_ttl = 0; - g_pserver->db[j].last_expire_set = 0; - g_pserver->db[j].defrag_later = listCreate(); + g_pserver->db[j].initialize(j); } /* Fixup Master Client Database */ @@ -4575,7 +4566,7 @@ sds genRedisInfoString(const char *section) { for (j = 0; j < cserver.dbnum; j++) { long long keys, vkeys; - keys = dictSize(g_pserver->db[j].pdict); + keys = g_pserver->db[j].size(); vkeys = g_pserver->db[j].setexpire->size(); // Adjust TTL by the current time diff --git a/src/server.h b/src/server.h index 94e679dc2..b536cef3f 100644 --- a/src/server.h +++ b/src/server.h @@ -129,6 +129,11 @@ public: return m_ptr; } + const redisObject& operator*() const + { + return *m_ptr; + } + bool operator!() const { return !m_ptr; @@ -1014,10 +1019,74 @@ typedef struct clientReplyBlock { * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { + // Legacy C API, Do not add more + friend void tryResizeHashTables(int); + friend int incrementallyRehash(int); + friend int dbAddCore(redisDb *db, robj *key, robj *val); + friend void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); + friend void dbOverwrite(redisDb *db, robj *key, robj *val); + friend int dbMerge(redisDb *db, robj *key, robj *val, int fReplace); + friend void setKey(redisDb *db, robj *key, robj *val); + friend int dbSyncDelete(redisDb *db, robj *key); + friend int dbAsyncDelete(redisDb *db, robj *key); + friend long long emptyDb(int dbnum, int flags, void(callback)(void*)); + friend void emptyDbAsync(redisDb *db); + friend void scanGenericCommand(struct client *c, robj_roptr o, unsigned long cursor); + friend int dbSwapDatabases(int id1, int id2); + friend int removeExpire(redisDb *db, robj *key); + friend void setExpire(struct client *c, redisDb *db, robj *key, robj *subkey, long long when); + friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e); + friend void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); + friend void activeDefragCycle(void); + redisDb() : expireitr(nullptr) {}; + + void initialize(int id); + + size_t slots() const { return dictSlots(pdict); } + size_t size() const { return dictSize(pdict); } + void expand(uint64_t slots) { dictExpand(pdict, slots); } + + robj *find(robj_roptr key) + { + return find(szFromObj(key)); + } + robj *find(const char *key) + { + dictEntry *de = dictFind(pdict, key); + if (de != nullptr) + return (robj*)dictGetVal(de); + return nullptr; + } + + std::pair lookup_tuple(robj_roptr key) + { + return lookup_tuple(szFromObj(key)); + } + std::pair lookup_tuple(const char *key) + { + dictEntry *de = dictFind(pdict, key); + if (de != nullptr) + return std::make_pair((const char*)dictGetKey(de), (robj*)dictGetVal(de)); + return std::make_pair(nullptr, nullptr); + } + + std::pair random() + { + dictEntry *de = dictGetRandomKey(pdict); + if (de != nullptr) + return std::make_pair((const char*)dictGetKey(de), (robj*)dictGetVal(de)); + return std::make_pair(nullptr, nullptr); + } + + bool iterate(std::function fn); + void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, pdict); } + +private: dict *pdict; /* The keyspace for this DB */ +public: expireset *setexpire; expireset::setiter expireitr; @@ -1907,6 +1976,7 @@ extern dictType dbDictType; extern dictType shaScriptObjectDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; extern dictType hashDictType; +extern dictType keylistDictType; extern dictType replScriptCacheDictType; extern dictType keyptrDictType; extern dictType modulesDictType; @@ -2135,7 +2205,7 @@ const char *strEncoding(int encoding); int compareStringObjects(robj *a, robj *b); int collateStringObjects(robj *a, robj *b); int equalStringObjects(robj *a, robj *b); -unsigned long long estimateObjectIdleTime(robj *o); +unsigned long long estimateObjectIdleTime(robj_roptr o); void trimStringObjectIfNeeded(robj *o); #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) @@ -2417,8 +2487,8 @@ robj *lookupKeyWrite(redisDb *db, robj *key); robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply); robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply); robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags); -robj *objectCommandLookup(client *c, robj *key); -robj *objectCommandLookupOrReply(client *c, robj *key, robj *reply); +robj_roptr objectCommandLookup(client *c, robj *key); +robj_roptr objectCommandLookupOrReply(client *c, robj *key, robj *reply); void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, long long lru_clock); #define LOOKUP_NONE 0 From f10e05f1bf5a2ec0c3aa38e997f192181f3689ba Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Sep 2019 22:36:29 -0400 Subject: [PATCH 2/3] Isolate the persistent parts of the database from the runtime parts Former-commit-id: 08995b065cb0ed6df16528e73c7acb28bcf3c1f4 --- src/db.cpp | 315 +++++++++++++++++++++++++++++------------------ src/debug.cpp | 8 +- src/defrag.cpp | 6 +- src/evict.cpp | 22 ++-- src/expire.cpp | 20 +-- src/lazyfree.cpp | 31 +++-- src/object.cpp | 10 +- src/rdb.cpp | 4 +- src/server.cpp | 15 ++- src/server.h | 184 +++++++++++++++++++++------ 10 files changed, 397 insertions(+), 218 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index a87cedb29..c66197aaf 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -41,7 +41,6 @@ int keyIsExpired(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key, robj *o); -void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. @@ -57,8 +56,7 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew) serverAssert(valOld->FExpires()); serverAssert(!valNew->FExpires()); - auto itr = db->setexpire->find(key); - serverAssert(itr != db->setexpire->end()); + serverAssert(db->FKeyExpires((const char*)key)); valNew->SetFExpires(true); valOld->SetFExpires(false); @@ -70,8 +68,9 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew) * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ static robj *lookupKey(redisDb *db, robj *key, int flags) { - robj *val = db->find(key); - if (val) { + auto itr = db->find(key); + if (itr) { + robj *val = itr.val(); /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ @@ -193,13 +192,13 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { return o; } -int dbAddCore(redisDb *db, robj *key, robj *val) { +bool dbAddCore(redisDb *db, robj *key, robj *val) { serverAssert(!val->FExpires()); sds copy = sdsdup(szFromObj(key)); - int retval = dictAdd(db->pdict, copy, val); + bool fInserted = db->insert(copy, val); val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); - if (retval == DICT_OK) + if (fInserted) { if (val->type == OBJ_LIST || val->type == OBJ_ZSET) @@ -211,7 +210,7 @@ int dbAddCore(redisDb *db, robj *key, robj *val) { sdsfree(copy); } - return retval; + return fInserted; } /* Add the key to the DB. It's up to the caller to increment the reference @@ -220,23 +219,22 @@ int dbAddCore(redisDb *db, robj *key, robj *val) { * The program is aborted if the key already exists. */ void dbAdd(redisDb *db, robj *key, robj *val) { - int retval = dbAddCore(db, key, val); - serverAssertWithInfo(NULL,key,retval == DICT_OK); + bool fInserted = dbAddCore(db, key, val); + serverAssertWithInfo(NULL,key,fInserted); } -void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire) +void redisDb::dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire) { - dictEntry auxentry = *de; - robj *old = (robj*)dictGetVal(de); + robj *old = itr.val(); if (old->FExpires()) { if (fRemoveExpire) { - removeExpire(db, key); + removeExpire(this, key); } else { if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) val = dupStringObject(val); - updateExpire(db, (sds)dictGetKey(de), old, val); + updateExpire(this, itr.key(), old, val); } } @@ -249,14 +247,12 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd val->mvcc_tstamp = getMvccTstamp(); } - dictSetVal(db->pdict, de, val); + if (g_pserver->lazyfree_lazy_server_del) + freeObjAsync(itr.val()); + else + decrRefCount(itr.val()); - if (g_pserver->lazyfree_lazy_server_del) { - freeObjAsync(old); - dictSetVal(db->pdict, &auxentry, NULL); - } - - dictFreeVal(db->pdict, &auxentry); + m_persistentData.updateValue(itr, val); } /* Overwrite an existing key with a new value. Incrementing the reference @@ -265,10 +261,10 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd * * The program is aborted if the key was not already present. */ void dbOverwrite(redisDb *db, robj *key, robj *val) { - dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); + auto itr = db->find(key); - serverAssertWithInfo(NULL,key,de != NULL); - dbOverwriteCore(db, de, key, val, !!g_pserver->fActiveReplica, false); + serverAssertWithInfo(NULL,key,itr != nullptr); + db->dbOverwriteCore(itr, key, val, !!g_pserver->fActiveReplica, false); } /* Insert a key, handling duplicate keys according to fReplace */ @@ -276,14 +272,14 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) { if (fReplace) { - dictEntry *de = dictFind(db->pdict, ptrFromObj(key)); - if (de == nullptr) - return (dbAddCore(db, key, val) == DICT_OK); + auto itr = db->find(key); + if (itr == nullptr) + return (dbAddCore(db, key, val) == true); - robj *old = (robj*)dictGetVal(de); + robj *old = itr.val(); if (old->mvcc_tstamp <= val->mvcc_tstamp) { - dbOverwriteCore(db, de, key, val, false, true); + db->dbOverwriteCore(itr, key, val, false, true); return true; } @@ -291,7 +287,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) } else { - return (dbAddCore(db, key, val) == DICT_OK); + return (dbAddCore(db, key, val) == true); } } @@ -304,11 +300,11 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) * * All the new keys in the database should be created via this interface. */ void setKey(redisDb *db, robj *key, robj *val) { - dictEntry *de = dictFind(db->pdict, ptrFromObj(key)); - if (de == NULL) { + auto itr = db->find(key); + if (itr == NULL) { dbAdd(db,key,val); } else { - dbOverwriteCore(db,de,key,val,!!g_pserver->fActiveReplica,true); + db->dbOverwriteCore(itr,key,val,!!g_pserver->fActiveReplica,true); } incrRefCount(val); signalModifiedKey(db,key); @@ -324,19 +320,19 @@ int dbExists(redisDb *db, robj *key) { * The function makes sure to return keys not already expired. */ robj *dbRandomKey(redisDb *db) { int maxtries = 100; - bool allvolatile = db->size() == db->setexpire->size(); + bool allvolatile = db->expireSize() == db->size(); while(1) { sds key; robj *keyobj; - auto pair = db->random(); - if (pair.first == NULL) return NULL; + auto itr = db->random(); + if (itr == nullptr) return NULL; - key = (sds)pair.first; + key = itr.key(); keyobj = createStringObject(key,sdslen(key)); - if (pair.second->FExpires()) + if (itr.val()->FExpires()) { if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, @@ -351,7 +347,7 @@ robj *dbRandomKey(redisDb *db) { } } - if (pair.second->FExpires()) + if (itr.val()->FExpires()) { if (expireIfNeeded(db,keyobj)) { decrRefCount(keyobj); @@ -363,15 +359,16 @@ robj *dbRandomKey(redisDb *db) { } } -/* Delete a key, value, and associated expiration entry if any, from the DB */ -int dbSyncDelete(redisDb *db, robj *key) { +bool redisDbPersistentData::syncDelete(robj *key) +{ /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ - dictEntry *de = dictFind(db->pdict, szFromObj(key)); - if (de != nullptr && ((robj*)dictGetVal(de))->FExpires()) - removeExpireCore(db, key, de); - if (dictDelete(db->pdict,ptrFromObj(key)) == DICT_OK) { + auto itr = find(szFromObj(key)); + trackkey(szFromObj(key)); + if (itr != nullptr && itr.val()->FExpires()) + removeExpire(key, itr); + if (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) { if (g_pserver->cluster_enabled) slotToKeyDel(key); return 1; } else { @@ -379,6 +376,11 @@ int dbSyncDelete(redisDb *db, robj *key) { } } +/* Delete a key, value, and associated expiration entry if any, from the DB */ +int dbSyncDelete(redisDb *db, robj *key) { + return db->m_persistentData.syncDelete(key); +} + /* This is a wrapper whose behavior depends on the Redis lazy free * configuration. Deletes the key synchronously or asynchronously. */ int dbDelete(redisDb *db, robj *key) { @@ -456,15 +458,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { } for (int j = startdb; j <= enddb; j++) { - removed += dictSize(g_pserver->db[j].pdict); - if (async) { - emptyDbAsync(&g_pserver->db[j]); - } else { - dictEmpty(g_pserver->db[j].pdict,callback); - delete g_pserver->db[j].setexpire; - g_pserver->db[j].setexpire = new (MALLOC_LOCAL) expireset(); - g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); - } + removed += g_pserver->db[j].clear(!!async, callback); } if (g_pserver->cluster_enabled) { if (async) { @@ -633,9 +627,9 @@ void randomkeyCommand(client *c) { } -bool redisDb::iterate(std::function fn) +bool redisDbPersistentData::iterate(std::function &fn) { - dictIterator *di = dictGetSafeIterator(pdict); + dictIterator *di = dictGetSafeIterator(m_pdict); dictEntry *de = nullptr; bool fResult = true; while((de = dictNext(di)) != nullptr) @@ -806,7 +800,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { /* Handle the case of a hash table. */ ht = NULL; if (o == nullptr) { - ht = c->db->pdict; + ht = c->db->dictUnsafe(); } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { ht = (dict*)ptrFromObj(o); } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { @@ -1159,17 +1153,14 @@ int dbSwapDatabases(int id1, int id2) { /* Swap hash tables. Note that we don't swap blocking_keys, * ready_keys and watched_keys, since we want clients to * remain in the same DB they were. */ - db1->pdict = db2->pdict; - db1->setexpire = db2->setexpire; - db1->expireitr = db2->expireitr; + redisDbPersistentData::swap(&db1->m_persistentData, &db2->m_persistentData); db1->avg_ttl = db2->avg_ttl; db1->last_expire_set = db2->last_expire_set; + db1->expireitr = db2->expireitr; - db2->pdict = aux.pdict; - db2->setexpire = aux.setexpire; - db2->expireitr = aux.expireitr; db2->avg_ttl = aux.avg_ttl; db2->last_expire_set = aux.last_expire_set; + db2->expireitr = aux.expireitr; /* Now we need to handle clients blocked on lists: as an effect * of swapping the two DBs, a client that was waiting for list @@ -1218,23 +1209,24 @@ void swapdbCommand(client *c) { * Expires API *----------------------------------------------------------------------------*/ int removeExpire(redisDb *db, robj *key) { - dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); - return removeExpireCore(db, key, de); + auto itr = db->find(key); + return db->m_persistentData.removeExpire(key, itr); } -int removeExpireCore(redisDb *db, robj *key, dictEntry *de) { +int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) { /* An expire may only be removed if there is a corresponding entry in the * main dict. Otherwise, the key will never be freed. */ - serverAssertWithInfo(NULL,key,de != NULL); + serverAssertWithInfo(NULL,key,itr != nullptr); - robj *val = (robj*)dictGetVal(de); + robj *val = itr.val(); if (!val->FExpires()) return 0; - auto itr = db->setexpire->find((sds)dictGetKey(de)); - serverAssert(itr != db->setexpire->end()); - serverAssert(itr->key() == (sds)dictGetKey(de)); - db->setexpire->erase(itr); + auto itrExpire = m_setexpire->find(itr.key()); + serverAssert(itrExpire != m_setexpire->end()); + serverAssert(itrExpire->key() == itr.key()); + m_setexpire->erase(itrExpire); val->SetFExpires(false); + trackkey(key); return 1; } @@ -1243,49 +1235,23 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de) { * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) { - dictEntry *kde; - serverAssert(GlobalLocksAcquired()); - /* Reuse the sds from the main dict in the expire dict */ - kde = dictFind(db->pdict,ptrFromObj(key)); - serverAssertWithInfo(NULL,key,kde != NULL); - - if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) - { - // shared objects cannot have the expire bit set, create a real object - dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); - } - /* Update TTL stats (exponential moving average) */ /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ long long now = g_pserver->mstime; db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed - if (db->setexpire->empty()) + if (db->expireSize() == 0) db->avg_ttl = 0; else - db->avg_ttl -= db->avg_ttl / db->setexpire->size(); // slide one entry out the window + db->avg_ttl -= db->avg_ttl / db->expireSize(); // slide one entry out the window if (db->avg_ttl < 0) db->avg_ttl = 0; // TTLs are never negative - db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry + db->avg_ttl += (double)(when-now) / (db->expireSize()+1); // add the new entry db->last_expire_set = now; /* Update the expire set */ - const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr; - if (((robj*)dictGetVal(kde))->FExpires()) { - auto itr = db->setexpire->find((sds)dictGetKey(kde)); - serverAssert(itr != db->setexpire->end()); - expireEntry eNew(std::move(*itr)); - eNew.update(szSubKey, when); - db->setexpire->erase(itr); - db->setexpire->insert(eNew); - } - else - { - expireEntry e((sds)dictGetKey(kde), szSubKey, when); - ((robj*)dictGetVal(kde))->SetFExpires(true); - db->setexpire->insert(e); - } + db->m_persistentData.setExpire(key, subkey, when); int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) @@ -1294,26 +1260,24 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) { - dictEntry *kde; - serverAssert(GlobalLocksAcquired()); /* Reuse the sds from the main dict in the expire dict */ - kde = dictFind(db->pdict,ptrFromObj(key)); + auto kde = db->find(key); serverAssertWithInfo(NULL,key,kde != NULL); - if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + if (kde.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) { // shared objects cannot have the expire bit set, create a real object - dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); + db->m_persistentData.updateValue(kde, dupStringObject(kde.val())); } - if (((robj*)dictGetVal(kde))->FExpires()) + if (kde.val()->FExpires()) removeExpire(db, key); - e.setKeyUnsafe((sds)dictGetKey(kde)); - db->setexpire->insert(e); - ((robj*)dictGetVal(kde))->SetFExpires(true); + e.setKeyUnsafe(kde.key()); + db->m_persistentData.setExpire(std::move(e)); + kde.val()->SetFExpires(true); int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; @@ -1323,19 +1287,24 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) /* Return the expire time of the specified key, or null if no expire * is associated with this key (i.e. the key is non volatile) */ -expireEntry *getExpire(redisDb *db, robj_roptr key) { +expireEntry *redisDb::getExpire(robj_roptr key) { /* No expire? return ASAP */ - if (db->setexpire->size() == 0) + if (expireSize() == 0) return nullptr; - auto pair = db->lookup_tuple(key); - if (pair.first == nullptr) + auto itr = find(key); + if (itr == nullptr) return nullptr; - if (!pair.second->FExpires()) + if (!itr.val()->FExpires()) return nullptr; - auto itr = db->setexpire->find(pair.first); - return itr.operator->(); + auto itrExpire = m_persistentData.findExpire(itr.key()); + return itrExpire.operator->(); +} + +expireEntry *getExpire(redisDb *db, robj_roptr key) +{ + return db->getExpire(key); } /* Propagate expires into slaves and the AOF file. @@ -1798,11 +1767,18 @@ unsigned int countKeysInSlot(unsigned int hashslot) { return g_pserver->cluster->slots_keys_count[hashslot]; } +void redisDbPersistentData::initialize() +{ + m_pdict = dictCreate(&dbDictType,NULL); + m_setexpire = new(MALLOC_LOCAL) expireset(); + m_fAllChanged = false; + m_fTrackingChanges = false; +} + void redisDb::initialize(int id) { - this->pdict = dictCreate(&dbDictType,NULL); - this->setexpire = new(MALLOC_LOCAL) expireset(); - this->expireitr = this->setexpire->end(); + m_persistentData.initialize(); + this->expireitr = m_persistentData.setexpireUnsafe()->end(); this->blocking_keys = dictCreate(&keylistDictType,NULL); this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); this->watched_keys = dictCreate(&keylistDictType,NULL); @@ -1811,3 +1787,96 @@ void redisDb::initialize(int id) this->last_expire_set = 0; this->defrag_later = listCreate(); } + +bool redisDbPersistentData::insert(char *key, robj *o) +{ + int res = dictAdd(m_pdict, key, o); + if (res == DICT_OK) + trackkey(key); + return (res == DICT_OK); +} + +void redisDbPersistentData::tryResize() +{ + if (htNeedsResize(m_pdict)) + dictResize(m_pdict); +} + +size_t redisDb::clear(bool fAsync, void(callback)(void*)) +{ + size_t removed = m_persistentData.size(); + if (fAsync) { + m_persistentData.emptyDbAsync(); + } else { + m_persistentData.clear(callback); + } + expireitr = m_persistentData.setexpireUnsafe()->end(); + return removed; +} + +void redisDbPersistentData::clear(void(callback)(void*)) +{ + dictEmpty(m_pdict,callback); + if (m_fTrackingChanges) + m_fAllChanged = true; + delete m_setexpire; + m_setexpire = new (MALLOC_LOCAL) expireset(); +} + +/* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2) +{ + redisDbPersistentData aux = *db1; + db1->m_pdict = db2->m_pdict; + db1->m_fTrackingChanges = db2->m_fTrackingChanges; + db1->m_fAllChanged = db2->m_fAllChanged; + db1->m_setexpire = db2->m_setexpire; + + db2->m_pdict = aux.m_pdict; + db2->m_fTrackingChanges = aux.m_fTrackingChanges; + db2->m_fAllChanged = aux.m_fAllChanged; + db2->m_setexpire = aux.m_setexpire; +} + +void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) +{ + /* Reuse the sds from the main dict in the expire dict */ + dictEntry *kde = dictFind(m_pdict,ptrFromObj(key)); + serverAssertWithInfo(NULL,key,kde != NULL); + + if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + { + // shared objects cannot have the expire bit set, create a real object + dictSetVal(m_pdict, kde, dupStringObject((robj*)dictGetVal(kde))); + } + + const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr; + if (((robj*)dictGetVal(kde))->FExpires()) { + auto itr = m_setexpire->find((sds)dictGetKey(kde)); + serverAssert(itr != m_setexpire->end()); + expireEntry eNew(std::move(*itr)); + eNew.update(szSubKey, when); + m_setexpire->erase(itr); + m_setexpire->insert(eNew); + } + else + { + expireEntry e((sds)dictGetKey(kde), szSubKey, when); + ((robj*)dictGetVal(kde))->SetFExpires(true); + m_setexpire->insert(e); + } +} + +void redisDbPersistentData::setExpire(expireEntry &&e) +{ + m_setexpire->insert(e); +} + +bool redisDb::FKeyExpires(const char *key) +{ + return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpireUnsafe()->end(); +} + +void redisDbPersistentData::updateValue(dict_iter itr, robj *val) +{ + dictSetVal(m_pdict, itr.de, val); +} \ No newline at end of file diff --git a/src/debug.cpp b/src/debug.cpp index 3310e114c..ff794ca09 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -439,9 +439,9 @@ NULL strenc, rdbSavedObjectLen(val), val->lru, estimateObjectIdleTime(val)/1000, extra); } else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) { - auto pair = c->db->lookup_tuple(c->argv[2]); - robj *val = pair.second; - const char *key = pair.first; + auto itr = c->db->find(c->argv[2]); + robj *val = itr.val(); + const char *key = itr.key(); if (val == NULL) { addReply(c,shared.nokeyerr); @@ -636,7 +636,7 @@ NULL stats = sdscat(stats,buf); stats = sdscatprintf(stats,"[Expires set]\n"); - g_pserver->db[dbid].setexpire->getstats(buf, sizeof(buf)); + g_pserver->db[dbid].getExpireStats(buf, sizeof(buf)); stats = sdscat(stats, buf); addReplyBulkSds(c,stats); diff --git a/src/defrag.cpp b/src/defrag.cpp index 59020e2d9..adb354b3b 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -781,8 +781,8 @@ long defragKey(redisDb *db, dictEntry *de) { newsds = activeDefragSds(keysds); if (newsds) defragged++, de->key = newsds; - if (!db->setexpire->empty()) { - replaceSateliteOSetKeyPtr(*db->setexpire, keysds, newsds); + if (!db->setexpire()->empty()) { + replaceSateliteOSetKeyPtr(*const_cast(db->setexpire()), keysds, newsds); } /* Try to defrag robj and / or string value. */ @@ -1111,7 +1111,7 @@ void activeDefragCycle(void) { break; /* this will exit the function and we'll continue on the next cycle */ } - cursor = dictScan(db->pdict, cursor, defragScanCallback, defragDictBucketCallback, db); + cursor = dictScan(db->dictUnsafe(), cursor, defragScanCallback, defragDictBucketCallback, db); /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys * (if we have a lot of pointers in one hash bucket or rehasing), diff --git a/src/evict.cpp b/src/evict.cpp index 23569597f..a9a0e4511 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -256,13 +256,13 @@ void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct ev { if (setexpire != nullptr) { - visitFunctor visitor { dbid, db->pdict, pool, 0 }; + visitFunctor visitor { dbid, db->m_persistentData.dictUnsafe(), pool, 0 }; setexpire->random_visit(visitor); } else { dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); - int count = dictGetSomeKeys(db->pdict,samples,g_pserver->maxmemory_samples); + int count = dictGetSomeKeys(db->m_persistentData.dictUnsafe(),samples,g_pserver->maxmemory_samples); for (int j = 0; j < count; j++) { robj *o = (robj*)dictGetVal(samples[j]); processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); @@ -511,9 +511,9 @@ int freeMemoryIfNeeded(void) { } else { - keys = db->setexpire->size(); + keys = db->expireSize(); if (keys != 0) - evictionPoolPopulate(i, db, db->setexpire, pool); + evictionPoolPopulate(i, db, db->m_persistentData.setexpireUnsafe(), pool); total_keys += keys; } } @@ -525,9 +525,9 @@ int freeMemoryIfNeeded(void) { bestdbid = pool[k].dbid; sds key = nullptr; - auto pair = g_pserver->db[pool[k].dbid].lookup_tuple(pool[k].key); - if (pair.first != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || pair.second->FExpires())) - key = (sds)pair.first; + auto itr = g_pserver->db[pool[k].dbid].find(pool[k].key); + if (itr != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || itr.val()->FExpires())) + key = itr.key(); /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) @@ -560,17 +560,17 @@ int freeMemoryIfNeeded(void) { if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) { if (db->size() != 0) { - auto pair = db->random(); - bestkey = (sds)pair.first; + auto itr = db->random(); + bestkey = itr.key(); bestdbid = j; break; } } else { - if (!db->setexpire->empty()) + if (db->expireSize()) { - bestkey = (sds)db->setexpire->random_value().key(); + bestkey = (sds)db->random_expire().key(); bestdbid = j; break; } diff --git a/src/expire.cpp b/src/expire.cpp index e77ba6c73..3803d8ac8 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -248,7 +248,7 @@ void activeExpireCycle(int type) { now = mstime(); /* If there is nothing to expire try next DB ASAP. */ - if (db->setexpire->empty()) + if (db->m_persistentData.setexpireUnsafe()->empty()) { db->avg_ttl = 0; db->last_expire_set = now; @@ -258,7 +258,7 @@ void activeExpireCycle(int type) { size_t expired = 0; size_t tried = 0; long long check = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; // assume a check is roughly 1us. It isn't but good enough - db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) { + db->expireitr = db->m_persistentData.setexpireUnsafe()->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) { if (e.when() < now) { activeExpireCycleExpire(db, e, now); @@ -358,16 +358,16 @@ void expireSlaveKeys(void) { redisDb *db = g_pserver->db+dbid; // the expire is hashed based on the key pointer, so we need the point in the main db - auto pairMain = db->lookup_tuple(keyname); - auto itr = db->setexpire->end(); - if (pairMain.first != nullptr) - itr = db->setexpire->find((sds)pairMain.first); + auto itrDB = db->find(keyname); + auto itrExpire = db->m_persistentData.setexpireUnsafe()->end(); + if (itrDB != nullptr) + itrExpire = db->m_persistentData.setexpireUnsafe()->find(itrDB.key()); int expired = 0; - if (itr != db->setexpire->end()) + if (itrExpire != db->m_persistentData.setexpireUnsafe()->end()) { - if (itr->when() < start) { - activeExpireCycleExpire(g_pserver->db+dbid,*itr,start); + if (itrExpire->when() < start) { + activeExpireCycleExpire(g_pserver->db+dbid,*itrExpire,start); expired = 1; } } @@ -376,7 +376,7 @@ void expireSlaveKeys(void) { * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ - if (itr != db->setexpire->end() && !expired) { + if (itrExpire != db->m_persistentData.setexpireUnsafe()->end() && !expired) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 91577cb85..471fb6260 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -51,18 +51,18 @@ size_t lazyfreeGetFreeEffort(robj *obj) { * a lazy free list instead of being freed synchronously. The lazy free list * will be reclaimed in a different bio.c thread. */ #define LAZYFREE_THRESHOLD 64 -int dbAsyncDelete(redisDb *db, robj *key) { +bool redisDbPersistentData::asyncDelete(robj *key) { /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ - dictEntry *de = dictUnlink(db->pdict,ptrFromObj(key)); + dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key)); if (de) { robj *val = (robj*)dictGetVal(de); if (val->FExpires()) { /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ - removeExpireCore(db,key,de); + removeExpire(key,dict_iter(de)); } size_t free_effort = lazyfreeGetFreeEffort(val); @@ -78,21 +78,25 @@ int dbAsyncDelete(redisDb *db, robj *key) { if (free_effort > LAZYFREE_THRESHOLD && val->getrefcount(std::memory_order_relaxed) == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); - dictSetVal(db->pdict,de,NULL); + dictSetVal(m_pdict,de,NULL); } } /* Release the key-val pair, or just the key if we set the val * field to NULL in order to lazy free it later. */ if (de) { - dictFreeUnlinkedEntry(db->pdict,de); + dictFreeUnlinkedEntry(m_pdict,de); if (g_pserver->cluster_enabled) slotToKeyDel(key); - return 1; + return true; } else { - return 0; + return false; } } +int dbAsyncDelete(redisDb *db, robj *key) { + return db->m_persistentData.asyncDelete(key); +} + /* Free an object, if the object is huge enough, free it in async way. */ void freeObjAsync(robj *o) { size_t free_effort = lazyfreeGetFreeEffort(o); @@ -107,12 +111,13 @@ void freeObjAsync(robj *o) { /* Empty a Redis DB asynchronously. What the function does actually is to * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ -void emptyDbAsync(redisDb *db) { - dict *oldht1 = db->pdict; - auto *set = db->setexpire; - db->setexpire = new (MALLOC_LOCAL) expireset(); - db->expireitr = db->setexpire->end(); - db->pdict = dictCreate(&dbDictType,NULL); +void redisDbPersistentData::emptyDbAsync() { + dict *oldht1 = m_pdict; + auto *set = m_setexpire; + m_setexpire = new (MALLOC_LOCAL) expireset(); + m_pdict = dictCreate(&dbDictType,NULL); + if (m_fTrackingChanges) + m_fAllChanged = true; atomicIncr(lazyfree_objects,dictSize(oldht1)); bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,set); } diff --git a/src/object.cpp b/src/object.cpp index a23ebf870..201e2ca9c 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1067,7 +1067,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; - mem = db->setexpire->bytes_used(); + mem = db->setexpire()->bytes_used(); mh->db[mh->num_dbs].overhead_ht_expires = mem; mem_total+=mem; @@ -1339,13 +1339,13 @@ NULL } } - auto pair = c->db->lookup_tuple(c->argv[2]); - if (pair.first == NULL) { + auto itr = c->db->find(c->argv[2]); + if (itr == nullptr) { addReplyNull(c, shared.nullbulk); return; } - size_t usage = objectComputeSize(pair.second,samples); - usage += sdsAllocSize((sds)pair.first); + size_t usage = objectComputeSize(itr.val(),samples); + usage += sdsAllocSize(itr.key()); usage += sizeof(dictEntry); addReplyLongLong(c,usage); } else if (!strcasecmp(szFromObj(c->argv[1]),"stats") && c->argc == 2) { diff --git a/src/rdb.cpp b/src/rdb.cpp index 1f1a4ee2b..c4ac55870 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1170,7 +1170,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { * these sizes are just hints to resize the hash tables. */ uint64_t db_size, expires_size; db_size = db->size(); - expires_size = db->setexpire->size(); + expires_size = db->expireSize(); if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; if (rdbSaveLen(rdb,db_size) == -1) goto werr; if (rdbSaveLen(rdb,expires_size) == -1) goto werr; @@ -1187,7 +1187,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { }); if (!fSavedAll) goto werr; - serverAssert(ckeysExpired == db->setexpire->size()); + serverAssert(ckeysExpired == db->expireSize()); } /* If we are storing the replication information on disk, persist diff --git a/src/server.cpp b/src/server.cpp index 8649beaaf..0a623c28e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1431,8 +1431,7 @@ int htNeedsResize(dict *dict) { /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL * we resize the hash table to save memory */ void tryResizeHashTables(int dbid) { - if (htNeedsResize(g_pserver->db[dbid].pdict)) - dictResize(g_pserver->db[dbid].pdict); + g_pserver->db[dbid].tryResize(); } /* Our hash table implementation performs rehashing incrementally while @@ -1442,10 +1441,10 @@ void tryResizeHashTables(int dbid) { * * The function returns 1 if some rehashing was performed, otherwise 0 * is returned. */ -int incrementallyRehash(int dbid) { +int redisDbPersistentData::incrementallyRehash() { /* Keys dictionary */ - if (dictIsRehashing(g_pserver->db[dbid].pdict)) { - dictRehashMilliseconds(g_pserver->db[dbid].pdict,1); + if (dictIsRehashing(m_pdict)) { + dictRehashMilliseconds(m_pdict,1); return 1; /* already used our millisecond for this loop... */ } return 0; @@ -1726,7 +1725,7 @@ void databasesCron(void) { /* Rehash */ if (g_pserver->activerehashing) { for (j = 0; j < dbs_per_call; j++) { - int work_done = incrementallyRehash(rehash_db); + int work_done = g_pserver->db[rehash_db].incrementallyRehash(); if (work_done) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ @@ -1887,7 +1886,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { size = g_pserver->db[j].slots(); used = g_pserver->db[j].size(); - vkeys = g_pserver->db[j].setexpire->size(); + vkeys = g_pserver->db[j].expireSize(); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(g_pserver->dict); */ @@ -4567,7 +4566,7 @@ sds genRedisInfoString(const char *section) { long long keys, vkeys; keys = g_pserver->db[j].size(); - vkeys = g_pserver->db[j].setexpire->size(); + vkeys = g_pserver->db[j].expireSize(); // Adjust TTL by the current time g_pserver->db[j].avg_ttl -= (g_pserver->mstime - g_pserver->db[j].last_expire_set); diff --git a/src/server.h b/src/server.h index b536cef3f..731f40a38 100644 --- a/src/server.h +++ b/src/server.h @@ -54,6 +54,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -1015,22 +1016,123 @@ typedef struct clientReplyBlock { #endif } clientReplyBlock; +struct dictEntry; +class dict_const_iter +{ + friend class redisDb; + friend class redisDbPersistentData; +protected: + dictEntry *de; +public: + explicit dict_const_iter(dictEntry *de) + : de(de) + {} + + const char *key() const { return de ? (const char*)dictGetKey(de) : nullptr; } + robj_roptr val() const { return de ? (robj*)dictGetVal(de) : nullptr; } + const robj* operator->() const { return de ? (robj*)dictGetVal(de) : nullptr; } + operator robj_roptr() const { return de ? (robj*)dictGetVal(de) : nullptr; } + + bool operator==(std::nullptr_t) const { return de == nullptr; } + bool operator!=(std::nullptr_t) const { return de != nullptr; } + bool operator==(const dict_const_iter &other) { return de == other.de; } +}; +class dict_iter : public dict_const_iter +{ +public: + explicit dict_iter(dictEntry *de) + : dict_const_iter(de) + {} + sds key() { return de ? (sds)dictGetKey(de) : nullptr; } + robj *val() { return de ? (robj*)dictGetVal(de) : nullptr; } + robj *operator->() { return de ? (robj*)dictGetVal(de) : nullptr; } + operator robj*() const { return de ? (robj*)dictGetVal(de) : nullptr; } +}; + +class redisDbPersistentData +{ +public: + static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2); + + size_t slots() const { return dictSlots(m_pdict); } + size_t size() const { return dictSize(m_pdict); } + void expand(uint64_t slots) { dictExpand(m_pdict, slots); } + + void trackkey(robj_roptr o) + { + trackkey(szFromObj(o)); + } + + void trackkey(const char *key) + { + if (m_fTrackingChanges) + m_setchanged.insert(key); + } + + dict_iter find(const char *key) + { + dictEntry *de = dictFind(m_pdict, key); + return dict_iter(de); + } + + dict_iter random() + { + dictEntry *de = dictGetRandomKey(m_pdict); + return dict_iter(de); + } + + const expireEntry &random_expire() + { + return m_setexpire->random_value(); + } + + auto findExpire(const char *key) + { + return m_setexpire->find(key); + } + + void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); } + void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); } + + bool insert(char *k, robj *o); + void tryResize(); + int incrementallyRehash(); + void updateValue(dict_iter itr, robj *val); + bool syncDelete(robj *key); + bool asyncDelete(robj *key); + size_t expireSize() const { return m_setexpire->size(); } + int removeExpire(robj *key, dict_iter itr); + void clear(void(callback)(void*)); + void emptyDbAsync(); + bool iterate(std::function &fn); + void setExpire(robj *key, robj *subkey, long long when); + void setExpire(expireEntry &&e); + void initialize(); + + dict *dictUnsafe() { return m_pdict; } + expireset *setexpireUnsafe() { return m_setexpire; } + const expireset *setexpire() { return m_setexpire; } + +private: + // Keyspace + dict *m_pdict; /* The keyspace for this DB */ + bool m_fTrackingChanges = false; + bool m_fAllChanged = false; + std::set m_setchanged; + + // Expire + expireset *m_setexpire; +}; + /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { // Legacy C API, Do not add more friend void tryResizeHashTables(int); - friend int incrementallyRehash(int); - friend int dbAddCore(redisDb *db, robj *key, robj *val); - friend void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); - friend void dbOverwrite(redisDb *db, robj *key, robj *val); - friend int dbMerge(redisDb *db, robj *key, robj *val, int fReplace); - friend void setKey(redisDb *db, robj *key, robj *val); friend int dbSyncDelete(redisDb *db, robj *key); friend int dbAsyncDelete(redisDb *db, robj *key); friend long long emptyDb(int dbnum, int flags, void(callback)(void*)); - friend void emptyDbAsync(redisDb *db); friend void scanGenericCommand(struct client *c, robj_roptr o, unsigned long cursor); friend int dbSwapDatabases(int id1, int id2); friend int removeExpire(redisDb *db, robj *key); @@ -1038,58 +1140,64 @@ typedef struct redisDb { friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e); friend void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); friend void activeDefragCycle(void); + friend int freeMemoryIfNeeded(void); + friend void activeExpireCycle(int); + friend void expireSlaveKeys(void); - redisDb() + typedef ::dict_const_iter const_iter; + typedef ::dict_iter iter; + + redisDb() : expireitr(nullptr) - {}; + {} void initialize(int id); - size_t slots() const { return dictSlots(pdict); } - size_t size() const { return dictSize(pdict); } - void expand(uint64_t slots) { dictExpand(pdict, slots); } + size_t slots() const { return m_persistentData.slots(); } + size_t size() const { return m_persistentData.size(); } + size_t expireSize() const { return m_persistentData.expireSize(); } + void expand(uint64_t slots) { m_persistentData.expand(slots); } + void tryResize() { m_persistentData.tryResize(); } + const expireset *setexpire() { return m_persistentData.setexpire(); } - robj *find(robj_roptr key) + iter find(robj_roptr key) { return find(szFromObj(key)); } - robj *find(const char *key) + iter find(const char *key) { - dictEntry *de = dictFind(pdict, key); - if (de != nullptr) - return (robj*)dictGetVal(de); - return nullptr; + return m_persistentData.find(key); } - std::pair lookup_tuple(robj_roptr key) + iter random() { - return lookup_tuple(szFromObj(key)); - } - std::pair lookup_tuple(const char *key) - { - dictEntry *de = dictFind(pdict, key); - if (de != nullptr) - return std::make_pair((const char*)dictGetKey(de), (robj*)dictGetVal(de)); - return std::make_pair(nullptr, nullptr); + return m_persistentData.random(); } - std::pair random() + const expireEntry &random_expire() { - dictEntry *de = dictGetRandomKey(pdict); - if (de != nullptr) - return std::make_pair((const char*)dictGetKey(de), (robj*)dictGetVal(de)); - return std::make_pair(nullptr, nullptr); + return m_persistentData.random_expire(); } - bool iterate(std::function fn); - void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, pdict); } + const_iter end() { return const_iter(nullptr); } + bool iterate(std::function fn) { return m_persistentData.iterate(fn); } + void getStats(char *buf, size_t bufsize) { m_persistentData.getStats(buf, bufsize); } + void getExpireStats(char *buf, size_t bufsize) { m_persistentData.getExpireStats(buf, bufsize); } + + bool insert(char *key, robj *o) { return m_persistentData.insert(key, o); } + void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); + + int incrementallyRehash() { return m_persistentData.incrementallyRehash(); }; + + bool FKeyExpires(const char *key); + size_t clear(bool fAsync, void(callback)(void*)); + dict *dictUnsafe() { return m_persistentData.dictUnsafe(); } + expireEntry *getExpire(robj_roptr key); private: - dict *pdict; /* The keyspace for this DB */ + redisDbPersistentData m_persistentData; public: - expireset *setexpire; expireset::setiter expireitr; - dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ @@ -2476,7 +2584,6 @@ int rewriteConfig(char *path); /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); -int removeExpireCore(redisDb *db, robj *key, dictEntry *de); void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); expireEntry *getExpire(redisDb *db, robj_roptr key); @@ -2521,7 +2628,6 @@ void slotToKeyAdd(robj *key); void slotToKeyDel(robj *key); void slotToKeyFlush(void); int dbAsyncDelete(redisDb *db, robj *key); -void emptyDbAsync(redisDb *db); void slotToKeyFlushAsync(void); size_t lazyfreeGetPendingObjectsCount(void); void freeObjAsync(robj *o); From 4325aae6d3cf2a43ddd3228b082180dcbe5330b4 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 20 Sep 2019 14:52:22 -0400 Subject: [PATCH 3/3] Add the IStorage interface and wire it up Former-commit-id: cbd4b4df1919f97f0d176f1ddc6b49b8afe7e001 --- src/IStorage.h | 14 +++++++ src/db.cpp | 89 +++++++++++++++++++++++++++++++++++++++++--- src/defrag.cpp | 7 +++- src/evict.cpp | 5 ++- src/expire.cpp | 6 +-- src/lazyfree.cpp | 2 + src/object.cpp | 51 +++++++++++++++++++++++++ src/semiorderedset.h | 4 +- src/server.cpp | 4 ++ src/server.h | 32 +++++++++++++--- 10 files changed, 194 insertions(+), 20 deletions(-) create mode 100644 src/IStorage.h diff --git a/src/IStorage.h b/src/IStorage.h new file mode 100644 index 000000000..b12b98260 --- /dev/null +++ b/src/IStorage.h @@ -0,0 +1,14 @@ +#pragma once +#include + +class IStorage +{ +public: + typedef std::function callback; + + virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0; + virtual void erase(const char *key, size_t cchKey) = 0; + virtual void retrieve(const char *key, size_t cchKey, bool fDelete, callback fn) = 0; + virtual size_t clear() = 0; + virtual void enumerate(callback fn) = 0; +}; diff --git a/src/db.cpp b/src/db.cpp index c66197aaf..2f8f5d8a9 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -87,6 +87,7 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) { if (flags & LOOKUP_UPDATEMVCC) { val->mvcc_tstamp = getMvccTstamp(); + db->trackkey(key); } return val; } else { @@ -800,7 +801,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { /* Handle the case of a hash table. */ ht = NULL; if (o == nullptr) { - ht = c->db->dictUnsafe(); + ht = c->db->dictUnsafeKeyOnly(); } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { ht = (dict*)ptrFromObj(o); } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { @@ -1221,12 +1222,12 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) { if (!val->FExpires()) return 0; + trackkey(key); auto itrExpire = m_setexpire->find(itr.key()); serverAssert(itrExpire != m_setexpire->end()); serverAssert(itrExpire->key() == itr.key()); m_setexpire->erase(itrExpire); val->SetFExpires(false); - trackkey(key); return 1; } @@ -1772,13 +1773,13 @@ void redisDbPersistentData::initialize() m_pdict = dictCreate(&dbDictType,NULL); m_setexpire = new(MALLOC_LOCAL) expireset(); m_fAllChanged = false; - m_fTrackingChanges = false; + m_fTrackingChanges = 0; } void redisDb::initialize(int id) { m_persistentData.initialize(); - this->expireitr = m_persistentData.setexpireUnsafe()->end(); + this->expireitr = m_persistentData.setexpire()->end(); this->blocking_keys = dictCreate(&keylistDictType,NULL); this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); this->watched_keys = dictCreate(&keylistDictType,NULL); @@ -1810,7 +1811,7 @@ size_t redisDb::clear(bool fAsync, void(callback)(void*)) } else { m_persistentData.clear(callback); } - expireitr = m_persistentData.setexpireUnsafe()->end(); + expireitr = m_persistentData.setexpire()->end(); return removed; } @@ -1821,6 +1822,8 @@ void redisDbPersistentData::clear(void(callback)(void*)) m_fAllChanged = true; delete m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(); + if (m_pstorage != nullptr) + m_pstorage->clear(); } /* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2) @@ -1830,11 +1833,13 @@ void redisDbPersistentData::clear(void(callback)(void*)) db1->m_fTrackingChanges = db2->m_fTrackingChanges; db1->m_fAllChanged = db2->m_fAllChanged; db1->m_setexpire = db2->m_setexpire; + db1->m_pstorage = db2->m_pstorage; db2->m_pdict = aux.m_pdict; db2->m_fTrackingChanges = aux.m_fTrackingChanges; db2->m_fAllChanged = aux.m_fAllChanged; db2->m_setexpire = aux.m_setexpire; + db2->m_pstorage = aux.m_pstorage; } void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) @@ -1842,6 +1847,7 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) /* Reuse the sds from the main dict in the expire dict */ dictEntry *kde = dictFind(m_pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,kde != NULL); + trackkey(key); if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) { @@ -1868,15 +1874,86 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) void redisDbPersistentData::setExpire(expireEntry &&e) { + trackkey(e.key()); m_setexpire->insert(e); } bool redisDb::FKeyExpires(const char *key) { - return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpireUnsafe()->end(); + return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpire()->end(); } void redisDbPersistentData::updateValue(dict_iter itr, robj *val) { + trackkey(itr.key()); dictSetVal(m_pdict, itr.de, val); +} + +void redisDbPersistentData::ensure(dictEntry *de) +{ + if (de != nullptr && dictGetVal(de) == nullptr) + { + serverAssert(m_pstorage != nullptr); + sds key = (sds)dictGetKey(de); + m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){ + robj *o = deserializeStoredObject(data, cb); + serverAssert(o != nullptr); + dictSetVal(m_pdict, de, o); + }); + } +} + +void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o) +{ + sds temp = serializeStoredObject(o); + m_pstorage->insert(szKey, cchKey, temp, sdslen(temp)); + sdsfree(temp); +} + +void redisDbPersistentData::storeDatabase() +{ + dictIterator *di = dictGetIterator(m_pdict); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + sds key = (sds)dictGetKey(de); + robj *o = (robj*)dictGetVal(de); + storeKey(key, sdslen(key), o); + } + dictReleaseIterator(di); +} + +void redisDbPersistentData::processChanges() +{ + --m_fTrackingChanges; + serverAssert(m_fTrackingChanges >= 0); + + if (m_pstorage != nullptr) + { + if (m_fTrackingChanges == 0) + { + if (m_fAllChanged) + { + m_pstorage->clear(); + storeDatabase(); + } + else + { + for (auto &str : m_setchanged) + { + sds sdsKey = sdsnewlen(str.data(), str.size()); + robj *o = find(sdsKey); + if (o != nullptr) + { + storeKey(str.data(), str.size(), o); + } + else + { + m_pstorage->erase(str.data(), str.size()); + } + sdsfree(sdsKey); + } + } + } + } + m_setchanged.clear(); } \ No newline at end of file diff --git a/src/defrag.cpp b/src/defrag.cpp index adb354b3b..62a5b6f59 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -787,6 +787,8 @@ long defragKey(redisDb *db, dictEntry *de) { /* Try to defrag robj and / or string value. */ ob = (robj*)dictGetVal(de); + if (ob == nullptr) + return defragged; if ((newob = activeDefragStringOb(ob, &defragged))) { de->v.val = newob; ob = newob; @@ -853,7 +855,7 @@ void defragScanCallback(void *privdata, const dictEntry *de) { g_pserver->stat_active_defrag_scanned++; } -/* Defrag scan callback for each hash table bicket, +/* Defrag scan callback for each hash table bucket, * used in order to defrag the dictEntry allocations. */ void defragDictBucketCallback(void *privdata, dictEntry **bucketref) { UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */ @@ -1111,7 +1113,8 @@ void activeDefragCycle(void) { break; /* this will exit the function and we'll continue on the next cycle */ } - cursor = dictScan(db->dictUnsafe(), cursor, defragScanCallback, defragDictBucketCallback, db); + // we actually look at the objects too but defragScanCallback can handle missing values + cursor = dictScan(db->dictUnsafeKeyOnly(), cursor, defragScanCallback, defragDictBucketCallback, db); /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys * (if we have a lot of pointers in one hash bucket or rehasing), diff --git a/src/evict.cpp b/src/evict.cpp index a9a0e4511..03049c36f 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -256,15 +256,16 @@ void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct ev { if (setexpire != nullptr) { - visitFunctor visitor { dbid, db->m_persistentData.dictUnsafe(), pool, 0 }; + visitFunctor visitor { dbid, db->m_persistentData.dictUnsafeKeyOnly(), pool, 0 }; setexpire->random_visit(visitor); } else { dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); - int count = dictGetSomeKeys(db->m_persistentData.dictUnsafe(),samples,g_pserver->maxmemory_samples); + int count = dictGetSomeKeys(db->m_persistentData.dictUnsafeKeyOnly(),samples,g_pserver->maxmemory_samples); for (int j = 0; j < count; j++) { robj *o = (robj*)dictGetVal(samples[j]); + serverAssert(o != nullptr); // BUG!!! We have to get the info we need here without permanently rehydrating the obj processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); } } diff --git a/src/expire.cpp b/src/expire.cpp index 3803d8ac8..1be7ca89a 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -359,12 +359,12 @@ void expireSlaveKeys(void) { // the expire is hashed based on the key pointer, so we need the point in the main db auto itrDB = db->find(keyname); - auto itrExpire = db->m_persistentData.setexpireUnsafe()->end(); + auto itrExpire = db->m_persistentData.setexpire()->end(); if (itrDB != nullptr) itrExpire = db->m_persistentData.setexpireUnsafe()->find(itrDB.key()); int expired = 0; - if (itrExpire != db->m_persistentData.setexpireUnsafe()->end()) + if (itrExpire != db->m_persistentData.setexpire()->end()) { if (itrExpire->when() < start) { activeExpireCycleExpire(g_pserver->db+dbid,*itrExpire,start); @@ -376,7 +376,7 @@ void expireSlaveKeys(void) { * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ - if (itrExpire != db->m_persistentData.setexpireUnsafe()->end() && !expired) { + if (itrExpire != db->m_persistentData.setexpire()->end() && !expired) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 471fb6260..89cd65540 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -116,6 +116,8 @@ void redisDbPersistentData::emptyDbAsync() { auto *set = m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(); m_pdict = dictCreate(&dbDictType,NULL); + if (m_pstorage != nullptr) + m_pstorage->clear(); if (m_fTrackingChanges) m_fAllChanged = true; atomicIncr(lazyfree_objects,dictSize(oldht1)); diff --git a/src/object.cpp b/src/object.cpp index 201e2ca9c..f178b827b 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1489,3 +1489,54 @@ void redisObject::setrefcount(unsigned ref) serverAssert(!FExpires()); refcount.store(ref, std::memory_order_relaxed); } + +sds serializeStoredStringObject(robj_roptr o) +{ + sds str = sdsempty(); + sdscatlen(str, &(*o), sizeof(robj)); + sdscat(str, szFromObj(o)); + return str; +} + +robj *deserializeStoredStringObject(const char *data, size_t cb) +{ + const robj *oT = (const robj*)data; + robj *newObject = nullptr; + switch (oT->encoding) + { + case OBJ_ENCODING_EMBSTR: + newObject = (robj*)zmalloc(cb, MALLOC_LOCAL); + memcpy(newObject, data, cb); + return newObject; + + case OBJ_ENCODING_RAW: + newObject = (robj*)zmalloc(sizeof(robj), MALLOC_SHARED); + memcpy(newObject, data, sizeof(robj)); + newObject->m_ptr = sdsnewlen(SDS_NOINIT,cb-sizeof(robj)); + memcpy(newObject->m_ptr, data+sizeof(robj), cb-sizeof(robj)); + return newObject; + } + serverPanic("Unknown string object encoding from storage"); + return nullptr; +} + +robj *deserializeStoredObject(const void *data, size_t cb) +{ + const robj *oT = (const robj*)data; + switch (oT->type) + { + case OBJ_STRING: + return deserializeStoredStringObject((char*)data, cb); + } + serverPanic("Unknown object type loading from storage"); +} + +sds serializeStoredObject(robj_roptr o) +{ + switch (o->type) + { + case OBJ_STRING: + return serializeStoredStringObject(o); + } + serverPanic("Attempting to store unknown object type"); +} \ No newline at end of file diff --git a/src/semiorderedset.h b/src/semiorderedset.h index 00a1f1d91..da704c4ce 100644 --- a/src/semiorderedset.h +++ b/src/semiorderedset.h @@ -93,9 +93,9 @@ public: return end(); } - setiter end() + setiter end() const { - setiter itr(this); + setiter itr(const_cast*>(this)); itr.idxPrimary = m_data.size(); return itr; } diff --git a/src/server.cpp b/src/server.cpp index 0a623c28e..b86cce9a0 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3689,7 +3689,11 @@ int processCommand(client *c, int callFlags) { addReply(c,shared.queued); } else { std::unique_lockdb->lock)> ulock(c->db->lock); + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].trackChanges(); call(c,callFlags); + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb].processChanges(); c->woff = g_pserver->master_repl_offset; if (listLength(g_pserver->ready_keys)) handleClientsBlockedOnKeys(); diff --git a/src/server.h b/src/server.h index 731f40a38..9f4f65db8 100644 --- a/src/server.h +++ b/src/server.h @@ -90,6 +90,7 @@ typedef long long mstime_t; /* millisecond time type. */ #include "sha1.h" #include "endianconv.h" #include "crc64.h" +#include "IStorage.h" extern int g_fTestMode; @@ -1065,19 +1066,21 @@ public: void trackkey(const char *key) { - if (m_fTrackingChanges) - m_setchanged.insert(key); + if (m_fTrackingChanges && !m_fAllChanged) + m_setchanged.insert(std::string(key, sdslen(key))); } dict_iter find(const char *key) { dictEntry *de = dictFind(m_pdict, key); + ensure(de); return dict_iter(de); } dict_iter random() { dictEntry *de = dictGetRandomKey(m_pdict); + ensure(de); return dict_iter(de); } @@ -1109,16 +1112,27 @@ public: void setExpire(expireEntry &&e); void initialize(); - dict *dictUnsafe() { return m_pdict; } + void trackChanges() { m_fTrackingChanges++; } + void processChanges(); + + // This should only be used if you look at the key, we do not fixup + // objects stored elsewhere + dict *dictUnsafeKeyOnly() { return m_pdict; } + expireset *setexpireUnsafe() { return m_setexpire; } const expireset *setexpire() { return m_setexpire; } private: + void ensure(dictEntry *de); + void storeDatabase(); + void storeKey(const char *key, size_t cchKey, robj *o); + // Keyspace dict *m_pdict; /* The keyspace for this DB */ - bool m_fTrackingChanges = false; + int m_fTrackingChanges = 0; // Note: Stack based bool m_fAllChanged = false; std::set m_setchanged; + IStorage *m_pstorage = nullptr; // Expire expireset *m_setexpire; @@ -1159,6 +1173,10 @@ typedef struct redisDb { void expand(uint64_t slots) { m_persistentData.expand(slots); } void tryResize() { m_persistentData.tryResize(); } const expireset *setexpire() { return m_persistentData.setexpire(); } + + void trackChanges() { m_persistentData.trackChanges(); } + void processChanges() { m_persistentData.processChanges(); } + void trackkey(robj_roptr o) { m_persistentData.trackkey(o); } iter find(robj_roptr key) { @@ -1192,7 +1210,7 @@ typedef struct redisDb { bool FKeyExpires(const char *key); size_t clear(bool fAsync, void(callback)(void*)); - dict *dictUnsafe() { return m_persistentData.dictUnsafe(); } + dict *dictUnsafeKeyOnly() { return m_persistentData.dictUnsafeKeyOnly(); } expireEntry *getExpire(robj_roptr key); private: redisDbPersistentData m_persistentData; @@ -2315,6 +2333,10 @@ int collateStringObjects(robj *a, robj *b); int equalStringObjects(robj *a, robj *b); unsigned long long estimateObjectIdleTime(robj_roptr o); void trimStringObjectIfNeeded(robj *o); + +robj *deserializeStoredObject(const void *data, size_t cb); +sds serializeStoredObject(robj_roptr o); + #define sdsEncodedObject(objptr) (objptr->encoding == OBJ_ENCODING_RAW || objptr->encoding == OBJ_ENCODING_EMBSTR) /* Synchronous I/O with timeout */