Add support for multiple databases with rocksdb

Former-commit-id: cfa3b760b00776876134692d42e25a60fcd8dea9
This commit is contained in:
John Sully 2019-12-09 20:45:58 -05:00
parent cb136d2ac5
commit 38f7cca61a
7 changed files with 58 additions and 20 deletions

View File

@ -1,6 +1,13 @@
#pragma once
#include <functional>
class IStorageFactory
{
public:
virtual ~IStorageFactory() {}
virtual class IStorage *create(int db) = 0;
};
class IStorage
{
public:
@ -17,6 +24,8 @@ public:
virtual void beginWriteBatch() {} // NOP
virtual void endWriteBatch() {} // NOP
virtual void flush();
/* This is permitted to be a shallow clone */
virtual const IStorage *clone() const = 0;
};

View File

@ -208,7 +208,7 @@ endif
REDIS_SERVER_NAME=keydb-server
REDIS_SENTINEL_NAME=keydb-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o $(ASM_OBJ)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o $(ASM_OBJ)
REDIS_CLI_NAME=keydb-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o $(ASM_OBJ)
REDIS_BENCHMARK_NAME=keydb-benchmark
@ -298,6 +298,7 @@ dict-benchmark: dict.cpp zmalloc.cpp sds.c siphash.c
clean:
rm -rf $(REDIS_SERVER_NAME) $(REDIS_SENTINEL_NAME) $(REDIS_CLI_NAME) $(REDIS_BENCHMARK_NAME) $(REDIS_CHECK_RDB_NAME) $(REDIS_CHECK_AOF_NAME) *.o *.gcda *.gcno *.gcov redis.info lcov-html Makefile.dep dict-benchmark
rm -rf storage/*.o
.PHONY: clean

View File

@ -649,20 +649,27 @@ void randomkeyCommand(client *c) {
decrRefCount(key);
}
static bool FEvictedDE(dictEntry *de)
{
return (de != nullptr) && dictGetVal(de) == nullptr;
}
bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
{
dictIterator *di = dictGetSafeIterator(m_pdict);
dictEntry *de = nullptr;
bool fResult = true;
while((de = dictNext(di)) != nullptr)
while(fResult && ((de = dictNext(di)) != nullptr))
{
bool fEvicted = FEvictedDE(de);
ensure((const char*)dictGetKey(de), &de);
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
{
fResult = false;
break;
}
// re-evict the key so we don't OOM
if (fEvicted)
removeCachedValue((const char*)dictGetKey(de));
}
dictReleaseIterator(di);
@ -679,10 +686,14 @@ bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
return true;
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
bool fEvicted = FEvictedDE(de);
ensure(key);
deCurrent = dictFind(m_pdict, key);
return fn(key, (robj*)dictGetVal(deCurrent));
});
bool fResult = fn(key, (robj*)dictGetVal(deCurrent));
if (fEvicted)
removeCachedValue(key);
return fResult;
}, true /*fKeyOnly*/);
}
return fResult;
@ -713,7 +724,7 @@ void keysCommandCore(client *cIn, const redisDbPersistentDataSnapshot *db, sds p
decrRefCount(keyobj);
}
return !(cIn->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP);
});
}, true /*fKeyOnly*/);
setDeferredArrayLen(c,replylen,numkeys);
@ -1906,7 +1917,6 @@ void redisDbPersistentData::setStorageProvider(IStorage *pstorage)
});
}
IStorage *create_rocksdb_storage(const char *dbfile);
void redisDb::initialize(int id)
{
redisDbPersistentData::initialize();
@ -1918,8 +1928,8 @@ void redisDb::initialize(int id)
this->avg_ttl = 0;
this->last_expire_set = 0;
this->defrag_later = listCreate();
if (id == 0)
this->setStorageProvider(create_rocksdb_storage("/tmp/rocks.db"));
if (g_pserver->m_pstorageFactory != nullptr)
this->setStorageProvider(g_pserver->m_pstorageFactory->create(id));
}
bool redisDbPersistentData::insert(char *key, robj *o)

View File

@ -61,6 +61,7 @@
#include <uuid/uuid.h>
#include <mutex>
#include "aelocker.h"
#include "storage/rocksdbfactory.h"
int g_fTestMode = false;
@ -2187,7 +2188,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
else {
fFirstRun = false;
}
aeReleaseLock();
for (auto &pair : vecchanges)
@ -3015,6 +3016,9 @@ void initServer(void) {
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
// Create The Storage Factory (if necessary)
g_pserver->m_pstorageFactory = CreateRocksDBStorageFactory("/tmp/rocks.db", cserver.dbnum);
zfree(g_pserver->db); // initServerConfig created a dummy array, free that now
g_pserver->db = (redisDb**)zmalloc(sizeof(redisDb*)*cserver.dbnum, MALLOC_LOCAL);
@ -3891,6 +3895,7 @@ int prepareForShutdown(int flags) {
delete g_pserver->db[idb];
g_pserver->db[idb] = nullptr;
}
delete g_pserver->m_pstorageFactory;
serverLog(LL_WARNING,"%s is now ready to exit, bye bye...",
g_pserver->sentinel_mode ? "Sentinel" : "KeyDB");

View File

@ -1381,7 +1381,7 @@ protected:
public:
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const;
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false) const;
using redisDbPersistentData::createSnapshot;
using redisDbPersistentData::endSnapshot;
using redisDbPersistentData::end;
@ -2274,6 +2274,8 @@ struct redisServer {
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollector;
IStorageFactory *m_pstorageFactory = nullptr;
bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
};

View File

@ -261,7 +261,7 @@ dict_iter redisDbPersistentDataSnapshot::find_threadsafe(const char *key) const
return dict_iter(de);
}
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly) const
{
dictEntry *de = nullptr;
bool fResult = true;
@ -272,14 +272,22 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
volatile size_t celem = size();
dictIterator *di = dictGetSafeIterator(m_pdict);
while((de = dictNext(di)) != nullptr)
while(fResult && ((de = dictNext(di)) != nullptr))
{
--celem;
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
robj *o = (robj*)dictGetVal(de);
if (o == nullptr && !fKeyOnly)
{
fResult = false;
break;
m_spstorage->retrieve((sds)dictGetKey(de), sdslen((sds)dictGetKey(de)), [&](const char *, size_t, const void *data, size_t cb){
o = deserializeStoredObject(data, cb);
});
}
if (!fn((const char*)dictGetKey(de), o))
fResult = false;
if (o != nullptr && dictGetVal(de) == nullptr)
decrRefCount(o);
}
dictReleaseIterator(di);
@ -301,7 +309,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
--celem;
return fn(key, o);
});
}, fKeyOnly);
}
serverAssert(!fResult || celem == 0);
@ -354,7 +362,7 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
incrRefCount(o);
dictAdd(spdb->m_pdict, sdsdup(key), o.unsafe_robjcast());
return true;
});
}, true /*fKeyOnly*/);
spdb->m_spstorage = m_pdbSnapshot->m_spstorage;
spdb->m_pdict->iterators++;

View File

@ -0,0 +1,3 @@
#pragma once
class IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum);