diff --git a/src/networking.cpp b/src/networking.cpp index 36470c0db..8e5683f65 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3489,10 +3489,10 @@ void processEventsWhileBlocked(int iel) { // Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here) for (int idb = 0; idb < cserver.dbnum; ++idb) { redisDb *db = &g_pserver->db[idb]; - while (db->pdict->asyncdata != nullptr) { - if (!db->pdict->asyncdata->done) + while (db->dict->asyncdata != nullptr) { + if (!db->dict->asyncdata->done) break; - dictCompleteRehashAsync(db->pdict->asyncdata, false /*fFree*/); + dictCompleteRehashAsync(db->dict->asyncdata, false /*fFree*/); } } diff --git a/src/server.cpp b/src/server.cpp index be140c48c..d95e207dc 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1782,6 +1782,7 @@ int hash_spin_worker() { void databasesCron(bool fMainThread) { serverAssert(GlobalLocksAcquired()); static int rehashes_per_ms = 0; + static int async_rehashes = 0; if (fMainThread) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ @@ -1833,28 +1834,33 @@ void databasesCron(bool fMainThread) { } serverAssert(serverTL->rehashCtl == nullptr); - if (rehashes_per_ms > 0) + /* Are we async rehashing? And if so is it time to re-calibrate? */ + /* The recalibration limit is a prime number to ensure balancing across threads */ + if (rehashes_per_ms > 0 && async_rehashes < 131) { serverTL->rehashCtl = dictRehashAsyncStart(g_pserver->db[rehash_db].dict, rehashes_per_ms); + ++async_rehashes; + } if (serverTL->rehashCtl) break; // Before starting anything new, can we end the rehash of a blocked thread? - if (g_pserver->db[rehash_db].pdict->asyncdata != nullptr) { - auto asyncdata = g_pserver->db[rehash_db].pdict->asyncdata; + if (g_pserver->db[rehash_db].dict->asyncdata != nullptr) { + auto asyncdata = g_pserver->db[rehash_db].dict->asyncdata; if (asyncdata->done) { dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer - serverAssert(g_pserver->db[rehash_db].pdict->asyncdata != asyncdata); + serverAssert(g_pserver->db[rehash_db].dict->asyncdata != asyncdata); break; // completion can be expensive, don't do anything else } } rehashes_per_ms = incrementallyRehash(rehash_db); + async_rehashes = 0; if (rehashes_per_ms > 0) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ - serverLog(LL_WARNING, "Rehashes per ms: %d", rehashes_per_ms); + serverLog(LL_VERBOSE, "Calibrated rehashes per ms: %d", rehashes_per_ms); break; - } else if (g_pserver->db[rehash_db].pdict->asyncdata == nullptr) { + } else if (g_pserver->db[rehash_db].dict->asyncdata == nullptr) { /* If this db didn't need rehash and we have none in flight, we'll try the next one. */ rehash_db++; rehash_db %= cserver.dbnum;