Merge branch 'flash_cache' into keydbpro

Former-commit-id: 2a721ef645921d62b39f1374c0a3f5c92b00fae5
This commit is contained in:
John Sully 2020-07-13 03:53:34 +00:00
commit 91a803815d
18 changed files with 326 additions and 66 deletions

View File

@ -4,12 +4,13 @@
class IStorageFactory
{
public:
typedef void (*key_load_iterator)(const char *rgchKey, size_t cchKey);
typedef void (*key_load_iterator)(const char *rgchKey, size_t cchKey, void *privdata);
virtual ~IStorageFactory() {}
virtual class IStorage *create(int db, key_load_iterator itr) = 0;
virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0;
virtual const char *name() const = 0;
virtual size_t totalDiskspaceUsed() const = 0;
virtual bool FSlow() const = 0;
};
class IStorage

View File

@ -277,7 +277,7 @@ endif
REDIS_SERVER_NAME=keydb-pro-server
REDIS_SENTINEL_NAME=keydb-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o $(ASM_OBJ)
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o $(ASM_OBJ)
REDIS_CLI_NAME=keydb-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o motd.o $(ASM_OBJ)
REDIS_BENCHMARK_NAME=keydb-benchmark

76
src/StorageCache.cpp Normal file
View File

@ -0,0 +1,76 @@
#include "server.h"
void StorageCache::clear()
{
if (m_setkeys != nullptr)
m_setkeys->clear();
m_spstorage->clear();
}
void StorageCache::cacheKey(sds key)
{
if (m_setkeys == nullptr)
return;
m_setkeys->insert(sdsimmutablestring(sdsdupshared(key)));
}
void StorageCache::cacheKey(const char *rgch, size_t cch)
{
if (m_setkeys == nullptr)
return;
m_setkeys->insert(sdsimmutablestring(sdsnewlen(rgch, cch)));
}
bool StorageCache::erase(sds key)
{
bool result = m_spstorage->erase(key, sdslen(key));
if (result && m_setkeys != nullptr)
{
auto itr = m_setkeys->find(sdsview(key));
serverAssert(itr != m_setkeys->end());
m_setkeys->erase(itr);
}
return result;
}
void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite)
{
if (!fOverwrite && m_setkeys != nullptr)
{
cacheKey(key);
}
m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite);
}
const StorageCache *StorageCache::clone()
{
// Clones never clone the cache
StorageCache *cacheNew = new StorageCache(const_cast<IStorage*>(m_spstorage->clone()));
return cacheNew;
}
void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey) const
{
if (m_setkeys != nullptr)
{
auto itr = m_setkeys->find(sdsview(key));
if (itr == m_setkeys->end())
return; // Not found
if (cachedKey != nullptr)
*cachedKey = sdsdupshared(itr->get());
}
m_spstorage->retrieve(key, sdslen(key), fn);
}
size_t StorageCache::count() const
{
size_t count = m_spstorage->count();
if (m_setkeys != nullptr)
serverAssert(count == m_setkeys->size());
return count;
}
void StorageCache::beginWriteBatch() {
serverAssert(GlobalLocksAcquired()); // Otherwise we deadlock
m_spstorage->beginWriteBatch();
}

57
src/StorageCache.h Normal file
View File

@ -0,0 +1,57 @@
#pragma once
#include "sds.h"
class StorageCache
{
std::shared_ptr<IStorage> m_spstorage;
std::unique_ptr<semiorderedset<sdsimmutablestring, sdsview, true>> m_setkeys;
StorageCache(IStorage *storage)
: m_spstorage(storage)
{}
void cacheKey(sds key);
void cacheKey(const char *rgchKey, size_t cchKey);
struct load_iter_data
{
StorageCache *cache;
IStorageFactory::key_load_iterator itrParent;
void *privdataParent;
};
static void key_load_itr(const char *rgchKey, size_t cchKey, void *privdata)
{
load_iter_data *data = (load_iter_data*)privdata;
data->cache->cacheKey(rgchKey, cchKey);
if (data->itrParent)
data->itrParent(rgchKey, cchKey, data->privdataParent);
}
public:
static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) {
StorageCache *cache = new StorageCache(nullptr);
if (pfactory->FSlow())
{
cache->m_setkeys = std::make_unique<semiorderedset<sdsimmutablestring, sdsview, true>>();
}
load_iter_data data = {cache, fn, privdata};
cache->m_spstorage = std::shared_ptr<IStorage>(pfactory->create(db, key_load_itr, (void*)&data));
return cache;
}
void clear();
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void retrieve(sds key, IStorage::callbackSingle fn, sds *sharedKeyOut) const;
bool erase(sds key);
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }
void beginWriteBatch();
void endWriteBatch() { m_spstorage->endWriteBatch(); }
void batch_lock() { return m_spstorage->batch_lock(); }
void batch_unlock() { return m_spstorage->batch_unlock(); }
size_t count() const;
const StorageCache *clone();
};

View File

@ -972,7 +972,7 @@ int loadAppendOnlyFile(char *filename) {
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->processChanges();
g_pserver->db[idb]->processChanges(false);
g_pserver->db[idb]->commitChanges();
}
fclose(fp);

View File

@ -105,6 +105,11 @@ configEnum repl_diskless_load_enum[] = {
{NULL, 0}
};
configEnum storage_memory_model_enum[] = {
{"writeback", STORAGE_WRITEBACK},
{"writethrough", STORAGE_WRITETHROUGH},
};
/* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */
@ -2327,6 +2332,7 @@ standardConfig configs[] = {
createEnumConfig("loglevel", NULL, MODIFIABLE_CONFIG, loglevel_enum, cserver.verbosity, LL_NOTICE, NULL, NULL),
createEnumConfig("maxmemory-policy", NULL, MODIFIABLE_CONFIG, maxmemory_policy_enum, g_pserver->maxmemory_policy, MAXMEMORY_NO_EVICTION, NULL, NULL),
createEnumConfig("appendfsync", NULL, MODIFIABLE_CONFIG, aof_fsync_enum, g_pserver->aof_fsync, AOF_FSYNC_EVERYSEC, NULL, NULL),
createEnumConfig("storage-cache-mode", NULL, IMMUTABLE_CONFIG, storage_memory_model_enum, cserver.storage_memory_model, STORAGE_WRITETHROUGH, NULL, NULL),
/* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, cserver.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
@ -2360,6 +2366,7 @@ standardConfig configs[] = {
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves),
createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 50, INTEGER_CONFIG, NULL, NULL),
createIntConfig("storage-flush-period", NULL, MODIFIABLE_CONFIG, 1, 10000, g_pserver->storage_flush_period, 500, INTEGER_CONFIG, NULL, NULL),
/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),

View File

@ -399,7 +399,7 @@ bool redisDbPersistentData::syncDelete(robj *key)
bool fDeleted = false;
if (m_spstorage != nullptr)
fDeleted = m_spstorage->erase(szFromObj(key), sdslen(szFromObj(key)));
fDeleted = m_spstorage->erase(szFromObj(key));
fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted;
if (fDeleted) {
@ -2279,13 +2279,13 @@ void redisDbPersistentData::initialize()
m_fTrackingChanges = 0;
}
void redisDbPersistentData::setStorageProvider(IStorage *pstorage)
void redisDbPersistentData::setStorageProvider(StorageCache *pstorage)
{
serverAssert(m_spstorage == nullptr);
m_spstorage = std::unique_ptr<IStorage>(pstorage);
m_spstorage = std::unique_ptr<StorageCache>(pstorage);
}
void clusterStorageLoadCallback(const char *rgchkey, size_t cch)
void clusterStorageLoadCallback(const char *rgchkey, size_t cch, void *)
{
slotToKeyUpdateKeyCore(rgchkey, cch, true /*add*/);
}
@ -2309,7 +2309,7 @@ void redisDb::storageProviderInitialize()
if (g_pserver->m_pstorageFactory != nullptr)
{
IStorageFactory::key_load_iterator itr = (g_pserver->cluster_enabled) ? clusterStorageLoadCallback : nullptr;
this->setStorageProvider(g_pserver->m_pstorageFactory->create(id, itr));
this->setStorageProvider(StorageCache::create(g_pserver->m_pstorageFactory, id, itr, nullptr));
}
}
@ -2352,7 +2352,11 @@ void redisDbPersistentData::clear(void(callback)(void*))
{
dictEmpty(m_pdict,callback);
if (m_fTrackingChanges)
{
m_setchanged.clear();
m_cnewKeysPending = 0;
m_fAllChanged++;
}
delete m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset();
if (m_spstorage != nullptr)
@ -2465,14 +2469,21 @@ LNotFound:
{
if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database
{
m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){
sds sdsNewKey = nullptr; // the storage cache will give us its cached key if available
robj *o = nullptr;
std::unique_ptr<expireEntry> spexpire;
m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){
size_t offset = 0;
sds sdsNewKey = sdsdupshared(sdsKey);
auto spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset);
robj *o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset);
o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr);
}, &sdsNewKey);
if (o != nullptr)
{
if (sdsNewKey == nullptr)
sdsNewKey = sdsdupshared(sdsKey);
dictAdd(m_pdict, sdsNewKey, o);
o->SetFExpires(spexpire != nullptr);
if (spexpire != nullptr)
@ -2484,7 +2495,13 @@ LNotFound:
serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end());
}
serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end()));
});
}
else
{
if (sdsNewKey != nullptr)
sdsfree(sdsNewKey); // BUG but don't bother crashing
}
*pde = dictFind(m_pdict, sdsKey);
}
}
@ -2496,10 +2513,10 @@ LNotFound:
}
}
void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite)
void redisDbPersistentData::storeKey(sds key, robj *o, bool fOverwrite)
{
sds temp = serializeStoredObjectAndExpire(this, szKey, o);
m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite);
sds temp = serializeStoredObjectAndExpire(this, key, o);
m_spstorage->insert(key, temp, sdslen(temp), fOverwrite);
sdsfree(temp);
}
@ -2510,12 +2527,24 @@ void redisDbPersistentData::storeDatabase()
while ((de = dictNext(di)) != NULL) {
sds key = (sds)dictGetKey(de);
robj *o = (robj*)dictGetVal(de);
storeKey(key, sdslen(key), o, false);
storeKey(key, o, false);
}
serverAssert(dictSize(m_pdict) == m_spstorage->count());
dictReleaseIterator(di);
}
void redisDbPersistentData::processChanges()
/* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const redisDbPersistentData::changedesc &change)
{
auto itr = db->find_cached_threadsafe(change.strkey.get());
if (itr == nullptr)
return;
robj *o = itr.val();
sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o);
storage->insert((sds)change.strkey.get(), temp, sdslen(temp), change.fUpdate);
sdsfree(temp);
}
void redisDbPersistentData::processChanges(bool fSnapshot)
{
serverAssert(GlobalLocksAcquired());
@ -2525,35 +2554,51 @@ void redisDbPersistentData::processChanges()
if (m_spstorage != nullptr)
{
m_spstorage->beginWriteBatch();
if (m_fTrackingChanges >= 0)
serverAssert(m_pdbSnapshotStorageFlush == nullptr);
if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100)
{
// Do a snapshot based process if possible
m_pdbSnapshotStorageFlush = createSnapshot(getMvccTstamp(), true /* optional */);
if (m_pdbSnapshotStorageFlush)
{
m_setchangedStorageFlush = std::move(m_setchanged);
}
}
if (m_pdbSnapshotStorageFlush == nullptr)
{
if (m_fAllChanged)
{
m_spstorage->clear();
storeDatabase();
m_fAllChanged--;
m_fAllChanged = 0;
}
else
{
for (auto &change : m_setchanged)
{
auto itr = find_cached_threadsafe(change.strkey.get());
if (itr == nullptr)
continue;
robj *o = itr.val();
sds temp = serializeStoredObjectAndExpire(this, (const char*) itr.key(), o);
m_spstorage->insert(change.strkey.get(), sdslen(change.strkey.get()), temp, sdslen(temp), change.fUpdate);
sdsfree(temp);
serializeAndStoreChange(m_spstorage.get(), this, change);
}
}
m_setchanged.clear();
m_cnewKeysPending = 0;
}
m_setchanged.clear();
m_cnewKeysPending = 0;
}
}
void redisDbPersistentData::commitChanges()
void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
{
if (m_pdbSnapshotStorageFlush)
{
for (auto &change : m_setchangedStorageFlush)
{
serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, change);
}
m_setchangedStorageFlush.clear();
*psnapshotFree = m_pdbSnapshotStorageFlush;
m_pdbSnapshotStorageFlush = nullptr;
}
if (m_spstorage != nullptr)
m_spstorage->endWriteBatch();
}
@ -2598,7 +2643,7 @@ dict_iter redisDbPersistentData::random()
size_t redisDbPersistentData::size() const
{
if (m_spstorage != nullptr)
if (m_spstorage != nullptr && !m_fAllChanged)
return m_spstorage->count() + m_cnewKeysPending;
return dictSize(m_pdict)
@ -2641,7 +2686,7 @@ void redisDbPersistentData::removeAllCachedValues()
// First we have to flush the tracked changes
if (m_fTrackingChanges)
{
processChanges();
processChanges(false);
commitChanges();
trackChanges(false);
}

View File

@ -2517,7 +2517,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->processChanges();
g_pserver->db[idb]->processChanges(false);
g_pserver->db[idb]->commitChanges();
g_pserver->db[idb]->trackChanges(false);
}
@ -2593,7 +2593,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->processChanges();
g_pserver->db[idb]->processChanges(false);
g_pserver->db[idb]->commitChanges();
}
return C_OK;

View File

@ -2002,6 +2002,43 @@ void checkChildrenDone(void) {
}
}
static std::atomic<bool> s_fFlushInProgress { false };
void flushStorageWeak()
{
bool fExpected = false;
if (s_fFlushInProgress.compare_exchange_strong(fExpected, true /* desired */, std::memory_order_seq_cst, std::memory_order_relaxed))
{
g_pserver->asyncworkqueue->AddWorkFunction([]{
aeAcquireLock();
mstime_t storage_process_latency;
latencyStartMonitor(storage_process_latency);
std::vector<redisDb*> vecdb;
for (int idb = 0; idb < cserver.dbnum; ++idb) {
vecdb.push_back(g_pserver->db[idb]);
g_pserver->db[idb]->processChanges(true);
}
latencyEndMonitor(storage_process_latency);
latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency);
aeReleaseLock();
std::vector<const redisDbPersistentDataSnapshot*> vecsnapshotFree;
vecsnapshotFree.resize(vecdb.size());
for (size_t idb = 0; idb < vecdb.size(); ++idb)
vecdb[idb]->commitChanges(&vecsnapshotFree[idb]);
for (size_t idb = 0; idb < vecsnapshotFree.size(); ++idb) {
if (vecsnapshotFree[idb] != nullptr)
vecdb[idb]->endSnapshotAsync(vecsnapshotFree[idb]);
}
s_fFlushInProgress = false;
}, true /* fHiPri */);
}
else
{
serverLog(LOG_INFO, "Missed storage flush due to existing flush still in flight. Consider increasing storage-weak-flush-period");
}
}
/* This is our timer interrupt, called g_pserver->hz times per second.
* Here is where we do a number of things that need to be done asynchronously.
* For instance:
@ -2269,6 +2306,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
g_pserver->rdb_bgsave_scheduled = 0;
}
if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory) {
run_with_period(g_pserver->storage_flush_period) {
flushStorageWeak();
}
}
g_pserver->asyncworkqueue->AddWorkFunction([]{
g_pserver->db[0]->consolidate_snapshot();
}, true /*HiPri*/);
@ -2398,17 +2441,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
static thread_local bool fFirstRun = true;
// note: we also copy the DB pointer in case a DB swap is done while the lock is released
std::vector<redisDb*> vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released
if (!fFirstRun) {
mstime_t storage_process_latency;
latencyStartMonitor(storage_process_latency);
for (int idb = 0; idb < cserver.dbnum; ++idb) {
vecdb.push_back(g_pserver->db[idb]);
g_pserver->db[idb]->processChanges();
if (cserver.storage_memory_model == STORAGE_WRITETHROUGH)
{
if (!fFirstRun) {
mstime_t storage_process_latency;
latencyStartMonitor(storage_process_latency);
for (int idb = 0; idb < cserver.dbnum; ++idb) {
vecdb.push_back(g_pserver->db[idb]);
g_pserver->db[idb]->processChanges(false);
}
latencyEndMonitor(storage_process_latency);
latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency);
} else {
fFirstRun = false;
}
latencyEndMonitor(storage_process_latency);
latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency);
} else {
fFirstRun = false;
}
int aof_state = g_pserver->aof_state;
@ -2441,7 +2487,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
aeReleaseLock();
}
/* This function is called immadiately after the event loop multiplexing
* API returned, and the control is going to soon return to Redis by invoking
* the different events callbacks. */
@ -4218,6 +4263,12 @@ int prepareForShutdown(int flags) {
redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n");
return C_ERR;
}
// Also Dump To FLASH if Applicable
for (int idb = 0; idb < cserver.dbnum; ++idb) {
g_pserver->db[idb]->processChanges(false);
g_pserver->db[idb]->commitChanges();
}
}
/* Fire the shutdown modules event. */

View File

@ -106,6 +106,7 @@ typedef long long ustime_t; /* microsecond time type. */
#include "endianconv.h"
#include "crc64.h"
#include "IStorage.h"
#include "StorageCache.h"
#include "AsyncWorkQueue.h"
#include "gc.h"
@ -617,6 +618,10 @@ inline bool operator!=(const void *p, const robj_sharedptr &rhs)
#define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1
#define REPL_DISKLESS_LOAD_SWAPDB 2
/* Storage Memory Model Defines */
#define STORAGE_WRITEBACK 0
#define STORAGE_WRITETHROUGH 1
/* Sets operations codes */
#define SET_OP_UNION 0
#define SET_OP_DIFF 1
@ -1316,7 +1321,7 @@ public:
void setExpire(expireEntry &&e);
void initialize();
void setStorageProvider(IStorage *pstorage);
void setStorageProvider(StorageCache *pstorage);
void trackChanges(bool fBulk);
@ -1324,8 +1329,8 @@ public:
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
// either release the global lock or keep the same global lock between the two functions as
// a second look is kept to ensure writes to secondary storage are ordered
void processChanges();
void commitChanges();
void processChanges(bool fSnapshot);
void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr);
// This should only be used if you look at the key, we do not fixup
// objects stored elsewhere
@ -1366,10 +1371,12 @@ private:
bool operator()(const char *key, const changedesc &b) const { return sdsview(key) < b.strkey; }
};
static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const changedesc &change);
void ensure(const char *key);
void ensure(const char *key, dictEntry **de);
void storeDatabase();
void storeKey(const char *key, size_t cchKey, robj *o, bool fOverwrite);
void storeKey(sds key, robj *o, bool fOverwrite);
void recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot);
// Keyspace
@ -1379,7 +1386,7 @@ private:
int m_fAllChanged = 0;
std::set<changedesc, changedescCmp> m_setchanged;
size_t m_cnewKeysPending = 0;
std::shared_ptr<IStorage> m_spstorage = nullptr;
std::shared_ptr<StorageCache> m_spstorage = nullptr;
// Expire
expireset *m_setexpire = nullptr;
@ -1390,6 +1397,10 @@ private:
const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr;
std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER;
const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr;
const redisDbPersistentDataSnapshot *m_pdbSnapshotStorageFlush = nullptr;
std::set<changedesc, changedescCmp> m_setchangedStorageFlush;
int m_refCount = 0;
};
@ -2046,6 +2057,7 @@ struct redisServerConst {
int delete_on_evict = false; // Only valid when a storage provider is set
int thread_min_client_threshold = 50;
int multimaster_no_forward;
int storage_memory_model = STORAGE_WRITETHROUGH;
};
struct redisServer {
@ -2408,6 +2420,7 @@ struct redisServer {
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollector;
IStorageFactory *m_pstorageFactory = nullptr;
int storage_flush_period; // The time between flushes in the CRON job
/* TLS Configuration */
int tls_cluster;

View File

@ -8,7 +8,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
serverAssert(GlobalLocksAcquired());
serverAssert(m_refCount == 0); // do not call this on a snapshot
freeMemoryIfNeededAndSafe(true /*fPreSnapshot*/);
if (freeMemoryIfNeededAndSafe(true /*fPreSnapshot*/) != C_OK && fOptional)
return nullptr; // can't create snapshot due to OOM
int levels = 1;
redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get();
@ -78,7 +79,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now
spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER);
if (m_spstorage != nullptr)
spdb->m_spstorage = std::shared_ptr<IStorage>(const_cast<IStorage*>(m_spstorage->clone()));
spdb->m_spstorage = std::shared_ptr<StorageCache>(const_cast<StorageCache*>(m_spstorage->clone()));
spdb->m_pdbSnapshot = m_pdbSnapshot;
spdb->m_refCount = 1;
spdb->m_mvccCheckpoint = getMvccTstamp();

View File

@ -82,6 +82,7 @@ size_t RocksDBStorageProvider::clear()
size_t RocksDBStorageProvider::count() const
{
std::unique_lock<fastlock> l(m_lock);
return m_count;
}

View File

@ -17,7 +17,7 @@ class RocksDBStorageProvider : public IStorage
std::shared_ptr<rocksdb::ColumnFamilyHandle> m_spcolfamily;
rocksdb::ReadOptions m_readOptionsTemplate;
size_t m_count = 0;
fastlock m_lock {"RocksDBStorageProvider"};
mutable fastlock m_lock {"RocksDBStorageProvider"};
public:
RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count);

View File

@ -15,11 +15,13 @@ public:
RocksDBStorageFactory(const char *dbfile, int dbnum);
~RocksDBStorageFactory();
virtual IStorage *create(int db, key_load_iterator iter) override;
virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override;
virtual const char *name() const override;
virtual size_t totalDiskspaceUsed() const override;
virtual bool FSlow() const override { return true; }
private:
void setVersion(rocksdb::ColumnFamilyHandle*);
};
@ -54,7 +56,8 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum)
options.max_background_flushes = 2;
options.bytes_per_sync = 1048576;
options.compaction_pri = rocksdb::kMinOverlappingRatio;
options.compression = rocksdb::kNoCompression;
options.compression = rocksdb::kLZ4Compression;
options.enable_pipelined_write = true;
options.sst_file_manager = m_pfilemanager;
rocksdb::BlockBasedTableOptions table_options;
table_options.block_size = 16 * 1024;
@ -110,7 +113,7 @@ void RocksDBStorageFactory::setVersion(rocksdb::ColumnFamilyHandle *handle)
throw status.ToString();
}
IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter)
IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *privdata)
{
++db; // skip default col family
std::shared_ptr<rocksdb::ColumnFamilyHandle> spcolfamily(m_vecspcols[db].release());
@ -141,7 +144,7 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter)
if (FInternalKey(it->key().data(), it->key().size()))
continue;
if (iter != nullptr)
iter(it->key().data(), it->key().size());
iter(it->key().data(), it->key().size(), privdata);
++count;
}
}

View File

@ -1,7 +1,7 @@
#include "teststorageprovider.h"
#include "../server.h"
IStorage *TestStorageFactory::create(int, key_load_iterator)
IStorage *TestStorageFactory::create(int, key_load_iterator, void *)
{
return new (MALLOC_LOCAL) TestStorageProvider();
}

View File

@ -4,9 +4,10 @@
class TestStorageFactory : public IStorageFactory
{
virtual class IStorage *create(int db, key_load_iterator itr) override;
virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override;
virtual const char *name() const override;
virtual size_t totalDiskspaceUsed() const override { return 0; }
virtual bool FSlow() const { return false; }
};
class TestStorageProvider final : public IStorage

View File

@ -1,4 +1,4 @@
start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no]] {
start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] {
test { FLASH - GET works after eviction } {
r set testkey foo
@ -100,6 +100,8 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.
}
r flushall
# If a weak storage memory model is set, wait for any pending snapshot writes to finish
after 500
foreach policy {
allkeys-random allkeys-lru allkeys-lfu
} {
@ -107,7 +109,7 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.
# Get the current memory limit and calculate a new limit.
# Set limit to 100M.
set used [s used_memory]
set limit [expr {$used+50000*1024}]
set limit [expr {$used+60*1024*1024}]
r config set maxmemory $limit
r config set maxmemory-policy $policy
# Now add keys equivalent to 1024b until the limit is almost reached.
@ -124,7 +126,8 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.
# should still be under the limit for maxmemory, however all keys set should still exist between flash and memory
# check same number of keys exist in addition to values of first and last keys
set err 0
for {set j 0} {$j < 10000} {incr j} {
set extra_keys [expr floor([expr ($limit * 0.4) / 1024])]
for {set j 0} {$j < $extra_keys} {incr j} {
catch {
r set p2$j xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
} err
@ -135,8 +138,8 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.
}
r set last val
set dbsize [r dbsize]
assert {[s used_memory] < $limit+4096}
assert {$dbsize == $numkeys+10002}
assert {[s used_memory] < ($limit*1.2)}
assert {$dbsize == $numkeys+$extra_keys+2}
assert {[r get first] == {val}}
assert {[r get last] == {val}}
r flushall

View File

@ -99,6 +99,7 @@ start_server {tags {"introspection"}} {
bio_cpulist
aof_rewrite_cpulist
bgsave_cpulist
storage-cache-mode
}
set configs {}