Smooth out performance fluctuations caused by variance in the rehash calibration

Former-commit-id: 09580dedfef09deace7863bf68ba7e0f9edf3eb3
This commit is contained in:
John Sully 2021-11-06 15:49:58 +00:00
parent 7962e0f5c7
commit 60f08d5f93
2 changed files with 26 additions and 31 deletions

View File

@ -408,7 +408,7 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
int empty_visits = buckets;
int empty_visits = buckets*10;
while (d->asyncdata->queue.size() < (size_t)buckets && (size_t)d->rehashidx < d->ht[0].size) {
dictEntry *de;

View File

@ -1705,12 +1705,12 @@ void tryResizeHashTables(int dbid) {
* is returned. */
int redisDbPersistentData::incrementallyRehash() {
/* Keys dictionary */
if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) {
int result = dictRehashMilliseconds(m_pdict,1);
result += dictRehashMilliseconds(m_pdictTombstone,1);
return result; /* already used our millisecond for this loop... */
}
return 0;
int result = 0;
if (dictIsRehashing(m_pdict))
result += dictRehashMilliseconds(m_pdict,1);
if (dictIsRehashing(m_pdictTombstone))
dictRehashMilliseconds(m_pdictTombstone,1); // don't count this
return result; /* already used our millisecond for this loop... */
}
/* This function is called once a background process of some kind terminates,
@ -2112,9 +2112,15 @@ void databasesCron(bool fMainThread) {
if (g_pserver->activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
if (serverTL->rehashCtl != nullptr) {
if (dictRehashSomeAsync(serverTL->rehashCtl, rehashes_per_ms)) {
break;
if (!serverTL->rehashCtl->done.load(std::memory_order_relaxed)) {
aeReleaseLock();
if (dictRehashSomeAsync(serverTL->rehashCtl, rehashes_per_ms)) {
aeAcquireLock();
break;
}
aeAcquireLock();
}
dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/);
serverTL->rehashCtl = nullptr;
}
@ -2124,22 +2130,27 @@ void databasesCron(bool fMainThread) {
/* Are we async rehashing? And if so is it time to re-calibrate? */
/* The recalibration limit is a prime number to ensure balancing across threads */
if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled && cserver.cthreads > 1 && dictSize(dict) > 2048 && dictIsRehashing(dict) && !g_pserver->loading) {
serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms);
++async_rehashes;
serverTL->rehashCtl = dictRehashAsyncStart(dict, rehashes_per_ms * ((1000 / g_pserver->hz) / 10)); // Estimate 10% CPU time spent in lock contention
if (serverTL->rehashCtl)
++async_rehashes;
}
if (serverTL->rehashCtl)
break;
// Before starting anything new, can we end the rehash of a blocked thread?
if (dict->asyncdata != nullptr) {
while (dict->asyncdata != nullptr) {
auto asyncdata = dict->asyncdata;
if (asyncdata->done) {
dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer
serverAssert(dict->asyncdata != asyncdata);
break; // completion can be expensive, don't do anything else
} else {
break;
}
}
if (dict->asyncdata)
break;
rehashes_per_ms = g_pserver->db[rehash_db]->incrementallyRehash();
async_rehashes = 0;
if (rehashes_per_ms > 0) {
@ -2360,14 +2371,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
UNUSED(id);
UNUSED(clientData);
if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) {
aeReleaseLock();
// If there is not enough lock contention we may not have made enough progress on the async
// rehash. Ensure we finish it outside the lock.
dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size());
aeAcquireLock();
}
if (g_pserver->maxmemory && g_pserver->m_pstorageFactory)
performEvictions(false);
@ -2657,14 +2660,6 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
UNUSED(id);
UNUSED(clientData);
if (serverTL->rehashCtl != nullptr && !serverTL->rehashCtl->done) {
aeReleaseLock();
// If there is not enough lock contention we may not have made enough progress on the async
// rehash. Ensure we finish it outside the lock.
dictRehashSomeAsync(serverTL->rehashCtl, serverTL->rehashCtl->queue.size());
aeAcquireLock();
}
if (g_pserver->maxmemory && g_pserver->m_pstorageFactory)
performEvictions(false);