Implement storage key cache, and writeback memory model

Former-commit-id: 732bd9c153459f1174475ad67de36c399ddbe359
This commit is contained in:
John Sully 2020-07-11 21:23:48 +00:00
parent a5f61b903b
commit 17661f2382
18 changed files with 326 additions and 66 deletions

View File

@ -4,12 +4,13 @@
class IStorageFactory class IStorageFactory
{ {
public: public:
typedef void (*key_load_iterator)(const char *rgchKey, size_t cchKey); typedef void (*key_load_iterator)(const char *rgchKey, size_t cchKey, void *privdata);
virtual ~IStorageFactory() {} virtual ~IStorageFactory() {}
virtual class IStorage *create(int db, key_load_iterator itr) = 0; virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0;
virtual const char *name() const = 0; virtual const char *name() const = 0;
virtual size_t totalDiskspaceUsed() const = 0; virtual size_t totalDiskspaceUsed() const = 0;
virtual bool FSlow() const = 0;
}; };
class IStorage class IStorage

View File

@ -277,7 +277,7 @@ endif
REDIS_SERVER_NAME=keydb-pro-server REDIS_SERVER_NAME=keydb-pro-server
REDIS_SENTINEL_NAME=keydb-sentinel REDIS_SENTINEL_NAME=keydb-sentinel
REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o $(ASM_OBJ) REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o $(ASM_OBJ)
REDIS_CLI_NAME=keydb-cli REDIS_CLI_NAME=keydb-cli
REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o motd.o $(ASM_OBJ) REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o motd.o $(ASM_OBJ)
REDIS_BENCHMARK_NAME=keydb-benchmark REDIS_BENCHMARK_NAME=keydb-benchmark

76
src/StorageCache.cpp Normal file
View File

@ -0,0 +1,76 @@
#include "server.h"
void StorageCache::clear()
{
if (m_setkeys != nullptr)
m_setkeys->clear();
m_spstorage->clear();
}
void StorageCache::cacheKey(sds key)
{
if (m_setkeys == nullptr)
return;
m_setkeys->insert(sdsimmutablestring(sdsdupshared(key)));
}
void StorageCache::cacheKey(const char *rgch, size_t cch)
{
if (m_setkeys == nullptr)
return;
m_setkeys->insert(sdsimmutablestring(sdsnewlen(rgch, cch)));
}
bool StorageCache::erase(sds key)
{
bool result = m_spstorage->erase(key, sdslen(key));
if (result && m_setkeys != nullptr)
{
auto itr = m_setkeys->find(sdsview(key));
serverAssert(itr != m_setkeys->end());
m_setkeys->erase(itr);
}
return result;
}
void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite)
{
if (!fOverwrite && m_setkeys != nullptr)
{
cacheKey(key);
}
m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite);
}
const StorageCache *StorageCache::clone()
{
// Clones never clone the cache
StorageCache *cacheNew = new StorageCache(const_cast<IStorage*>(m_spstorage->clone()));
return cacheNew;
}
void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey) const
{
if (m_setkeys != nullptr)
{
auto itr = m_setkeys->find(sdsview(key));
if (itr == m_setkeys->end())
return; // Not found
if (cachedKey != nullptr)
*cachedKey = sdsdupshared(itr->get());
}
m_spstorage->retrieve(key, sdslen(key), fn);
}
size_t StorageCache::count() const
{
size_t count = m_spstorage->count();
if (m_setkeys != nullptr)
serverAssert(count == m_setkeys->size());
return count;
}
void StorageCache::beginWriteBatch() {
serverAssert(GlobalLocksAcquired()); // Otherwise we deadlock
m_spstorage->beginWriteBatch();
}

57
src/StorageCache.h Normal file
View File

@ -0,0 +1,57 @@
#pragma once
#include "sds.h"
class StorageCache
{
std::shared_ptr<IStorage> m_spstorage;
std::unique_ptr<semiorderedset<sdsimmutablestring, sdsview, true>> m_setkeys;
StorageCache(IStorage *storage)
: m_spstorage(storage)
{}
void cacheKey(sds key);
void cacheKey(const char *rgchKey, size_t cchKey);
struct load_iter_data
{
StorageCache *cache;
IStorageFactory::key_load_iterator itrParent;
void *privdataParent;
};
static void key_load_itr(const char *rgchKey, size_t cchKey, void *privdata)
{
load_iter_data *data = (load_iter_data*)privdata;
data->cache->cacheKey(rgchKey, cchKey);
if (data->itrParent)
data->itrParent(rgchKey, cchKey, data->privdataParent);
}
public:
static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) {
StorageCache *cache = new StorageCache(nullptr);
if (pfactory->FSlow())
{
cache->m_setkeys = std::make_unique<semiorderedset<sdsimmutablestring, sdsview, true>>();
}
load_iter_data data = {cache, fn, privdata};
cache->m_spstorage = std::shared_ptr<IStorage>(pfactory->create(db, key_load_itr, (void*)&data));
return cache;
}
void clear();
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void retrieve(sds key, IStorage::callbackSingle fn, sds *sharedKeyOut) const;
bool erase(sds key);
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }
void beginWriteBatch();
void endWriteBatch() { m_spstorage->endWriteBatch(); }
void batch_lock() { return m_spstorage->batch_lock(); }
void batch_unlock() { return m_spstorage->batch_unlock(); }
size_t count() const;
const StorageCache *clone();
};

View File

@ -968,7 +968,7 @@ int loadAppendOnlyFile(char *filename) {
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
g_pserver->db[idb]->processChanges(); g_pserver->db[idb]->processChanges(false);
g_pserver->db[idb]->commitChanges(); g_pserver->db[idb]->commitChanges();
} }
fclose(fp); fclose(fp);

View File

@ -105,6 +105,11 @@ configEnum repl_diskless_load_enum[] = {
{NULL, 0} {NULL, 0}
}; };
configEnum storage_memory_model_enum[] = {
{"writeback", STORAGE_WRITEBACK},
{"writethrough", STORAGE_WRITETHROUGH},
};
/* Output buffer limits presets. */ /* Output buffer limits presets. */
clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = {
{0, 0, 0}, /* normal */ {0, 0, 0}, /* normal */
@ -2316,6 +2321,7 @@ standardConfig configs[] = {
createEnumConfig("loglevel", NULL, MODIFIABLE_CONFIG, loglevel_enum, cserver.verbosity, LL_NOTICE, NULL, NULL), createEnumConfig("loglevel", NULL, MODIFIABLE_CONFIG, loglevel_enum, cserver.verbosity, LL_NOTICE, NULL, NULL),
createEnumConfig("maxmemory-policy", NULL, MODIFIABLE_CONFIG, maxmemory_policy_enum, g_pserver->maxmemory_policy, MAXMEMORY_NO_EVICTION, NULL, NULL), createEnumConfig("maxmemory-policy", NULL, MODIFIABLE_CONFIG, maxmemory_policy_enum, g_pserver->maxmemory_policy, MAXMEMORY_NO_EVICTION, NULL, NULL),
createEnumConfig("appendfsync", NULL, MODIFIABLE_CONFIG, aof_fsync_enum, g_pserver->aof_fsync, AOF_FSYNC_EVERYSEC, NULL, NULL), createEnumConfig("appendfsync", NULL, MODIFIABLE_CONFIG, aof_fsync_enum, g_pserver->aof_fsync, AOF_FSYNC_EVERYSEC, NULL, NULL),
createEnumConfig("storage-cache-mode", NULL, IMMUTABLE_CONFIG, storage_memory_model_enum, cserver.storage_memory_model, STORAGE_WRITETHROUGH, NULL, NULL),
/* Integer configs */ /* Integer configs */
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, cserver.dbnum, 16, INTEGER_CONFIG, NULL, NULL), createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, cserver.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
@ -2349,6 +2355,7 @@ standardConfig configs[] = {
createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves),
createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves),
createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 50, INTEGER_CONFIG, NULL, NULL), createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 50, INTEGER_CONFIG, NULL, NULL),
createIntConfig("storage-flush-period", NULL, MODIFIABLE_CONFIG, 1, 10000, g_pserver->storage_flush_period, 500, INTEGER_CONFIG, NULL, NULL),
/* Unsigned int configs */ /* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),

View File

@ -399,7 +399,7 @@ bool redisDbPersistentData::syncDelete(robj *key)
bool fDeleted = false; bool fDeleted = false;
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
fDeleted = m_spstorage->erase(szFromObj(key), sdslen(szFromObj(key))); fDeleted = m_spstorage->erase(szFromObj(key));
fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted; fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted;
if (fDeleted) { if (fDeleted) {
@ -2266,13 +2266,13 @@ void redisDbPersistentData::initialize()
m_fTrackingChanges = 0; m_fTrackingChanges = 0;
} }
void redisDbPersistentData::setStorageProvider(IStorage *pstorage) void redisDbPersistentData::setStorageProvider(StorageCache *pstorage)
{ {
serverAssert(m_spstorage == nullptr); serverAssert(m_spstorage == nullptr);
m_spstorage = std::unique_ptr<IStorage>(pstorage); m_spstorage = std::unique_ptr<StorageCache>(pstorage);
} }
void clusterStorageLoadCallback(const char *rgchkey, size_t cch) void clusterStorageLoadCallback(const char *rgchkey, size_t cch, void *)
{ {
slotToKeyUpdateKeyCore(rgchkey, cch, true /*add*/); slotToKeyUpdateKeyCore(rgchkey, cch, true /*add*/);
} }
@ -2296,7 +2296,7 @@ void redisDb::storageProviderInitialize()
if (g_pserver->m_pstorageFactory != nullptr) if (g_pserver->m_pstorageFactory != nullptr)
{ {
IStorageFactory::key_load_iterator itr = (g_pserver->cluster_enabled) ? clusterStorageLoadCallback : nullptr; IStorageFactory::key_load_iterator itr = (g_pserver->cluster_enabled) ? clusterStorageLoadCallback : nullptr;
this->setStorageProvider(g_pserver->m_pstorageFactory->create(id, itr)); this->setStorageProvider(StorageCache::create(g_pserver->m_pstorageFactory, id, itr, nullptr));
} }
} }
@ -2339,7 +2339,11 @@ void redisDbPersistentData::clear(void(callback)(void*))
{ {
dictEmpty(m_pdict,callback); dictEmpty(m_pdict,callback);
if (m_fTrackingChanges) if (m_fTrackingChanges)
{
m_setchanged.clear();
m_cnewKeysPending = 0;
m_fAllChanged++; m_fAllChanged++;
}
delete m_setexpire; delete m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset(); m_setexpire = new (MALLOC_LOCAL) expireset();
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
@ -2448,14 +2452,21 @@ LNotFound:
{ {
if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database
{ {
m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){ sds sdsNewKey = nullptr; // the storage cache will give us its cached key if available
robj *o = nullptr;
std::unique_ptr<expireEntry> spexpire;
m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){
size_t offset = 0; size_t offset = 0;
sds sdsNewKey = sdsdupshared(sdsKey); spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset);
auto spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset); o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
robj *o = deserializeStoredObject(this, sdsKey, reinterpret_cast<const char*>(data) + offset, cb - offset);
serverAssert(o != nullptr); serverAssert(o != nullptr);
dictAdd(m_pdict, sdsNewKey, o); }, &sdsNewKey);
if (o != nullptr)
{
if (sdsNewKey == nullptr)
sdsNewKey = sdsdupshared(sdsKey);
dictAdd(m_pdict, sdsNewKey, o);
o->SetFExpires(spexpire != nullptr); o->SetFExpires(spexpire != nullptr);
if (spexpire != nullptr) if (spexpire != nullptr)
@ -2467,7 +2478,13 @@ LNotFound:
serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end()); serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end());
} }
serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end()));
}); }
else
{
if (sdsNewKey != nullptr)
sdsfree(sdsNewKey); // BUG but don't bother crashing
}
*pde = dictFind(m_pdict, sdsKey); *pde = dictFind(m_pdict, sdsKey);
} }
} }
@ -2479,10 +2496,10 @@ LNotFound:
} }
} }
void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite) void redisDbPersistentData::storeKey(sds key, robj *o, bool fOverwrite)
{ {
sds temp = serializeStoredObjectAndExpire(this, szKey, o); sds temp = serializeStoredObjectAndExpire(this, key, o);
m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite); m_spstorage->insert(key, temp, sdslen(temp), fOverwrite);
sdsfree(temp); sdsfree(temp);
} }
@ -2493,12 +2510,24 @@ void redisDbPersistentData::storeDatabase()
while ((de = dictNext(di)) != NULL) { while ((de = dictNext(di)) != NULL) {
sds key = (sds)dictGetKey(de); sds key = (sds)dictGetKey(de);
robj *o = (robj*)dictGetVal(de); robj *o = (robj*)dictGetVal(de);
storeKey(key, sdslen(key), o, false); storeKey(key, o, false);
} }
serverAssert(dictSize(m_pdict) == m_spstorage->count());
dictReleaseIterator(di); dictReleaseIterator(di);
} }
void redisDbPersistentData::processChanges() /* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const redisDbPersistentData::changedesc &change)
{
auto itr = db->find_cached_threadsafe(change.strkey.get());
if (itr == nullptr)
return;
robj *o = itr.val();
sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o);
storage->insert((sds)change.strkey.get(), temp, sdslen(temp), change.fUpdate);
sdsfree(temp);
}
void redisDbPersistentData::processChanges(bool fSnapshot)
{ {
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
@ -2508,35 +2537,51 @@ void redisDbPersistentData::processChanges()
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
{ {
m_spstorage->beginWriteBatch(); m_spstorage->beginWriteBatch();
if (m_fTrackingChanges >= 0) serverAssert(m_pdbSnapshotStorageFlush == nullptr);
if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100)
{
// Do a snapshot based process if possible
m_pdbSnapshotStorageFlush = createSnapshot(getMvccTstamp(), true /* optional */);
if (m_pdbSnapshotStorageFlush)
{
m_setchangedStorageFlush = std::move(m_setchanged);
}
}
if (m_pdbSnapshotStorageFlush == nullptr)
{ {
if (m_fAllChanged) if (m_fAllChanged)
{ {
m_spstorage->clear(); m_spstorage->clear();
storeDatabase(); storeDatabase();
m_fAllChanged--; m_fAllChanged = 0;
} }
else else
{ {
for (auto &change : m_setchanged) for (auto &change : m_setchanged)
{ {
auto itr = find_cached_threadsafe(change.strkey.get()); serializeAndStoreChange(m_spstorage.get(), this, change);
if (itr == nullptr) }
continue;
robj *o = itr.val();
sds temp = serializeStoredObjectAndExpire(this, (const char*) itr.key(), o);
m_spstorage->insert(change.strkey.get(), sdslen(change.strkey.get()), temp, sdslen(temp), change.fUpdate);
sdsfree(temp);
} }
} }
m_setchanged.clear(); m_setchanged.clear();
m_cnewKeysPending = 0; m_cnewKeysPending = 0;
} }
}
} }
void redisDbPersistentData::commitChanges() void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
{ {
if (m_pdbSnapshotStorageFlush)
{
for (auto &change : m_setchangedStorageFlush)
{
serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, change);
}
m_setchangedStorageFlush.clear();
*psnapshotFree = m_pdbSnapshotStorageFlush;
m_pdbSnapshotStorageFlush = nullptr;
}
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
m_spstorage->endWriteBatch(); m_spstorage->endWriteBatch();
} }
@ -2581,7 +2626,7 @@ dict_iter redisDbPersistentData::random()
size_t redisDbPersistentData::size() const size_t redisDbPersistentData::size() const
{ {
if (m_spstorage != nullptr) if (m_spstorage != nullptr && !m_fAllChanged)
return m_spstorage->count() + m_cnewKeysPending; return m_spstorage->count() + m_cnewKeysPending;
return dictSize(m_pdict) return dictSize(m_pdict)
@ -2624,7 +2669,7 @@ void redisDbPersistentData::removeAllCachedValues()
// First we have to flush the tracked changes // First we have to flush the tracked changes
if (m_fTrackingChanges) if (m_fTrackingChanges)
{ {
processChanges(); processChanges(false);
commitChanges(); commitChanges();
trackChanges(false); trackChanges(false);
} }

View File

@ -2515,7 +2515,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
{ {
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
g_pserver->db[idb]->processChanges(); g_pserver->db[idb]->processChanges(false);
g_pserver->db[idb]->commitChanges(); g_pserver->db[idb]->commitChanges();
g_pserver->db[idb]->trackChanges(false); g_pserver->db[idb]->trackChanges(false);
} }
@ -2591,7 +2591,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
g_pserver->db[idb]->processChanges(); g_pserver->db[idb]->processChanges(false);
g_pserver->db[idb]->commitChanges(); g_pserver->db[idb]->commitChanges();
} }
return C_OK; return C_OK;

View File

@ -1996,6 +1996,43 @@ void checkChildrenDone(void) {
} }
} }
static std::atomic<bool> s_fFlushInProgress { false };
void flushStorageWeak()
{
bool fExpected = false;
if (s_fFlushInProgress.compare_exchange_strong(fExpected, true /* desired */, std::memory_order_seq_cst, std::memory_order_relaxed))
{
g_pserver->asyncworkqueue->AddWorkFunction([]{
aeAcquireLock();
mstime_t storage_process_latency;
latencyStartMonitor(storage_process_latency);
std::vector<redisDb*> vecdb;
for (int idb = 0; idb < cserver.dbnum; ++idb) {
vecdb.push_back(g_pserver->db[idb]);
g_pserver->db[idb]->processChanges(true);
}
latencyEndMonitor(storage_process_latency);
latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency);
aeReleaseLock();
std::vector<const redisDbPersistentDataSnapshot*> vecsnapshotFree;
vecsnapshotFree.resize(vecdb.size());
for (size_t idb = 0; idb < vecdb.size(); ++idb)
vecdb[idb]->commitChanges(&vecsnapshotFree[idb]);
for (size_t idb = 0; idb < vecsnapshotFree.size(); ++idb) {
if (vecsnapshotFree[idb] != nullptr)
vecdb[idb]->endSnapshotAsync(vecsnapshotFree[idb]);
}
s_fFlushInProgress = false;
}, true /* fHiPri */);
}
else
{
serverLog(LOG_INFO, "Missed storage flush due to existing flush still in flight. Consider increasing storage-weak-flush-period");
}
}
/* This is our timer interrupt, called g_pserver->hz times per second. /* This is our timer interrupt, called g_pserver->hz times per second.
* Here is where we do a number of things that need to be done asynchronously. * Here is where we do a number of things that need to be done asynchronously.
* For instance: * For instance:
@ -2263,6 +2300,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
g_pserver->rdb_bgsave_scheduled = 0; g_pserver->rdb_bgsave_scheduled = 0;
} }
if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory) {
run_with_period(g_pserver->storage_flush_period) {
flushStorageWeak();
}
}
g_pserver->asyncworkqueue->AddWorkFunction([]{ g_pserver->asyncworkqueue->AddWorkFunction([]{
g_pserver->db[0]->consolidate_snapshot(); g_pserver->db[0]->consolidate_snapshot();
}, true /*HiPri*/); }, true /*HiPri*/);
@ -2392,18 +2435,21 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
static thread_local bool fFirstRun = true; static thread_local bool fFirstRun = true;
// note: we also copy the DB pointer in case a DB swap is done while the lock is released // note: we also copy the DB pointer in case a DB swap is done while the lock is released
std::vector<redisDb*> vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released std::vector<redisDb*> vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released
if (cserver.storage_memory_model == STORAGE_WRITETHROUGH)
{
if (!fFirstRun) { if (!fFirstRun) {
mstime_t storage_process_latency; mstime_t storage_process_latency;
latencyStartMonitor(storage_process_latency); latencyStartMonitor(storage_process_latency);
for (int idb = 0; idb < cserver.dbnum; ++idb) { for (int idb = 0; idb < cserver.dbnum; ++idb) {
vecdb.push_back(g_pserver->db[idb]); vecdb.push_back(g_pserver->db[idb]);
g_pserver->db[idb]->processChanges(); g_pserver->db[idb]->processChanges(false);
} }
latencyEndMonitor(storage_process_latency); latencyEndMonitor(storage_process_latency);
latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency); latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency);
} else { } else {
fFirstRun = false; fFirstRun = false;
} }
}
int aof_state = g_pserver->aof_state; int aof_state = g_pserver->aof_state;
aeReleaseLock(); aeReleaseLock();
@ -2435,7 +2481,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
aeReleaseLock(); aeReleaseLock();
} }
/* This function is called immadiately after the event loop multiplexing /* This function is called immadiately after the event loop multiplexing
* API returned, and the control is going to soon return to Redis by invoking * API returned, and the control is going to soon return to Redis by invoking
* the different events callbacks. */ * the different events callbacks. */
@ -4209,6 +4254,12 @@ int prepareForShutdown(int flags) {
redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n"); redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n");
return C_ERR; return C_ERR;
} }
// Also Dump To FLASH if Applicable
for (int idb = 0; idb < cserver.dbnum; ++idb) {
g_pserver->db[idb]->processChanges(false);
g_pserver->db[idb]->commitChanges();
}
} }
/* Fire the shutdown modules event. */ /* Fire the shutdown modules event. */

View File

@ -106,6 +106,7 @@ typedef long long ustime_t; /* microsecond time type. */
#include "endianconv.h" #include "endianconv.h"
#include "crc64.h" #include "crc64.h"
#include "IStorage.h" #include "IStorage.h"
#include "StorageCache.h"
#include "AsyncWorkQueue.h" #include "AsyncWorkQueue.h"
#include "gc.h" #include "gc.h"
@ -606,6 +607,10 @@ public:
#define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1 #define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1
#define REPL_DISKLESS_LOAD_SWAPDB 2 #define REPL_DISKLESS_LOAD_SWAPDB 2
/* Storage Memory Model Defines */
#define STORAGE_WRITEBACK 0
#define STORAGE_WRITETHROUGH 1
/* Sets operations codes */ /* Sets operations codes */
#define SET_OP_UNION 0 #define SET_OP_UNION 0
#define SET_OP_DIFF 1 #define SET_OP_DIFF 1
@ -1304,7 +1309,7 @@ public:
void setExpire(expireEntry &&e); void setExpire(expireEntry &&e);
void initialize(); void initialize();
void setStorageProvider(IStorage *pstorage); void setStorageProvider(StorageCache *pstorage);
void trackChanges(bool fBulk); void trackChanges(bool fBulk);
@ -1312,8 +1317,8 @@ public:
// to allow you to release the global lock before commiting. To prevent deadlocks you *must* // to allow you to release the global lock before commiting. To prevent deadlocks you *must*
// either release the global lock or keep the same global lock between the two functions as // either release the global lock or keep the same global lock between the two functions as
// a second look is kept to ensure writes to secondary storage are ordered // a second look is kept to ensure writes to secondary storage are ordered
void processChanges(); void processChanges(bool fSnapshot);
void commitChanges(); void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr);
// This should only be used if you look at the key, we do not fixup // This should only be used if you look at the key, we do not fixup
// objects stored elsewhere // objects stored elsewhere
@ -1354,10 +1359,12 @@ private:
bool operator()(const char *key, const changedesc &b) const { return sdsview(key) < b.strkey; } bool operator()(const char *key, const changedesc &b) const { return sdsview(key) < b.strkey; }
}; };
static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const changedesc &change);
void ensure(const char *key); void ensure(const char *key);
void ensure(const char *key, dictEntry **de); void ensure(const char *key, dictEntry **de);
void storeDatabase(); void storeDatabase();
void storeKey(const char *key, size_t cchKey, robj *o, bool fOverwrite); void storeKey(sds key, robj *o, bool fOverwrite);
void recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot); void recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot);
// Keyspace // Keyspace
@ -1367,7 +1374,7 @@ private:
int m_fAllChanged = 0; int m_fAllChanged = 0;
std::set<changedesc, changedescCmp> m_setchanged; std::set<changedesc, changedescCmp> m_setchanged;
size_t m_cnewKeysPending = 0; size_t m_cnewKeysPending = 0;
std::shared_ptr<IStorage> m_spstorage = nullptr; std::shared_ptr<StorageCache> m_spstorage = nullptr;
// Expire // Expire
expireset *m_setexpire = nullptr; expireset *m_setexpire = nullptr;
@ -1378,6 +1385,10 @@ private:
const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr; const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr;
std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER; std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER;
const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr; const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr;
const redisDbPersistentDataSnapshot *m_pdbSnapshotStorageFlush = nullptr;
std::set<changedesc, changedescCmp> m_setchangedStorageFlush;
int m_refCount = 0; int m_refCount = 0;
}; };
@ -2033,6 +2044,7 @@ struct redisServerConst {
int trial_timeout = 120; int trial_timeout = 120;
int delete_on_evict = false; // Only valid when a storage provider is set int delete_on_evict = false; // Only valid when a storage provider is set
int thread_min_client_threshold = 50; int thread_min_client_threshold = 50;
int storage_memory_model = STORAGE_WRITETHROUGH;
}; };
struct redisServer { struct redisServer {
@ -2395,6 +2407,7 @@ struct redisServer {
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollector; GarbageCollector<redisDbPersistentDataSnapshot> garbageCollector;
IStorageFactory *m_pstorageFactory = nullptr; IStorageFactory *m_pstorageFactory = nullptr;
int storage_flush_period; // The time between flushes in the CRON job
/* TLS Configuration */ /* TLS Configuration */
int tls_cluster; int tls_cluster;

View File

@ -8,7 +8,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
serverAssert(GlobalLocksAcquired()); serverAssert(GlobalLocksAcquired());
serverAssert(m_refCount == 0); // do not call this on a snapshot serverAssert(m_refCount == 0); // do not call this on a snapshot
freeMemoryIfNeededAndSafe(true /*fPreSnapshot*/); if (freeMemoryIfNeededAndSafe(true /*fPreSnapshot*/) != C_OK && fOptional)
return nullptr; // can't create snapshot due to OOM
int levels = 1; int levels = 1;
redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get(); redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get();
@ -76,7 +77,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now
spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER); spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER);
if (m_spstorage != nullptr) if (m_spstorage != nullptr)
spdb->m_spstorage = std::shared_ptr<IStorage>(const_cast<IStorage*>(m_spstorage->clone())); spdb->m_spstorage = std::shared_ptr<StorageCache>(const_cast<StorageCache*>(m_spstorage->clone()));
spdb->m_pdbSnapshot = m_pdbSnapshot; spdb->m_pdbSnapshot = m_pdbSnapshot;
spdb->m_refCount = 1; spdb->m_refCount = 1;
spdb->m_mvccCheckpoint = getMvccTstamp(); spdb->m_mvccCheckpoint = getMvccTstamp();

View File

@ -82,6 +82,7 @@ size_t RocksDBStorageProvider::clear()
size_t RocksDBStorageProvider::count() const size_t RocksDBStorageProvider::count() const
{ {
std::unique_lock<fastlock> l(m_lock);
return m_count; return m_count;
} }

View File

@ -17,7 +17,7 @@ class RocksDBStorageProvider : public IStorage
std::shared_ptr<rocksdb::ColumnFamilyHandle> m_spcolfamily; std::shared_ptr<rocksdb::ColumnFamilyHandle> m_spcolfamily;
rocksdb::ReadOptions m_readOptionsTemplate; rocksdb::ReadOptions m_readOptionsTemplate;
size_t m_count = 0; size_t m_count = 0;
fastlock m_lock {"RocksDBStorageProvider"}; mutable fastlock m_lock {"RocksDBStorageProvider"};
public: public:
RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count); RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count);

View File

@ -15,11 +15,13 @@ public:
RocksDBStorageFactory(const char *dbfile, int dbnum); RocksDBStorageFactory(const char *dbfile, int dbnum);
~RocksDBStorageFactory(); ~RocksDBStorageFactory();
virtual IStorage *create(int db, key_load_iterator iter) override; virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override;
virtual const char *name() const override; virtual const char *name() const override;
virtual size_t totalDiskspaceUsed() const override; virtual size_t totalDiskspaceUsed() const override;
virtual bool FSlow() const override { return true; }
private: private:
void setVersion(rocksdb::ColumnFamilyHandle*); void setVersion(rocksdb::ColumnFamilyHandle*);
}; };
@ -54,7 +56,8 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum)
options.max_background_flushes = 2; options.max_background_flushes = 2;
options.bytes_per_sync = 1048576; options.bytes_per_sync = 1048576;
options.compaction_pri = rocksdb::kMinOverlappingRatio; options.compaction_pri = rocksdb::kMinOverlappingRatio;
options.compression = rocksdb::kNoCompression; options.compression = rocksdb::kLZ4Compression;
options.enable_pipelined_write = true;
options.sst_file_manager = m_pfilemanager; options.sst_file_manager = m_pfilemanager;
rocksdb::BlockBasedTableOptions table_options; rocksdb::BlockBasedTableOptions table_options;
table_options.block_size = 16 * 1024; table_options.block_size = 16 * 1024;
@ -110,7 +113,7 @@ void RocksDBStorageFactory::setVersion(rocksdb::ColumnFamilyHandle *handle)
throw status.ToString(); throw status.ToString();
} }
IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter) IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *privdata)
{ {
++db; // skip default col family ++db; // skip default col family
std::shared_ptr<rocksdb::ColumnFamilyHandle> spcolfamily(m_vecspcols[db].release()); std::shared_ptr<rocksdb::ColumnFamilyHandle> spcolfamily(m_vecspcols[db].release());
@ -141,7 +144,7 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter)
if (FInternalKey(it->key().data(), it->key().size())) if (FInternalKey(it->key().data(), it->key().size()))
continue; continue;
if (iter != nullptr) if (iter != nullptr)
iter(it->key().data(), it->key().size()); iter(it->key().data(), it->key().size(), privdata);
++count; ++count;
} }
} }

View File

@ -1,7 +1,7 @@
#include "teststorageprovider.h" #include "teststorageprovider.h"
#include "../server.h" #include "../server.h"
IStorage *TestStorageFactory::create(int, key_load_iterator) IStorage *TestStorageFactory::create(int, key_load_iterator, void *)
{ {
return new (MALLOC_LOCAL) TestStorageProvider(); return new (MALLOC_LOCAL) TestStorageProvider();
} }

View File

@ -4,9 +4,10 @@
class TestStorageFactory : public IStorageFactory class TestStorageFactory : public IStorageFactory
{ {
virtual class IStorage *create(int db, key_load_iterator itr) override; virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override;
virtual const char *name() const override; virtual const char *name() const override;
virtual size_t totalDiskspaceUsed() const override { return 0; } virtual size_t totalDiskspaceUsed() const override { return 0; }
virtual bool FSlow() const { return false; }
}; };
class TestStorageProvider final : public IStorage class TestStorageProvider final : public IStorage

View File

@ -1,4 +1,4 @@
start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no]] { start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] {
test { FLASH - GET works after eviction } { test { FLASH - GET works after eviction } {
r set testkey foo r set testkey foo
@ -100,6 +100,8 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.
} }
r flushall r flushall
# If a weak storage memory model is set, wait for any pending snapshot writes to finish
after 500
foreach policy { foreach policy {
allkeys-random allkeys-lru allkeys-lfu allkeys-random allkeys-lru allkeys-lfu
} { } {
@ -107,7 +109,7 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.
# Get the current memory limit and calculate a new limit. # Get the current memory limit and calculate a new limit.
# Set limit to 100M. # Set limit to 100M.
set used [s used_memory] set used [s used_memory]
set limit [expr {$used+50000*1024}] set limit [expr {$used+60*1024*1024}]
r config set maxmemory $limit r config set maxmemory $limit
r config set maxmemory-policy $policy r config set maxmemory-policy $policy
# Now add keys equivalent to 1024b until the limit is almost reached. # Now add keys equivalent to 1024b until the limit is almost reached.
@ -124,7 +126,8 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.
# should still be under the limit for maxmemory, however all keys set should still exist between flash and memory # should still be under the limit for maxmemory, however all keys set should still exist between flash and memory
# check same number of keys exist in addition to values of first and last keys # check same number of keys exist in addition to values of first and last keys
set err 0 set err 0
for {set j 0} {$j < 10000} {incr j} { set extra_keys [expr floor([expr ($limit * 0.4) / 1024])]
for {set j 0} {$j < $extra_keys} {incr j} {
catch { catch {
r set p2$j xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx r set p2$j xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
} err } err
@ -135,8 +138,8 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.
} }
r set last val r set last val
set dbsize [r dbsize] set dbsize [r dbsize]
assert {[s used_memory] < $limit+4096} assert {[s used_memory] < ($limit*1.2)}
assert {$dbsize == $numkeys+10002} assert {$dbsize == $numkeys+$extra_keys+2}
assert {[r get first] == {val}} assert {[r get first] == {val}}
assert {[r get last] == {val}} assert {[r get last] == {val}}
r flushall r flushall

View File

@ -99,6 +99,7 @@ start_server {tags {"introspection"}} {
bio_cpulist bio_cpulist
aof_rewrite_cpulist aof_rewrite_cpulist
bgsave_cpulist bgsave_cpulist
storage-cache-mode
} }
set configs {} set configs {}