From 5f481a206de9f561f4c66b3720c43747742c4d0b Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 6 Dec 2019 20:39:32 -0500 Subject: [PATCH] Initial RocksDB integration Former-commit-id: 0de9e5b692c02e779e538ddd0a56d10215e501bb --- deps/Makefile | 4 +- src/IStorage.h | 2 +- src/Makefile | 10 ++-- src/db.cpp | 15 +++++- src/new.cpp | 7 ++- src/new.h | 1 + src/server.cpp | 21 ++++++++ src/server.h | 13 +++++ src/storage/rocksdb.cpp | 110 +++++++++++++++++++++++++++++++++++++++- 9 files changed, 170 insertions(+), 13 deletions(-) diff --git a/deps/Makefile b/deps/Makefile index 93936f725..c0d2ebe18 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -83,13 +83,13 @@ JEMALLOC_LDFLAGS= $(LDFLAGS) jemalloc: .make-prerequisites @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) - cd jemalloc && ./configure --with-version=5.1.0-0-g0 --with-lg-quantum=3 --with-jemalloc-prefix=je_ --enable-cc-silence CFLAGS="$(JEMALLOC_CFLAGS)" LDFLAGS="$(JEMALLOC_LDFLAGS)" + cd jemalloc && ./configure --with-version=5.1.0-0-g0 --with-lg-quantum=3 --with-jemalloc-prefix=je_ --enable-cc-silence --disable-cxx CFLAGS="$(JEMALLOC_CFLAGS)" LDFLAGS="$(JEMALLOC_LDFLAGS)" cd jemalloc && $(MAKE) CFLAGS="$(JEMALLOC_CFLAGS)" LDFLAGS="$(JEMALLOC_LDFLAGS)" lib/libjemalloc.a .PHONY: jemalloc rocksdb: .make-prerequisites @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) - cd rocksdb && $(MAKE) shared_lib + cd rocksdb && $(MAKE) static_lib .PHONY: rocksdb diff --git a/src/IStorage.h b/src/IStorage.h index fa870933c..48aab51d9 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -10,7 +10,7 @@ public: virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0; virtual void erase(const char *key, size_t cchKey) = 0; - virtual void retrieve(const char *key, size_t cchKey, bool fDelete, callback fn) const = 0; + virtual void retrieve(const char *key, size_t cchKey, callback fn) const = 0; virtual size_t clear() = 0; virtual void enumerate(callback fn) const = 0; diff --git a/src/Makefile b/src/Makefile index c2c1e603b..3a6b530d2 100644 --- a/src/Makefile +++ b/src/Makefile @@ -103,7 +103,7 @@ endif FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS) FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(CXXFLAGS) $(REDIS_CFLAGS) FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG) -FINAL_LIBS=-lm +FINAL_LIBS=-lm -lz DEBUG=-g -ggdb ifeq ($(uname_S),SunOS) @@ -258,8 +258,8 @@ endif @touch $@ # keydb-server -$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) librocksdb.so - $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a librocksdb.so -Wl,-rpath . $(FINAL_LIBS) +$(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) + $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS) # keydb-sentinel $(REDIS_SENTINEL_NAME): $(REDIS_SERVER_NAME) @@ -296,12 +296,8 @@ dict-benchmark: dict.cpp zmalloc.cpp sds.c siphash.c %.o: %.asm .make-prerequisites $(KEYDB_AS) $< -o $@ -librocksdb.so: .make-prerequisites - $(QUIET_CP)cp --preserve=mode ../deps/rocksdb/librocksdb.so* ./ - clean: rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark - rm -rf librocksdb.so* .PHONY: clean diff --git a/src/db.cpp b/src/db.cpp index c7f75bcab..38f62c7ee 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1903,6 +1903,17 @@ void redisDbPersistentData::initialize() m_fTrackingChanges = 0; } +void redisDbPersistentData::setStorageProvider(IStorage *pstorage) +{ + serverAssert(m_spstorage == nullptr); + m_spstorage = std::unique_ptr(pstorage); + m_spstorage->enumerate([&](const char *key, size_t cchkey, const void *, size_t){ + sds sdsKey = sdsnewlen(key, cchkey); + dictAdd(m_pdict, sdsKey, nullptr); + }); +} + +IStorage *create_rocksdb_storage(const char *dbfile); void redisDb::initialize(int id) { redisDbPersistentData::initialize(); @@ -1914,6 +1925,8 @@ void redisDb::initialize(int id) this->avg_ttl = 0; this->last_expire_set = 0; this->defrag_later = listCreate(); + if (id == 0) + this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db")); } bool redisDbPersistentData::insert(char *key, robj *o) @@ -2068,7 +2081,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) else if (*pde != nullptr && dictGetVal(*pde) == nullptr) { serverAssert(m_spstorage != nullptr); - m_spstorage->retrieve(sdsKey, sdslen(sdsKey), true, [&](const char *, size_t, const void *data, size_t cb){ + m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){ robj *o = deserializeStoredObject(data, cb); serverAssert(o != nullptr); dictSetVal(m_pdict, *pde, o); diff --git a/src/new.cpp b/src/new.cpp index 33693da7a..809db3e7e 100644 --- a/src/new.cpp +++ b/src/new.cpp @@ -31,4 +31,9 @@ void operator delete(void *p, std::size_t) noexcept zfree(p); } -#endif \ No newline at end of file +#endif + +extern "C" size_t malloc_usable_size(void *pv) +{ + return zmalloc_size(pv); +} \ No newline at end of file diff --git a/src/new.h b/src/new.h index fc7ea926e..d37ac04f8 100644 --- a/src/new.h +++ b/src/new.h @@ -1,5 +1,6 @@ #pragma once #include // std::size_t +#include "storage.h" void *operator new(size_t size, enum MALLOC_CLASS mclass); diff --git a/src/server.cpp b/src/server.cpp index a81a4a3d6..535f26caa 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -5057,6 +5057,27 @@ void OnTerminate() The easiest way to achieve that is to acutally segfault, so we assert here. */ + auto exception = std::current_exception(); + if (exception != nullptr) + { + try + { + std::rethrow_exception(exception); + } + catch (const char *szErr) + { + serverLog(LL_WARNING, "Crashing on uncaught exception: %s", szErr); + } + catch (std::string str) + { + serverLog(LL_WARNING, "Crashing on uncaught exception: %s", str.c_str()); + } + catch (...) + { + // NOP + } + } + serverAssert(false); } diff --git a/src/server.h b/src/server.h index 79c841dc0..fb7de567b 100644 --- a/src/server.h +++ b/src/server.h @@ -1187,6 +1187,7 @@ public: robj *val() { return de ? (robj*)dictGetVal(de) : nullptr; } robj *operator->() { return de ? (robj*)dictGetVal(de) : nullptr; } operator robj*() const { return de ? (robj*)dictGetVal(de) : nullptr; } + }; class redisDbPersistentDataSnapshot; @@ -1266,6 +1267,8 @@ public: const expireEntry *getExpire(robj_roptr key) const; void initialize(); + void setStorageProvider(IStorage *pstorage); + void trackChanges() { m_fTrackingChanges++; } void processChanges(); @@ -1406,6 +1409,16 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::endSnapshot; using redisDbPersistentData::consolidate_snapshot; + dict_iter find_threadsafe(const char *key) const + { + dict_iter itr = redisDbPersistentDataSnapshot::find_threadsafe(key); + if (itr.key() != nullptr && itr.val() == nullptr) + { + return const_cast(this)->redisDbPersistentData::find(key); + } + return itr; + } + public: expireset::setiter expireitr; dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 499a257c1..de3174dab 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -1,14 +1,122 @@ #include "../IStorage.h" #include +#include +#include class RocksDBStorageProvider : public IStorage { - std::unique_ptr m_spdb; + std::shared_ptr m_spdb; + const rocksdb::Snapshot *m_psnapshot = nullptr; + rocksdb::ReadOptions m_readOptionsTemplate; public: + RocksDBStorageProvider(const char *path); ~RocksDBStorageProvider(); + + virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override; + virtual void erase(const char *key, size_t cchKey) override; + virtual void retrieve(const char *key, size_t cchKey, callback fn) const override; + virtual size_t clear() override; + virtual void enumerate(callback fn) const override; + + virtual const IStorage *clone() const override; + + size_t count() const; + +protected: + RocksDBStorageProvider(std::shared_ptr &spdb); + + const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; } }; +IStorage *create_rocksdb_storage(const char *dbfile) +{ + return new RocksDBStorageProvider(dbfile); +} + +RocksDBStorageProvider::RocksDBStorageProvider(const char *path) +{ + rocksdb::Options options; + options.create_if_missing = true; + rocksdb::DB *db = nullptr; + auto status = rocksdb::DB::Open(options, path, &db); + if (!status.ok()) + throw status.ToString(); + m_spdb = std::shared_ptr(db); + + m_readOptionsTemplate = rocksdb::ReadOptions(); +} + +RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr &spdb) + : m_spdb(spdb) +{ + m_readOptionsTemplate = rocksdb::ReadOptions(); + m_psnapshot = spdb->GetSnapshot(); + m_readOptionsTemplate.snapshot = m_psnapshot; +} + +void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb) +{ + auto status = m_spdb->Put(rocksdb::WriteOptions(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb)); + if (!status.ok()) + throw status; +} + +void RocksDBStorageProvider::erase(const char *key, size_t cchKey) +{ + auto status = m_spdb->Delete(rocksdb::WriteOptions(), rocksdb::Slice(key, cchKey)); + if (!status.ok()) + throw status; +} + +void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback fn) const +{ + std::string value; + auto status = m_spdb->Get(ReadOptions(), rocksdb::Slice(key, cchKey), &value); + if (!status.ok()) + throw status; + fn(key, cchKey, value.data(), value.size()); +} + +size_t RocksDBStorageProvider::clear() +{ + size_t celem = count(); + auto status = m_spdb->DropColumnFamily(m_spdb->DefaultColumnFamily()); + if (!status.ok()) + throw status; + return celem; +} + +size_t RocksDBStorageProvider::count() const +{ + std::string strelem; + if (!m_spdb->GetProperty(rocksdb::DB::Properties::kEstimateNumKeys, &strelem)) + throw "Failed to get database size"; + std::stringstream sstream(strelem); + size_t count; + sstream >> count; + return count; +} + +void RocksDBStorageProvider::enumerate(callback fn) const +{ + std::unique_ptr it = std::unique_ptr(m_spdb->NewIterator(ReadOptions())); + for (it->SeekToFirst(); it->Valid(); it->Next()) { + fn(it->key().data(), it->key().size(), it->value().data(), it->value().size()); + } + assert(it->status().ok()); // Check for any errors found during the scan +} + +const IStorage *RocksDBStorageProvider::clone() const +{ + return new RocksDBStorageProvider(const_cast(this)->m_spdb); +} + RocksDBStorageProvider::~RocksDBStorageProvider() { + if (m_spdb != nullptr) + { + if (m_psnapshot != nullptr) + m_spdb->ReleaseSnapshot(m_psnapshot); + } } \ No newline at end of file