diff --git a/src/IStorage.h b/src/IStorage.h index 975e7f0cf..ebe94e19b 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -1,6 +1,13 @@ #pragma once #include +class IStorageFactory +{ +public: + virtual ~IStorageFactory() {} + virtual class IStorage *create(int db) = 0; +}; + class IStorage { public: @@ -17,6 +24,8 @@ public: virtual void beginWriteBatch() {} // NOP virtual void endWriteBatch() {} // NOP + virtual void flush(); + /* This is permitted to be a shallow clone */ virtual const IStorage *clone() const = 0; }; diff --git a/src/Makefile b/src/Makefile index 3a6b530d2..edd0476f6 100644 --- a/src/Makefile +++ b/src/Makefile @@ -208,7 +208,7 @@ endif REDIS_SERVER_NAME=keydb-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark @@ -298,6 +298,7 @@ dict-benchmark: dict.cpp zmalloc.cpp sds.c siphash.c clean: rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark + rm -rf storage/*.o .PHONY: clean diff --git a/src/db.cpp b/src/db.cpp index c9d43bda4..d2e7e9631 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -649,20 +649,27 @@ void randomkeyCommand(client *c) { decrRefCount(key); } +static bool FEvictedDE(dictEntry *de) +{ + return (de != nullptr) && dictGetVal(de) == nullptr; +} bool redisDbPersistentData::iterate(std::function fn) { dictIterator *di = dictGetSafeIterator(m_pdict); dictEntry *de = nullptr; bool fResult = true; - while((de = dictNext(di)) != nullptr) + while(fResult && ((de = dictNext(di)) != nullptr)) { + bool fEvicted = FEvictedDE(de); + ensure((const char*)dictGetKey(de), &de); if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de))) - { fResult = false; - break; - } + + // re-evict the key so we don't OOM + if (fEvicted) + removeCachedValue((const char*)dictGetKey(de)); } dictReleaseIterator(di); @@ -679,10 +686,14 @@ bool redisDbPersistentData::iterate(std::function fn) return true; // Alright it's a key in the use keyspace, lets ensure it and then pass it off + bool fEvicted = FEvictedDE(de); ensure(key); deCurrent = dictFind(m_pdict, key); - return fn(key, (robj*)dictGetVal(deCurrent)); - }); + bool fResult = fn(key, (robj*)dictGetVal(deCurrent)); + if (fEvicted) + removeCachedValue(key); + return fResult; + }, true /*fKeyOnly*/); } return fResult; @@ -713,7 +724,7 @@ void keysCommandCore(client *cIn, const redisDbPersistentDataSnapshot *db, sds p decrRefCount(keyobj); } return !(cIn->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP); - }); + }, true /*fKeyOnly*/); setDeferredArrayLen(c,replylen,numkeys); @@ -1906,7 +1917,6 @@ void redisDbPersistentData::setStorageProvider(IStorage *pstorage) }); } -IStorage *create_rocksdb_storage(const char *dbfile); void redisDb::initialize(int id) { redisDbPersistentData::initialize(); @@ -1918,8 +1928,8 @@ void redisDb::initialize(int id) this->avg_ttl = 0; this->last_expire_set = 0; this->defrag_later = listCreate(); - if (id == 0) - this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db")); + if (g_pserver->m_pstorageFactory != nullptr) + this->setStorageProvider(g_pserver->m_pstorageFactory->create(id)); } bool redisDbPersistentData::insert(char *key, robj *o) diff --git a/src/server.cpp b/src/server.cpp index 7b6ebaf47..cdb639efa 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -61,6 +61,7 @@ #include #include #include "aelocker.h" +#include "storage/rocksdbfactory.h" int g_fTestMode = false; @@ -2187,7 +2188,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { else { fFirstRun = false; } - + aeReleaseLock(); for (auto &pair : vecchanges) @@ -3015,6 +3016,9 @@ void initServer(void) { signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); + // Create The Storage Factory (if necessary) + g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory("/tmp/rocks.db", cserver.dbnum); + zfree(g_pserver->db); // initServerConfig created a dummy array, free that now g_pserver->db = (redisDb**)zmalloc(sizeof(redisDb*)*cserver.dbnum, MALLOC_LOCAL); @@ -3891,6 +3895,7 @@ int prepareForShutdown(int flags) { delete g_pserver->db[idb]; g_pserver->db[idb] = nullptr; } + delete g_pserver->m_pstorageFactory; serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", g_pserver->sentinel_mode ? "Sentinel" : "KeyDB"); diff --git a/src/server.h b/src/server.h index 35e1fe094..5b81babc8 100644 --- a/src/server.h +++ b/src/server.h @@ -1381,7 +1381,7 @@ protected: public: bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } - bool iterate_threadsafe(std::function fn) const; + bool iterate_threadsafe(std::function fn, bool fKeyOnly = false) const; using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; using redisDbPersistentData::end; @@ -2274,6 +2274,8 @@ struct redisServer { GarbageCollector garbageCollector; + IStorageFactory *m_pstorageFactory = nullptr; + bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; } }; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index f5499bbea..b299c62ef 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -261,7 +261,7 @@ dict_iter redisDbPersistentDataSnapshot::find_threadsafe(const char *key) const return dict_iter(de); } -bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn) const +bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn, bool fKeyOnly) const { dictEntry *de = nullptr; bool fResult = true; @@ -272,14 +272,22 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functionretrieve((sds)dictGetKey(de), sdslen((sds)dictGetKey(de)), [&](const char *, size_t, const void *data, size_t cb){ + o = deserializeStoredObject(data, cb); + }); } + + if (!fn((const char*)dictGetKey(de), o)) + fResult = false; + + if (o != nullptr && dictGetVal(de) == nullptr) + decrRefCount(o); } dictReleaseIterator(di); @@ -301,7 +309,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functionm_pdict, sdsdup(key), o.unsafe_robjcast()); return true; - }); + }, true /*fKeyOnly*/); spdb->m_spstorage = m_pdbSnapshot->m_spstorage; spdb->m_pdict->iterators++; diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 4d352eef2..c809ee88c 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -1,62 +1,11 @@ -#include "../IStorage.h" -#include +#include "rocksdb.h" #include #include -class RocksDBStorageProvider : public IStorage -{ - std::shared_ptr m_spdb; - std::unique_ptr m_spbatch; - const rocksdb::Snapshot *m_psnapshot = nullptr; - rocksdb::ReadOptions m_readOptionsTemplate; - -public: - RocksDBStorageProvider(const char *path); - ~RocksDBStorageProvider(); - - virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override; - virtual void erase(const char *key, size_t cchKey) override; - virtual void retrieve(const char *key, size_t cchKey, callback fn) const override; - virtual size_t clear() override; - virtual void enumerate(callback fn) const override; - - virtual const IStorage *clone() const override; - - virtual void beginWriteBatch() override; - virtual void endWriteBatch() override; - - size_t count() const; - -protected: - RocksDBStorageProvider(std::shared_ptr &spdb); - - const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; } - rocksdb::WriteOptions WriteOptions() const; -}; - -IStorage *create_rocksdb_storage(const char *dbfile) -{ - return new RocksDBStorageProvider(dbfile); -} - -RocksDBStorageProvider::RocksDBStorageProvider(const char *path) -{ - rocksdb::Options options; - options.create_if_missing = true; - rocksdb::DB *db = nullptr; - auto status = rocksdb::DB::Open(options, path, &db); - if (!status.ok()) - throw status.ToString(); - m_spdb = std::shared_ptr(db); - - m_readOptionsTemplate = rocksdb::ReadOptions(); -} - -RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr &spdb) - : m_spdb(spdb) +RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr &spdb, std::shared_ptr &spcolfam, const rocksdb::Snapshot *psnapshot) + : m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam) { m_readOptionsTemplate = rocksdb::ReadOptions(); - m_psnapshot = spdb->GetSnapshot(); m_readOptionsTemplate.snapshot = m_psnapshot; } @@ -64,9 +13,9 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, { rocksdb::Status status; if (m_spbatch != nullptr) - status = m_spbatch->Put(rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb)); + status = m_spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb)); else - status = m_spdb->Put(WriteOptions(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb)); + status = m_spdb->Put(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb)); if (!status.ok()) throw status.ToString(); } @@ -75,9 +24,9 @@ void RocksDBStorageProvider::erase(const char *key, size_t cchKey) { rocksdb::Status status; if (m_spbatch != nullptr) - status = m_spbatch->Delete(rocksdb::Slice(key, cchKey)); + status = m_spbatch->Delete(m_spcolfamily.get(), rocksdb::Slice(key, cchKey)); else - status = m_spdb->Delete(WriteOptions(), rocksdb::Slice(key, cchKey)); + status = m_spdb->Delete(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey)); if (!status.ok()) throw status.ToString(); } @@ -85,7 +34,7 @@ void RocksDBStorageProvider::erase(const char *key, size_t cchKey) void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback fn) const { std::string value; - auto status = m_spdb->Get(ReadOptions(), rocksdb::Slice(key, cchKey), &value); + auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &value); if (!status.ok()) throw status.ToString(); fn(key, cchKey, value.data(), value.size()); @@ -94,7 +43,13 @@ void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback f size_t RocksDBStorageProvider::clear() { size_t celem = count(); - auto status = m_spdb->DropColumnFamily(m_spdb->DefaultColumnFamily()); + auto status = m_spdb->DropColumnFamily(m_spcolfamily.get()); + auto strName = m_spcolfamily->GetName(); + + rocksdb::ColumnFamilyHandle *handle = nullptr; + m_spdb->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), strName, &handle); + m_spcolfamily = std::shared_ptr(handle); + if (!status.ok()) throw status.ToString(); return celem; @@ -103,7 +58,7 @@ size_t RocksDBStorageProvider::clear() size_t RocksDBStorageProvider::count() const { std::string strelem; - if (!m_spdb->GetProperty(rocksdb::DB::Properties::kEstimateNumKeys, &strelem)) + if (!m_spdb->GetProperty(m_spcolfamily.get(), rocksdb::DB::Properties::kEstimateNumKeys, &strelem)) throw "Failed to get database size"; std::stringstream sstream(strelem); size_t count; @@ -113,7 +68,7 @@ size_t RocksDBStorageProvider::count() const void RocksDBStorageProvider::enumerate(callback fn) const { - std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(ReadOptions())); + std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get())); for (it->SeekToFirst(); it->Valid(); it->Next()) { fn(it->key().data(), it->key().size(), it->value().data(), it->value().size()); } @@ -122,7 +77,8 @@ void RocksDBStorageProvider::enumerate(callback fn) const const IStorage *RocksDBStorageProvider::clone() const { - return new RocksDBStorageProvider(const_cast(this)->m_spdb); + const rocksdb::Snapshot *psnapshot = const_cast(this)->m_spdb->GetSnapshot(); + return new RocksDBStorageProvider(const_cast(this)->m_spdb, const_cast(this)->m_spcolfamily, psnapshot); } RocksDBStorageProvider::~RocksDBStorageProvider() @@ -150,4 +106,9 @@ void RocksDBStorageProvider::endWriteBatch() { m_spdb->Write(WriteOptions(), m_spbatch.get()); m_spbatch = nullptr; +} + +void RocksDBStorageProvider::flush() +{ + m_spdb->SyncWAL(); } \ No newline at end of file diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h new file mode 100644 index 000000000..66e63c0b0 --- /dev/null +++ b/src/storage/rocksdb.h @@ -0,0 +1,38 @@ +#pragma once + +#include +#include "../IStorage.h" +#include + +class RocksDBStorageProvider : public IStorage +{ + std::shared_ptr m_spdb; // Note: This must be first so it is deleted last + std::unique_ptr m_spbatch; + const rocksdb::Snapshot *m_psnapshot = nullptr; + std::shared_ptr m_spcolfamily; + rocksdb::ReadOptions m_readOptionsTemplate; + +public: + RocksDBStorageProvider(std::shared_ptr &spdb, std::shared_ptr &spcolfam, const rocksdb::Snapshot *psnapshot); + ~RocksDBStorageProvider(); + + virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override; + virtual void erase(const char *key, size_t cchKey) override; + virtual void retrieve(const char *key, size_t cchKey, callback fn) const override; + virtual size_t clear() override; + virtual void enumerate(callback fn) const override; + + virtual const IStorage *clone() const override; + + virtual void beginWriteBatch() override; + virtual void endWriteBatch() override; + + virtual void flush() override; + + size_t count() const; + +protected: + + const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; } + rocksdb::WriteOptions WriteOptions() const; +}; \ No newline at end of file diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp new file mode 100644 index 000000000..3774d217d --- /dev/null +++ b/src/storage/rocksdbfactory.cpp @@ -0,0 +1,63 @@ +#include "rocksdb.h" + +class RocksDBStorageFactory : public IStorageFactory +{ + std::shared_ptr m_spdb; // Note: This must be first so it is deleted last + std::vector> m_vecspcols; + +public: + RocksDBStorageFactory(const char *dbfile, int dbnum); + ~RocksDBStorageFactory(); + + virtual IStorage *create(int db) override; +}; + +IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum) +{ + return new RocksDBStorageFactory(path, dbnum); +} + +RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum) +{ + // Get the count of column families in the actual database + std::vector vecT; + auto status = rocksdb::DB::ListColumnFamilies(rocksdb::Options(), dbfile, &vecT); + // RocksDB requires we know the count of col families before opening, if the user only wants to see less + // we still have to make room for all column family handles regardless + if (status.ok() && (int)vecT.size() > dbnum) + dbnum = (int)vecT.size(); + + std::vector veccoldesc; + veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName, rocksdb::ColumnFamilyOptions())); // ignore default col family + + for (int idb = 0; idb < dbnum; ++idb) + { + veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(std::to_string(idb), rocksdb::ColumnFamilyOptions())); + } + + rocksdb::Options options; + options.create_if_missing = true; + options.create_missing_column_families = true; + rocksdb::DB *db = nullptr; + + std::vector handles; + status = rocksdb::DB::Open(options, dbfile, veccoldesc, &handles, &db); + if (!status.ok()) + throw status.ToString(); + + for (auto handle : handles) + m_vecspcols.emplace_back(handle); + m_spdb = std::shared_ptr(db); +} + +RocksDBStorageFactory::~RocksDBStorageFactory() +{ + m_spdb->SyncWAL(); +} + +IStorage *RocksDBStorageFactory::create(int db) +{ + ++db; // skip default col family + std::shared_ptr spcolfamily(m_vecspcols[db].release()); + return new RocksDBStorageProvider(m_spdb, spcolfamily, nullptr); +} \ No newline at end of file diff --git a/src/storage/rocksdbfactory.h b/src/storage/rocksdbfactory.h new file mode 100644 index 000000000..3171bbd16 --- /dev/null +++ b/src/storage/rocksdbfactory.h @@ -0,0 +1,3 @@ +#pragma once + +class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum); \ No newline at end of file