From ba006abe029c6766fd9dbcccbe03808134f294ce Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 27 Jan 2021 00:47:22 +0000 Subject: [PATCH] Ensure rehash completes even when we're in a long running task Former-commit-id: f107746e90f7a8ff3c7094145ee1ad438911e8c2 --- src/dict.cpp | 112 ++++++++++++++++++++++++++------------------- src/dict.h | 4 +- src/fastlock.cpp | 1 + src/networking.cpp | 10 ++++ src/server.cpp | 44 ++++++++++-------- 5 files changed, 103 insertions(+), 68 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index e882fff2a..c5d89eea9 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -275,6 +275,7 @@ void dictRehashAsync(dictAsyncRehashCtl *ctl) { wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de)); } ctl->hashIdx = ctl->queue.size(); + ctl->done = true; } bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) { @@ -284,71 +285,88 @@ bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) { wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de)); } + if (ctl->hashIdx == ctl->queue.size()) ctl->done = true; return ctl->hashIdx < ctl->queue.size(); } -void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) { +void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) { dict *d = ctl->dict; + assert(ctl->done); + // Unlink ourselves + bool fUnlinked = false; dictAsyncRehashCtl **pprev = &d->asyncdata; + while (*pprev != nullptr) { if (*pprev == ctl) { *pprev = ctl->next; + fUnlinked = true; break; } pprev = &((*pprev)->next); } - // Was the dictionary free'd while we were in flight? - if (ctl->release) { - if (d->asyncdata != nullptr) - d->asyncdata->release = true; - else - dictRelease(d); - delete ctl; - return; - } - - if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash - for (auto &wi : ctl->queue) { - // We need to remove it from the source hash table, and store it in the dest. - // Note that it may have been deleted in the meantime and therefore not exist. - // In this case it will be in the garbage collection list - - dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask]; - while (*pdePrev != nullptr && *pdePrev != wi.de) { - pdePrev = &((*pdePrev)->next); - } - if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list - assert(*pdePrev == wi.de); - *pdePrev = wi.de->next; - // Now link it to the dest hash table - wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask]; - d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de; - d->ht[0].used--; - d->ht[1].used++; - } + if (fUnlinked) { + if (ctl->next != nullptr && ctl->deGCList != nullptr) { + // An older work item may depend on our free list, so pass our free list to them + dictEntry **deGC = &ctl->next->deGCList; + while (*deGC != nullptr) deGC = &((*deGC)->next); + *deGC = ctl->deGCList; + ctl->deGCList = nullptr; } } - // Now free anyting in the GC list - while (ctl->deGCList != nullptr) { - auto next = ctl->deGCList->next; - dictFreeKey(d, ctl->deGCList); - dictFreeVal(d, ctl->deGCList); - zfree(ctl->deGCList); - ctl->deGCList = next; + if (fUnlinked && !ctl->release) { + if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash + for (auto &wi : ctl->queue) { + // We need to remove it from the source hash table, and store it in the dest. + // Note that it may have been deleted in the meantime and therefore not exist. + // In this case it will be in the garbage collection list + + dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask]; + while (*pdePrev != nullptr && *pdePrev != wi.de) { + pdePrev = &((*pdePrev)->next); + } + if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list + assert(*pdePrev == wi.de); + *pdePrev = wi.de->next; + // Now link it to the dest hash table + wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask]; + d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de; + d->ht[0].used--; + d->ht[1].used++; + } + } + } + + /* Check if we already rehashed the whole table... */ + if (d->ht[0].used == 0 && d->asyncdata == nullptr) { + zfree(d->ht[0].table); + d->ht[0] = d->ht[1]; + _dictReset(&d->ht[1]); + d->rehashidx = -1; + } } - /* Check if we already rehashed the whole table... */ - if (d->ht[0].used == 0 && d->asyncdata == nullptr) { - zfree(d->ht[0].table); - d->ht[0] = d->ht[1]; - _dictReset(&d->ht[1]); - d->rehashidx = -1; - } + if (fFree) { + while (ctl->deGCList != nullptr) { + auto next = ctl->deGCList->next; + dictFreeKey(d, ctl->deGCList); + dictFreeVal(d, ctl->deGCList); + zfree(ctl->deGCList); + ctl->deGCList = next; + } - delete ctl; + // Was the dictionary free'd while we were in flight? + if (ctl->release) { + if (d->asyncdata != nullptr) + d->asyncdata->release = true; + else + dictRelease(d); + } + + delete ctl; + } } long long timeInMilliseconds(void) { @@ -506,7 +524,7 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { else d->ht[table].table[idx] = he->next; if (!nofree) { - if (table == 0 && d->asyncdata != nullptr) { + if (table == 0 && d->asyncdata != nullptr && idx < d->rehashidx) { he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he->next; } else { @@ -584,7 +602,7 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { if ((he = ht->table[i]) == NULL) continue; while(he) { nextHe = he->next; - if (d->asyncdata) { + if (d->asyncdata && i < d->rehashidx) { he->next = d->asyncdata->deGCList; d->asyncdata->deGCList = he; } else { diff --git a/src/dict.h b/src/dict.h index 2f6c9677a..9abba00eb 100644 --- a/src/dict.h +++ b/src/dict.h @@ -37,6 +37,7 @@ #ifdef __cplusplus #include +#include extern "C" { #endif @@ -99,6 +100,7 @@ struct dictAsyncRehashCtl { size_t hashIdx = 0; bool release = false; dictAsyncRehashCtl *next = nullptr; + std::atomic done { false }; dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) { queue.reserve(c_targetQueueSize); @@ -222,7 +224,7 @@ dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t h dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets = dictAsyncRehashCtl::c_targetQueueSize); void dictRehashAsync(dictAsyncRehashCtl *ctl); bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes); -void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl); +void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree); /* Hash table types */ extern dictType dictTypeHeapStringCopyKey; diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 61cd1752e..8c1c5f6e4 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -356,6 +356,7 @@ extern "C" void fastlock_lock(struct fastlock *lock, spin_worker worker) __atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED); unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000; + // WARNING:::: THIS DOESN"T MATCH ASM for (;;) { __atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE); diff --git a/src/networking.cpp b/src/networking.cpp index dc4e56e92..36470c0db 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3486,6 +3486,16 @@ void processEventsWhileBlocked(int iel) { locker.arm(nullptr); locker.release(); + // Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here) + for (int idb = 0; idb < cserver.dbnum; ++idb) { + redisDb *db = &g_pserver->db[idb]; + while (db->pdict->asyncdata != nullptr) { + if (!db->pdict->asyncdata->done) + break; + dictCompleteRehashAsync(db->pdict->asyncdata, false /*fFree*/); + } + } + // Restore it so the calling code is not confused if (fReplBacklog) { g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; diff --git a/src/server.cpp b/src/server.cpp index f1dd30d0d..be140c48c 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1773,19 +1773,14 @@ bool expireOwnKeys() int hash_spin_worker() { auto ctl = serverTL->rehashCtl; - if (ctl->hashIdx < ctl->queue.size()) { - ctl->queue[ctl->hashIdx].hash = dictHashKey(ctl->dict, dictGetKey(ctl->queue[ctl->hashIdx].de)); - ctl->hashIdx++; - return true; - } else { - return false; - } + return dictRehashSomeAsync(ctl, 1); } /* This function handles 'background' operations we are required to do * incrementally in Redis databases, such as active key expiring, resizing, * rehashing. */ void databasesCron(bool fMainThread) { + serverAssert(GlobalLocksAcquired()); static int rehashes_per_ms = 0; if (fMainThread) { /* Expire keys by random sampling. Not required for slaves @@ -1832,29 +1827,38 @@ void databasesCron(bool fMainThread) { if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) { break; } else { - dictCompleteRehashAsync(serverTL->rehashCtl); + dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/); serverTL->rehashCtl = nullptr; } } + + serverAssert(serverTL->rehashCtl == nullptr); if (rehashes_per_ms > 0) serverTL->rehashCtl = dictRehashAsyncStart(g_pserver->db[rehash_db].dict, rehashes_per_ms); if (serverTL->rehashCtl) break; - if (fMainThread) { - // We only synchronously rehash on the main thread, otherwise we'll cause too much latency - rehashes_per_ms = incrementallyRehash(rehash_db); - if (rehashes_per_ms > 0) { - /* If the function did some work, stop here, we'll do - * more at the next cron loop. */ - serverLog(LL_WARNING, "Rehashes per ms: %d", rehashes_per_ms); - break; - } else { - /* If this db didn't need rehash, we'll try the next one. */ - rehash_db++; - rehash_db %= cserver.dbnum; + // Before starting anything new, can we end the rehash of a blocked thread? + if (g_pserver->db[rehash_db].pdict->asyncdata != nullptr) { + auto asyncdata = g_pserver->db[rehash_db].pdict->asyncdata; + if (asyncdata->done) { + dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer + serverAssert(g_pserver->db[rehash_db].pdict->asyncdata != asyncdata); + break; // completion can be expensive, don't do anything else } } + + rehashes_per_ms = incrementallyRehash(rehash_db); + if (rehashes_per_ms > 0) { + /* If the function did some work, stop here, we'll do + * more at the next cron loop. */ + serverLog(LL_WARNING, "Rehashes per ms: %d", rehashes_per_ms); + break; + } else if (g_pserver->db[rehash_db].pdict->asyncdata == nullptr) { + /* If this db didn't need rehash and we have none in flight, we'll try the next one. */ + rehash_db++; + rehash_db %= cserver.dbnum; + } } } }