diff --git a/src/dict.cpp b/src/dict.cpp index 8f8262751..e10567c8d 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -408,7 +408,7 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata); - int empty_visits = buckets; + int empty_visits = buckets*10; while (d->asyncdata->queue.size() < (size_t)buckets && (size_t)d->rehashidx < d->ht[0].size) { dictEntry *de; diff --git a/src/server.cpp b/src/server.cpp index 9fb92939f..0dcae55b0 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1705,12 +1705,12 @@ void tryResizeHashTables(int dbid) { * is returned. */ int redisDbPersistentData::incrementallyRehash() { /* Keys dictionary */ - if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) { - int result = dictRehashMilliseconds(m_pdict,1); - result += dictRehashMilliseconds(m_pdictTombstone,1); - return result; /* already used our millisecond for this loop... */ - } - return 0; + int result = 0; + if (dictIsRehashing(m_pdict)) + result += dictRehashMilliseconds(m_pdict,1); + if (dictIsRehashing(m_pdictTombstone)) + dictRehashMilliseconds(m_pdictTombstone,1); // don't count this + return result; /* already used our millisecond for this loop... */ } /* This function is called once a background process of some kind terminates, @@ -2112,9 +2112,15 @@ void databasesCron(bool fMainThread) { if (g_pserver->activerehashing) { for (j = 0; j < dbs_per_call; j++) { if (serverTL->rehashCtl != nullptr) { - if (dictRehashSomeAsync(serverTL->rehashCtl, rehashes_per_ms)) { - break; - } + if (!serverTL->rehashCtl->done.load(std::memory_order_relaxed)) { + aeReleaseLock(); + if (dictRehashSomeAsync(serverTL->rehashCtl, rehashes_per_ms)) { + aeAcquireLock(); + break; + } + aeAcquireLock(); + } + dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/); serverTL->rehashCtl = nullptr; } @@ -2124,22 +2130,27 @@ void databasesCron(bool fMainThread) { /* Are we async rehashing? And if so is it time to re-calibrate? */ /* The recalibration limit is a prime number to ensure balancing across threads */ if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1 && dictSize(dict) > 2048 && dictIsRehashing(dict) && !g_pserver->loading) { - serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms); - ++async_rehashes; + serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms * ((1000 / g_pserver->hz) / 10)); // Estimate 10% CPU time spent in lock contention + if (serverTL->rehashCtl) + ++async_rehashes; } if (serverTL->rehashCtl) break; - + // Before starting anything new, can we end the rehash of a blocked thread? - if (dict->asyncdata != nullptr) { + while (dict->asyncdata != nullptr) { auto asyncdata = dict->asyncdata; if (asyncdata->done) { dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer serverAssert(dict->asyncdata != asyncdata); - break; // completion can be expensive, don't do anything else + } else { + break; } } + if (dict->asyncdata) + break; + rehashes_per_ms = g_pserver->db[rehash_db]->incrementallyRehash(); async_rehashes = 0; if (rehashes_per_ms > 0) { @@ -2360,14 +2371,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { UNUSED(id); UNUSED(clientData); - if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) { - aeReleaseLock(); - // If there is not enough lock contention we may not have made enough progress on the async - // rehash. Ensure we finish it outside the lock. - dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size()); - aeAcquireLock(); - } - if (g_pserver->maxmemory && g_pserver->m_pstorageFactory) performEvictions(false); @@ -2657,14 +2660,6 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData UNUSED(id); UNUSED(clientData); - if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) { - aeReleaseLock(); - // If there is not enough lock contention we may not have made enough progress on the async - // rehash. Ensure we finish it outside the lock. - dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size()); - aeAcquireLock(); - } - if (g_pserver->maxmemory && g_pserver->m_pstorageFactory) performEvictions(false);