From 60f08d5f93837b927220bd6361d0eafe9f28ff4c Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 6 Nov 2021 15:49:58 +0000 Subject: [PATCH] Smooth out performance fluctuations caused by variance in the rehash calibration Former-commit-id: 09580dedfef09deace7863bf68ba7e0f9edf3eb3 --- src/dict.cpp | 2 +- src/server.cpp | 55 +++++++++++++++++++++++--------------------------- 2 files changed, 26 insertions(+), 31 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index 8f8262751..e10567c8d 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -408,7 +408,7 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata); - int empty_visits = buckets; + int empty_visits = buckets*10; while (d->asyncdata->queue.size() < (size_t)buckets && (size_t)d->rehashidx < d->ht[0].size) { dictEntry *de; diff --git a/src/server.cpp b/src/server.cpp index 9fb92939f..0dcae55b0 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1705,12 +1705,12 @@ void tryResizeHashTables(int dbid) { * is returned. */ int redisDbPersistentData::incrementallyRehash() { /* Keys dictionary */ - if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) { - int result = dictRehashMilliseconds(m_pdict,1); - result += dictRehashMilliseconds(m_pdictTombstone,1); - return result; /* already used our millisecond for this loop... */ - } - return 0; + int result = 0; + if (dictIsRehashing(m_pdict)) + result += dictRehashMilliseconds(m_pdict,1); + if (dictIsRehashing(m_pdictTombstone)) + dictRehashMilliseconds(m_pdictTombstone,1); // don't count this + return result; /* already used our millisecond for this loop... */ } /* This function is called once a background process of some kind terminates, @@ -2112,9 +2112,15 @@ void databasesCron(bool fMainThread) { if (g_pserver->activerehashing) { for (j = 0; j < dbs_per_call; j++) { if (serverTL->rehashCtl != nullptr) { - if (dictRehashSomeAsync(serverTL->rehashCtl, rehashes_per_ms)) { - break; - } + if (!serverTL->rehashCtl->done.load(std::memory_order_relaxed)) { + aeReleaseLock(); + if (dictRehashSomeAsync(serverTL->rehashCtl, rehashes_per_ms)) { + aeAcquireLock(); + break; + } + aeAcquireLock(); + } + dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/); serverTL->rehashCtl = nullptr; } @@ -2124,22 +2130,27 @@ void databasesCron(bool fMainThread) { /* Are we async rehashing? And if so is it time to re-calibrate? */ /* The recalibration limit is a prime number to ensure balancing across threads */ if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1 && dictSize(dict) > 2048 && dictIsRehashing(dict) && !g_pserver->loading) { - serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms); - ++async_rehashes; + serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms * ((1000 / g_pserver->hz) / 10)); // Estimate 10% CPU time spent in lock contention + if (serverTL->rehashCtl) + ++async_rehashes; } if (serverTL->rehashCtl) break; - + // Before starting anything new, can we end the rehash of a blocked thread? - if (dict->asyncdata != nullptr) { + while (dict->asyncdata != nullptr) { auto asyncdata = dict->asyncdata; if (asyncdata->done) { dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer serverAssert(dict->asyncdata != asyncdata); - break; // completion can be expensive, don't do anything else + } else { + break; } } + if (dict->asyncdata) + break; + rehashes_per_ms = g_pserver->db[rehash_db]->incrementallyRehash(); async_rehashes = 0; if (rehashes_per_ms > 0) { @@ -2360,14 +2371,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { UNUSED(id); UNUSED(clientData); - if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) { - aeReleaseLock(); - // If there is not enough lock contention we may not have made enough progress on the async - // rehash. Ensure we finish it outside the lock. - dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size()); - aeAcquireLock(); - } - if (g_pserver->maxmemory && g_pserver->m_pstorageFactory) performEvictions(false); @@ -2657,14 +2660,6 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData UNUSED(id); UNUSED(clientData); - if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) { - aeReleaseLock(); - // If there is not enough lock contention we may not have made enough progress on the async - // rehash. Ensure we finish it outside the lock. - dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size()); - aeAcquireLock(); - } - if (g_pserver->maxmemory && g_pserver->m_pstorageFactory) performEvictions(false);