From 6a77ceaa4a586ad849695d63b5c91e31e1924a2f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 20 Dec 2019 17:45:07 -0500 Subject: [PATCH] Evicted flash no longer keeps the key around Former-commit-id: 908b303d1a8dd032c092f0bc035361a09c0291a5 --- src/IStorage.h | 10 +++-- src/db.cpp | 84 +++++++++++++++++++++-------------------- src/evict.cpp | 4 +- src/lazyfree.cpp | 4 ++ src/server.h | 22 ++--------- src/snapshot.cpp | 40 ++++++++++++++++---- src/storage/rocksdb.cpp | 14 ++++--- src/storage/rocksdb.h | 6 +-- 8 files changed, 101 insertions(+), 83 deletions(-) diff --git a/src/IStorage.h b/src/IStorage.h index 516f3e0e9..cdb8336f4 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -12,15 +12,17 @@ public: class IStorage { public: - typedef std::function callback; + typedef std::function callback; + typedef std::function callbackSingle; virtual ~IStorage(); virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0; - virtual void erase(const char *key, size_t cchKey) = 0; - virtual void retrieve(const char *key, size_t cchKey, callback fn) const = 0; + virtual bool erase(const char *key, size_t cchKey) = 0; + virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const = 0; virtual size_t clear() = 0; - virtual void enumerate(callback fn) const = 0; + virtual bool enumerate(callback fn) const = 0; + virtual size_t count() const = 0; virtual void beginWriteBatch() {} // NOP virtual void endWriteBatch() {} // NOP diff --git a/src/db.cpp b/src/db.cpp index 531e98fab..dc83cff91 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -100,7 +100,7 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) { } static robj_roptr lookupKeyConst(redisDb *db, robj *key, int flags) { serverAssert((flags & LOOKUP_UPDATEMVCC) == 0); - robj_roptr val = db->find_threadsafe(szFromObj(key)); + robj_roptr val = db->find(szFromObj(key)); if (val != nullptr) { lookupKeyUpdateObj(val.unsafe_robjcast(), flags); return val; @@ -315,7 +315,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) * * All the new keys in the database should be created via this interface. */ void setKey(redisDb *db, robj *key, robj *val) { - auto itr = db->find(key); + auto itr = db->find_cached_threadsafe(szFromObj(key)); if (itr == NULL) { dbAdd(db,key,val); } else { @@ -380,13 +380,18 @@ bool redisDbPersistentData::syncDelete(robj *key) * the key, because it is shared with the main dictionary. */ auto itr = find(szFromObj(key)); - trackkey(szFromObj(key)); if (itr != nullptr && itr.val()->FExpires()) removeExpire(key, itr); - if (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) { + + bool fDeleted = false; + if (m_spstorage != nullptr) + fDeleted = m_spstorage->erase(szFromObj(key), sdslen(szFromObj(key))); + fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted; + + if (fDeleted) { if (m_pdbSnapshot != nullptr) { - auto itr = m_pdbSnapshot->find_threadsafe(szFromObj(key)); + auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)); if (itr != nullptr) { dictAdd(m_pdictTombstone, sdsdup(szFromObj(key)), nullptr); @@ -649,27 +654,32 @@ void randomkeyCommand(client *c) { decrRefCount(key); } -static bool FEvictedDE(dictEntry *de) -{ - return (de != nullptr) && dictGetVal(de) == nullptr; -} - bool redisDbPersistentData::iterate(std::function fn) { + if (m_spstorage != nullptr) + { + bool fSawAll = m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *, size_t )->bool{ + sds sdsKey = sdsnewlen(key, cchKey); + dictEntry *de = dictFind(m_pdict, sdsKey); + bool fEvict = (de == nullptr); + ensure(sdsKey, &de); + bool fContinue = fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)); + if (fEvict) + removeCachedValue(sdsKey); + sdsfree(sdsKey); + return fContinue; + }); + return fSawAll; + } + dictIterator *di = dictGetSafeIterator(m_pdict); dictEntry *de = nullptr; bool fResult = true; while(fResult && ((de = dictNext(di)) != nullptr)) { - bool fEvicted = FEvictedDE(de); - ensure((const char*)dictGetKey(de), &de); if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de))) fResult = false; - - // re-evict the key so we don't OOM - if (fEvicted) - removeCachedValue((const char*)dictGetKey(de)); } dictReleaseIterator(di); @@ -686,13 +696,9 @@ bool redisDbPersistentData::iterate(std::function fn) return true; // Alright it's a key in the use keyspace, lets ensure it and then pass it off - bool fEvicted = FEvictedDE(de); ensure(key); deCurrent = dictFind(m_pdict, key); - bool fResult = fn(key, (robj*)dictGetVal(deCurrent)); - if (fEvicted) - removeCachedValue(key); - return fResult; + return fn(key, (robj*)dictGetVal(deCurrent)); }, true /*fKeyOnly*/); } @@ -1419,13 +1425,9 @@ expireEntry *redisDbPersistentDataSnapshot::getExpire(const char *key) { if (expireSize() == 0) return nullptr; - auto itr = find_threadsafe(key); - if (itr == nullptr) + auto itrExpire = m_setexpire->find(key); + if (itrExpire == m_setexpire->end()) return nullptr; - if (!itr.val()->FExpires()) - return nullptr; - - auto itrExpire = findExpire(itr.key()); return itrExpire.operator->(); } @@ -1933,7 +1935,7 @@ bool redisDbPersistentData::insert(char *key, robj *o) int res = dictAdd(m_pdict, key, o); if (res == DICT_OK) { - if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_threadsafe(key) != nullptr) + if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(key) != nullptr) { serverAssert(dictFind(m_pdictTombstone, key) != nullptr); } @@ -2036,7 +2038,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) dictEntry *deTombstone = dictFind(m_pdictTombstone, sdsKey); if (deTombstone == nullptr) { - auto itr = m_pdbSnapshot->find_threadsafe(sdsKey); + auto itr = m_pdbSnapshot->find_cached_threadsafe(sdsKey); if (itr == m_pdbSnapshot->end()) return; // not found @@ -2128,16 +2130,12 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() { for (unique_sds_ptr &key : m_vecchanged) { - robj *o = find(key.get()); - if (o != nullptr) - { - sds temp = serializeStoredObject(o); - vecRet.emplace_back(std::move(key), unique_sds_ptr(temp)); - } - else - { - m_spstorage->erase(key.get(), sdslen(key.get())); - } + dictEntry *de = dictFind(m_pdict, key.get()); + if (de == nullptr) + continue; + robj *o = (robj*)dictGetVal(de); + sds temp = serializeStoredObject(o); + vecRet.emplace_back(std::move(key), unique_sds_ptr(temp)); } } m_vecchanged.clear(); @@ -2182,7 +2180,7 @@ dict_iter redisDbPersistentData::random() double randval = (double)rand()/RAND_MAX; if (randval <= pctInSnapshot) { - iter = m_pdbSnapshot->random_threadsafe(); + iter = m_pdbSnapshot->random_cache_threadsafe(); // BUG: RANDOM doesn't consider keys not in RAM ensure(iter.key()); dictEntry *de = dictFind(m_pdict, iter.key()); return dict_iter(de); @@ -2196,7 +2194,11 @@ dict_iter redisDbPersistentData::random() size_t redisDbPersistentData::size() const { - return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0); + if (m_spstorage != nullptr) + return m_spstorage->count(); + + return dictSize(m_pdict) + + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0); } bool redisDbPersistentData::removeCachedValue(const char *key) diff --git a/src/evict.cpp b/src/evict.cpp index 2c293d78c..10952c73f 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -248,7 +248,7 @@ struct visitFunctor bool operator()(const expireEntry &e) { dictEntry *de = dictFind(dbdict, e.key()); - if (dictGetVal(de) != nullptr) + if (de != nullptr) { processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool); ++count; @@ -580,7 +580,7 @@ int freeMemoryIfNeeded(void) { if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM) { if (db->size() != 0) { - auto itr = db->random_threadsafe(); + auto itr = db->random_cache_threadsafe(); bestkey = itr.key(); bestdbid = j; break; diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index b08eec542..1ddc62a58 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -55,6 +55,10 @@ bool redisDbPersistentData::asyncDelete(robj *key) { /* If the value is composed of a few allocations, to free in a lazy way * is actually just slower... So under a certain limit we just free * the object synchronously. */ + + if (m_spstorage != nullptr) + return syncDelete(key); // async delte never makes sense with a storage provider + dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key)); if (m_pdbSnapshot != nullptr) dictAdd(m_pdictTombstone, sdsdup((sds)dictGetKey(de)), nullptr); diff --git a/src/server.h b/src/server.h index 2e58877fb..c5576b30b 100644 --- a/src/server.h +++ b/src/server.h @@ -1271,7 +1271,7 @@ public: void trackkey(const char *key) { if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) - m_vecchanged.push_back(unique_sds_ptr(sdsdup(key))); + m_vecchanged.push_back(unique_sds_ptr(sdsdupshared(key))); } dict_iter find(const char *key) @@ -1293,11 +1293,6 @@ public: return m_setexpire->random_value(); } - auto findExpire(const char *key) - { - return m_setexpire->find(key); - } - dict_iter end() { return dict_iter(nullptr); } dict_const_iter end() const { return dict_const_iter(nullptr); } @@ -1393,8 +1388,8 @@ public: using redisDbPersistentData::endSnapshot; using redisDbPersistentData::end; - dict_iter random_threadsafe() const; - dict_iter find_threadsafe(const char *key) const; + dict_iter random_cache_threadsafe() const; + dict_iter find_cached_threadsafe(const char *key) const; expireEntry *getExpire(robj_roptr key) { return getExpire(szFromObj(key)); } expireEntry *getExpire(const char *key); @@ -1449,7 +1444,6 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::find; using redisDbPersistentData::random; using redisDbPersistentData::random_expire; - using redisDbPersistentData::findExpire; using redisDbPersistentData::end; using redisDbPersistentData::getStats; using redisDbPersistentData::getExpireStats; @@ -1475,16 +1469,6 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::endSnapshot; using redisDbPersistentData::consolidate_snapshot; - dict_iter find_threadsafe(const char *key) const - { - dict_iter itr = redisDbPersistentDataSnapshot::find_threadsafe(key); - if (itr.key() != nullptr && itr.val() == nullptr) - { - return const_cast(this)->redisDbPersistentData::find(key); - } - return itr; - } - public: expireset::setiter expireitr; dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 7c0a653a5..1fc9c6c01 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -161,7 +161,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn if (deSnapshot == nullptr) { // The tombstone is for a grand child, propogate it - serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_threadsafe((const char*)dictGetKey(de)) != nullptr); + serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr); dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr); continue; } @@ -222,7 +222,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn serverAssert(m_spdbSnapshotHOLDER != nullptr || dictSize(m_pdictTombstone) == 0); } -dict_iter redisDbPersistentDataSnapshot::random_threadsafe() const +dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe() const { if (size() == 0) return dict_iter(nullptr); @@ -233,20 +233,21 @@ dict_iter redisDbPersistentDataSnapshot::random_threadsafe() const double randval = (double)rand()/RAND_MAX; if (randval <= pctInSnapshot) { - return m_pdbSnapshot->random_threadsafe(); + return m_pdbSnapshot->random_cache_threadsafe(); } } - serverAssert(dictSize(m_pdict) > 0); + if (dictSize(m_pdict) == 0) + return dict_iter(nullptr); dictEntry *de = dictGetRandomKey(m_pdict); return dict_iter(de); } -dict_iter redisDbPersistentDataSnapshot::find_threadsafe(const char *key) const +dict_iter redisDbPersistentDataSnapshot::find_cached_threadsafe(const char *key) const { dictEntry *de = dictFind(m_pdict, key); if (de == nullptr && m_pdbSnapshot != nullptr) { - auto itr = m_pdbSnapshot->find_threadsafe(key); + auto itr = m_pdbSnapshot->find_cached_threadsafe(key); if (itr != nullptr && dictFind(m_pdictTombstone, itr.key()) == nullptr) return itr; } @@ -255,6 +256,30 @@ dict_iter redisDbPersistentDataSnapshot::find_threadsafe(const char *key) const bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn, bool fKeyOnly) const { + if (m_spstorage != nullptr) + { + bool fSawAll = m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *data, size_t cbData){ + sds sdsKey = sdsnewlen(key, cchKey); + dictEntry *de = dictFind(m_pdict, sdsKey); + bool fContinue = false; + if (de != nullptr) + { + fContinue = fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)); + } + else + { + robj *o = fKeyOnly ? nullptr : deserializeStoredObject(this, sdsKey, data, cbData); + fContinue = fn(sdsKey, o); + if (o != nullptr) + decrRefCount(o); + } + + sdsfree(sdsKey); + return fContinue; + }); + return fSawAll; + } + dictEntry *de = nullptr; bool fResult = true; @@ -351,8 +376,7 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData * m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){ if (o != nullptr) - incrRefCount(o); - dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast()); + dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast()); return true; }, true /*fKeyOnly*/); spdb->m_spstorage = m_pdbSnapshot->m_spstorage; diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 981f5548c..4addc26b1 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -20,18 +20,17 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, throw status.ToString(); } -void RocksDBStorageProvider::erase(const char *key, size_t cchKey) +bool RocksDBStorageProvider::erase(const char *key, size_t cchKey) { rocksdb::Status status; if (m_spbatch != nullptr) status = m_spbatch->Delete(m_spcolfamily.get(), rocksdb::Slice(key, cchKey)); else status = m_spdb->Delete(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey)); - if (!status.ok()) - throw status.ToString(); + return status.ok(); } -void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback fn) const +void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callbackSingle fn) const { std::string value; auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &value); @@ -65,13 +64,16 @@ size_t RocksDBStorageProvider::count() const return count; } -void RocksDBStorageProvider::enumerate(callback fn) const +bool RocksDBStorageProvider::enumerate(callback fn) const { std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get())); for (it->SeekToFirst(); it->Valid(); it->Next()) { - fn(it->key().data(), it->key().size(), it->value().data(), it->value().size()); + bool fContinue = fn(it->key().data(), it->key().size(), it->value().data(), it->value().size()); + if (!fContinue) + break; } assert(it->status().ok()); // Check for any errors found during the scan + return !it->Valid(); } const IStorage *RocksDBStorageProvider::clone() const diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 66e63c0b0..84b0bbf31 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -17,10 +17,10 @@ public: ~RocksDBStorageProvider(); virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override; - virtual void erase(const char *key, size_t cchKey) override; - virtual void retrieve(const char *key, size_t cchKey, callback fn) const override; + virtual bool erase(const char *key, size_t cchKey) override; + virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override; virtual size_t clear() override; - virtual void enumerate(callback fn) const override; + virtual bool enumerate(callback fn) const override; virtual const IStorage *clone() const override;