diff --git a/src/IStorage.h b/src/IStorage.h index 975e7f0cf..ebe94e19b 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -1,6 +1,13 @@ #pragma once #include +class IStorageFactory +{ +public: + virtual ~IStorageFactory() {} + virtual class IStorage *create(int db) = 0; +}; + class IStorage { public: @@ -17,6 +24,8 @@ public: virtual void beginWriteBatch() {} // NOP virtual void endWriteBatch() {} // NOP + virtual void flush(); + /* This is permitted to be a shallow clone */ virtual const IStorage *clone() const = 0; }; diff --git a/src/Makefile b/src/Makefile index 3a6b530d2..edd0476f6 100644 --- a/src/Makefile +++ b/src/Makefile @@ -208,7 +208,7 @@ endif REDIS_SERVER_NAME=keydb-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark @@ -298,6 +298,7 @@ dict-benchmark: dict.cpp zmalloc.cpp sds.c siphash.c clean: rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark + rm -rf storage/*.o .PHONY: clean diff --git a/src/db.cpp b/src/db.cpp index c9d43bda4..d2e7e9631 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -649,20 +649,27 @@ void randomkeyCommand(client *c) { decrRefCount(key); } +static bool FEvictedDE(dictEntry *de) +{ + return (de != nullptr) && dictGetVal(de) == nullptr; +} bool redisDbPersistentData::iterate(std::function fn) { dictIterator *di = dictGetSafeIterator(m_pdict); dictEntry *de = nullptr; bool fResult = true; - while((de = dictNext(di)) != nullptr) + while(fResult && ((de = dictNext(di)) != nullptr)) { + bool fEvicted = FEvictedDE(de); + ensure((const char*)dictGetKey(de), &de); if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de))) - { fResult = false; - break; - } + + // re-evict the key so we don't OOM + if (fEvicted) + removeCachedValue((const char*)dictGetKey(de)); } dictReleaseIterator(di); @@ -679,10 +686,14 @@ bool redisDbPersistentData::iterate(std::function fn) return true; // Alright it's a key in the use keyspace, lets ensure it and then pass it off + bool fEvicted = FEvictedDE(de); ensure(key); deCurrent = dictFind(m_pdict, key); - return fn(key, (robj*)dictGetVal(deCurrent)); - }); + bool fResult = fn(key, (robj*)dictGetVal(deCurrent)); + if (fEvicted) + removeCachedValue(key); + return fResult; + }, true /*fKeyOnly*/); } return fResult; @@ -713,7 +724,7 @@ void keysCommandCore(client *cIn, const redisDbPersistentDataSnapshot *db, sds p decrRefCount(keyobj); } return !(cIn->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP); - }); + }, true /*fKeyOnly*/); setDeferredArrayLen(c,replylen,numkeys); @@ -1906,7 +1917,6 @@ void redisDbPersistentData::setStorageProvider(IStorage *pstorage) }); } -IStorage *create_rocksdb_storage(const char *dbfile); void redisDb::initialize(int id) { redisDbPersistentData::initialize(); @@ -1918,8 +1928,8 @@ void redisDb::initialize(int id) this->avg_ttl = 0; this->last_expire_set = 0; this->defrag_later = listCreate(); - if (id == 0) - this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db")); + if (g_pserver->m_pstorageFactory != nullptr) + this->setStorageProvider(g_pserver->m_pstorageFactory->create(id)); } bool redisDbPersistentData::insert(char *key, robj *o) diff --git a/src/server.cpp b/src/server.cpp index 7b6ebaf47..cdb639efa 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -61,6 +61,7 @@ #include #include #include "aelocker.h" +#include "storage/rocksdbfactory.h" int g_fTestMode = false; @@ -2187,7 +2188,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { else { fFirstRun = false; } - + aeReleaseLock(); for (auto &pair : vecchanges) @@ -3015,6 +3016,9 @@ void initServer(void) { signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); + // Create The Storage Factory (if necessary) + g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory("/tmp/rocks.db", cserver.dbnum); + zfree(g_pserver->db); // initServerConfig created a dummy array, free that now g_pserver->db = (redisDb**)zmalloc(sizeof(redisDb*)*cserver.dbnum, MALLOC_LOCAL); @@ -3891,6 +3895,7 @@ int prepareForShutdown(int flags) { delete g_pserver->db[idb]; g_pserver->db[idb] = nullptr; } + delete g_pserver->m_pstorageFactory; serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", g_pserver->sentinel_mode ? "Sentinel" : "KeyDB"); diff --git a/src/server.h b/src/server.h index 35e1fe094..5b81babc8 100644 --- a/src/server.h +++ b/src/server.h @@ -1381,7 +1381,7 @@ protected: public: bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } - bool iterate_threadsafe(std::function fn) const; + bool iterate_threadsafe(std::function fn, bool fKeyOnly = false) const; using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; using redisDbPersistentData::end; @@ -2274,6 +2274,8 @@ struct redisServer { GarbageCollector garbageCollector; + IStorageFactory *m_pstorageFactory = nullptr; + bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; } }; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index f5499bbea..b299c62ef 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -261,7 +261,7 @@ dict_iter redisDbPersistentDataSnapshot::find_threadsafe(const char *key) const return dict_iter(de); } -bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn) const +bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn, bool fKeyOnly) const { dictEntry *de = nullptr; bool fResult = true; @@ -272,14 +272,22 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functionretrieve((sds)dictGetKey(de), sdslen((sds)dictGetKey(de)), [&](const char *, size_t, const void *data, size_t cb){ + o = deserializeStoredObject(data, cb); + }); } + + if (!fn((const char*)dictGetKey(de), o)) + fResult = false; + + if (o != nullptr && dictGetVal(de) == nullptr) + decrRefCount(o); } dictReleaseIterator(di); @@ -301,7 +309,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functionm_pdict, sdsdup(key), o.unsafe_robjcast()); return true; - }); + }, true /*fKeyOnly*/); spdb->m_spstorage = m_pdbSnapshot->m_spstorage; spdb->m_pdict->iterators++; diff --git a/src/storage/rocksdbfactory.h b/src/storage/rocksdbfactory.h new file mode 100644 index 000000000..3171bbd16 --- /dev/null +++ b/src/storage/rocksdbfactory.h @@ -0,0 +1,3 @@ +#pragma once + +class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum); \ No newline at end of file