From 2c9c4a98ea643bc9809baa60b2000934cd3873ac Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 4 Aug 2020 04:37:16 +0000 Subject: [PATCH 1/8] Latency normalization with SCAN Former-commit-id: 237f25d854e70d4d7a3095fdf56aaa80770e492e --- src/db.cpp | 16 +++++++++++++--- src/snapshot.cpp | 20 +++++++++++++------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 3865ce9b2..46138f677 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -896,7 +896,10 @@ void keysCommand(client *c) { locker.arm(c); unblockClient(c); - db->endSnapshot(snapshot); + + locker.disarm(); + lock.unlock(); + db->endSnapshotAsync(snapshot); aeAcquireLock(); }); }); @@ -1044,7 +1047,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { // Do an async version const redisDbPersistentDataSnapshot *snapshot = nullptr; if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED))) - snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */); + snapshot = c->db->createSnapshot(c->mvccCheckpoint, false /* fOptional */); if (snapshot != nullptr) { aeEventLoop *el = serverTL->el; @@ -1082,9 +1085,16 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { locker.arm(c); unblockClient(c); + mstime_t timeScanFilter; + latencyStartMonitor(timeScanFilter); scanFilterAndReply(c, keys, nullptr, nullptr, false, nullptr, cursorResult); + latencyEndMonitor(timeScanFilter); + latencyAddSampleIfNeeded("scan-async-filter", timeScanFilter); - db->endSnapshot(snapshot); + locker.disarm(); + lock.unlock(); + + db->endSnapshotAsync(snapshot); listSetFreeMethod(keys,decrRefCountVoid); listRelease(keys); aeAcquireLock(); diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 79df2a662..4734b84b3 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -182,7 +182,8 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot // have some internal heuristics to do a synchronous endSnapshot if it makes sense void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot) { - aeAcquireLock(); + mstime_t latency; + aeAcquireLock(); latencyStartMonitor(latency); if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint) { // Free a stale async snapshot so consolidate_children can clean it up later @@ -198,6 +199,8 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot { // For small snapshots it makes more sense just to merge it directly endSnapshot(psnapshot); + latencyEndMonitor(latency); + latencyAddSampleIfNeeded("end-snapshot-async-synchronous-path", latency); aeReleaseLock(); return; } @@ -206,19 +209,22 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot auto psnapshotT = createSnapshot(LLONG_MAX, false); endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref psnapshot = nullptr; - aeReleaseLock(); + aeReleaseLock(); latencyEndMonitor(latency); + latencyAddSampleIfNeeded("end-snapshot-async-phase-1", latency); // do the expensive work of merging snapshots outside the ref const_cast(psnapshotT)->freeTombstoneObjects(1); // depth is one because we just creted it const_cast(psnapshotT)->consolidate_children(this, true); // Final Cleanup - aeAcquireLock(); + aeAcquireLock(); latencyStartMonitor(latency); if (m_pdbSnapshotASYNC == nullptr) m_pdbSnapshotASYNC = psnapshotT; else endSnapshot(psnapshotT); // finally clean up our temp snapshot - aeReleaseLock(); + aeReleaseLock(); latencyEndMonitor(latency); + + latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency); } void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) @@ -262,6 +268,9 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn return; } + mstime_t latency_endsnapshot; + latencyStartMonitor(latency_endsnapshot); + // Alright we're ready to be free'd, but first dump all the refs on our child snapshots if (m_spdbSnapshotHOLDER->m_refCount == 1) recursiveFreeSnapshots(m_spdbSnapshotHOLDER.get()); @@ -288,9 +297,6 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn return; } - mstime_t latency_endsnapshot; - latencyStartMonitor(latency_endsnapshot); - // Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; From ef7198e24d7ecddcf7137a3ef1215317aa68c497 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 6 Aug 2020 02:13:23 +0000 Subject: [PATCH 2/8] keydbpro unstable version should be 0.0.0 Former-commit-id: 4a6e0772f36d5dd721ad2e9b83f9fccce2e38ff0 --- src/version.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/version.h b/src/version.h index 92689b67a..befbfff3f 100644 --- a/src/version.h +++ b/src/version.h @@ -1,4 +1,4 @@ -#define KEYDB_REAL_VERSION "6.0.12" +#define KEYDB_REAL_VERSION "0.0.0" extern const char *KEYDB_SET_VERSION; // Unlike real version, this can be overriden by the config enum VersionCompareResult From 3c13a4bc255ba7d0254db53a25112e334cbc9726 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 6 Aug 2020 22:30:56 +0000 Subject: [PATCH 3/8] Disable compression it destroys read perf Former-commit-id: d3fffc12ae5339886f54c064127497f277393b00 From 80dc68e497912434d238bc6cfbb3d8e6d1c24576 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 9 Aug 2020 00:57:56 +0000 Subject: [PATCH 4/8] Start off storage cache with a larger size Former-commit-id: 5f6fb970a81cc73586ba595b35564e7865e7262d --- src/StorageCache.h | 2 +- src/semiorderedset.h | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/StorageCache.h b/src/StorageCache.h index a430572af..429795eb8 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -32,7 +32,7 @@ public: StorageCache *cache = new StorageCache(nullptr); if (pfactory->FSlow()) { - cache->m_setkeys = std::make_unique>(); + cache->m_setkeys = std::make_unique>(20); } load_iter_data data = {cache, fn, privdata}; cache->m_spstorage = std::shared_ptr(pfactory->create(db, key_load_itr, (void*)&data)); diff --git a/src/semiorderedset.h b/src/semiorderedset.h index 1b79e725b..c0281c35c 100644 --- a/src/semiorderedset.h +++ b/src/semiorderedset.h @@ -49,8 +49,11 @@ class semiorderedset } public: - semiorderedset() + semiorderedset(size_t bitsStart = 0) { + if (bitsStart < bits_min) + bitsStart = bits_min; + bits = bitsStart; m_data.resize((1ULL << bits)); } From 94020160dc66e2de5372018aba2b84ef5b0ef201 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 9 Aug 2020 01:47:53 +0000 Subject: [PATCH 5/8] Ensure we use the SSE CRC32 instruction Former-commit-id: 78ef8976e86d07d4c1fa7c4631a610babf2f4776 --- deps/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/deps/Makefile b/deps/Makefile index bc62c594f..b2c021227 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -94,6 +94,6 @@ jemalloc: .make-prerequisites rocksdb: .make-prerequisites @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) - cd rocksdb && PORTABLE=1 FORCE_SSE42=1 $(MAKE) static_lib + cd rocksdb && PORTABLE=1 USE_SSE=1 FORCE_SSE42=1 $(MAKE) static_lib .PHONY: rocksdb From 848057ff19c829ece71c385e522b11b96c68b2ee Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 9 Aug 2020 23:36:20 +0000 Subject: [PATCH 6/8] RocksDB Read Performance Improvements Former-commit-id: 80cca4869888e048e10e11f1f20796c482c3e5b3 --- src/StorageCache.cpp | 8 +++++ src/StorageCache.h | 1 + src/aof.cpp | 4 +-- src/config.cpp | 2 +- src/db.cpp | 86 ++++++++++++++++++++++++++++++++++++++++++-- src/evict.cpp | 17 ++++----- src/rdb.cpp | 10 +++--- src/scripting.cpp | 2 +- src/server.cpp | 17 +++++---- src/server.h | 13 ++++--- src/snapshot.cpp | 4 +-- 11 files changed, 130 insertions(+), 34 deletions(-) diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index 19818eb97..23b9af90a 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -2,6 +2,7 @@ void StorageCache::clear() { + std::unique_lock ul(m_lock); if (m_setkeys != nullptr) m_setkeys->clear(); m_spstorage->clear(); @@ -24,6 +25,7 @@ void StorageCache::cacheKey(const char *rgch, size_t cch) bool StorageCache::erase(sds key) { bool result = m_spstorage->erase(key, sdslen(key)); + std::unique_lock ul(m_lock); if (result && m_setkeys != nullptr) { auto itr = m_setkeys->find(sdsview(key)); @@ -35,15 +37,18 @@ bool StorageCache::erase(sds key) void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite) { + std::unique_lock ul(m_lock); if (!fOverwrite && m_setkeys != nullptr) { cacheKey(key); } + ul.unlock(); m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); } const StorageCache *StorageCache::clone() { + std::unique_lock ul(m_lock); // Clones never clone the cache StorageCache *cacheNew = new StorageCache(const_cast(m_spstorage->clone())); return cacheNew; @@ -51,6 +56,7 @@ const StorageCache *StorageCache::clone() void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey) const { + std::unique_lock ul(m_lock); if (m_setkeys != nullptr) { auto itr = m_setkeys->find(sdsview(key)); @@ -59,11 +65,13 @@ void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey if (cachedKey != nullptr) *cachedKey = sdsdupshared(itr->get()); } + ul.unlock(); m_spstorage->retrieve(key, sdslen(key), fn); } size_t StorageCache::count() const { + std::unique_lock ul(m_lock); size_t count = m_spstorage->count(); if (m_setkeys != nullptr) serverAssert(count == m_setkeys->size()); diff --git a/src/StorageCache.h b/src/StorageCache.h index 429795eb8..4fa3c3a08 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -5,6 +5,7 @@ class StorageCache { std::shared_ptr m_spstorage; std::unique_ptr> m_setkeys; + mutable fastlock m_lock {"StorageCache"}; StorageCache(IStorage *storage) : m_spstorage(storage) diff --git a/src/aof.cpp b/src/aof.cpp index d7094e54f..edb008399 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -972,8 +972,8 @@ int loadAppendOnlyFile(char *filename) { loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(false); - g_pserver->db[idb]->commitChanges(); + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); } fclose(fp); freeFakeClient(fakeClient); diff --git a/src/config.cpp b/src/config.cpp index bf9117247..c4b42d1f4 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2181,7 +2181,7 @@ static int updateMaxmemory(long long val, long long prev, const char **err) { if ((unsigned long long)val < used) { serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET (%llu) is smaller than the current memory usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", g_pserver->maxmemory, used); } - freeMemoryIfNeededAndSafe(false /*fPreSnapshot*/); + freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, false /*fPreSnapshot*/); } return 1; } diff --git a/src/db.cpp b/src/db.cpp index 46138f677..2d88ceb73 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2556,7 +2556,7 @@ void redisDbPersistentData::storeDatabase() sdsfree(temp); } -void redisDbPersistentData::processChanges(bool fSnapshot) +bool redisDbPersistentData::processChanges(bool fSnapshot) { serverAssert(GlobalLocksAcquired()); @@ -2565,6 +2565,8 @@ void redisDbPersistentData::processChanges(bool fSnapshot) if (m_spstorage != nullptr) { + if (!m_fAllChanged && m_setchanged.empty() && m_cnewKeysPending == 0) + return false; m_spstorage->beginWriteBatch(); serverAssert(m_pdbSnapshotStorageFlush == nullptr); if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100) @@ -2596,6 +2598,7 @@ void redisDbPersistentData::processChanges(bool fSnapshot) m_setchanged.clear(); m_cnewKeysPending = 0; } + return (m_spstorage != nullptr); } void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) @@ -2698,8 +2701,8 @@ void redisDbPersistentData::removeAllCachedValues() // First we have to flush the tracked changes if (m_fTrackingChanges) { - processChanges(false); - commitChanges(); + if (processChanges(false)) + commitChanges(); trackChanges(false); } @@ -2812,3 +2815,80 @@ int dbnumFromDb(redisDb *db) } serverPanic("invalid database pointer"); } + +void redisDbPersistentData::prefetchKeysAsync(AeLocker &lock, client *c) +{ + if (m_spstorage == nullptr) + return; + + std::vector veckeys; + lock.arm(c); + int numkeys = 0; + int *keys = getKeysFromCommand(c->cmd, c->argv, c->argc, &numkeys); + for (int ikey = 0; ikey < numkeys; ++ikey) + { + robj *objKey = c->argv[keys[ikey]]; + if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr) + veckeys.push_back(objKey); + } + lock.disarm(); + + getKeysFreeResult(keys); + + std::vector>> vecInserts; + for (robj *objKey : veckeys) + { + sds sharedKey = nullptr; + std::unique_ptr spexpire; + robj *o = nullptr; + m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){ + size_t offset = 0; + spexpire = deserializeExpire((sds)szFromObj(objKey), (const char*)data, cb, &offset); + o = deserializeStoredObject(this, szFromObj(objKey), reinterpret_cast(data) + offset, cb - offset); + serverAssert(o != nullptr); + }, &sharedKey); + + if (sharedKey == nullptr) + sharedKey = sdsdupshared(szFromObj(objKey)); + + vecInserts.emplace_back(sharedKey, o, std::move(spexpire)); + } + + lock.arm(c); + for (auto &tuple : vecInserts) + { + sds sharedKey = std::get<0>(tuple); + robj *o = std::get<1>(tuple); + std::unique_ptr spexpire = std::move(std::get<2>(tuple)); + + if (o != nullptr) + { + if (this->find_cached_threadsafe(sharedKey) != nullptr) + { + // While unlocked this was already ensured + decrRefCount(o); + sdsfree(sharedKey); + } + else + { + dictAdd(m_pdict, sharedKey, o); + o->SetFExpires(spexpire != nullptr); + + if (spexpire != nullptr) + { + auto itr = m_setexpire->find(sharedKey); + if (itr != m_setexpire->end()) + m_setexpire->erase(itr); + m_setexpire->insert(std::move(*spexpire)); + serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end()); + } + serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end())); + } + } + else + { + if (sharedKey != nullptr) + sdsfree(sharedKey); // BUG but don't bother crashing + } + } +} \ No newline at end of file diff --git a/src/evict.cpp b/src/evict.cpp index ce9c420c4..21df570d2 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -425,7 +425,7 @@ size_t freeMemoryGetNotCountedMemory(void) { * limit. * (Populated both for C_ERR and C_OK) */ -int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fPreSnapshot) { +int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fQuickCycle, bool fPreSnapshot) { size_t mem_reported, mem_used, mem_tofree; /* Check if we are over the memory usage limit. If we are not, no need @@ -464,7 +464,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev /* Compute how much memory we need to free. */ mem_tofree = mem_used - maxmemory; - if (g_pserver->m_pstorageFactory) + if (g_pserver->m_pstorageFactory && !fQuickCycle) { mem_tofree += static_cast(maxmemory * 0.05); // if we have a storage provider be much more aggressive } @@ -484,7 +484,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev * were over the limit, but the attempt to free memory was successful. * Otehrwise if we are over the memory limit, but not enough memory * was freed to return back under the limit, the function returns C_ERR. */ -int freeMemoryIfNeeded(bool fPreSnapshot) { +int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) { serverAssert(GlobalLocksAcquired()); /* By default replicas should ignore maxmemory @@ -498,12 +498,13 @@ int freeMemoryIfNeeded(bool fPreSnapshot) { const bool fEvictToStorage = !cserver.delete_on_evict && g_pserver->db[0]->FStorageProvider(); int result = C_ERR; int ckeysFailed = 0; + int keys_freed = 0; /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */ if (clientsArePaused()) return C_OK; - if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL,fPreSnapshot) == C_OK) + if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL,fQuickCycle,fPreSnapshot) == C_OK) return C_OK; mem_freed = 0; @@ -513,13 +514,13 @@ int freeMemoryIfNeeded(bool fPreSnapshot) { goto cant_free; /* We need to free memory, but policy forbids. */ while (mem_freed < mem_tofree) { - int j, k, i, keys_freed = 0; + int j, k, i; static unsigned int next_db = 0; sds bestkey = NULL; int bestdbid; redisDb *db; bool fFallback = false; - + if (g_pserver->maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { @@ -737,7 +738,7 @@ cant_free: * - Nor we are loading data right now. * */ -int freeMemoryIfNeededAndSafe(bool fPreSnapshot) { +int freeMemoryIfNeededAndSafe(bool fQuickCycle, bool fPreSnapshot) { if (g_pserver->shutdown_asap || g_pserver->lua_timedout || g_pserver->loading) return C_OK; - return freeMemoryIfNeeded(fPreSnapshot); + return freeMemoryIfNeeded(fQuickCycle, fPreSnapshot); } diff --git a/src/rdb.cpp b/src/rdb.cpp index 2bee255d1..729fd0f59 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2520,11 +2520,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { { for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(false); - g_pserver->db[idb]->commitChanges(); + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); g_pserver->db[idb]->trackChanges(false); } - freeMemoryIfNeeded(false /* fPreSnapshot*/); + freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); } } @@ -2596,8 +2596,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(false); - g_pserver->db[idb]->commitChanges(); + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); } return C_OK; diff --git a/src/scripting.cpp b/src/scripting.cpp index ca56d2718..d517810ba 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -1473,7 +1473,7 @@ void evalGenericCommand(client *c, int evalsha) { int delhook = 0, err; if (g_pserver->m_pstorageFactory != nullptr) - freeMemoryIfNeededAndSafe(true); + freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, true); /* When we replicate whole scripts, we want the same PRNG sequence at * every call so that our PRNG is not affected by external state. */ diff --git a/src/server.cpp b/src/server.cpp index 08f8a178e..52648e7bd 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2014,8 +2014,8 @@ void flushStorageWeak() latencyStartMonitor(storage_process_latency); std::vector vecdb; for (int idb = 0; idb < cserver.dbnum; ++idb) { - vecdb.push_back(g_pserver->db[idb]); - g_pserver->db[idb]->processChanges(true); + if (g_pserver->db[idb]->processChanges(true)) + vecdb.push_back(g_pserver->db[idb]); } latencyEndMonitor(storage_process_latency); latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency); @@ -2453,8 +2453,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { mstime_t storage_process_latency; latencyStartMonitor(storage_process_latency); for (int idb = 0; idb < cserver.dbnum; ++idb) { - vecdb.push_back(g_pserver->db[idb]); - g_pserver->db[idb]->processChanges(false); + if (g_pserver->db[idb]->processChanges(false)) + vecdb.push_back(g_pserver->db[idb]); } latencyEndMonitor(storage_process_latency); latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency); @@ -3936,6 +3936,9 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { return C_OK; } + if (!locker.isArmed()) + c->db->prefetchKeysAsync(locker, c); + /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || @@ -4007,7 +4010,7 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { * propagation of DELs due to eviction. */ if (g_pserver->maxmemory && !g_pserver->lua_timedout) { locker.arm(c); - int out_of_memory = freeMemoryIfNeededAndSafe(false /*fPreSnapshot*/) == C_ERR; + int out_of_memory = freeMemoryIfNeededAndSafe(true /*fQuickCycle*/, false /*fPreSnapshot*/) == C_ERR; /* freeMemoryIfNeeded may flush replica output buffers. This may result * into a replica, that may be the active client, to be freed. */ if (serverTL->current_client == NULL) return C_ERR; @@ -4268,8 +4271,8 @@ int prepareForShutdown(int flags) { // Also Dump To FLASH if Applicable for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(false); - g_pserver->db[idb]->commitChanges(); + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); } } diff --git a/src/server.h b/src/server.h index 4a5073461..86dcb872f 100644 --- a/src/server.h +++ b/src/server.h @@ -1329,7 +1329,7 @@ public: // to allow you to release the global lock before commiting. To prevent deadlocks you *must* // either release the global lock or keep the same global lock between the two functions as // a second look is kept to ensure writes to secondary storage are ordered - void processChanges(bool fSnapshot); + bool processChanges(bool fSnapshot); void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr); // This should only be used if you look at the key, we do not fixup @@ -1350,6 +1350,8 @@ public: bool removeCachedValue(const char *key); void removeAllCachedValues(); + void prefetchKeysAsync(class AeLocker &locker, client *c); + bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } dict_iter find_cached_threadsafe(const char *key) const; @@ -1462,7 +1464,7 @@ struct redisDb : public redisDbPersistentDataSnapshot friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e); friend int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); friend void activeDefragCycle(void); - friend int freeMemoryIfNeeded(bool); + friend int freeMemoryIfNeeded(bool, bool); friend void activeExpireCycle(int); friend void expireSlaveKeys(void); @@ -1518,6 +1520,7 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::removeAllCachedValues; using redisDbPersistentData::dictUnsafeKeyOnly; using redisDbPersistentData::resortExpire; + using redisDbPersistentData::prefetchKeysAsync; public: expireset::setiter expireitr; @@ -2997,10 +3000,10 @@ int zslLexValueGteMin(sds value, zlexrangespec *spec); int zslLexValueLteMax(sds value, zlexrangespec *spec); /* Core functions */ -int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fPreSnapshot=false); +int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fQuickCycle = false, bool fPreSnapshot=false); size_t freeMemoryGetNotCountedMemory(); -int freeMemoryIfNeeded(bool fPreSnapshot); -int freeMemoryIfNeededAndSafe(bool fPreSnapshot); +int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot); +int freeMemoryIfNeededAndSafe(bool fQuickCycle, bool fPreSnapshot); int processCommand(client *c, int callFlags, class AeLocker &locker); void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 4734b84b3..e436a608d 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -8,7 +8,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 serverAssert(GlobalLocksAcquired()); serverAssert(m_refCount == 0); // do not call this on a snapshot - if (freeMemoryIfNeededAndSafe(true /*fPreSnapshot*/) != C_OK && fOptional) + if (freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, true /*fPreSnapshot*/) != C_OK && fOptional) return nullptr; // can't create snapshot due to OOM int levels = 1; @@ -369,7 +369,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn latencyEndMonitor(latency_endsnapshot); latencyAddSampleIfNeeded("end-mvcc-snapshot", latency_endsnapshot); - freeMemoryIfNeededAndSafe(false); + freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, false); } dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOnly) const From 4037ee98a4f5a27599aab2b7375b33da0e34fab6 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Aug 2020 05:01:36 +0000 Subject: [PATCH 7/8] Fix assert caused by freeTombstoneObjects and null check in consolidate_children Former-commit-id: 8565a02b331cd2bba2a1c7c6693dfb3f6e61c845 --- src/snapshot.cpp | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/src/snapshot.cpp b/src/snapshot.cpp index e436a608d..0ad1b5282 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -238,15 +238,14 @@ void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; - size_t freed = 0; while ((de = dictNext(di)) != nullptr) { dictEntry *deObj = dictFind(m_pdbSnapshot->m_pdict, dictGetKey(de)); if (deObj != nullptr && dictGetVal(deObj) != nullptr) { decrRefCount((robj*)dictGetVal(deObj)); - deObj->v.val = nullptr; - ++freed; + void *ptrSet = nullptr; + __atomic_store(&deObj->v.val, &ptrSet, __ATOMIC_RELAXED); } } dictReleaseIterator(di); @@ -465,7 +464,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functioniterate_threadsafe([&](const char *key, robj_roptr o) { - if (o != nullptr) { + if (o != nullptr || !m_spstorage) { dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast()); - incrRefCount(o); + if (o != nullptr) { + incrRefCount(o); + } } else { ++skipped; } From 0cb8d4ca633200afc15f2c568b21ad1ab2a8fbda Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Aug 2020 05:45:56 +0000 Subject: [PATCH 8/8] MVCC Perf fixes Former-commit-id: 5a4afe5fb4231bec34d434f9e3214a7320842091 --- src/db.cpp | 9 ++++++++- src/storage.h | 2 -- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 2d88ceb73..b7b36b518 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -101,7 +101,12 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) { } static robj_roptr lookupKeyConst(redisDb *db, robj *key, int flags) { serverAssert((flags & LOOKUP_UPDATEMVCC) == 0); - robj_roptr val = db->find(szFromObj(key)); + robj_roptr val; + if (g_pserver->m_pstorageFactory) + val = db->find(szFromObj(key)).val(); + else + val = db->find_cached_threadsafe(szFromObj(key)).val(); + if (val != nullptr) { lookupKeyUpdateObj(val.unsafe_robjcast(), flags); return val; @@ -2333,10 +2338,12 @@ bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew) serverAssert(FImplies(fAssumeNew, res == DICT_OK)); if (res == DICT_OK) { +#ifdef CHECKED_BUILD if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(key) != nullptr) { serverAssert(dictFind(m_pdictTombstone, key) != nullptr); } +#endif trackkey(key, false /* fUpdate */); } return (res == DICT_OK); diff --git a/src/storage.h b/src/storage.h index e9106aca2..2d46c0223 100644 --- a/src/storage.h +++ b/src/storage.h @@ -1,8 +1,6 @@ #ifndef __STORAGE_H__ #define __STORAGE_H__ -#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 // Note: also defined in object.c - should always match - #ifdef __cplusplus extern "C" { #endif