diff --git a/src/IStorage.h b/src/IStorage.h index cdb8336f4..016960350 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -17,7 +17,7 @@ public: virtual ~IStorage(); - virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0; + virtual void insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwire) = 0; virtual bool erase(const char *key, size_t cchKey) = 0; virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const = 0; virtual size_t clear() = 0; diff --git a/src/aof.cpp b/src/aof.cpp index 733d0dc43..d1caa7885 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -764,6 +764,11 @@ int loadAppendOnlyFile(char *filename) { } } + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + g_pserver->db[idb]->trackChanges(true); + } + /* Read the actual AOF file, in REPL format, command by command. */ while(1) { int argc, j; @@ -864,6 +869,11 @@ int loadAppendOnlyFile(char *filename) { } loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + auto vec = g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->commitChanges(vec); + } fclose(fp); freeFakeClient(fakeClient); g_pserver->aof_state = old_aof_state; diff --git a/src/db.cpp b/src/db.cpp index 1e36a50f0..51d5fcec5 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -91,7 +91,7 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) { lookupKeyUpdateObj(val, flags); if (flags & LOOKUP_UPDATEMVCC) { val->mvcc_tstamp = getMvccTstamp(); - db->trackkey(key); + db->trackkey(key, true /* fUpdate */); } return val; } else { @@ -386,6 +386,10 @@ bool redisDbPersistentData::syncDelete(robj *key) fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted; if (fDeleted) { + auto itrChange = m_setchanged.find(szFromObj(key)); + if (itrChange != m_setchanged.end()) + m_setchanged.erase(itrChange); + if (m_pdbSnapshot != nullptr) { auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)); @@ -1340,7 +1344,7 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) { if (!val->FExpires()) return 0; - trackkey(key); + trackkey(key, true /* fUpdate */); auto itrExpire = m_setexpire->find(itr.key()); serverAssert(itrExpire != m_setexpire->end()); m_setexpire->erase(itrExpire); @@ -1957,7 +1961,7 @@ bool redisDbPersistentData::insert(char *key, robj *o) { serverAssert(dictFind(m_pdictTombstone, key) != nullptr); } - trackkey(key); + trackkey(key, false /* fUpdate */); } return (res == DICT_OK); } @@ -1997,7 +2001,7 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) /* Reuse the sds from the main dict in the expire dict */ dictEntry *kde = dictFind(m_pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,kde != NULL); - trackkey(key); + trackkey(key, true /* fUpdate */); if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) { @@ -2024,7 +2028,7 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when) void redisDbPersistentData::setExpire(expireEntry &&e) { - trackkey(e.key()); + trackkey(e.key(), true /* fUpdate */); m_setexpire->insert(e); } @@ -2035,7 +2039,7 @@ bool redisDb::FKeyExpires(const char *key) void redisDbPersistentData::updateValue(dict_iter itr, robj *val) { - trackkey(itr.key()); + trackkey(itr.key(), true /* fUpdate */); dictSetVal(m_pdict, itr.de, val); } @@ -2105,10 +2109,10 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) } } -void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o) +void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite) { sds temp = serializeStoredObject(o); - m_spstorage->insert(szKey, cchKey, temp, sdslen(temp)); + m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite); sdsfree(temp); } @@ -2119,7 +2123,7 @@ void redisDbPersistentData::storeDatabase() while ((de = dictNext(di)) != NULL) { sds key = (sds)dictGetKey(de); robj *o = (robj*)dictGetVal(de); - storeKey(key, sdslen(key), o); + storeKey(key, sdslen(key), o, false); } dictReleaseIterator(di); } @@ -2146,14 +2150,14 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() } else { - for (auto &key : m_setchanged) + for (auto &change : m_setchanged) { - dictEntry *de = dictFind(m_pdict, key.get()); + dictEntry *de = dictFind(m_pdict, change.strkey.get()); if (de == nullptr) continue; robj *o = (robj*)dictGetVal(de); sds temp = serializeStoredObject(o); - vecRet.emplace_back(std::move(key), unique_sds_ptr(temp)); + vecRet.emplace_back(std::move(change), unique_sds_ptr(temp)); } } m_setchanged.clear(); @@ -2167,7 +2171,7 @@ void redisDbPersistentData::commitChanges(const changelist &vec) { for (auto &pair : vec) { - m_spstorage->insert(pair.first.get(), sdslen(pair.first.get()), pair.second.get(), sdslen(pair.second.get())); + m_spstorage->insert(pair.first.strkey.get(), sdslen(pair.first.strkey.get()), pair.second.get(), sdslen(pair.second.get()), pair.first.fUpdate); } if (m_spstorage != nullptr) m_spstorage->endWriteBatch(); @@ -2223,9 +2227,9 @@ bool redisDbPersistentData::removeCachedValue(const char *key) { serverAssert(m_spstorage != nullptr); // First ensure its not a pending key - for (auto &strkey : m_setchanged) + for (auto &change : m_setchanged) { - if (sdscmp(strkey.get(), (sds)key) == 0) + if (sdscmp(change.strkey.get(), (sds)key) == 0) return false; // NOP } @@ -2252,4 +2256,13 @@ void redisDbPersistentData::removeAllCachedValues() } dictEmpty(m_pdict, nullptr); +} + +void redisDbPersistentData::trackkey(const char *key, bool fUpdate) +{ + if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) { + auto itr = m_setchanged.find(key); + if (itr == m_setchanged.end()) + m_setchanged.emplace(sdsdupshared(key), fUpdate); + } } \ No newline at end of file diff --git a/src/server.h b/src/server.h index cea3b5808..353de7cd7 100644 --- a/src/server.h +++ b/src/server.h @@ -1263,16 +1263,12 @@ public: size_t size() const; void expand(uint64_t slots) { dictExpand(m_pdict, slots); } - void trackkey(robj_roptr o) + void trackkey(robj_roptr o, bool fUpdate) { - trackkey(szFromObj(o)); + trackkey(szFromObj(o), fUpdate); } - void trackkey(const char *key) - { - if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) - m_setchanged.emplace(sdsdupshared(key)); - } + void trackkey(const char *key, bool fUpdate); dict_iter find(const char *key) { @@ -1324,7 +1320,14 @@ public: // to allow you to release the global lock before commiting. To prevent deadlocks you *must* // either release the global lock or keep the same global lock between the two functions as // a second look is kept to ensure writes to secondary storage are ordered - typedef std::vector> changelist; + struct changedesc + { + sdsimmutablestring strkey; + bool fUpdate; + + changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {} + }; + typedef std::vector> changelist; changelist processChanges(); void commitChanges(const changelist &vec); @@ -1345,10 +1348,18 @@ public: void removeAllCachedValues(); private: + struct changedescCmp + { + using is_transparent = void; // C++14 to allow comparisons with different types + bool operator()(const changedesc &a, const changedesc &b) const { return a.strkey < b.strkey; } + bool operator()(const changedesc &a, const char *key) const { return a.strkey < sdsview(key); } + bool operator()(const char *key, const changedesc &b) const { return sdsview(key) < b.strkey; } + }; + void ensure(const char *key); void ensure(const char *key, dictEntry **de); void storeDatabase(); - void storeKey(const char *key, size_t cchKey, robj *o); + void storeKey(const char *key, size_t cchKey, robj *o, bool fOverwrite); void recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot); // Keyspace @@ -1356,7 +1367,7 @@ private: dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */ int m_fTrackingChanges = 0; // Note: Stack based int m_fAllChanged = 0; - std::set m_setchanged; + std::set m_setchanged; std::shared_ptr m_spstorage = nullptr; uint64_t mvccCheckpoint = 0; diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 9ecac8ed8..cf536b2a6 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -9,10 +9,9 @@ RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr &spd m_readOptionsTemplate.snapshot = m_psnapshot; } -void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb) +void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwrite) { rocksdb::Status status; - bool fOverwrite = FKeyExists(key, cchKey); if (m_spbatch != nullptr) status = m_spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb)); else diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 8b7f6d549..03df666fa 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -17,7 +17,7 @@ public: RocksDBStorageProvider(std::shared_ptr &spdb, std::shared_ptr &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count); ~RocksDBStorageProvider(); - virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override; + virtual void insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwrite) override; virtual bool erase(const char *key, size_t cchKey) override; virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override; virtual size_t clear() override; diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index 92d7ac547..c0f0b53e8 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -1,9 +1,9 @@ #include "teststorageprovider.h" -#include +#include "../server.h" IStorage *TestStorageFactory::create(int) { - return new TestStorageProvider(); + return new (MALLOC_LOCAL) TestStorageProvider(); } const char *TestStorageFactory::name() const @@ -19,9 +19,12 @@ TestStorageProvider::~TestStorageProvider() { } -void TestStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb) +void TestStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwrite) { - m_map.insert(std::make_pair(std::string(key, cchKey), std::string((char*)data, cb))); + auto strkey = std::string(key, cchKey); + bool fActuallyExists = m_map.find(strkey) != m_map.end(); + serverAssert(fActuallyExists == fOverwrite); + m_map.insert(std::make_pair(strkey, std::string((char*)data, cb))); } @@ -77,5 +80,5 @@ void TestStorageProvider::flush() /* This is permitted to be a shallow clone */ const IStorage *TestStorageProvider::clone() const { - return new TestStorageProvider(*this); + return new (MALLOC_LOCAL) TestStorageProvider(*this); } \ No newline at end of file diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index a0522e11c..876571444 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -16,7 +16,7 @@ public: TestStorageProvider(); virtual ~TestStorageProvider(); - virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override; + virtual void insert(const char *key, size_t cchKey, void *data, size_t cb, bool fHintOverwrite) override; virtual bool erase(const char *key, size_t cchKey) override; virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override; virtual size_t clear() override; diff --git a/tests/unit/flash.tcl b/tests/unit/flash.tcl index b3fc2b7ac..75be4d58f 100644 --- a/tests/unit/flash.tcl +++ b/tests/unit/flash.tcl @@ -48,6 +48,25 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks. assert [expr [r ttl testkey] > 0] } + test { CREATE and UPDATE in transaction, key count is accurate } { + r flushall + r multi + r set testkey 2 + r incr testkey + r exec + assert_equal {1} [r dbsize] + assert_equal {3} [r get testkey] + } + + test { EXPIRE key count is accurate } { + r flushall + r set testkey foo ex 1 + r flushall cache + assert_equal {1} [r dbsize] + after 1500 + assert_equal {0} [r dbsize] + } + r flushall foreach policy { allkeys-random allkeys-lru allkeys-lfu