From 17661f2382ee736e696f0e53b089008474076c39 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 11 Jul 2020 21:23:48 +0000 Subject: [PATCH] Implement storage key cache, and writeback memory model Former-commit-id: 732bd9c153459f1174475ad67de36c399ddbe359 --- src/IStorage.h | 5 +- src/Makefile | 2 +- src/StorageCache.cpp | 76 ++++++++++++++++++++ src/StorageCache.h | 57 +++++++++++++++ src/aof.cpp | 2 +- src/config.cpp | 7 ++ src/db.cpp | 105 ++++++++++++++++++++-------- src/rdb.cpp | 4 +- src/server.cpp | 73 ++++++++++++++++--- src/server.h | 23 ++++-- src/snapshot.cpp | 5 +- src/storage/rocksdb.cpp | 1 + src/storage/rocksdb.h | 2 +- src/storage/rocksdbfactory.cpp | 11 +-- src/storage/teststorageprovider.cpp | 2 +- src/storage/teststorageprovider.h | 3 +- tests/unit/flash.tcl | 13 ++-- tests/unit/introspection.tcl | 1 + 18 files changed, 326 insertions(+), 66 deletions(-) create mode 100644 src/StorageCache.cpp create mode 100644 src/StorageCache.h diff --git a/src/IStorage.h b/src/IStorage.h index 087a0b1dd..69611de13 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -4,12 +4,13 @@ class IStorageFactory { public: - typedef void (*key_load_iterator)(const char *rgchKey, size_t cchKey); + typedef void (*key_load_iterator)(const char *rgchKey, size_t cchKey, void *privdata); virtual ~IStorageFactory() {} - virtual class IStorage *create(int db, key_load_iterator itr) = 0; + virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0; virtual const char *name() const = 0; virtual size_t totalDiskspaceUsed() const = 0; + virtual bool FSlow() const = 0; }; class IStorage diff --git a/src/Makefile b/src/Makefile index 938531b3a..cba5b77be 100644 --- a/src/Makefile +++ b/src/Makefile @@ -277,7 +277,7 @@ endif REDIS_SERVER_NAME=keydb-pro-server REDIS_SENTINEL_NAME=keydb-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o $(ASM_OBJ) +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o acl.o storage.o rdb-s3.o fastlock.o new.o tracking.o cron.o connection.o tls.o sha256.o motd.o timeout.o setcpuaffinity.o AsyncWorkQueue.o snapshot.o storage/rocksdb.o storage/rocksdbfactory.o storage/teststorageprovider.o keydbutils.o StorageCache.o $(ASM_OBJ) REDIS_CLI_NAME=keydb-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o redis-cli-cpphelper.o zmalloc.o release.o anet.o ae.o crcspeed.o crc64.o siphash.o crc16.o storage-lite.o fastlock.o new.o motd.o $(ASM_OBJ) REDIS_BENCHMARK_NAME=keydb-benchmark diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp new file mode 100644 index 000000000..19818eb97 --- /dev/null +++ b/src/StorageCache.cpp @@ -0,0 +1,76 @@ +#include "server.h" + +void StorageCache::clear() +{ + if (m_setkeys != nullptr) + m_setkeys->clear(); + m_spstorage->clear(); +} + +void StorageCache::cacheKey(sds key) +{ + if (m_setkeys == nullptr) + return; + m_setkeys->insert(sdsimmutablestring(sdsdupshared(key))); +} + +void StorageCache::cacheKey(const char *rgch, size_t cch) +{ + if (m_setkeys == nullptr) + return; + m_setkeys->insert(sdsimmutablestring(sdsnewlen(rgch, cch))); +} + +bool StorageCache::erase(sds key) +{ + bool result = m_spstorage->erase(key, sdslen(key)); + if (result && m_setkeys != nullptr) + { + auto itr = m_setkeys->find(sdsview(key)); + serverAssert(itr != m_setkeys->end()); + m_setkeys->erase(itr); + } + return result; +} + +void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite) +{ + if (!fOverwrite && m_setkeys != nullptr) + { + cacheKey(key); + } + m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); +} + +const StorageCache *StorageCache::clone() +{ + // Clones never clone the cache + StorageCache *cacheNew = new StorageCache(const_cast(m_spstorage->clone())); + return cacheNew; +} + +void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey) const +{ + if (m_setkeys != nullptr) + { + auto itr = m_setkeys->find(sdsview(key)); + if (itr == m_setkeys->end()) + return; // Not found + if (cachedKey != nullptr) + *cachedKey = sdsdupshared(itr->get()); + } + m_spstorage->retrieve(key, sdslen(key), fn); +} + +size_t StorageCache::count() const +{ + size_t count = m_spstorage->count(); + if (m_setkeys != nullptr) + serverAssert(count == m_setkeys->size()); + return count; +} + +void StorageCache::beginWriteBatch() { + serverAssert(GlobalLocksAcquired()); // Otherwise we deadlock + m_spstorage->beginWriteBatch(); +} \ No newline at end of file diff --git a/src/StorageCache.h b/src/StorageCache.h new file mode 100644 index 000000000..a430572af --- /dev/null +++ b/src/StorageCache.h @@ -0,0 +1,57 @@ +#pragma once +#include "sds.h" + +class StorageCache +{ + std::shared_ptr m_spstorage; + std::unique_ptr> m_setkeys; + + StorageCache(IStorage *storage) + : m_spstorage(storage) + {} + + void cacheKey(sds key); + void cacheKey(const char *rgchKey, size_t cchKey); + + struct load_iter_data + { + StorageCache *cache; + IStorageFactory::key_load_iterator itrParent; + void *privdataParent; + }; + static void key_load_itr(const char *rgchKey, size_t cchKey, void *privdata) + { + load_iter_data *data = (load_iter_data*)privdata; + data->cache->cacheKey(rgchKey, cchKey); + if (data->itrParent) + data->itrParent(rgchKey, cchKey, data->privdataParent); + } + +public: + static StorageCache *create(IStorageFactory *pfactory, int db, IStorageFactory::key_load_iterator fn, void *privdata) { + StorageCache *cache = new StorageCache(nullptr); + if (pfactory->FSlow()) + { + cache->m_setkeys = std::make_unique>(); + } + load_iter_data data = {cache, fn, privdata}; + cache->m_spstorage = std::shared_ptr(pfactory->create(db, key_load_itr, (void*)&data)); + return cache; + } + + void clear(); + void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); + void retrieve(sds key, IStorage::callbackSingle fn, sds *sharedKeyOut) const; + bool erase(sds key); + + bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } + + void beginWriteBatch(); + void endWriteBatch() { m_spstorage->endWriteBatch(); } + void batch_lock() { return m_spstorage->batch_lock(); } + void batch_unlock() { return m_spstorage->batch_unlock(); } + + size_t count() const; + + const StorageCache *clone(); +}; \ No newline at end of file diff --git a/src/aof.cpp b/src/aof.cpp index 0a15204d0..69f229b31 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -968,7 +968,7 @@ int loadAppendOnlyFile(char *filename) { loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->processChanges(false); g_pserver->db[idb]->commitChanges(); } fclose(fp); diff --git a/src/config.cpp b/src/config.cpp index 3dc81dc5e..1f1387ea0 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -105,6 +105,11 @@ configEnum repl_diskless_load_enum[] = { {NULL, 0} }; +configEnum storage_memory_model_enum[] = { + {"writeback", STORAGE_WRITEBACK}, + {"writethrough", STORAGE_WRITETHROUGH}, +}; + /* Output buffer limits presets. */ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {0, 0, 0}, /* normal */ @@ -2316,6 +2321,7 @@ standardConfig configs[] = { createEnumConfig("loglevel", NULL, MODIFIABLE_CONFIG, loglevel_enum, cserver.verbosity, LL_NOTICE, NULL, NULL), createEnumConfig("maxmemory-policy", NULL, MODIFIABLE_CONFIG, maxmemory_policy_enum, g_pserver->maxmemory_policy, MAXMEMORY_NO_EVICTION, NULL, NULL), createEnumConfig("appendfsync", NULL, MODIFIABLE_CONFIG, aof_fsync_enum, g_pserver->aof_fsync, AOF_FSYNC_EVERYSEC, NULL, NULL), + createEnumConfig("storage-cache-mode", NULL, IMMUTABLE_CONFIG, storage_memory_model_enum, cserver.storage_memory_model, STORAGE_WRITETHROUGH, NULL, NULL), /* Integer configs */ createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, cserver.dbnum, 16, INTEGER_CONFIG, NULL, NULL), @@ -2349,6 +2355,7 @@ standardConfig configs[] = { createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-replicas-max-lag", "min-slaves-max-lag", MODIFIABLE_CONFIG, 0, INT_MAX, g_pserver->repl_min_slaves_max_lag, 10, INTEGER_CONFIG, NULL, updateGoodSlaves), createIntConfig("min-clients-per-thread", NULL, MODIFIABLE_CONFIG, 0, 400, cserver.thread_min_client_threshold, 50, INTEGER_CONFIG, NULL, NULL), + createIntConfig("storage-flush-period", NULL, MODIFIABLE_CONFIG, 1, 10000, g_pserver->storage_flush_period, 500, INTEGER_CONFIG, NULL, NULL), /* Unsigned int configs */ createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, g_pserver->maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients), diff --git a/src/db.cpp b/src/db.cpp index 39ba16cb9..60a3b4d3e 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -399,7 +399,7 @@ bool redisDbPersistentData::syncDelete(robj *key) bool fDeleted = false; if (m_spstorage != nullptr) - fDeleted = m_spstorage->erase(szFromObj(key), sdslen(szFromObj(key))); + fDeleted = m_spstorage->erase(szFromObj(key)); fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted; if (fDeleted) { @@ -2266,13 +2266,13 @@ void redisDbPersistentData::initialize() m_fTrackingChanges = 0; } -void redisDbPersistentData::setStorageProvider(IStorage *pstorage) +void redisDbPersistentData::setStorageProvider(StorageCache *pstorage) { serverAssert(m_spstorage == nullptr); - m_spstorage = std::unique_ptr(pstorage); + m_spstorage = std::unique_ptr(pstorage); } -void clusterStorageLoadCallback(const char *rgchkey, size_t cch) +void clusterStorageLoadCallback(const char *rgchkey, size_t cch, void *) { slotToKeyUpdateKeyCore(rgchkey, cch, true /*add*/); } @@ -2296,7 +2296,7 @@ void redisDb::storageProviderInitialize() if (g_pserver->m_pstorageFactory != nullptr) { IStorageFactory::key_load_iterator itr = (g_pserver->cluster_enabled) ? clusterStorageLoadCallback : nullptr; - this->setStorageProvider(g_pserver->m_pstorageFactory->create(id, itr)); + this->setStorageProvider(StorageCache::create(g_pserver->m_pstorageFactory, id, itr, nullptr)); } } @@ -2339,7 +2339,11 @@ void redisDbPersistentData::clear(void(callback)(void*)) { dictEmpty(m_pdict,callback); if (m_fTrackingChanges) + { + m_setchanged.clear(); + m_cnewKeysPending = 0; m_fAllChanged++; + } delete m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(); if (m_spstorage != nullptr) @@ -2448,14 +2452,21 @@ LNotFound: { if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database { - m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){ + sds sdsNewKey = nullptr; // the storage cache will give us its cached key if available + robj *o = nullptr; + std::unique_ptr spexpire; + m_spstorage->retrieve((sds)sdsKey, [&](const char *, size_t, const void *data, size_t cb){ size_t offset = 0; - sds sdsNewKey = sdsdupshared(sdsKey); - auto spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset); - robj *o = deserializeStoredObject(this, sdsKey, reinterpret_cast(data) + offset, cb - offset); + spexpire = deserializeExpire((sds)sdsNewKey, (const char*)data, cb, &offset); + o = deserializeStoredObject(this, sdsKey, reinterpret_cast(data) + offset, cb - offset); serverAssert(o != nullptr); + }, &sdsNewKey); + + if (o != nullptr) + { + if (sdsNewKey == nullptr) + sdsNewKey = sdsdupshared(sdsKey); dictAdd(m_pdict, sdsNewKey, o); - o->SetFExpires(spexpire != nullptr); if (spexpire != nullptr) @@ -2467,7 +2478,13 @@ LNotFound: serverAssert(m_setexpire->find(sdsKey) != m_setexpire->end()); } serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); - }); + } + else + { + if (sdsNewKey != nullptr) + sdsfree(sdsNewKey); // BUG but don't bother crashing + } + *pde = dictFind(m_pdict, sdsKey); } } @@ -2479,10 +2496,10 @@ LNotFound: } } -void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite) +void redisDbPersistentData::storeKey(sds key, robj *o, bool fOverwrite) { - sds temp = serializeStoredObjectAndExpire(this, szKey, o); - m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite); + sds temp = serializeStoredObjectAndExpire(this, key, o); + m_spstorage->insert(key, temp, sdslen(temp), fOverwrite); sdsfree(temp); } @@ -2493,12 +2510,24 @@ void redisDbPersistentData::storeDatabase() while ((de = dictNext(di)) != NULL) { sds key = (sds)dictGetKey(de); robj *o = (robj*)dictGetVal(de); - storeKey(key, sdslen(key), o, false); + storeKey(key, o, false); } + serverAssert(dictSize(m_pdict) == m_spstorage->count()); dictReleaseIterator(di); } -void redisDbPersistentData::processChanges() +/* static */ void redisDbPersistentData::serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const redisDbPersistentData::changedesc &change) +{ + auto itr = db->find_cached_threadsafe(change.strkey.get()); + if (itr == nullptr) + return; + robj *o = itr.val(); + sds temp = serializeStoredObjectAndExpire(db, (const char*) itr.key(), o); + storage->insert((sds)change.strkey.get(), temp, sdslen(temp), change.fUpdate); + sdsfree(temp); +} + +void redisDbPersistentData::processChanges(bool fSnapshot) { serverAssert(GlobalLocksAcquired()); @@ -2508,35 +2537,51 @@ void redisDbPersistentData::processChanges() if (m_spstorage != nullptr) { m_spstorage->beginWriteBatch(); - if (m_fTrackingChanges >= 0) + serverAssert(m_pdbSnapshotStorageFlush == nullptr); + if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100) + { + // Do a snapshot based process if possible + m_pdbSnapshotStorageFlush = createSnapshot(getMvccTstamp(), true /* optional */); + if (m_pdbSnapshotStorageFlush) + { + m_setchangedStorageFlush = std::move(m_setchanged); + } + } + + if (m_pdbSnapshotStorageFlush == nullptr) { if (m_fAllChanged) { m_spstorage->clear(); storeDatabase(); - m_fAllChanged--; + m_fAllChanged = 0; } else { for (auto &change : m_setchanged) { - auto itr = find_cached_threadsafe(change.strkey.get()); - if (itr == nullptr) - continue; - robj *o = itr.val(); - sds temp = serializeStoredObjectAndExpire(this, (const char*) itr.key(), o); - m_spstorage->insert(change.strkey.get(), sdslen(change.strkey.get()), temp, sdslen(temp), change.fUpdate); - sdsfree(temp); + serializeAndStoreChange(m_spstorage.get(), this, change); } } - m_setchanged.clear(); - m_cnewKeysPending = 0; } + m_setchanged.clear(); + m_cnewKeysPending = 0; } } -void redisDbPersistentData::commitChanges() +void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) { + if (m_pdbSnapshotStorageFlush) + { + + for (auto &change : m_setchangedStorageFlush) + { + serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, change); + } + m_setchangedStorageFlush.clear(); + *psnapshotFree = m_pdbSnapshotStorageFlush; + m_pdbSnapshotStorageFlush = nullptr; + } if (m_spstorage != nullptr) m_spstorage->endWriteBatch(); } @@ -2581,7 +2626,7 @@ dict_iter redisDbPersistentData::random() size_t redisDbPersistentData::size() const { - if (m_spstorage != nullptr) + if (m_spstorage != nullptr && !m_fAllChanged) return m_spstorage->count() + m_cnewKeysPending; return dictSize(m_pdict) @@ -2624,7 +2669,7 @@ void redisDbPersistentData::removeAllCachedValues() // First we have to flush the tracked changes if (m_fTrackingChanges) { - processChanges(); + processChanges(false); commitChanges(); trackChanges(false); } diff --git a/src/rdb.cpp b/src/rdb.cpp index befb43430..efd4f47b6 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2515,7 +2515,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { { for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->processChanges(false); g_pserver->db[idb]->commitChanges(); g_pserver->db[idb]->trackChanges(false); } @@ -2591,7 +2591,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->processChanges(false); g_pserver->db[idb]->commitChanges(); } return C_OK; diff --git a/src/server.cpp b/src/server.cpp index 4a1b1ff72..52fedc853 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1996,6 +1996,43 @@ void checkChildrenDone(void) { } } +static std::atomic s_fFlushInProgress { false }; +void flushStorageWeak() +{ + bool fExpected = false; + if (s_fFlushInProgress.compare_exchange_strong(fExpected, true /* desired */, std::memory_order_seq_cst, std::memory_order_relaxed)) + { + g_pserver->asyncworkqueue->AddWorkFunction([]{ + aeAcquireLock(); + mstime_t storage_process_latency; + latencyStartMonitor(storage_process_latency); + std::vector vecdb; + for (int idb = 0; idb < cserver.dbnum; ++idb) { + vecdb.push_back(g_pserver->db[idb]); + g_pserver->db[idb]->processChanges(true); + } + latencyEndMonitor(storage_process_latency); + latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency); + aeReleaseLock(); + + std::vector vecsnapshotFree; + vecsnapshotFree.resize(vecdb.size()); + for (size_t idb = 0; idb < vecdb.size(); ++idb) + vecdb[idb]->commitChanges(&vecsnapshotFree[idb]); + + for (size_t idb = 0; idb < vecsnapshotFree.size(); ++idb) { + if (vecsnapshotFree[idb] != nullptr) + vecdb[idb]->endSnapshotAsync(vecsnapshotFree[idb]); + } + s_fFlushInProgress = false; + }, true /* fHiPri */); + } + else + { + serverLog(LOG_INFO, "Missed storage flush due to existing flush still in flight. Consider increasing storage-weak-flush-period"); + } +} + /* This is our timer interrupt, called g_pserver->hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: @@ -2263,6 +2300,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { g_pserver->rdb_bgsave_scheduled = 0; } + if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory) { + run_with_period(g_pserver->storage_flush_period) { + flushStorageWeak(); + } + } + g_pserver->asyncworkqueue->AddWorkFunction([]{ g_pserver->db[0]->consolidate_snapshot(); }, true /*HiPri*/); @@ -2392,17 +2435,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) { static thread_local bool fFirstRun = true; // note: we also copy the DB pointer in case a DB swap is done while the lock is released std::vector vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released - if (!fFirstRun) { - mstime_t storage_process_latency; - latencyStartMonitor(storage_process_latency); - for (int idb = 0; idb < cserver.dbnum; ++idb) { - vecdb.push_back(g_pserver->db[idb]); - g_pserver->db[idb]->processChanges(); + if (cserver.storage_memory_model == STORAGE_WRITETHROUGH) + { + if (!fFirstRun) { + mstime_t storage_process_latency; + latencyStartMonitor(storage_process_latency); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + vecdb.push_back(g_pserver->db[idb]); + g_pserver->db[idb]->processChanges(false); + } + latencyEndMonitor(storage_process_latency); + latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency); + } else { + fFirstRun = false; } - latencyEndMonitor(storage_process_latency); - latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency); - } else { - fFirstRun = false; } int aof_state = g_pserver->aof_state; @@ -2435,7 +2481,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) { aeReleaseLock(); } - /* This function is called immadiately after the event loop multiplexing * API returned, and the control is going to soon return to Redis by invoking * the different events callbacks. */ @@ -4209,6 +4254,12 @@ int prepareForShutdown(int flags) { redisCommunicateSystemd("STATUS=Error trying to save the DB, can't exit.\n"); return C_ERR; } + + // Also Dump To FLASH if Applicable + for (int idb = 0; idb < cserver.dbnum; ++idb) { + g_pserver->db[idb]->processChanges(false); + g_pserver->db[idb]->commitChanges(); + } } /* Fire the shutdown modules event. */ diff --git a/src/server.h b/src/server.h index 81f47304c..918e1ae72 100644 --- a/src/server.h +++ b/src/server.h @@ -106,6 +106,7 @@ typedef long long ustime_t; /* microsecond time type. */ #include "endianconv.h" #include "crc64.h" #include "IStorage.h" +#include "StorageCache.h" #include "AsyncWorkQueue.h" #include "gc.h" @@ -606,6 +607,10 @@ public: #define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1 #define REPL_DISKLESS_LOAD_SWAPDB 2 +/* Storage Memory Model Defines */ +#define STORAGE_WRITEBACK 0 +#define STORAGE_WRITETHROUGH 1 + /* Sets operations codes */ #define SET_OP_UNION 0 #define SET_OP_DIFF 1 @@ -1304,7 +1309,7 @@ public: void setExpire(expireEntry &&e); void initialize(); - void setStorageProvider(IStorage *pstorage); + void setStorageProvider(StorageCache *pstorage); void trackChanges(bool fBulk); @@ -1312,8 +1317,8 @@ public: // to allow you to release the global lock before commiting. To prevent deadlocks you *must* // either release the global lock or keep the same global lock between the two functions as // a second look is kept to ensure writes to secondary storage are ordered - void processChanges(); - void commitChanges(); + void processChanges(bool fSnapshot); + void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr); // This should only be used if you look at the key, we do not fixup // objects stored elsewhere @@ -1354,10 +1359,12 @@ private: bool operator()(const char *key, const changedesc &b) const { return sdsview(key) < b.strkey; } }; + static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const changedesc &change); + void ensure(const char *key); void ensure(const char *key, dictEntry **de); void storeDatabase(); - void storeKey(const char *key, size_t cchKey, robj *o, bool fOverwrite); + void storeKey(sds key, robj *o, bool fOverwrite); void recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot); // Keyspace @@ -1367,7 +1374,7 @@ private: int m_fAllChanged = 0; std::set m_setchanged; size_t m_cnewKeysPending = 0; - std::shared_ptr m_spstorage = nullptr; + std::shared_ptr m_spstorage = nullptr; // Expire expireset *m_setexpire = nullptr; @@ -1378,6 +1385,10 @@ private: const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr; std::unique_ptr m_spdbSnapshotHOLDER; const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr; + + const redisDbPersistentDataSnapshot *m_pdbSnapshotStorageFlush = nullptr; + std::set m_setchangedStorageFlush; + int m_refCount = 0; }; @@ -2033,6 +2044,7 @@ struct redisServerConst { int trial_timeout = 120; int delete_on_evict = false; // Only valid when a storage provider is set int thread_min_client_threshold = 50; + int storage_memory_model = STORAGE_WRITETHROUGH; }; struct redisServer { @@ -2395,6 +2407,7 @@ struct redisServer { GarbageCollector garbageCollector; IStorageFactory *m_pstorageFactory = nullptr; + int storage_flush_period; // The time between flushes in the CRON job /* TLS Configuration */ int tls_cluster; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 7cbbf94a2..a4e97020d 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -8,7 +8,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 serverAssert(GlobalLocksAcquired()); serverAssert(m_refCount == 0); // do not call this on a snapshot - freeMemoryIfNeededAndSafe(true /*fPreSnapshot*/); + if (freeMemoryIfNeededAndSafe(true /*fPreSnapshot*/) != C_OK && fOptional) + return nullptr; // can't create snapshot due to OOM int levels = 1; redisDbPersistentDataSnapshot *psnapshot = m_spdbSnapshotHOLDER.get(); @@ -76,7 +77,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER); if (m_spstorage != nullptr) - spdb->m_spstorage = std::shared_ptr(const_cast(m_spstorage->clone())); + spdb->m_spstorage = std::shared_ptr(const_cast(m_spstorage->clone())); spdb->m_pdbSnapshot = m_pdbSnapshot; spdb->m_refCount = 1; spdb->m_mvccCheckpoint = getMvccTstamp(); diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 0d3bbc4f3..4f751b29c 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -82,6 +82,7 @@ size_t RocksDBStorageProvider::clear() size_t RocksDBStorageProvider::count() const { + std::unique_lock l(m_lock); return m_count; } diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 8e03a3471..4a0665b6e 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -17,7 +17,7 @@ class RocksDBStorageProvider : public IStorage std::shared_ptr m_spcolfamily; rocksdb::ReadOptions m_readOptionsTemplate; size_t m_count = 0; - fastlock m_lock {"RocksDBStorageProvider"}; + mutable fastlock m_lock {"RocksDBStorageProvider"}; public: RocksDBStorageProvider(std::shared_ptr &spdb, std::shared_ptr &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count); diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 354483a8e..17a2982bc 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -15,11 +15,13 @@ public: RocksDBStorageFactory(const char *dbfile, int dbnum); ~RocksDBStorageFactory(); - virtual IStorage *create(int db, key_load_iterator iter) override; + virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override; + virtual bool FSlow() const override { return true; } + private: void setVersion(rocksdb::ColumnFamilyHandle*); }; @@ -54,7 +56,8 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum) options.max_background_flushes = 2; options.bytes_per_sync = 1048576; options.compaction_pri = rocksdb::kMinOverlappingRatio; - options.compression = rocksdb::kNoCompression; + options.compression = rocksdb::kLZ4Compression; + options.enable_pipelined_write = true; options.sst_file_manager = m_pfilemanager; rocksdb::BlockBasedTableOptions table_options; table_options.block_size = 16 * 1024; @@ -110,7 +113,7 @@ void RocksDBStorageFactory::setVersion(rocksdb::ColumnFamilyHandle *handle) throw status.ToString(); } -IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter) +IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *privdata) { ++db; // skip default col family std::shared_ptr spcolfamily(m_vecspcols[db].release()); @@ -141,7 +144,7 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter) if (FInternalKey(it->key().data(), it->key().size())) continue; if (iter != nullptr) - iter(it->key().data(), it->key().size()); + iter(it->key().data(), it->key().size(), privdata); ++count; } } diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index 6995a8b10..188a339f3 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -1,7 +1,7 @@ #include "teststorageprovider.h" #include "../server.h" -IStorage *TestStorageFactory::create(int, key_load_iterator) +IStorage *TestStorageFactory::create(int, key_load_iterator, void *) { return new (MALLOC_LOCAL) TestStorageProvider(); } diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index f719577e9..d5e956c7e 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -4,9 +4,10 @@ class TestStorageFactory : public IStorageFactory { - virtual class IStorage *create(int db, key_load_iterator itr) override; + virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override { return 0; } + virtual bool FSlow() const { return false; } }; class TestStorageProvider final : public IStorage diff --git a/tests/unit/flash.tcl b/tests/unit/flash.tcl index 374bb648e..1988d307a 100644 --- a/tests/unit/flash.tcl +++ b/tests/unit/flash.tcl @@ -1,4 +1,4 @@ -start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no]] { +start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] { test { FLASH - GET works after eviction } { r set testkey foo @@ -100,6 +100,8 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks. } r flushall + # If a weak storage memory model is set, wait for any pending snapshot writes to finish + after 500 foreach policy { allkeys-random allkeys-lru allkeys-lfu } { @@ -107,7 +109,7 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks. # Get the current memory limit and calculate a new limit. # Set limit to 100M. set used [s used_memory] - set limit [expr {$used+50000*1024}] + set limit [expr {$used+60*1024*1024}] r config set maxmemory $limit r config set maxmemory-policy $policy # Now add keys equivalent to 1024b until the limit is almost reached. @@ -124,7 +126,8 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks. # should still be under the limit for maxmemory, however all keys set should still exist between flash and memory # check same number of keys exist in addition to values of first and last keys set err 0 - for {set j 0} {$j < 10000} {incr j} { + set extra_keys [expr floor([expr ($limit * 0.4) / 1024])] + for {set j 0} {$j < $extra_keys} {incr j} { catch { r set p2$j xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx } err @@ -135,8 +138,8 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks. } r set last val set dbsize [r dbsize] - assert {[s used_memory] < $limit+4096} - assert {$dbsize == $numkeys+10002} + assert {[s used_memory] < ($limit*1.2)} + assert {$dbsize == $numkeys+$extra_keys+2} assert {[r get first] == {val}} assert {[r get last] == {val}} r flushall diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index c7121e848..457881cf2 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -99,6 +99,7 @@ start_server {tags {"introspection"}} { bio_cpulist aof_rewrite_cpulist bgsave_cpulist + storage-cache-mode } set configs {}