Add support for multiple databases with rocksdb
Former-commit-id: cfa3b760b00776876134692d42e25a60fcd8dea9
This commit is contained in:
parent
8f8f9b7a46
commit
c55904d9f4
@ -1,6 +1,13 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
|
||||||
|
class IStorageFactory
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
virtual ~IStorageFactory() {}
|
||||||
|
virtual class IStorage *create(int db) = 0;
|
||||||
|
};
|
||||||
|
|
||||||
class IStorage
|
class IStorage
|
||||||
{
|
{
|
||||||
public:
|
public:
|
||||||
@ -17,6 +24,8 @@ public:
|
|||||||
virtual void beginWriteBatch() {} // NOP
|
virtual void beginWriteBatch() {} // NOP
|
||||||
virtual void endWriteBatch() {} // NOP
|
virtual void endWriteBatch() {} // NOP
|
||||||
|
|
||||||
|
virtual void flush();
|
||||||
|
|
||||||
/* This is permitted to be a shallow clone */
|
/* This is permitted to be a shallow clone */
|
||||||
virtual const IStorage *clone() const = 0;
|
virtual const IStorage *clone() const = 0;
|
||||||
};
|
};
|
||||||
|
@ -208,7 +208,7 @@ endif
|
|||||||
|
|
||||||
REDIS_SERVER_NAME=keydb-server
|
REDIS_SERVER_NAME=keydb-server
|
||||||
REDIS_SENTINEL_NAME=keydb-sentinel
|
REDIS_SENTINEL_NAME=keydb-sentinel
|
||||||
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o $(ASM_OBJ)
|
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o $(ASM_OBJ)
|
||||||
REDIS_CLI_NAME=keydb-cli
|
REDIS_CLI_NAME=keydb-cli
|
||||||
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ)
|
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ)
|
||||||
REDIS_BENCHMARK_NAME=keydb-benchmark
|
REDIS_BENCHMARK_NAME=keydb-benchmark
|
||||||
@ -298,6 +298,7 @@ dict-benchmark: dict.cpp zmalloc.cpp sds.c siphash.c
|
|||||||
|
|
||||||
clean:
|
clean:
|
||||||
rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark
|
rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark
|
||||||
|
rm -rf storage/*.o
|
||||||
|
|
||||||
.PHONY: clean
|
.PHONY: clean
|
||||||
|
|
||||||
|
30
src/db.cpp
30
src/db.cpp
@ -649,20 +649,27 @@ void randomkeyCommand(client *c) {
|
|||||||
decrRefCount(key);
|
decrRefCount(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool FEvictedDE(dictEntry *de)
|
||||||
|
{
|
||||||
|
return (de != nullptr) && dictGetVal(de) == nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
|
bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
|
||||||
{
|
{
|
||||||
dictIterator *di = dictGetSafeIterator(m_pdict);
|
dictIterator *di = dictGetSafeIterator(m_pdict);
|
||||||
dictEntry *de = nullptr;
|
dictEntry *de = nullptr;
|
||||||
bool fResult = true;
|
bool fResult = true;
|
||||||
while((de = dictNext(di)) != nullptr)
|
while(fResult && ((de = dictNext(di)) != nullptr))
|
||||||
{
|
{
|
||||||
|
bool fEvicted = FEvictedDE(de);
|
||||||
|
|
||||||
ensure((const char*)dictGetKey(de), &de);
|
ensure((const char*)dictGetKey(de), &de);
|
||||||
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
|
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
|
||||||
{
|
|
||||||
fResult = false;
|
fResult = false;
|
||||||
break;
|
|
||||||
}
|
// re-evict the key so we don't OOM
|
||||||
|
if (fEvicted)
|
||||||
|
removeCachedValue((const char*)dictGetKey(de));
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
|
||||||
@ -679,10 +686,14 @@ bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
|
|||||||
return true;
|
return true;
|
||||||
|
|
||||||
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
|
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
|
||||||
|
bool fEvicted = FEvictedDE(de);
|
||||||
ensure(key);
|
ensure(key);
|
||||||
deCurrent = dictFind(m_pdict, key);
|
deCurrent = dictFind(m_pdict, key);
|
||||||
return fn(key, (robj*)dictGetVal(deCurrent));
|
bool fResult = fn(key, (robj*)dictGetVal(deCurrent));
|
||||||
});
|
if (fEvicted)
|
||||||
|
removeCachedValue(key);
|
||||||
|
return fResult;
|
||||||
|
}, true /*fKeyOnly*/);
|
||||||
}
|
}
|
||||||
|
|
||||||
return fResult;
|
return fResult;
|
||||||
@ -713,7 +724,7 @@ void keysCommandCore(client *cIn, const redisDbPersistentDataSnapshot *db, sds p
|
|||||||
decrRefCount(keyobj);
|
decrRefCount(keyobj);
|
||||||
}
|
}
|
||||||
return !(cIn->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP);
|
return !(cIn->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP);
|
||||||
});
|
}, true /*fKeyOnly*/);
|
||||||
|
|
||||||
setDeferredArrayLen(c,replylen,numkeys);
|
setDeferredArrayLen(c,replylen,numkeys);
|
||||||
|
|
||||||
@ -1906,7 +1917,6 @@ void redisDbPersistentData::setStorageProvider(IStorage *pstorage)
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
IStorage *create_rocksdb_storage(const char *dbfile);
|
|
||||||
void redisDb::initialize(int id)
|
void redisDb::initialize(int id)
|
||||||
{
|
{
|
||||||
redisDbPersistentData::initialize();
|
redisDbPersistentData::initialize();
|
||||||
@ -1918,8 +1928,8 @@ void redisDb::initialize(int id)
|
|||||||
this->avg_ttl = 0;
|
this->avg_ttl = 0;
|
||||||
this->last_expire_set = 0;
|
this->last_expire_set = 0;
|
||||||
this->defrag_later = listCreate();
|
this->defrag_later = listCreate();
|
||||||
if (id == 0)
|
if (g_pserver->m_pstorageFactory != nullptr)
|
||||||
this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db"));
|
this->setStorageProvider(g_pserver->m_pstorageFactory->create(id));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool redisDbPersistentData::insert(char *key, robj *o)
|
bool redisDbPersistentData::insert(char *key, robj *o)
|
||||||
|
@ -61,6 +61,7 @@
|
|||||||
#include <uuid/uuid.h>
|
#include <uuid/uuid.h>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include "aelocker.h"
|
#include "aelocker.h"
|
||||||
|
#include "storage/rocksdbfactory.h"
|
||||||
|
|
||||||
int g_fTestMode = false;
|
int g_fTestMode = false;
|
||||||
|
|
||||||
@ -2187,7 +2188,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
|||||||
else {
|
else {
|
||||||
fFirstRun = false;
|
fFirstRun = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
|
|
||||||
for (auto &pair : vecchanges)
|
for (auto &pair : vecchanges)
|
||||||
@ -3015,6 +3016,9 @@ void initServer(void) {
|
|||||||
signal(SIGPIPE, SIG_IGN);
|
signal(SIGPIPE, SIG_IGN);
|
||||||
setupSignalHandlers();
|
setupSignalHandlers();
|
||||||
|
|
||||||
|
// Create The Storage Factory (if necessary)
|
||||||
|
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory("/tmp/rocks.db", cserver.dbnum);
|
||||||
|
|
||||||
zfree(g_pserver->db); // initServerConfig created a dummy array, free that now
|
zfree(g_pserver->db); // initServerConfig created a dummy array, free that now
|
||||||
g_pserver->db = (redisDb**)zmalloc(sizeof(redisDb*)*cserver.dbnum, MALLOC_LOCAL);
|
g_pserver->db = (redisDb**)zmalloc(sizeof(redisDb*)*cserver.dbnum, MALLOC_LOCAL);
|
||||||
|
|
||||||
@ -3891,6 +3895,7 @@ int prepareForShutdown(int flags) {
|
|||||||
delete g_pserver->db[idb];
|
delete g_pserver->db[idb];
|
||||||
g_pserver->db[idb] = nullptr;
|
g_pserver->db[idb] = nullptr;
|
||||||
}
|
}
|
||||||
|
delete g_pserver->m_pstorageFactory;
|
||||||
|
|
||||||
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
|
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
|
||||||
g_pserver->sentinel_mode ? "Sentinel" : "KeyDB");
|
g_pserver->sentinel_mode ? "Sentinel" : "KeyDB");
|
||||||
|
@ -1381,7 +1381,7 @@ protected:
|
|||||||
public:
|
public:
|
||||||
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
|
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||||
|
|
||||||
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const;
|
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false) const;
|
||||||
using redisDbPersistentData::createSnapshot;
|
using redisDbPersistentData::createSnapshot;
|
||||||
using redisDbPersistentData::endSnapshot;
|
using redisDbPersistentData::endSnapshot;
|
||||||
using redisDbPersistentData::end;
|
using redisDbPersistentData::end;
|
||||||
@ -2274,6 +2274,8 @@ struct redisServer {
|
|||||||
|
|
||||||
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollector;
|
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollector;
|
||||||
|
|
||||||
|
IStorageFactory *m_pstorageFactory = nullptr;
|
||||||
|
|
||||||
bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
|
bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -261,7 +261,7 @@ dict_iter redisDbPersistentDataSnapshot::find_threadsafe(const char *key) const
|
|||||||
return dict_iter(de);
|
return dict_iter(de);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const
|
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly) const
|
||||||
{
|
{
|
||||||
dictEntry *de = nullptr;
|
dictEntry *de = nullptr;
|
||||||
bool fResult = true;
|
bool fResult = true;
|
||||||
@ -272,14 +272,22 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
|
|||||||
volatile size_t celem = size();
|
volatile size_t celem = size();
|
||||||
|
|
||||||
dictIterator *di = dictGetSafeIterator(m_pdict);
|
dictIterator *di = dictGetSafeIterator(m_pdict);
|
||||||
while((de = dictNext(di)) != nullptr)
|
while(fResult && ((de = dictNext(di)) != nullptr))
|
||||||
{
|
{
|
||||||
--celem;
|
--celem;
|
||||||
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
|
robj *o = (robj*)dictGetVal(de);
|
||||||
|
if (o == nullptr && !fKeyOnly)
|
||||||
{
|
{
|
||||||
fResult = false;
|
m_spstorage->retrieve((sds)dictGetKey(de), sdslen((sds)dictGetKey(de)), [&](const char *, size_t, const void *data, size_t cb){
|
||||||
break;
|
o = deserializeStoredObject(data, cb);
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!fn((const char*)dictGetKey(de), o))
|
||||||
|
fResult = false;
|
||||||
|
|
||||||
|
if (o != nullptr && dictGetVal(de) == nullptr)
|
||||||
|
decrRefCount(o);
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
|
|
||||||
@ -301,7 +309,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
|
|||||||
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
|
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
|
||||||
--celem;
|
--celem;
|
||||||
return fn(key, o);
|
return fn(key, o);
|
||||||
});
|
}, fKeyOnly);
|
||||||
}
|
}
|
||||||
|
|
||||||
serverAssert(!fResult || celem == 0);
|
serverAssert(!fResult || celem == 0);
|
||||||
@ -354,7 +362,7 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
|
|||||||
incrRefCount(o);
|
incrRefCount(o);
|
||||||
dictAdd(spdb->m_pdict, sdsdup(key), o.unsafe_robjcast());
|
dictAdd(spdb->m_pdict, sdsdup(key), o.unsafe_robjcast());
|
||||||
return true;
|
return true;
|
||||||
});
|
}, true /*fKeyOnly*/);
|
||||||
spdb->m_spstorage = m_pdbSnapshot->m_spstorage;
|
spdb->m_spstorage = m_pdbSnapshot->m_spstorage;
|
||||||
|
|
||||||
spdb->m_pdict->iterators++;
|
spdb->m_pdict->iterators++;
|
||||||
|
@ -1,62 +1,11 @@
|
|||||||
#include "../IStorage.h"
|
#include "rocksdb.h"
|
||||||
#include <rocksdb/db.h>
|
|
||||||
#include <string>
|
#include <string>
|
||||||
#include <sstream>
|
#include <sstream>
|
||||||
|
|
||||||
class RocksDBStorageProvider : public IStorage
|
RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot)
|
||||||
{
|
: m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam)
|
||||||
std::shared_ptr<rocksdb::DB> m_spdb;
|
|
||||||
std::unique_ptr<rocksdb::WriteBatch> m_spbatch;
|
|
||||||
const rocksdb::Snapshot *m_psnapshot = nullptr;
|
|
||||||
rocksdb::ReadOptions m_readOptionsTemplate;
|
|
||||||
|
|
||||||
public:
|
|
||||||
RocksDBStorageProvider(const char *path);
|
|
||||||
~RocksDBStorageProvider();
|
|
||||||
|
|
||||||
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override;
|
|
||||||
virtual void erase(const char *key, size_t cchKey) override;
|
|
||||||
virtual void retrieve(const char *key, size_t cchKey, callback fn) const override;
|
|
||||||
virtual size_t clear() override;
|
|
||||||
virtual void enumerate(callback fn) const override;
|
|
||||||
|
|
||||||
virtual const IStorage *clone() const override;
|
|
||||||
|
|
||||||
virtual void beginWriteBatch() override;
|
|
||||||
virtual void endWriteBatch() override;
|
|
||||||
|
|
||||||
size_t count() const;
|
|
||||||
|
|
||||||
protected:
|
|
||||||
RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb);
|
|
||||||
|
|
||||||
const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; }
|
|
||||||
rocksdb::WriteOptions WriteOptions() const;
|
|
||||||
};
|
|
||||||
|
|
||||||
IStorage *create_rocksdb_storage(const char *dbfile)
|
|
||||||
{
|
|
||||||
return new RocksDBStorageProvider(dbfile);
|
|
||||||
}
|
|
||||||
|
|
||||||
RocksDBStorageProvider::RocksDBStorageProvider(const char *path)
|
|
||||||
{
|
|
||||||
rocksdb::Options options;
|
|
||||||
options.create_if_missing = true;
|
|
||||||
rocksdb::DB *db = nullptr;
|
|
||||||
auto status = rocksdb::DB::Open(options, path, &db);
|
|
||||||
if (!status.ok())
|
|
||||||
throw status.ToString();
|
|
||||||
m_spdb = std::shared_ptr<rocksdb::DB>(db);
|
|
||||||
|
|
||||||
m_readOptionsTemplate = rocksdb::ReadOptions();
|
|
||||||
}
|
|
||||||
|
|
||||||
RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb)
|
|
||||||
: m_spdb(spdb)
|
|
||||||
{
|
{
|
||||||
m_readOptionsTemplate = rocksdb::ReadOptions();
|
m_readOptionsTemplate = rocksdb::ReadOptions();
|
||||||
m_psnapshot = spdb->GetSnapshot();
|
|
||||||
m_readOptionsTemplate.snapshot = m_psnapshot;
|
m_readOptionsTemplate.snapshot = m_psnapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,9 +13,9 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data,
|
|||||||
{
|
{
|
||||||
rocksdb::Status status;
|
rocksdb::Status status;
|
||||||
if (m_spbatch != nullptr)
|
if (m_spbatch != nullptr)
|
||||||
status = m_spbatch->Put(rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
|
status = m_spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
|
||||||
else
|
else
|
||||||
status = m_spdb->Put(WriteOptions(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
|
status = m_spdb->Put(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
|
||||||
if (!status.ok())
|
if (!status.ok())
|
||||||
throw status.ToString();
|
throw status.ToString();
|
||||||
}
|
}
|
||||||
@ -75,9 +24,9 @@ void RocksDBStorageProvider::erase(const char *key, size_t cchKey)
|
|||||||
{
|
{
|
||||||
rocksdb::Status status;
|
rocksdb::Status status;
|
||||||
if (m_spbatch != nullptr)
|
if (m_spbatch != nullptr)
|
||||||
status = m_spbatch->Delete(rocksdb::Slice(key, cchKey));
|
status = m_spbatch->Delete(m_spcolfamily.get(), rocksdb::Slice(key, cchKey));
|
||||||
else
|
else
|
||||||
status = m_spdb->Delete(WriteOptions(), rocksdb::Slice(key, cchKey));
|
status = m_spdb->Delete(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey));
|
||||||
if (!status.ok())
|
if (!status.ok())
|
||||||
throw status.ToString();
|
throw status.ToString();
|
||||||
}
|
}
|
||||||
@ -85,7 +34,7 @@ void RocksDBStorageProvider::erase(const char *key, size_t cchKey)
|
|||||||
void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback fn) const
|
void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback fn) const
|
||||||
{
|
{
|
||||||
std::string value;
|
std::string value;
|
||||||
auto status = m_spdb->Get(ReadOptions(), rocksdb::Slice(key, cchKey), &value);
|
auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &value);
|
||||||
if (!status.ok())
|
if (!status.ok())
|
||||||
throw status.ToString();
|
throw status.ToString();
|
||||||
fn(key, cchKey, value.data(), value.size());
|
fn(key, cchKey, value.data(), value.size());
|
||||||
@ -94,7 +43,13 @@ void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback f
|
|||||||
size_t RocksDBStorageProvider::clear()
|
size_t RocksDBStorageProvider::clear()
|
||||||
{
|
{
|
||||||
size_t celem = count();
|
size_t celem = count();
|
||||||
auto status = m_spdb->DropColumnFamily(m_spdb->DefaultColumnFamily());
|
auto status = m_spdb->DropColumnFamily(m_spcolfamily.get());
|
||||||
|
auto strName = m_spcolfamily->GetName();
|
||||||
|
|
||||||
|
rocksdb::ColumnFamilyHandle *handle = nullptr;
|
||||||
|
m_spdb->CreateColumnFamily(rocksdb::ColumnFamilyOptions(), strName, &handle);
|
||||||
|
m_spcolfamily = std::shared_ptr<rocksdb::ColumnFamilyHandle>(handle);
|
||||||
|
|
||||||
if (!status.ok())
|
if (!status.ok())
|
||||||
throw status.ToString();
|
throw status.ToString();
|
||||||
return celem;
|
return celem;
|
||||||
@ -103,7 +58,7 @@ size_t RocksDBStorageProvider::clear()
|
|||||||
size_t RocksDBStorageProvider::count() const
|
size_t RocksDBStorageProvider::count() const
|
||||||
{
|
{
|
||||||
std::string strelem;
|
std::string strelem;
|
||||||
if (!m_spdb->GetProperty(rocksdb::DB::Properties::kEstimateNumKeys, &strelem))
|
if (!m_spdb->GetProperty(m_spcolfamily.get(), rocksdb::DB::Properties::kEstimateNumKeys, &strelem))
|
||||||
throw "Failed to get database size";
|
throw "Failed to get database size";
|
||||||
std::stringstream sstream(strelem);
|
std::stringstream sstream(strelem);
|
||||||
size_t count;
|
size_t count;
|
||||||
@ -113,7 +68,7 @@ size_t RocksDBStorageProvider::count() const
|
|||||||
|
|
||||||
void RocksDBStorageProvider::enumerate(callback fn) const
|
void RocksDBStorageProvider::enumerate(callback fn) const
|
||||||
{
|
{
|
||||||
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(ReadOptions()));
|
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get()));
|
||||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||||
fn(it->key().data(), it->key().size(), it->value().data(), it->value().size());
|
fn(it->key().data(), it->key().size(), it->value().data(), it->value().size());
|
||||||
}
|
}
|
||||||
@ -122,7 +77,8 @@ void RocksDBStorageProvider::enumerate(callback fn) const
|
|||||||
|
|
||||||
const IStorage *RocksDBStorageProvider::clone() const
|
const IStorage *RocksDBStorageProvider::clone() const
|
||||||
{
|
{
|
||||||
return new RocksDBStorageProvider(const_cast<RocksDBStorageProvider*>(this)->m_spdb);
|
const rocksdb::Snapshot *psnapshot = const_cast<RocksDBStorageProvider*>(this)->m_spdb->GetSnapshot();
|
||||||
|
return new RocksDBStorageProvider(const_cast<RocksDBStorageProvider*>(this)->m_spdb, const_cast<RocksDBStorageProvider*>(this)->m_spcolfamily, psnapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
RocksDBStorageProvider::~RocksDBStorageProvider()
|
RocksDBStorageProvider::~RocksDBStorageProvider()
|
||||||
@ -150,4 +106,9 @@ void RocksDBStorageProvider::endWriteBatch()
|
|||||||
{
|
{
|
||||||
m_spdb->Write(WriteOptions(), m_spbatch.get());
|
m_spdb->Write(WriteOptions(), m_spbatch.get());
|
||||||
m_spbatch = nullptr;
|
m_spbatch = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
void RocksDBStorageProvider::flush()
|
||||||
|
{
|
||||||
|
m_spdb->SyncWAL();
|
||||||
}
|
}
|
38
src/storage/rocksdb.h
Normal file
38
src/storage/rocksdb.h
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <memory>
|
||||||
|
#include "../IStorage.h"
|
||||||
|
#include <rocksdb/db.h>
|
||||||
|
|
||||||
|
class RocksDBStorageProvider : public IStorage
|
||||||
|
{
|
||||||
|
std::shared_ptr<rocksdb::DB> m_spdb; // Note: This must be first so it is deleted last
|
||||||
|
std::unique_ptr<rocksdb::WriteBatch> m_spbatch;
|
||||||
|
const rocksdb::Snapshot *m_psnapshot = nullptr;
|
||||||
|
std::shared_ptr<rocksdb::ColumnFamilyHandle> m_spcolfamily;
|
||||||
|
rocksdb::ReadOptions m_readOptionsTemplate;
|
||||||
|
|
||||||
|
public:
|
||||||
|
RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot);
|
||||||
|
~RocksDBStorageProvider();
|
||||||
|
|
||||||
|
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override;
|
||||||
|
virtual void erase(const char *key, size_t cchKey) override;
|
||||||
|
virtual void retrieve(const char *key, size_t cchKey, callback fn) const override;
|
||||||
|
virtual size_t clear() override;
|
||||||
|
virtual void enumerate(callback fn) const override;
|
||||||
|
|
||||||
|
virtual const IStorage *clone() const override;
|
||||||
|
|
||||||
|
virtual void beginWriteBatch() override;
|
||||||
|
virtual void endWriteBatch() override;
|
||||||
|
|
||||||
|
virtual void flush() override;
|
||||||
|
|
||||||
|
size_t count() const;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
|
||||||
|
const rocksdb::ReadOptions &ReadOptions() const { return m_readOptionsTemplate; }
|
||||||
|
rocksdb::WriteOptions WriteOptions() const;
|
||||||
|
};
|
63
src/storage/rocksdbfactory.cpp
Normal file
63
src/storage/rocksdbfactory.cpp
Normal file
@ -0,0 +1,63 @@
|
|||||||
|
#include "rocksdb.h"
|
||||||
|
|
||||||
|
class RocksDBStorageFactory : public IStorageFactory
|
||||||
|
{
|
||||||
|
std::shared_ptr<rocksdb::DB> m_spdb; // Note: This must be first so it is deleted last
|
||||||
|
std::vector<std::unique_ptr<rocksdb::ColumnFamilyHandle>> m_vecspcols;
|
||||||
|
|
||||||
|
public:
|
||||||
|
RocksDBStorageFactory(const char *dbfile, int dbnum);
|
||||||
|
~RocksDBStorageFactory();
|
||||||
|
|
||||||
|
virtual IStorage *create(int db) override;
|
||||||
|
};
|
||||||
|
|
||||||
|
IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum)
|
||||||
|
{
|
||||||
|
return new RocksDBStorageFactory(path, dbnum);
|
||||||
|
}
|
||||||
|
|
||||||
|
RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum)
|
||||||
|
{
|
||||||
|
// Get the count of column families in the actual database
|
||||||
|
std::vector<std::string> vecT;
|
||||||
|
auto status = rocksdb::DB::ListColumnFamilies(rocksdb::Options(), dbfile, &vecT);
|
||||||
|
// RocksDB requires we know the count of col families before opening, if the user only wants to see less
|
||||||
|
// we still have to make room for all column family handles regardless
|
||||||
|
if (status.ok() && (int)vecT.size() > dbnum)
|
||||||
|
dbnum = (int)vecT.size();
|
||||||
|
|
||||||
|
std::vector<rocksdb::ColumnFamilyDescriptor> veccoldesc;
|
||||||
|
veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(rocksdb::kDefaultColumnFamilyName, rocksdb::ColumnFamilyOptions())); // ignore default col family
|
||||||
|
|
||||||
|
for (int idb = 0; idb < dbnum; ++idb)
|
||||||
|
{
|
||||||
|
veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(std::to_string(idb), rocksdb::ColumnFamilyOptions()));
|
||||||
|
}
|
||||||
|
|
||||||
|
rocksdb::Options options;
|
||||||
|
options.create_if_missing = true;
|
||||||
|
options.create_missing_column_families = true;
|
||||||
|
rocksdb::DB *db = nullptr;
|
||||||
|
|
||||||
|
std::vector<rocksdb::ColumnFamilyHandle*> handles;
|
||||||
|
status = rocksdb::DB::Open(options, dbfile, veccoldesc, &handles, &db);
|
||||||
|
if (!status.ok())
|
||||||
|
throw status.ToString();
|
||||||
|
|
||||||
|
for (auto handle : handles)
|
||||||
|
m_vecspcols.emplace_back(handle);
|
||||||
|
m_spdb = std::shared_ptr<rocksdb::DB>(db);
|
||||||
|
}
|
||||||
|
|
||||||
|
RocksDBStorageFactory::~RocksDBStorageFactory()
|
||||||
|
{
|
||||||
|
m_spdb->SyncWAL();
|
||||||
|
}
|
||||||
|
|
||||||
|
IStorage *RocksDBStorageFactory::create(int db)
|
||||||
|
{
|
||||||
|
++db; // skip default col family
|
||||||
|
std::shared_ptr<rocksdb::ColumnFamilyHandle> spcolfamily(m_vecspcols[db].release());
|
||||||
|
return new RocksDBStorageProvider(m_spdb, spcolfamily, nullptr);
|
||||||
|
}
|
3
src/storage/rocksdbfactory.h
Normal file
3
src/storage/rocksdbfactory.h
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
#pragma once
|
||||||
|
|
||||||
|
class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum);
|
Loading…
x
Reference in New Issue
Block a user