From f10e05f1bf5a2ec0c3aa38e997f192181f3689ba Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 19 Sep 2019 22:36:29 -0400 Subject: [PATCH] Isolate the persistent parts of the database from the runtime parts Former-commit-id: 08995b065cb0ed6df16528e73c7acb28bcf3c1f4 --- src/db.cpp | 315 +++++++++++++++++++++++++++++------------------ src/debug.cpp | 8 +- src/defrag.cpp | 6 +- src/evict.cpp | 22 ++-- src/expire.cpp | 20 +-- src/lazyfree.cpp | 31 +++-- src/object.cpp | 10 +- src/rdb.cpp | 4 +- src/server.cpp | 15 ++- src/server.h | 184 +++++++++++++++++++++------ 10 files changed, 397 insertions(+), 218 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index a87cedb29..c66197aaf 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -41,7 +41,6 @@ int keyIsExpired(redisDb *db, robj *key); int expireIfNeeded(redisDb *db, robj *key, robj *o); -void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. @@ -57,8 +56,7 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew) serverAssert(valOld->FExpires()); serverAssert(!valNew->FExpires()); - auto itr = db->setexpire->find(key); - serverAssert(itr != db->setexpire->end()); + serverAssert(db->FKeyExpires((const char*)key)); valNew->SetFExpires(true); valOld->SetFExpires(false); @@ -70,8 +68,9 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew) * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ static robj *lookupKey(redisDb *db, robj *key, int flags) { - robj *val = db->find(key); - if (val) { + auto itr = db->find(key); + if (itr) { + robj *val = itr.val(); /* Update the access time for the ageing algorithm. * Don't do it if we have a saving child, as this will trigger * a copy on write madness. */ @@ -193,13 +192,13 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { return o; } -int dbAddCore(redisDb *db, robj *key, robj *val) { +bool dbAddCore(redisDb *db, robj *key, robj *val) { serverAssert(!val->FExpires()); sds copy = sdsdup(szFromObj(key)); - int retval = dictAdd(db->pdict, copy, val); + bool fInserted = db->insert(copy, val); val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); - if (retval == DICT_OK) + if (fInserted) { if (val->type == OBJ_LIST || val->type == OBJ_ZSET) @@ -211,7 +210,7 @@ int dbAddCore(redisDb *db, robj *key, robj *val) { sdsfree(copy); } - return retval; + return fInserted; } /* Add the key to the DB. It's up to the caller to increment the reference @@ -220,23 +219,22 @@ int dbAddCore(redisDb *db, robj *key, robj *val) { * The program is aborted if the key already exists. */ void dbAdd(redisDb *db, robj *key, robj *val) { - int retval = dbAddCore(db, key, val); - serverAssertWithInfo(NULL,key,retval == DICT_OK); + bool fInserted = dbAddCore(db, key, val); + serverAssertWithInfo(NULL,key,fInserted); } -void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire) +void redisDb::dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire) { - dictEntry auxentry = *de; - robj *old = (robj*)dictGetVal(de); + robj *old = itr.val(); if (old->FExpires()) { if (fRemoveExpire) { - removeExpire(db, key); + removeExpire(this, key); } else { if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) val = dupStringObject(val); - updateExpire(db, (sds)dictGetKey(de), old, val); + updateExpire(this, itr.key(), old, val); } } @@ -249,14 +247,12 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd val->mvcc_tstamp = getMvccTstamp(); } - dictSetVal(db->pdict, de, val); + if (g_pserver->lazyfree_lazy_server_del) + freeObjAsync(itr.val()); + else + decrRefCount(itr.val()); - if (g_pserver->lazyfree_lazy_server_del) { - freeObjAsync(old); - dictSetVal(db->pdict, &auxentry, NULL); - } - - dictFreeVal(db->pdict, &auxentry); + m_persistentData.updateValue(itr, val); } /* Overwrite an existing key with a new value. Incrementing the reference @@ -265,10 +261,10 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd * * The program is aborted if the key was not already present. */ void dbOverwrite(redisDb *db, robj *key, robj *val) { - dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); + auto itr = db->find(key); - serverAssertWithInfo(NULL,key,de != NULL); - dbOverwriteCore(db, de, key, val, !!g_pserver->fActiveReplica, false); + serverAssertWithInfo(NULL,key,itr != nullptr); + db->dbOverwriteCore(itr, key, val, !!g_pserver->fActiveReplica, false); } /* Insert a key, handling duplicate keys according to fReplace */ @@ -276,14 +272,14 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) { if (fReplace) { - dictEntry *de = dictFind(db->pdict, ptrFromObj(key)); - if (de == nullptr) - return (dbAddCore(db, key, val) == DICT_OK); + auto itr = db->find(key); + if (itr == nullptr) + return (dbAddCore(db, key, val) == true); - robj *old = (robj*)dictGetVal(de); + robj *old = itr.val(); if (old->mvcc_tstamp <= val->mvcc_tstamp) { - dbOverwriteCore(db, de, key, val, false, true); + db->dbOverwriteCore(itr, key, val, false, true); return true; } @@ -291,7 +287,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) } else { - return (dbAddCore(db, key, val) == DICT_OK); + return (dbAddCore(db, key, val) == true); } } @@ -304,11 +300,11 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) * * All the new keys in the database should be created via this interface. */ void setKey(redisDb *db, robj *key, robj *val) { - dictEntry *de = dictFind(db->pdict, ptrFromObj(key)); - if (de == NULL) { + auto itr = db->find(key); + if (itr == NULL) { dbAdd(db,key,val); } else { - dbOverwriteCore(db,de,key,val,!!g_pserver->fActiveReplica,true); + db->dbOverwriteCore(itr,key,val,!!g_pserver->fActiveReplica,true); } incrRefCount(val); signalModifiedKey(db,key); @@ -324,19 +320,19 @@ int dbExists(redisDb *db, robj *key) { * The function makes sure to return keys not already expired. */ robj *dbRandomKey(redisDb *db) { int maxtries = 100; - bool allvolatile = db->size() == db->setexpire->size(); + bool allvolatile = db->expireSize() == db->size(); while(1) { sds key; robj *keyobj; - auto pair = db->random(); - if (pair.first == NULL) return NULL; + auto itr = db->random(); + if (itr == nullptr) return NULL; - key = (sds)pair.first; + key = itr.key(); keyobj = createStringObject(key,sdslen(key)); - if (pair.second->FExpires()) + if (itr.val()->FExpires()) { if (allvolatile && listLength(g_pserver->masters) && --maxtries == 0) { /* If the DB is composed only of keys with an expire set, @@ -351,7 +347,7 @@ robj *dbRandomKey(redisDb *db) { } } - if (pair.second->FExpires()) + if (itr.val()->FExpires()) { if (expireIfNeeded(db,keyobj)) { decrRefCount(keyobj); @@ -363,15 +359,16 @@ robj *dbRandomKey(redisDb *db) { } } -/* Delete a key, value, and associated expiration entry if any, from the DB */ -int dbSyncDelete(redisDb *db, robj *key) { +bool redisDbPersistentData::syncDelete(robj *key) +{ /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ - dictEntry *de = dictFind(db->pdict, szFromObj(key)); - if (de != nullptr && ((robj*)dictGetVal(de))->FExpires()) - removeExpireCore(db, key, de); - if (dictDelete(db->pdict,ptrFromObj(key)) == DICT_OK) { + auto itr = find(szFromObj(key)); + trackkey(szFromObj(key)); + if (itr != nullptr && itr.val()->FExpires()) + removeExpire(key, itr); + if (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) { if (g_pserver->cluster_enabled) slotToKeyDel(key); return 1; } else { @@ -379,6 +376,11 @@ int dbSyncDelete(redisDb *db, robj *key) { } } +/* Delete a key, value, and associated expiration entry if any, from the DB */ +int dbSyncDelete(redisDb *db, robj *key) { + return db->m_persistentData.syncDelete(key); +} + /* This is a wrapper whose behavior depends on the Redis lazy free * configuration. Deletes the key synchronously or asynchronously. */ int dbDelete(redisDb *db, robj *key) { @@ -456,15 +458,7 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { } for (int j = startdb; j <= enddb; j++) { - removed += dictSize(g_pserver->db[j].pdict); - if (async) { - emptyDbAsync(&g_pserver->db[j]); - } else { - dictEmpty(g_pserver->db[j].pdict,callback); - delete g_pserver->db[j].setexpire; - g_pserver->db[j].setexpire = new (MALLOC_LOCAL) expireset(); - g_pserver->db[j].expireitr = g_pserver->db[j].setexpire->end(); - } + removed += g_pserver->db[j].clear(!!async, callback); } if (g_pserver->cluster_enabled) { if (async) { @@ -633,9 +627,9 @@ void randomkeyCommand(client *c) { } -bool redisDb::iterate(std::function fn) +bool redisDbPersistentData::iterate(std::function &fn) { - dictIterator *di = dictGetSafeIterator(pdict); + dictIterator *di = dictGetSafeIterator(m_pdict); dictEntry *de = nullptr; bool fResult = true; while((de = dictNext(di)) != nullptr) @@ -806,7 +800,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { /* Handle the case of a hash table. */ ht = NULL; if (o == nullptr) { - ht = c->db->pdict; + ht = c->db->dictUnsafe(); } else if (o->type == OBJ_SET && o->encoding == OBJ_ENCODING_HT) { ht = (dict*)ptrFromObj(o); } else if (o->type == OBJ_HASH && o->encoding == OBJ_ENCODING_HT) { @@ -1159,17 +1153,14 @@ int dbSwapDatabases(int id1, int id2) { /* Swap hash tables. Note that we don't swap blocking_keys, * ready_keys and watched_keys, since we want clients to * remain in the same DB they were. */ - db1->pdict = db2->pdict; - db1->setexpire = db2->setexpire; - db1->expireitr = db2->expireitr; + redisDbPersistentData::swap(&db1->m_persistentData, &db2->m_persistentData); db1->avg_ttl = db2->avg_ttl; db1->last_expire_set = db2->last_expire_set; + db1->expireitr = db2->expireitr; - db2->pdict = aux.pdict; - db2->setexpire = aux.setexpire; - db2->expireitr = aux.expireitr; db2->avg_ttl = aux.avg_ttl; db2->last_expire_set = aux.last_expire_set; + db2->expireitr = aux.expireitr; /* Now we need to handle clients blocked on lists: as an effect * of swapping the two DBs, a client that was waiting for list @@ -1218,23 +1209,24 @@ void swapdbCommand(client *c) { * Expires API *----------------------------------------------------------------------------*/ int removeExpire(redisDb *db, robj *key) { - dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); - return removeExpireCore(db, key, de); + auto itr = db->find(key); + return db->m_persistentData.removeExpire(key, itr); } -int removeExpireCore(redisDb *db, robj *key, dictEntry *de) { +int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) { /* An expire may only be removed if there is a corresponding entry in the * main dict. Otherwise, the key will never be freed. */ - serverAssertWithInfo(NULL,key,de != NULL); + serverAssertWithInfo(NULL,key,itr != nullptr); - robj *val = (robj*)dictGetVal(de); + robj *val = itr.val(); if (!val->FExpires()) return 0; - auto itr = db->setexpire->find((sds)dictGetKey(de)); - serverAssert(itr != db->setexpire->end()); - serverAssert(itr->key() == (sds)dictGetKey(de)); - db->setexpire->erase(itr); + auto itrExpire = m_setexpire->find(itr.key()); + serverAssert(itrExpire != m_setexpire->end()); + serverAssert(itrExpire->key() == itr.key()); + m_setexpire->erase(itrExpire); val->SetFExpires(false); + trackkey(key); return 1; } @@ -1243,49 +1235,23 @@ int removeExpireCore(redisDb *db, robj *key, dictEntry *de) { * to NULL. The 'when' parameter is the absolute unix time in milliseconds * after which the key will no longer be considered valid. */ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) { - dictEntry *kde; - serverAssert(GlobalLocksAcquired()); - /* Reuse the sds from the main dict in the expire dict */ - kde = dictFind(db->pdict,ptrFromObj(key)); - serverAssertWithInfo(NULL,key,kde != NULL); - - if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) - { - // shared objects cannot have the expire bit set, create a real object - dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); - } - /* Update TTL stats (exponential moving average) */ /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ long long now = g_pserver->mstime; db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed - if (db->setexpire->empty()) + if (db->expireSize() == 0) db->avg_ttl = 0; else - db->avg_ttl -= db->avg_ttl / db->setexpire->size(); // slide one entry out the window + db->avg_ttl -= db->avg_ttl / db->expireSize(); // slide one entry out the window if (db->avg_ttl < 0) db->avg_ttl = 0; // TTLs are never negative - db->avg_ttl += (double)(when-now) / (db->setexpire->size()+1); // add the new entry + db->avg_ttl += (double)(when-now) / (db->expireSize()+1); // add the new entry db->last_expire_set = now; /* Update the expire set */ - const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr; - if (((robj*)dictGetVal(kde))->FExpires()) { - auto itr = db->setexpire->find((sds)dictGetKey(kde)); - serverAssert(itr != db->setexpire->end()); - expireEntry eNew(std::move(*itr)); - eNew.update(szSubKey, when); - db->setexpire->erase(itr); - db->setexpire->insert(eNew); - } - else - { - expireEntry e((sds)dictGetKey(kde), szSubKey, when); - ((robj*)dictGetVal(kde))->SetFExpires(true); - db->setexpire->insert(e); - } + db->m_persistentData.setExpire(key, subkey, when); int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; if (c && writable_slave && !(c->flags & CLIENT_MASTER)) @@ -1294,26 +1260,24 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) { - dictEntry *kde; - serverAssert(GlobalLocksAcquired()); /* Reuse the sds from the main dict in the expire dict */ - kde = dictFind(db->pdict,ptrFromObj(key)); + auto kde = db->find(key); serverAssertWithInfo(NULL,key,kde != NULL); - if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + if (kde.val()->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) { // shared objects cannot have the expire bit set, create a real object - dictSetVal(db->pdict, kde, dupStringObject((robj*)dictGetVal(kde))); + db->m_persistentData.updateValue(kde, dupStringObject(kde.val())); } - if (((robj*)dictGetVal(kde))->FExpires()) + if (kde.val()->FExpires()) removeExpire(db, key); - e.setKeyUnsafe((sds)dictGetKey(kde)); - db->setexpire->insert(e); - ((robj*)dictGetVal(kde))->SetFExpires(true); + e.setKeyUnsafe(kde.key()); + db->m_persistentData.setExpire(std::move(e)); + kde.val()->SetFExpires(true); int writable_slave = listLength(g_pserver->masters) && g_pserver->repl_slave_ro == 0; @@ -1323,19 +1287,24 @@ void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) /* Return the expire time of the specified key, or null if no expire * is associated with this key (i.e. the key is non volatile) */ -expireEntry *getExpire(redisDb *db, robj_roptr key) { +expireEntry *redisDb::getExpire(robj_roptr key) { /* No expire? return ASAP */ - if (db->setexpire->size() == 0) + if (expireSize() == 0) return nullptr; - auto pair = db->lookup_tuple(key); - if (pair.first == nullptr) + auto itr = find(key); + if (itr == nullptr) return nullptr; - if (!pair.second->FExpires()) + if (!itr.val()->FExpires()) return nullptr; - auto itr = db->setexpire->find(pair.first); - return itr.operator->(); + auto itrExpire = m_persistentData.findExpire(itr.key()); + return itrExpire.operator->(); +} + +expireEntry *getExpire(redisDb *db, robj_roptr key) +{ + return db->getExpire(key); } /* Propagate expires into slaves and the AOF file. @@ -1798,11 +1767,18 @@ unsigned int countKeysInSlot(unsigned int hashslot) { return g_pserver->cluster->slots_keys_count[hashslot]; } +void redisDbPersistentData::initialize() +{ + m_pdict = dictCreate(&dbDictType,NULL); + m_setexpire = new(MALLOC_LOCAL) expireset(); + m_fAllChanged = false; + m_fTrackingChanges = false; +} + void redisDb::initialize(int id) { - this->pdict = dictCreate(&dbDictType,NULL); - this->setexpire = new(MALLOC_LOCAL) expireset(); - this->expireitr = this->setexpire->end(); + m_persistentData.initialize(); + this->expireitr = m_persistentData.setexpireUnsafe()->end(); this->blocking_keys = dictCreate(&keylistDictType,NULL); this->ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL); this->watched_keys = dictCreate(&keylistDictType,NULL); @@ -1811,3 +1787,96 @@ void redisDb::initialize(int id) this->last_expire_set = 0; this->defrag_later = listCreate(); } + +bool redisDbPersistentData::insert(char *key, robj *o) +{ + int res = dictAdd(m_pdict, key, o); + if (res == DICT_OK) + trackkey(key); + return (res == DICT_OK); +} + +void redisDbPersistentData::tryResize() +{ + if (htNeedsResize(m_pdict)) + dictResize(m_pdict); +} + +size_t redisDb::clear(bool fAsync, void(callback)(void*)) +{ + size_t removed = m_persistentData.size(); + if (fAsync) { + m_persistentData.emptyDbAsync(); + } else { + m_persistentData.clear(callback); + } + expireitr = m_persistentData.setexpireUnsafe()->end(); + return removed; +} + +void redisDbPersistentData::clear(void(callback)(void*)) +{ + dictEmpty(m_pdict,callback); + if (m_fTrackingChanges) + m_fAllChanged = true; + delete m_setexpire; + m_setexpire = new (MALLOC_LOCAL) expireset(); +} + +/* static */ void redisDbPersistentData::swap(redisDbPersistentData *db1, redisDbPersistentData *db2) +{ + redisDbPersistentData aux = *db1; + db1->m_pdict = db2->m_pdict; + db1->m_fTrackingChanges = db2->m_fTrackingChanges; + db1->m_fAllChanged = db2->m_fAllChanged; + db1->m_setexpire = db2->m_setexpire; + + db2->m_pdict = aux.m_pdict; + db2->m_fTrackingChanges = aux.m_fTrackingChanges; + db2->m_fAllChanged = aux.m_fAllChanged; + db2->m_setexpire = aux.m_setexpire; +} + +void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) +{ + /* Reuse the sds from the main dict in the expire dict */ + dictEntry *kde = dictFind(m_pdict,ptrFromObj(key)); + serverAssertWithInfo(NULL,key,kde != NULL); + + if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) + { + // shared objects cannot have the expire bit set, create a real object + dictSetVal(m_pdict, kde, dupStringObject((robj*)dictGetVal(kde))); + } + + const char *szSubKey = (subkey != nullptr) ? szFromObj(subkey) : nullptr; + if (((robj*)dictGetVal(kde))->FExpires()) { + auto itr = m_setexpire->find((sds)dictGetKey(kde)); + serverAssert(itr != m_setexpire->end()); + expireEntry eNew(std::move(*itr)); + eNew.update(szSubKey, when); + m_setexpire->erase(itr); + m_setexpire->insert(eNew); + } + else + { + expireEntry e((sds)dictGetKey(kde), szSubKey, when); + ((robj*)dictGetVal(kde))->SetFExpires(true); + m_setexpire->insert(e); + } +} + +void redisDbPersistentData::setExpire(expireEntry &&e) +{ + m_setexpire->insert(e); +} + +bool redisDb::FKeyExpires(const char *key) +{ + return m_persistentData.setexpireUnsafe()->find(key) != m_persistentData.setexpireUnsafe()->end(); +} + +void redisDbPersistentData::updateValue(dict_iter itr, robj *val) +{ + dictSetVal(m_pdict, itr.de, val); +} \ No newline at end of file diff --git a/src/debug.cpp b/src/debug.cpp index 3310e114c..ff794ca09 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -439,9 +439,9 @@ NULL strenc, rdbSavedObjectLen(val), val->lru, estimateObjectIdleTime(val)/1000, extra); } else if (!strcasecmp(szFromObj(c->argv[1]),"sdslen") && c->argc == 3) { - auto pair = c->db->lookup_tuple(c->argv[2]); - robj *val = pair.second; - const char *key = pair.first; + auto itr = c->db->find(c->argv[2]); + robj *val = itr.val(); + const char *key = itr.key(); if (val == NULL) { addReply(c,shared.nokeyerr); @@ -636,7 +636,7 @@ NULL stats = sdscat(stats,buf); stats = sdscatprintf(stats,"[Expires set]\n"); - g_pserver->db[dbid].setexpire->getstats(buf, sizeof(buf)); + g_pserver->db[dbid].getExpireStats(buf, sizeof(buf)); stats = sdscat(stats, buf); addReplyBulkSds(c,stats); diff --git a/src/defrag.cpp b/src/defrag.cpp index 59020e2d9..adb354b3b 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -781,8 +781,8 @@ long defragKey(redisDb *db, dictEntry *de) { newsds = activeDefragSds(keysds); if (newsds) defragged++, de->key = newsds; - if (!db->setexpire->empty()) { - replaceSateliteOSetKeyPtr(*db->setexpire, keysds, newsds); + if (!db->setexpire()->empty()) { + replaceSateliteOSetKeyPtr(*const_cast(db->setexpire()), keysds, newsds); } /* Try to defrag robj and / or string value. */ @@ -1111,7 +1111,7 @@ void activeDefragCycle(void) { break; /* this will exit the function and we'll continue on the next cycle */ } - cursor = dictScan(db->pdict, cursor, defragScanCallback, defragDictBucketCallback, db); + cursor = dictScan(db->dictUnsafe(), cursor, defragScanCallback, defragDictBucketCallback, db); /* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys * (if we have a lot of pointers in one hash bucket or rehasing), diff --git a/src/evict.cpp b/src/evict.cpp index 23569597f..a9a0e4511 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -256,13 +256,13 @@ void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct ev { if (setexpire != nullptr) { - visitFunctor visitor { dbid, db->pdict, pool, 0 }; + visitFunctor visitor { dbid, db->m_persistentData.dictUnsafe(), pool, 0 }; setexpire->random_visit(visitor); } else { dictEntry **samples = (dictEntry**)alloca(g_pserver->maxmemory_samples * sizeof(dictEntry*)); - int count = dictGetSomeKeys(db->pdict,samples,g_pserver->maxmemory_samples); + int count = dictGetSomeKeys(db->m_persistentData.dictUnsafe(),samples,g_pserver->maxmemory_samples); for (int j = 0; j < count; j++) { robj *o = (robj*)dictGetVal(samples[j]); processEvictionCandidate(dbid, (sds)dictGetKey(samples[j]), o, nullptr, pool); @@ -511,9 +511,9 @@ int freeMemoryIfNeeded(void) { } else { - keys = db->setexpire->size(); + keys = db->expireSize(); if (keys != 0) - evictionPoolPopulate(i, db, db->setexpire, pool); + evictionPoolPopulate(i, db, db->m_persistentData.setexpireUnsafe(), pool); total_keys += keys; } } @@ -525,9 +525,9 @@ int freeMemoryIfNeeded(void) { bestdbid = pool[k].dbid; sds key = nullptr; - auto pair = g_pserver->db[pool[k].dbid].lookup_tuple(pool[k].key); - if (pair.first != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || pair.second->FExpires())) - key = (sds)pair.first; + auto itr = g_pserver->db[pool[k].dbid].find(pool[k].key); + if (itr != nullptr && (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_ALLKEYS || itr.val()->FExpires())) + key = itr.key(); /* Remove the entry from the pool. */ if (pool[k].key != pool[k].cached) @@ -560,17 +560,17 @@ int freeMemoryIfNeeded(void) { if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) { if (db->size() != 0) { - auto pair = db->random(); - bestkey = (sds)pair.first; + auto itr = db->random(); + bestkey = itr.key(); bestdbid = j; break; } } else { - if (!db->setexpire->empty()) + if (db->expireSize()) { - bestkey = (sds)db->setexpire->random_value().key(); + bestkey = (sds)db->random_expire().key(); bestdbid = j; break; } diff --git a/src/expire.cpp b/src/expire.cpp index e77ba6c73..3803d8ac8 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -248,7 +248,7 @@ void activeExpireCycle(int type) { now = mstime(); /* If there is nothing to expire try next DB ASAP. */ - if (db->setexpire->empty()) + if (db->m_persistentData.setexpireUnsafe()->empty()) { db->avg_ttl = 0; db->last_expire_set = now; @@ -258,7 +258,7 @@ void activeExpireCycle(int type) { size_t expired = 0; size_t tried = 0; long long check = ACTIVE_EXPIRE_CYCLE_FAST_DURATION; // assume a check is roughly 1us. It isn't but good enough - db->expireitr = db->setexpire->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) { + db->expireitr = db->m_persistentData.setexpireUnsafe()->enumerate(db->expireitr, now, [&](expireEntry &e) __attribute__((always_inline)) { if (e.when() < now) { activeExpireCycleExpire(db, e, now); @@ -358,16 +358,16 @@ void expireSlaveKeys(void) { redisDb *db = g_pserver->db+dbid; // the expire is hashed based on the key pointer, so we need the point in the main db - auto pairMain = db->lookup_tuple(keyname); - auto itr = db->setexpire->end(); - if (pairMain.first != nullptr) - itr = db->setexpire->find((sds)pairMain.first); + auto itrDB = db->find(keyname); + auto itrExpire = db->m_persistentData.setexpireUnsafe()->end(); + if (itrDB != nullptr) + itrExpire = db->m_persistentData.setexpireUnsafe()->find(itrDB.key()); int expired = 0; - if (itr != db->setexpire->end()) + if (itrExpire != db->m_persistentData.setexpireUnsafe()->end()) { - if (itr->when() < start) { - activeExpireCycleExpire(g_pserver->db+dbid,*itr,start); + if (itrExpire->when() < start) { + activeExpireCycleExpire(g_pserver->db+dbid,*itrExpire,start); expired = 1; } } @@ -376,7 +376,7 @@ void expireSlaveKeys(void) { * corresponding bit in the new bitmap we set as value. * At the end of the loop if the bitmap is zero, it means we * no longer need to keep track of this key. */ - if (itr != db->setexpire->end() && !expired) { + if (itrExpire != db->m_persistentData.setexpireUnsafe()->end() && !expired) { noexpire++; new_dbids |= (uint64_t)1 << dbid; } diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 91577cb85..471fb6260 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -51,18 +51,18 @@ size_t lazyfreeGetFreeEffort(robj *obj) { * a lazy free list instead of being freed synchronously. The lazy free list * will be reclaimed in a different bio.c thread. */ #define LAZYFREE_THRESHOLD 64 -int dbAsyncDelete(redisDb *db, robj *key) { +bool redisDbPersistentData::asyncDelete(robj *key) { /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ - dictEntry *de = dictUnlink(db->pdict,ptrFromObj(key)); + dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key)); if (de) { robj *val = (robj*)dictGetVal(de); if (val->FExpires()) { /* Deleting an entry from the expires dict will not free the sds of * the key, because it is shared with the main dictionary. */ - removeExpireCore(db,key,de); + removeExpire(key,dict_iter(de)); } size_t free_effort = lazyfreeGetFreeEffort(val); @@ -78,21 +78,25 @@ int dbAsyncDelete(redisDb *db, robj *key) { if (free_effort > LAZYFREE_THRESHOLD && val->getrefcount(std::memory_order_relaxed) == 1) { atomicIncr(lazyfree_objects,1); bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); - dictSetVal(db->pdict,de,NULL); + dictSetVal(m_pdict,de,NULL); } } /* Release the key-val pair, or just the key if we set the val * field to NULL in order to lazy free it later. */ if (de) { - dictFreeUnlinkedEntry(db->pdict,de); + dictFreeUnlinkedEntry(m_pdict,de); if (g_pserver->cluster_enabled) slotToKeyDel(key); - return 1; + return true; } else { - return 0; + return false; } } +int dbAsyncDelete(redisDb *db, robj *key) { + return db->m_persistentData.asyncDelete(key); +} + /* Free an object, if the object is huge enough, free it in async way. */ void freeObjAsync(robj *o) { size_t free_effort = lazyfreeGetFreeEffort(o); @@ -107,12 +111,13 @@ void freeObjAsync(robj *o) { /* Empty a Redis DB asynchronously. What the function does actually is to * create a new empty set of hash tables and scheduling the old ones for * lazy freeing. */ -void emptyDbAsync(redisDb *db) { - dict *oldht1 = db->pdict; - auto *set = db->setexpire; - db->setexpire = new (MALLOC_LOCAL) expireset(); - db->expireitr = db->setexpire->end(); - db->pdict = dictCreate(&dbDictType,NULL); +void redisDbPersistentData::emptyDbAsync() { + dict *oldht1 = m_pdict; + auto *set = m_setexpire; + m_setexpire = new (MALLOC_LOCAL) expireset(); + m_pdict = dictCreate(&dbDictType,NULL); + if (m_fTrackingChanges) + m_fAllChanged = true; atomicIncr(lazyfree_objects,dictSize(oldht1)); bioCreateBackgroundJob(BIO_LAZY_FREE,NULL,oldht1,set); } diff --git a/src/object.cpp b/src/object.cpp index a23ebf870..201e2ca9c 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -1067,7 +1067,7 @@ struct redisMemOverhead *getMemoryOverheadData(void) { mh->db[mh->num_dbs].overhead_ht_main = mem; mem_total+=mem; - mem = db->setexpire->bytes_used(); + mem = db->setexpire()->bytes_used(); mh->db[mh->num_dbs].overhead_ht_expires = mem; mem_total+=mem; @@ -1339,13 +1339,13 @@ NULL } } - auto pair = c->db->lookup_tuple(c->argv[2]); - if (pair.first == NULL) { + auto itr = c->db->find(c->argv[2]); + if (itr == nullptr) { addReplyNull(c, shared.nullbulk); return; } - size_t usage = objectComputeSize(pair.second,samples); - usage += sdsAllocSize((sds)pair.first); + size_t usage = objectComputeSize(itr.val(),samples); + usage += sdsAllocSize(itr.key()); usage += sizeof(dictEntry); addReplyLongLong(c,usage); } else if (!strcasecmp(szFromObj(c->argv[1]),"stats") && c->argc == 2) { diff --git a/src/rdb.cpp b/src/rdb.cpp index 1f1a4ee2b..c4ac55870 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1170,7 +1170,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { * these sizes are just hints to resize the hash tables. */ uint64_t db_size, expires_size; db_size = db->size(); - expires_size = db->setexpire->size(); + expires_size = db->expireSize(); if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; if (rdbSaveLen(rdb,db_size) == -1) goto werr; if (rdbSaveLen(rdb,expires_size) == -1) goto werr; @@ -1187,7 +1187,7 @@ int rdbSaveRio(rio *rdb, int *error, int flags, rdbSaveInfo *rsi) { }); if (!fSavedAll) goto werr; - serverAssert(ckeysExpired == db->setexpire->size()); + serverAssert(ckeysExpired == db->expireSize()); } /* If we are storing the replication information on disk, persist diff --git a/src/server.cpp b/src/server.cpp index 8649beaaf..0a623c28e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1431,8 +1431,7 @@ int htNeedsResize(dict *dict) { /* If the percentage of used slots in the HT reaches HASHTABLE_MIN_FILL * we resize the hash table to save memory */ void tryResizeHashTables(int dbid) { - if (htNeedsResize(g_pserver->db[dbid].pdict)) - dictResize(g_pserver->db[dbid].pdict); + g_pserver->db[dbid].tryResize(); } /* Our hash table implementation performs rehashing incrementally while @@ -1442,10 +1441,10 @@ void tryResizeHashTables(int dbid) { * * The function returns 1 if some rehashing was performed, otherwise 0 * is returned. */ -int incrementallyRehash(int dbid) { +int redisDbPersistentData::incrementallyRehash() { /* Keys dictionary */ - if (dictIsRehashing(g_pserver->db[dbid].pdict)) { - dictRehashMilliseconds(g_pserver->db[dbid].pdict,1); + if (dictIsRehashing(m_pdict)) { + dictRehashMilliseconds(m_pdict,1); return 1; /* already used our millisecond for this loop... */ } return 0; @@ -1726,7 +1725,7 @@ void databasesCron(void) { /* Rehash */ if (g_pserver->activerehashing) { for (j = 0; j < dbs_per_call; j++) { - int work_done = incrementallyRehash(rehash_db); + int work_done = g_pserver->db[rehash_db].incrementallyRehash(); if (work_done) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ @@ -1887,7 +1886,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { size = g_pserver->db[j].slots(); used = g_pserver->db[j].size(); - vkeys = g_pserver->db[j].setexpire->size(); + vkeys = g_pserver->db[j].expireSize(); if (used || vkeys) { serverLog(LL_VERBOSE,"DB %d: %lld keys (%lld volatile) in %lld slots HT.",j,used,vkeys,size); /* dictPrintStats(g_pserver->dict); */ @@ -4567,7 +4566,7 @@ sds genRedisInfoString(const char *section) { long long keys, vkeys; keys = g_pserver->db[j].size(); - vkeys = g_pserver->db[j].setexpire->size(); + vkeys = g_pserver->db[j].expireSize(); // Adjust TTL by the current time g_pserver->db[j].avg_ttl -= (g_pserver->mstime - g_pserver->db[j].last_expire_set); diff --git a/src/server.h b/src/server.h index b536cef3f..731f40a38 100644 --- a/src/server.h +++ b/src/server.h @@ -54,6 +54,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -1015,22 +1016,123 @@ typedef struct clientReplyBlock { #endif } clientReplyBlock; +struct dictEntry; +class dict_const_iter +{ + friend class redisDb; + friend class redisDbPersistentData; +protected: + dictEntry *de; +public: + explicit dict_const_iter(dictEntry *de) + : de(de) + {} + + const char *key() const { return de ? (const char*)dictGetKey(de) : nullptr; } + robj_roptr val() const { return de ? (robj*)dictGetVal(de) : nullptr; } + const robj* operator->() const { return de ? (robj*)dictGetVal(de) : nullptr; } + operator robj_roptr() const { return de ? (robj*)dictGetVal(de) : nullptr; } + + bool operator==(std::nullptr_t) const { return de == nullptr; } + bool operator!=(std::nullptr_t) const { return de != nullptr; } + bool operator==(const dict_const_iter &other) { return de == other.de; } +}; +class dict_iter : public dict_const_iter +{ +public: + explicit dict_iter(dictEntry *de) + : dict_const_iter(de) + {} + sds key() { return de ? (sds)dictGetKey(de) : nullptr; } + robj *val() { return de ? (robj*)dictGetVal(de) : nullptr; } + robj *operator->() { return de ? (robj*)dictGetVal(de) : nullptr; } + operator robj*() const { return de ? (robj*)dictGetVal(de) : nullptr; } +}; + +class redisDbPersistentData +{ +public: + static void swap(redisDbPersistentData *db1, redisDbPersistentData *db2); + + size_t slots() const { return dictSlots(m_pdict); } + size_t size() const { return dictSize(m_pdict); } + void expand(uint64_t slots) { dictExpand(m_pdict, slots); } + + void trackkey(robj_roptr o) + { + trackkey(szFromObj(o)); + } + + void trackkey(const char *key) + { + if (m_fTrackingChanges) + m_setchanged.insert(key); + } + + dict_iter find(const char *key) + { + dictEntry *de = dictFind(m_pdict, key); + return dict_iter(de); + } + + dict_iter random() + { + dictEntry *de = dictGetRandomKey(m_pdict); + return dict_iter(de); + } + + const expireEntry &random_expire() + { + return m_setexpire->random_value(); + } + + auto findExpire(const char *key) + { + return m_setexpire->find(key); + } + + void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, m_pdict); } + void getExpireStats(char *buf, size_t bufsize) { m_setexpire->getstats(buf, bufsize); } + + bool insert(char *k, robj *o); + void tryResize(); + int incrementallyRehash(); + void updateValue(dict_iter itr, robj *val); + bool syncDelete(robj *key); + bool asyncDelete(robj *key); + size_t expireSize() const { return m_setexpire->size(); } + int removeExpire(robj *key, dict_iter itr); + void clear(void(callback)(void*)); + void emptyDbAsync(); + bool iterate(std::function &fn); + void setExpire(robj *key, robj *subkey, long long when); + void setExpire(expireEntry &&e); + void initialize(); + + dict *dictUnsafe() { return m_pdict; } + expireset *setexpireUnsafe() { return m_setexpire; } + const expireset *setexpire() { return m_setexpire; } + +private: + // Keyspace + dict *m_pdict; /* The keyspace for this DB */ + bool m_fTrackingChanges = false; + bool m_fAllChanged = false; + std::set m_setchanged; + + // Expire + expireset *m_setexpire; +}; + /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ typedef struct redisDb { // Legacy C API, Do not add more friend void tryResizeHashTables(int); - friend int incrementallyRehash(int); - friend int dbAddCore(redisDb *db, robj *key, robj *val); - friend void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); - friend void dbOverwrite(redisDb *db, robj *key, robj *val); - friend int dbMerge(redisDb *db, robj *key, robj *val, int fReplace); - friend void setKey(redisDb *db, robj *key, robj *val); friend int dbSyncDelete(redisDb *db, robj *key); friend int dbAsyncDelete(redisDb *db, robj *key); friend long long emptyDb(int dbnum, int flags, void(callback)(void*)); - friend void emptyDbAsync(redisDb *db); friend void scanGenericCommand(struct client *c, robj_roptr o, unsigned long cursor); friend int dbSwapDatabases(int id1, int id2); friend int removeExpire(redisDb *db, robj *key); @@ -1038,58 +1140,64 @@ typedef struct redisDb { friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e); friend void evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); friend void activeDefragCycle(void); + friend int freeMemoryIfNeeded(void); + friend void activeExpireCycle(int); + friend void expireSlaveKeys(void); - redisDb() + typedef ::dict_const_iter const_iter; + typedef ::dict_iter iter; + + redisDb() : expireitr(nullptr) - {}; + {} void initialize(int id); - size_t slots() const { return dictSlots(pdict); } - size_t size() const { return dictSize(pdict); } - void expand(uint64_t slots) { dictExpand(pdict, slots); } + size_t slots() const { return m_persistentData.slots(); } + size_t size() const { return m_persistentData.size(); } + size_t expireSize() const { return m_persistentData.expireSize(); } + void expand(uint64_t slots) { m_persistentData.expand(slots); } + void tryResize() { m_persistentData.tryResize(); } + const expireset *setexpire() { return m_persistentData.setexpire(); } - robj *find(robj_roptr key) + iter find(robj_roptr key) { return find(szFromObj(key)); } - robj *find(const char *key) + iter find(const char *key) { - dictEntry *de = dictFind(pdict, key); - if (de != nullptr) - return (robj*)dictGetVal(de); - return nullptr; + return m_persistentData.find(key); } - std::pair lookup_tuple(robj_roptr key) + iter random() { - return lookup_tuple(szFromObj(key)); - } - std::pair lookup_tuple(const char *key) - { - dictEntry *de = dictFind(pdict, key); - if (de != nullptr) - return std::make_pair((const char*)dictGetKey(de), (robj*)dictGetVal(de)); - return std::make_pair(nullptr, nullptr); + return m_persistentData.random(); } - std::pair random() + const expireEntry &random_expire() { - dictEntry *de = dictGetRandomKey(pdict); - if (de != nullptr) - return std::make_pair((const char*)dictGetKey(de), (robj*)dictGetVal(de)); - return std::make_pair(nullptr, nullptr); + return m_persistentData.random_expire(); } - bool iterate(std::function fn); - void getStats(char *buf, size_t bufsize) { dictGetStats(buf, bufsize, pdict); } + const_iter end() { return const_iter(nullptr); } + bool iterate(std::function fn) { return m_persistentData.iterate(fn); } + void getStats(char *buf, size_t bufsize) { m_persistentData.getStats(buf, bufsize); } + void getExpireStats(char *buf, size_t bufsize) { m_persistentData.getExpireStats(buf, bufsize); } + + bool insert(char *key, robj *o) { return m_persistentData.insert(key, o); } + void dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpdateMvcc, bool fRemoveExpire); + + int incrementallyRehash() { return m_persistentData.incrementallyRehash(); }; + + bool FKeyExpires(const char *key); + size_t clear(bool fAsync, void(callback)(void*)); + dict *dictUnsafe() { return m_persistentData.dictUnsafe(); } + expireEntry *getExpire(robj_roptr key); private: - dict *pdict; /* The keyspace for this DB */ + redisDbPersistentData m_persistentData; public: - expireset *setexpire; expireset::setiter expireitr; - dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ @@ -2476,7 +2584,6 @@ int rewriteConfig(char *path); /* db.c -- Keyspace access API */ int removeExpire(redisDb *db, robj *key); -int removeExpireCore(redisDb *db, robj *key, dictEntry *de); void propagateExpire(redisDb *db, robj *key, int lazy); int expireIfNeeded(redisDb *db, robj *key); expireEntry *getExpire(redisDb *db, robj_roptr key); @@ -2521,7 +2628,6 @@ void slotToKeyAdd(robj *key); void slotToKeyDel(robj *key); void slotToKeyFlush(void); int dbAsyncDelete(redisDb *db, robj *key); -void emptyDbAsync(redisDb *db); void slotToKeyFlushAsync(void); size_t lazyfreeGetPendingObjectsCount(void); void freeObjAsync(robj *o);