From c6fc1bcfe3923ba8c84c07cfbf051eac98d49546 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 23 Mar 2021 03:44:20 +0000 Subject: [PATCH 1/4] Perform GET command inline Former-commit-id: 5623936d99e334ab103f3dc1541b145c125d0ee8 --- src/db.cpp | 74 +++++++++++++++++++++++++++------------------- src/networking.cpp | 8 +++-- src/server.cpp | 6 ++++ src/server.h | 2 +- 4 files changed, 55 insertions(+), 35 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 20b70fe02..70fd2d1dc 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2998,10 +2998,10 @@ int dbnumFromDb(redisDb *db) serverPanic("invalid database pointer"); } -void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command) +bool redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command, bool fExecOK) { if (m_spstorage == nullptr) - return; + return false; AeLocker lock; @@ -3010,7 +3010,7 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command getKeysResult result = GETKEYS_RESULT_INIT; auto cmd = lookupCommand(szFromObj(command.argv[0])); if (cmd == nullptr) - return; // Bad command? It's not for us to judge, just bail + return false; // Bad command? It's not for us to judge, just bail int numkeys = getKeysFromCommand(cmd, command.argv, command.argc, &result); for (int ikey = 0; ikey < numkeys; ++ikey) { @@ -3042,41 +3042,53 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command } } - lock.arm(c); - for (auto &tuple : vecInserts) - { - sds sharedKey = std::get<0>(tuple); - robj *o = std::get<1>(tuple); - std::unique_ptr spexpire = std::move(std::get<2>(tuple)); - - if (o != nullptr) + if (!vecInserts.empty()) { + lock.arm(c); + for (auto &tuple : vecInserts) { - if (this->find_cached_threadsafe(sharedKey) != nullptr) + sds sharedKey = std::get<0>(tuple); + robj *o = std::get<1>(tuple); + std::unique_ptr spexpire = std::move(std::get<2>(tuple)); + + if (o != nullptr) { - // While unlocked this was already ensured - decrRefCount(o); - sdsfree(sharedKey); + if (this->find_cached_threadsafe(sharedKey) != nullptr) + { + // While unlocked this was already ensured + decrRefCount(o); + sdsfree(sharedKey); + } + else + { + dictAdd(m_pdict, sharedKey, o); + o->SetFExpires(spexpire != nullptr); + + if (spexpire != nullptr) + { + auto itr = m_setexpire->find(sharedKey); + if (itr != m_setexpire->end()) + m_setexpire->erase(itr); + m_setexpire->insert(std::move(*spexpire)); + serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end()); + } + serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end())); + } } else { - dictAdd(m_pdict, sharedKey, o); - o->SetFExpires(spexpire != nullptr); - - if (spexpire != nullptr) - { - auto itr = m_setexpire->find(sharedKey); - if (itr != m_setexpire->end()) - m_setexpire->erase(itr); - m_setexpire->insert(std::move(*spexpire)); - serverAssert(m_setexpire->find(sharedKey) != m_setexpire->end()); - } - serverAssert(o->FExpires() == (m_setexpire->find(sharedKey) != m_setexpire->end())); + if (sharedKey != nullptr) + sdsfree(sharedKey); // BUG but don't bother crashing } } - else - { - if (sharedKey != nullptr) - sdsfree(sharedKey); // BUG but don't bother crashing + lock.disarm(); + } + + if (fExecOK && cmd->proc == getCommand && !vecInserts.empty()) { + robj *o = std::get<1>(vecInserts[0]); + if (o != nullptr) { + addReplyBulk(c, o); + return true; } } + return false; } \ No newline at end of file diff --git a/src/networking.cpp b/src/networking.cpp index 15aa6f43a..e9959c263 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2343,7 +2343,7 @@ void parseClientCommandBuffer(client *c) { } } - size_t cqueries = c->vecqueuedcmd.size(); + size_t cqueriesStart = c->vecqueuedcmd.size(); if (c->reqtype == PROTO_REQ_INLINE) { if (processInlineBuffer(c) != C_OK) break; } else if (c->reqtype == PROTO_REQ_MULTIBULK) { @@ -2359,10 +2359,12 @@ void parseClientCommandBuffer(client *c) { } /* Prefetch if we have a storage provider and we're not in the global lock */ - if (cqueries < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) { + if (cqueriesStart < c->vecqueuedcmd.size() && g_pserver->m_pstorageFactory != nullptr && !GlobalLocksAcquired()) { auto &query = c->vecqueuedcmd.back(); if (query.argc > 0 && query.argc == query.argcMax) { - c->db->prefetchKeysAsync(c, query); + if (c->db->prefetchKeysAsync(c, query, c->vecqueuedcmd.size() == 1)) { + c->vecqueuedcmd.erase(c->vecqueuedcmd.begin()); + } } } c->reqtype = 0; diff --git a/src/server.cpp b/src/server.cpp index 96d62e4ea..814909bf6 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2168,6 +2168,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { aeAcquireLock(); } + if (g_pserver->maxmemory && g_pserver->m_pstorageFactory) + freeMemoryIfNeededAndSafe(false, false); + /* If another threads unblocked one of our clients, and this thread has been idle then beforeSleep won't have a chance to process the unblocking. So we also process them here in the cron job to ensure they don't starve. @@ -2455,6 +2458,9 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData aeAcquireLock(); } + if (g_pserver->maxmemory && g_pserver->m_pstorageFactory) + freeMemoryIfNeededAndSafe(false, false); + int iel = ielFromEventLoop(eventLoop); serverAssert(iel != IDX_EVENT_LOOP_MAIN); diff --git a/src/server.h b/src/server.h index 07f025c46..9ffccd6a0 100644 --- a/src/server.h +++ b/src/server.h @@ -1134,7 +1134,7 @@ public: bool removeCachedValue(const char *key); void removeAllCachedValues(); - void prefetchKeysAsync(client *c, struct parsed_command &command); + bool prefetchKeysAsync(client *c, struct parsed_command &command, bool fExecOK); bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } From 18da2dd0919315cb3414d56e7882e9925458010d Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 24 Mar 2021 19:58:51 +0000 Subject: [PATCH 2/4] Fix bug where we skip valid dict elements in dictGetRandomKey Former-commit-id: 291a3610a679cb1d17caadf6ab067cad41885935 --- src/dict.cpp | 11 +++++++++-- src/dict.h | 1 + 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index c682e2ec9..831a32f8f 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -373,6 +373,7 @@ int dictRehash(dict *d, int n) { dictAsyncRehashCtl::dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) { queue.reserve(c_targetQueueSize); __atomic_fetch_add(&d->refcount, 1, __ATOMIC_RELEASE); + this->rehashIdxBase = d->rehashidx; } dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { @@ -931,12 +932,18 @@ dictEntry *dictGetRandomKey(dict *d) if (dictSize(d) == 0) return NULL; if (dictIsRehashing(d)) _dictRehashStep(d); if (dictIsRehashing(d)) { + long rehashidx = d->rehashidx; + auto async = d->asyncdata; + while (async != nullptr) { + rehashidx = std::min((long)async->rehashIdxBase, rehashidx); + async = async->next; + } do { /* We are sure there are no elements in indexes from 0 * to rehashidx-1 */ - h = d->rehashidx + (random() % (d->ht[0].size + + h = rehashidx + (random() % (d->ht[0].size + d->ht[1].size - - d->rehashidx)); + rehashidx)); he = (h >= d->ht[0].size) ? d->ht[1].table[h - d->ht[0].size] : d->ht[0].table[h]; } while(he == NULL); diff --git a/src/dict.h b/src/dict.h index ab57a7d7f..f24108d32 100644 --- a/src/dict.h +++ b/src/dict.h @@ -100,6 +100,7 @@ struct dictAsyncRehashCtl { struct dict *dict = nullptr; std::vector queue; size_t hashIdx = 0; + long rehashIdxBase; dictAsyncRehashCtl *next = nullptr; std::atomic done { false }; std::atomic abondon { false }; From fa244c930e37a949a3d010b88207191760ceb87b Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 24 Mar 2021 20:12:43 +0000 Subject: [PATCH 3/4] Bump RocksDB version for better perf Former-commit-id: ab4ae61b9c54b3c28dc5fd775d0df3d377c4846a From 54fb01e24a6725c3d8832cdd5a440c015df89d5e Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 24 Mar 2021 20:13:42 +0000 Subject: [PATCH 4/4] Don't run code in evict unless we really have to Former-commit-id: b665b1c2b2df96883a6e2237f7bf3f9b1bec2a89 --- src/evict.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/evict.cpp b/src/evict.cpp index 31cadeae5..887b100b9 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -587,7 +587,7 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) { /* volatile-random and allkeys-random policy */ if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM || g_pserver->maxmemory_policy == MAXMEMORY_VOLATILE_RANDOM - || fEvictToStorage) + || fFallback) { /* When evicting a random key, we try to evict a key for * each DB, so we use the static 'next_db' variable to