diff --git a/src/Makefile b/src/Makefile index a77939533..862fcd1af 100644 --- a/src/Makefile +++ b/src/Makefile @@ -214,7 +214,7 @@ endif REDIS_SERVER_NAME=keydb-pro-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o keydbutils.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark diff --git a/src/config.cpp b/src/config.cpp index b84fb4d8a..72d833fdf 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -30,6 +30,7 @@ #include "server.h" #include "storage/rocksdbfactory.h" +#include "storage/teststorageprovider.h" #include "cluster.h" #include @@ -142,6 +143,7 @@ configYesNo configs_yesno[] = { {"replica-read-only","slave-read-only",&g_pserver->repl_slave_ro,1,CONFIG_DEFAULT_SLAVE_READ_ONLY}, {"replica-ignore-maxmemory","slave-ignore-maxmemory",&g_pserver->repl_slave_ignore_maxmemory,1,CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY}, {"multi-master",NULL,&g_pserver->enable_multimaster,false,CONFIG_DEFAULT_ENABLE_MULTIMASTER}, + {"delete-on-evict",NULL,&cserver.delete_on_evict,true,false}, {NULL, NULL, 0, 0} }; @@ -218,15 +220,19 @@ void queueLoadModule(sds path, sds *argv, int argc) { static bool initializeStorageProvider(sds *argv, int argc, const char **err) { - bool fResult = false; + bool fTest = false; if (!strcasecmp(argv[0], "flash") && argc == 2) { // Create The Storage Factory (if necessary) g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory(argv[1], cserver.dbnum); - fResult = true; + } + else if (!strcasecmp(argv[0], "test") && argc == 1) + { + g_pserver->m_pstorageFactory = new (MALLOC_LOCAL) TestStorageFactory(); + fTest = true; } - if (fResult) + if (g_pserver->m_pstorageFactory != nullptr && !fTest) { // We need to set max memory to a sane default so keys are actually evicted properly if (g_pserver->maxmemory == 0 && g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION) @@ -244,7 +250,7 @@ static bool initializeStorageProvider(sds *argv, int argc, const char **err) { *err = "Unknown storage provider"; } - return fResult; + return g_pserver->m_pstorageFactory != nullptr; } void loadServerConfigFromString(char *config) { diff --git a/src/db.cpp b/src/db.cpp index bd48275a4..64547a530 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -677,33 +677,33 @@ void randomkeyCommand(client *c) { bool redisDbPersistentData::iterate(std::function fn) { - if (m_spstorage != nullptr) - { - bool fSawAll = m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *, size_t )->bool{ - sds sdsKey = sdsnewlen(key, cchKey); - dictEntry *de = dictFind(m_pdict, sdsKey); - bool fEvict = (de == nullptr); - ensure(sdsKey, &de); - bool fContinue = fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)); - if (fEvict) - removeCachedValue(sdsKey); - sdsfree(sdsKey); - return fContinue; - }); - return fSawAll; - } - dictIterator *di = dictGetSafeIterator(m_pdict); dictEntry *de = nullptr; bool fResult = true; while(fResult && ((de = dictNext(di)) != nullptr)) { - ensure((const char*)dictGetKey(de), &de); if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de))) fResult = false; } dictReleaseIterator(di); + if (m_spstorage != nullptr) + { + bool fSawAll = fResult && m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *, size_t )->bool{ + sds sdsKey = sdsnewlen(key, cchKey); + bool fContinue = true; + if (dictFind(m_pdict, sdsKey) == nullptr) + { + ensure(sdsKey, &de); + fContinue = fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)); + removeCachedValue(sdsKey); + } + sdsfree(sdsKey); + return fContinue; + }); + return fSawAll; + } + if (fResult && m_pdbSnapshot != nullptr) { fResult = m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr){ @@ -1925,7 +1925,7 @@ void redisDbPersistentData::initialize() m_pdict = dictCreate(&dbDictType,this); m_pdictTombstone = dictCreate(&dbDictType,this); m_setexpire = new(MALLOC_LOCAL) expireset(); - m_fAllChanged = false; + m_fAllChanged = 0; m_fTrackingChanges = 0; } @@ -1987,7 +1987,7 @@ void redisDbPersistentData::clear(void(callback)(void*)) { dictEmpty(m_pdict,callback); if (m_fTrackingChanges) - m_fAllChanged = true; + m_fAllChanged++; delete m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(); if (m_spstorage != nullptr) @@ -2145,7 +2145,7 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() { m_spstorage->clear(); storeDatabase(); - m_fAllChanged = false; + m_fAllChanged--; } else { @@ -2237,9 +2237,11 @@ bool redisDbPersistentData::removeCachedValue(const char *key) return true; } -void redisDbPersistentData::trackChanges() +void redisDbPersistentData::trackChanges(bool fBulk) { m_fTrackingChanges++; + if (fBulk) + m_fAllChanged++; } void redisDbPersistentData::removeAllCachedValues() @@ -2249,7 +2251,7 @@ void redisDbPersistentData::removeAllCachedValues() { auto vec = processChanges(); commitChanges(vec); - trackChanges(); + trackChanges(false); } dictEmpty(m_pdict, nullptr); diff --git a/src/evict.cpp b/src/evict.cpp index 038542365..492b9736f 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -594,7 +594,7 @@ int freeMemoryIfNeeded(void) { if (bestkey) { db = g_pserver->db[bestdbid]; - if (db->FStorageProvider()) + if (!cserver.delete_on_evict && db->FStorageProvider()) { // This key is in the storage so we only need to free the object delta = (long long) zmalloc_used_memory(); diff --git a/src/rdb.cpp b/src/rdb.cpp index fbb011dfe..5faf4958a 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1993,6 +1993,11 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { robj *subexpireKey = nullptr; robj *key = nullptr; + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + g_pserver->db[idb]->trackChanges(true); + } + rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; if (rioRead(rdb,buf,9) == 0) goto eoferr; @@ -2252,9 +2257,16 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } } } + + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + auto vec = g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->commitChanges(vec); + } return C_OK; eoferr: /* unexpected end of file is handled here with a fatal exit */ + // we don't need to commit changes, because we're about to exit() serverLog(LL_WARNING,"Short read or OOM loading DB. Unrecoverable error, aborting now."); rdbExitReportCorruptRDB("Unexpected EOF reading RDB file"); return C_ERR; /* Just to avoid warning */ diff --git a/src/server.cpp b/src/server.cpp index cbf4a7d70..ebcd2eb49 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2290,7 +2290,7 @@ void afterSleep(struct aeEventLoop *eventLoop) { serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); aeAcquireLock(); for (int idb = 0; idb < cserver.dbnum; ++idb) - g_pserver->db[idb]->trackChanges(); + g_pserver->db[idb]->trackChanges(false); aeReleaseLock(); } diff --git a/src/server.h b/src/server.h index 44b18f317..e8f98ed46 100644 --- a/src/server.h +++ b/src/server.h @@ -1318,7 +1318,7 @@ public: void setStorageProvider(IStorage *pstorage); - void trackChanges(); + void trackChanges(bool fBulk); // Process and commit changes for secondary storage. Note that process and commit are seperated // to allow you to release the global lock before commiting. To prevent deadlocks you *must* @@ -1355,7 +1355,7 @@ private: dict *m_pdict = nullptr; /* The keyspace for this DB */ dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */ int m_fTrackingChanges = 0; // Note: Stack based - bool m_fAllChanged = false; + int m_fAllChanged = 0; std::vector m_vecchanged; std::shared_ptr m_spstorage = nullptr; uint64_t mvccCheckpoint = 0; @@ -1949,6 +1949,7 @@ struct redisServerConst { sds license_key = nullptr; int trial_timeout = 20; + int delete_on_evict = false; // Only valid when a storage provider is set }; struct redisServer { diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 1fc9c6c01..5b2503b82 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -256,17 +256,32 @@ dict_iter redisDbPersistentDataSnapshot::find_cached_threadsafe(const char *key) bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn, bool fKeyOnly) const { + // Take the size so we can ensure we visited every element exactly once + // use volatile to ensure it's not checked too late. This makes it more + // likely we'll detect races (but it won't gurantee it) + volatile size_t celem = size(); + + dictEntry *de = nullptr; + bool fResult = true; + + dictIterator *di = dictGetSafeIterator(m_pdict); + while(fResult && ((de = dictNext(di)) != nullptr)) + { + --celem; + robj *o = fKeyOnly ? nullptr : (robj*)dictGetVal(de); + if (!fn((const char*)dictGetKey(de), o)) + fResult = false; + } + dictReleaseIterator(di); + + if (m_spstorage != nullptr) { - bool fSawAll = m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *data, size_t cbData){ + bool fSawAll = fResult && m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *data, size_t cbData){ sds sdsKey = sdsnewlen(key, cchKey); dictEntry *de = dictFind(m_pdict, sdsKey); - bool fContinue = false; - if (de != nullptr) - { - fContinue = fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)); - } - else + bool fContinue = true; + if (de == nullptr) { robj *o = fKeyOnly ? nullptr : deserializeStoredObject(this, sdsKey, data, cbData); fContinue = fn(sdsKey, o); @@ -280,34 +295,6 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functionretrieve((sds)dictGetKey(de), sdslen((sds)dictGetKey(de)), [&](const char *, size_t, const void *data, size_t cb){ - o = deserializeStoredObject(this, (const char*)dictGetKey(de), data, cb); - }); - } - - if (!fn((const char*)dictGetKey(de), o)) - fResult = false; - - if (o != nullptr && dictGetVal(de) == nullptr) - decrRefCount(o); - } - dictReleaseIterator(di); - const redisDbPersistentDataSnapshot *psnapshot; __atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE); if (fResult && psnapshot != nullptr) diff --git a/tests/unit/flash.tcl b/tests/unit/flash.tcl index b8ed35a32..d7b604cf5 100644 --- a/tests/unit/flash.tcl +++ b/tests/unit/flash.tcl @@ -1,4 +1,4 @@ -start_server {tags {"flash"} overrides {"storage-provider flash ./rocks.db"}} { +start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no]] { test { FLASH - GET works after eviction } { r set testkey foo