From 848057ff19c829ece71c385e522b11b96c68b2ee Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 9 Aug 2020 23:36:20 +0000 Subject: [PATCH] RocksDB Read Performance Improvements Former-commit-id: 80cca4869888e048e10e11f1f20796c482c3e5b3 --- src/StorageCache.cpp | 8 +++++ src/StorageCache.h | 1 + src/aof.cpp | 4 +-- src/config.cpp | 2 +- src/db.cpp | 86 ++++++++++++++++++++++++++++++++++++++++++-- src/evict.cpp | 17 ++++----- src/rdb.cpp | 10 +++--- src/scripting.cpp | 2 +- src/server.cpp | 17 +++++---- src/server.h | 13 ++++--- src/snapshot.cpp | 4 +-- 11 files changed, 130 insertions(+), 34 deletions(-) diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index 19818eb97..23b9af90a 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -2,6 +2,7 @@ void StorageCache::clear() { + std::unique_lock ul(m_lock); if (m_setkeys != nullptr) m_setkeys->clear(); m_spstorage->clear(); @@ -24,6 +25,7 @@ void StorageCache::cacheKey(const char *rgch, size_t cch) bool StorageCache::erase(sds key) { bool result = m_spstorage->erase(key, sdslen(key)); + std::unique_lock ul(m_lock); if (result && m_setkeys != nullptr) { auto itr = m_setkeys->find(sdsview(key)); @@ -35,15 +37,18 @@ bool StorageCache::erase(sds key) void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwrite) { + std::unique_lock ul(m_lock); if (!fOverwrite && m_setkeys != nullptr) { cacheKey(key); } + ul.unlock(); m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); } const StorageCache *StorageCache::clone() { + std::unique_lock ul(m_lock); // Clones never clone the cache StorageCache *cacheNew = new StorageCache(const_cast(m_spstorage->clone())); return cacheNew; @@ -51,6 +56,7 @@ const StorageCache *StorageCache::clone() void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey) const { + std::unique_lock ul(m_lock); if (m_setkeys != nullptr) { auto itr = m_setkeys->find(sdsview(key)); @@ -59,11 +65,13 @@ void StorageCache::retrieve(sds key, IStorage::callbackSingle fn, sds *cachedKey if (cachedKey != nullptr) *cachedKey = sdsdupshared(itr->get()); } + ul.unlock(); m_spstorage->retrieve(key, sdslen(key), fn); } size_t StorageCache::count() const { + std::unique_lock ul(m_lock); size_t count = m_spstorage->count(); if (m_setkeys != nullptr) serverAssert(count == m_setkeys->size()); diff --git a/src/StorageCache.h b/src/StorageCache.h index 429795eb8..4fa3c3a08 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -5,6 +5,7 @@ class StorageCache { std::shared_ptr m_spstorage; std::unique_ptr> m_setkeys; + mutable fastlock m_lock {"StorageCache"}; StorageCache(IStorage *storage) : m_spstorage(storage) diff --git a/src/aof.cpp b/src/aof.cpp index d7094e54f..edb008399 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -972,8 +972,8 @@ int loadAppendOnlyFile(char *filename) { loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(false); - g_pserver->db[idb]->commitChanges(); + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); } fclose(fp); freeFakeClient(fakeClient); diff --git a/src/config.cpp b/src/config.cpp index bf9117247..c4b42d1f4 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2181,7 +2181,7 @@ static int updateMaxmemory(long long val, long long prev, const char **err) { if ((unsigned long long)val < used) { serverLog(LL_WARNING,"WARNING: the new maxmemory value set via CONFIG SET (%llu) is smaller than the current memory usage (%zu). This will result in key eviction and/or the inability to accept new write commands depending on the maxmemory-policy.", g_pserver->maxmemory, used); } - freeMemoryIfNeededAndSafe(false /*fPreSnapshot*/); + freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, false /*fPreSnapshot*/); } return 1; } diff --git a/src/db.cpp b/src/db.cpp index 46138f677..2d88ceb73 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2556,7 +2556,7 @@ void redisDbPersistentData::storeDatabase() sdsfree(temp); } -void redisDbPersistentData::processChanges(bool fSnapshot) +bool redisDbPersistentData::processChanges(bool fSnapshot) { serverAssert(GlobalLocksAcquired()); @@ -2565,6 +2565,8 @@ void redisDbPersistentData::processChanges(bool fSnapshot) if (m_spstorage != nullptr) { + if (!m_fAllChanged && m_setchanged.empty() && m_cnewKeysPending == 0) + return false; m_spstorage->beginWriteBatch(); serverAssert(m_pdbSnapshotStorageFlush == nullptr); if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100) @@ -2596,6 +2598,7 @@ void redisDbPersistentData::processChanges(bool fSnapshot) m_setchanged.clear(); m_cnewKeysPending = 0; } + return (m_spstorage != nullptr); } void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) @@ -2698,8 +2701,8 @@ void redisDbPersistentData::removeAllCachedValues() // First we have to flush the tracked changes if (m_fTrackingChanges) { - processChanges(false); - commitChanges(); + if (processChanges(false)) + commitChanges(); trackChanges(false); } @@ -2812,3 +2815,80 @@ int dbnumFromDb(redisDb *db) } serverPanic("invalid database pointer"); } + +void redisDbPersistentData::prefetchKeysAsync(AeLocker &lock, client *c) +{ + if (m_spstorage == nullptr) + return; + + std::vector veckeys; + lock.arm(c); + int numkeys = 0; + int *keys = getKeysFromCommand(c->cmd, c->argv, c->argc, &numkeys); + for (int ikey = 0; ikey < numkeys; ++ikey) + { + robj *objKey = c->argv[keys[ikey]]; + if (this->find_cached_threadsafe(szFromObj(objKey)) == nullptr) + veckeys.push_back(objKey); + } + lock.disarm(); + + getKeysFreeResult(keys); + + std::vector>> vecInserts; + for (robj *objKey : veckeys) + { + sds sharedKey = nullptr; + std::unique_ptr spexpire; + robj *o = nullptr; + m_spstorage->retrieve((sds)szFromObj(objKey), [&](const char *, size_t, const void *data, size_t cb){ + size_t offset = 0; + spexpire = deserializeExpire((sds)szFromObj(objKey), (const char*)data, cb, &offset); + o = deserializeStoredObject(this, szFromObj(objKey), reinterpret_cast(data) + offset, cb - offset); + serverAssert(o != nullptr); + }, &sharedKey); + + if (sharedKey == nullptr) + sharedKey = sdsdupshared(szFromObj(objKey)); + + vecInserts.emplace_back(sharedKey, o, std::move(spexpire)); + } + + lock.arm(c); + for (auto &tuple : vecInserts) + { + sds sharedKey = std::get<0>(tuple); + robj *o = std::get<1>(tuple); + std::unique_ptr spexpire = std::move(std::get<2>(tuple)); + + if (o != nullptr) + { + if (this->find_cached_threadsafe(sharedKey) != nullptr) + { + // While unlocked this was already ensured + decrRefCount(o); + sdsfree(sharedKey); + } + else + { + dictAdd(m_pdict, sharedKey, o); + o->SetFExpires(spexpire != nullptr); + + if (spexpire != nullptr) + { + auto itr = m_setexpire->find(sharedKey); + if (itr != m_setexpire->end()) + m_setexpire->erase(itr); + m_setexpire->insert(std::move(*spexpire)); + serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end()); + } + serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end())); + } + } + else + { + if (sharedKey != nullptr) + sdsfree(sharedKey); // BUG but don't bother crashing + } + } +} \ No newline at end of file diff --git a/src/evict.cpp b/src/evict.cpp index ce9c420c4..21df570d2 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -425,7 +425,7 @@ size_t freeMemoryGetNotCountedMemory(void) { * limit. * (Populated both for C_ERR and C_OK) */ -int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fPreSnapshot) { +int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fQuickCycle, bool fPreSnapshot) { size_t mem_reported, mem_used, mem_tofree; /* Check if we are over the memory usage limit. If we are not, no need @@ -464,7 +464,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev /* Compute how much memory we need to free. */ mem_tofree = mem_used - maxmemory; - if (g_pserver->m_pstorageFactory) + if (g_pserver->m_pstorageFactory && !fQuickCycle) { mem_tofree += static_cast(maxmemory * 0.05); // if we have a storage provider be much more aggressive } @@ -484,7 +484,7 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev * were over the limit, but the attempt to free memory was successful. * Otehrwise if we are over the memory limit, but not enough memory * was freed to return back under the limit, the function returns C_ERR. */ -int freeMemoryIfNeeded(bool fPreSnapshot) { +int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) { serverAssert(GlobalLocksAcquired()); /* By default replicas should ignore maxmemory @@ -498,12 +498,13 @@ int freeMemoryIfNeeded(bool fPreSnapshot) { const bool fEvictToStorage = !cserver.delete_on_evict && g_pserver->db[0]->FStorageProvider(); int result = C_ERR; int ckeysFailed = 0; + int keys_freed = 0; /* When clients are paused the dataset should be static not just from the * POV of clients not being able to write, but also from the POV of * expires and evictions of keys not being performed. */ if (clientsArePaused()) return C_OK; - if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL,fPreSnapshot) == C_OK) + if (getMaxmemoryState(&mem_reported,NULL,&mem_tofree,NULL,fQuickCycle,fPreSnapshot) == C_OK) return C_OK; mem_freed = 0; @@ -513,13 +514,13 @@ int freeMemoryIfNeeded(bool fPreSnapshot) { goto cant_free; /* We need to free memory, but policy forbids. */ while (mem_freed < mem_tofree) { - int j, k, i, keys_freed = 0; + int j, k, i; static unsigned int next_db = 0; sds bestkey = NULL; int bestdbid; redisDb *db; bool fFallback = false; - + if (g_pserver->maxmemory_policy & (MAXMEMORY_FLAG_LRU|MAXMEMORY_FLAG_LFU) || g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_TTL) { @@ -737,7 +738,7 @@ cant_free: * - Nor we are loading data right now. * */ -int freeMemoryIfNeededAndSafe(bool fPreSnapshot) { +int freeMemoryIfNeededAndSafe(bool fQuickCycle, bool fPreSnapshot) { if (g_pserver->shutdown_asap || g_pserver->lua_timedout || g_pserver->loading) return C_OK; - return freeMemoryIfNeeded(fPreSnapshot); + return freeMemoryIfNeeded(fQuickCycle, fPreSnapshot); } diff --git a/src/rdb.cpp b/src/rdb.cpp index 2bee255d1..729fd0f59 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2520,11 +2520,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { { for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(false); - g_pserver->db[idb]->commitChanges(); + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); g_pserver->db[idb]->trackChanges(false); } - freeMemoryIfNeeded(false /* fPreSnapshot*/); + freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); } } @@ -2596,8 +2596,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(false); - g_pserver->db[idb]->commitChanges(); + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); } return C_OK; diff --git a/src/scripting.cpp b/src/scripting.cpp index ca56d2718..d517810ba 100644 --- a/src/scripting.cpp +++ b/src/scripting.cpp @@ -1473,7 +1473,7 @@ void evalGenericCommand(client *c, int evalsha) { int delhook = 0, err; if (g_pserver->m_pstorageFactory != nullptr) - freeMemoryIfNeededAndSafe(true); + freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, true); /* When we replicate whole scripts, we want the same PRNG sequence at * every call so that our PRNG is not affected by external state. */ diff --git a/src/server.cpp b/src/server.cpp index 08f8a178e..52648e7bd 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2014,8 +2014,8 @@ void flushStorageWeak() latencyStartMonitor(storage_process_latency); std::vector vecdb; for (int idb = 0; idb < cserver.dbnum; ++idb) { - vecdb.push_back(g_pserver->db[idb]); - g_pserver->db[idb]->processChanges(true); + if (g_pserver->db[idb]->processChanges(true)) + vecdb.push_back(g_pserver->db[idb]); } latencyEndMonitor(storage_process_latency); latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency); @@ -2453,8 +2453,8 @@ void beforeSleep(struct aeEventLoop *eventLoop) { mstime_t storage_process_latency; latencyStartMonitor(storage_process_latency); for (int idb = 0; idb < cserver.dbnum; ++idb) { - vecdb.push_back(g_pserver->db[idb]); - g_pserver->db[idb]->processChanges(false); + if (g_pserver->db[idb]->processChanges(false)) + vecdb.push_back(g_pserver->db[idb]); } latencyEndMonitor(storage_process_latency); latencyAddSampleIfNeeded("storage-process-changes", storage_process_latency); @@ -3936,6 +3936,9 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { return C_OK; } + if (!locker.isArmed()) + c->db->prefetchKeysAsync(locker, c); + /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || @@ -4007,7 +4010,7 @@ int processCommand(client *c, int callFlags, AeLocker &locker) { * propagation of DELs due to eviction. */ if (g_pserver->maxmemory && !g_pserver->lua_timedout) { locker.arm(c); - int out_of_memory = freeMemoryIfNeededAndSafe(false /*fPreSnapshot*/) == C_ERR; + int out_of_memory = freeMemoryIfNeededAndSafe(true /*fQuickCycle*/, false /*fPreSnapshot*/) == C_ERR; /* freeMemoryIfNeeded may flush replica output buffers. This may result * into a replica, that may be the active client, to be freed. */ if (serverTL->current_client == NULL) return C_ERR; @@ -4268,8 +4271,8 @@ int prepareForShutdown(int flags) { // Also Dump To FLASH if Applicable for (int idb = 0; idb < cserver.dbnum; ++idb) { - g_pserver->db[idb]->processChanges(false); - g_pserver->db[idb]->commitChanges(); + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); } } diff --git a/src/server.h b/src/server.h index 4a5073461..86dcb872f 100644 --- a/src/server.h +++ b/src/server.h @@ -1329,7 +1329,7 @@ public: // to allow you to release the global lock before commiting. To prevent deadlocks you *must* // either release the global lock or keep the same global lock between the two functions as // a second look is kept to ensure writes to secondary storage are ordered - void processChanges(bool fSnapshot); + bool processChanges(bool fSnapshot); void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr); // This should only be used if you look at the key, we do not fixup @@ -1350,6 +1350,8 @@ public: bool removeCachedValue(const char *key); void removeAllCachedValues(); + void prefetchKeysAsync(class AeLocker &locker, client *c); + bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } dict_iter find_cached_threadsafe(const char *key) const; @@ -1462,7 +1464,7 @@ struct redisDb : public redisDbPersistentDataSnapshot friend void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e); friend int evictionPoolPopulate(int dbid, redisDb *db, expireset *setexpire, struct evictionPoolEntry *pool); friend void activeDefragCycle(void); - friend int freeMemoryIfNeeded(bool); + friend int freeMemoryIfNeeded(bool, bool); friend void activeExpireCycle(int); friend void expireSlaveKeys(void); @@ -1518,6 +1520,7 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::removeAllCachedValues; using redisDbPersistentData::dictUnsafeKeyOnly; using redisDbPersistentData::resortExpire; + using redisDbPersistentData::prefetchKeysAsync; public: expireset::setiter expireitr; @@ -2997,10 +3000,10 @@ int zslLexValueGteMin(sds value, zlexrangespec *spec); int zslLexValueLteMax(sds value, zlexrangespec *spec); /* Core functions */ -int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fPreSnapshot=false); +int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *level, bool fQuickCycle = false, bool fPreSnapshot=false); size_t freeMemoryGetNotCountedMemory(); -int freeMemoryIfNeeded(bool fPreSnapshot); -int freeMemoryIfNeededAndSafe(bool fPreSnapshot); +int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot); +int freeMemoryIfNeededAndSafe(bool fQuickCycle, bool fPreSnapshot); int processCommand(client *c, int callFlags, class AeLocker &locker); void setupSignalHandlers(void); struct redisCommand *lookupCommand(sds name); diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 4734b84b3..e436a608d 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -8,7 +8,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 serverAssert(GlobalLocksAcquired()); serverAssert(m_refCount == 0); // do not call this on a snapshot - if (freeMemoryIfNeededAndSafe(true /*fPreSnapshot*/) != C_OK && fOptional) + if (freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, true /*fPreSnapshot*/) != C_OK && fOptional) return nullptr; // can't create snapshot due to OOM int levels = 1; @@ -369,7 +369,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn latencyEndMonitor(latency_endsnapshot); latencyAddSampleIfNeeded("end-mvcc-snapshot", latency_endsnapshot); - freeMemoryIfNeededAndSafe(false); + freeMemoryIfNeededAndSafe(false /*fQuickCycle*/, false); } dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOnly) const