From 61e054f826419a0c672448da9f8baa7f36e19b01 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 7 May 2021 00:28:10 +0000 Subject: [PATCH] Fix test hang Former-commit-id: 23647390e628de07759f8e7d8768a7f638edf01d --- src/dict.cpp | 178 +++------------------------------------------ src/dict.h | 7 -- src/networking.cpp | 10 --- src/server.cpp | 59 +-------------- src/server.h | 1 - 5 files changed, 13 insertions(+), 242 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index c5d89eea9..a2ef900e5 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -127,7 +127,6 @@ int _dictInit(dict *d, dictType *type, d->privdata = privDataPtr; d->rehashidx = -1; d->iterators = 0; - d->asyncdata = nullptr; return DICT_OK; } @@ -189,7 +188,6 @@ int dictExpand(dict *d, unsigned long size) int dictRehash(dict *d, int n) { int empty_visits = n*10; /* Max number of empty buckets to visit. */ if (!dictIsRehashing(d)) return 0; - if (d->asyncdata) return 0; while(n-- && d->ht[0].used != 0) { dictEntry *de, *nextde; @@ -231,144 +229,6 @@ int dictRehash(dict *d, int n) { return 1; } -dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { - if (!dictIsRehashing(d)) return 0; - - d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata); - - int empty_visits = buckets * 10; - - while (d->asyncdata->queue.size() < (size_t)buckets && d->rehashidx < d->ht[0].size) { - dictEntry *de; - - /* Note that rehashidx can't overflow as we are sure there are more - * elements because ht[0].used != 0 */ - while(d->ht[0].table[d->rehashidx] == NULL) { - d->rehashidx++; - if (--empty_visits == 0) goto LDone; - if (d->rehashidx >= d->ht[0].size) goto LDone; - } - - de = d->ht[0].table[d->rehashidx]; - // We have to queue every node in the bucket, even if we go over our target size - while (de != nullptr) { - d->asyncdata->queue.emplace_back(de); - de = de->next; - } - d->rehashidx++; - } - -LDone: - if (d->asyncdata->queue.empty()) { - // We didn't find any work to do (can happen if there is a large gap in the hash table) - auto asyncT = d->asyncdata; - d->asyncdata = d->asyncdata->next; - delete asyncT; - return nullptr; - } - return d->asyncdata; -} - -void dictRehashAsync(dictAsyncRehashCtl *ctl) { - for (size_t idx = ctl->hashIdx; idx < ctl->queue.size(); ++idx) { - auto &wi = ctl->queue[idx]; - wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de)); - } - ctl->hashIdx = ctl->queue.size(); - ctl->done = true; -} - -bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) { - size_t max = std::min(ctl->hashIdx + hashes, ctl->queue.size()); - for (; ctl->hashIdx < max; ++ctl->hashIdx) { - auto &wi = ctl->queue[ctl->hashIdx]; - wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de)); - } - - if (ctl->hashIdx == ctl->queue.size()) ctl->done = true; - return ctl->hashIdx < ctl->queue.size(); -} - -void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) { - dict *d = ctl->dict; - assert(ctl->done); - - // Unlink ourselves - bool fUnlinked = false; - dictAsyncRehashCtl **pprev = &d->asyncdata; - - while (*pprev != nullptr) { - if (*pprev == ctl) { - *pprev = ctl->next; - fUnlinked = true; - break; - } - pprev = &((*pprev)->next); - } - - if (fUnlinked) { - if (ctl->next != nullptr && ctl->deGCList != nullptr) { - // An older work item may depend on our free list, so pass our free list to them - dictEntry **deGC = &ctl->next->deGCList; - while (*deGC != nullptr) deGC = &((*deGC)->next); - *deGC = ctl->deGCList; - ctl->deGCList = nullptr; - } - } - - if (fUnlinked && !ctl->release) { - if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash - for (auto &wi : ctl->queue) { - // We need to remove it from the source hash table, and store it in the dest. - // Note that it may have been deleted in the meantime and therefore not exist. - // In this case it will be in the garbage collection list - - dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask]; - while (*pdePrev != nullptr && *pdePrev != wi.de) { - pdePrev = &((*pdePrev)->next); - } - if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list - assert(*pdePrev == wi.de); - *pdePrev = wi.de->next; - // Now link it to the dest hash table - wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask]; - d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de; - d->ht[0].used--; - d->ht[1].used++; - } - } - } - - /* Check if we already rehashed the whole table... */ - if (d->ht[0].used == 0 && d->asyncdata == nullptr) { - zfree(d->ht[0].table); - d->ht[0] = d->ht[1]; - _dictReset(&d->ht[1]); - d->rehashidx = -1; - } - } - - if (fFree) { - while (ctl->deGCList != nullptr) { - auto next = ctl->deGCList->next; - dictFreeKey(d, ctl->deGCList); - dictFreeVal(d, ctl->deGCList); - zfree(ctl->deGCList); - ctl->deGCList = next; - } - - // Was the dictionary free'd while we were in flight? - if (ctl->release) { - if (d->asyncdata != nullptr) - d->asyncdata->release = true; - else - dictRelease(d); - } - - delete ctl; - } -} - long long timeInMilliseconds(void) { struct timeval tv; @@ -524,14 +384,9 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { else d->ht[table].table[idx] = he->next; if (!nofree) { - if (table == 0 && d->asyncdata != nullptr && idx < d->rehashidx) { - he->next = d->asyncdata->deGCList; - d->asyncdata->deGCList = he->next; - } else { - dictFreeKey(d, he); - dictFreeVal(d, he); - zfree(he); - } + dictFreeKey(d, he); + dictFreeVal(d, he); + zfree(he); } d->ht[table].used--; return he; @@ -579,14 +434,10 @@ dictEntry *dictUnlink(dict *ht, const void *key) { * to dictUnlink(). It's safe to call this function with 'he' = NULL. */ void dictFreeUnlinkedEntry(dict *d, dictEntry *he) { if (he == NULL) return; - if (d->asyncdata) { - he->next = d->asyncdata->deGCList; - d->asyncdata->deGCList = he; - } else { - dictFreeKey(d, he); - dictFreeVal(d, he); - zfree(he); - } + + dictFreeKey(d, he); + dictFreeVal(d, he); + zfree(he); } /* Destroy an entire dictionary */ @@ -602,14 +453,9 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { if ((he = ht->table[i]) == NULL) continue; while(he) { nextHe = he->next; - if (d->asyncdata && i < d->rehashidx) { - he->next = d->asyncdata->deGCList; - d->asyncdata->deGCList = he; - } else { - dictFreeKey(d, he); - dictFreeVal(d, he); - zfree(he); - } + dictFreeKey(d, he); + dictFreeVal(d, he); + zfree(he); ht->used--; he = nextHe; } @@ -624,10 +470,6 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { /* Clear & Release the hash table */ void dictRelease(dict *d) { - if (d->asyncdata) { - d->asyncdata->release = true; - return; - } _dictClear(d,&d->ht[0],NULL); _dictClear(d,&d->ht[1],NULL); zfree(d); diff --git a/src/dict.h b/src/dict.h index 9abba00eb..85f709687 100644 --- a/src/dict.h +++ b/src/dict.h @@ -116,7 +116,6 @@ typedef struct dict { dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 */ unsigned long iterators; /* number of iterators currently running */ - dictAsyncRehashCtl *asyncdata; } dict; /* If safe is set to 1 this is a safe iterator, that means, you can call @@ -220,12 +219,6 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanB uint64_t dictGetHash(dict *d, const void *key); dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); -/* Async Rehash Functions */ -dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets = dictAsyncRehashCtl::c_targetQueueSize); -void dictRehashAsync(dictAsyncRehashCtl *ctl); -bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes); -void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree); - /* Hash table types */ extern dictType dictTypeHeapStringCopyKey; extern dictType dictTypeHeapStrings; diff --git a/src/networking.cpp b/src/networking.cpp index adf8946e3..e5e6f0770 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3491,16 +3491,6 @@ void processEventsWhileBlocked(int iel) { locker.arm(nullptr); locker.release(); - // Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here) - for (int idb = 0; idb < cserver.dbnum; ++idb) { - redisDb *db = &g_pserver->db[idb]; - while (db->dict->asyncdata != nullptr) { - if (!db->dict->asyncdata->done) - break; - dictCompleteRehashAsync(db->dict->asyncdata, false /*fFree*/); - } - } - // Restore it so the calling code is not confused if (fReplBacklog) { g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; diff --git a/src/server.cpp b/src/server.cpp index 66ef81abd..c86f39659 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1771,18 +1771,11 @@ bool expireOwnKeys() return false; } -int hash_spin_worker() { - auto ctl = serverTL->rehashCtl; - return dictRehashSomeAsync(ctl, 1); -} - /* This function handles 'background' operations we are required to do * incrementally in Redis databases, such as active key expiring, resizing, * rehashing. */ void databasesCron(bool fMainThread) { serverAssert(GlobalLocksAcquired()); - static int rehashes_per_ms = 0; - static int async_rehashes = 0; if (fMainThread) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ @@ -1824,58 +1817,12 @@ void databasesCron(bool fMainThread) { /* Rehash */ if (g_pserver->activerehashing) { for (j = 0; j < dbs_per_call; j++) { - if (serverTL->rehashCtl != nullptr) { - if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) { - break; - } else { - dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/); - serverTL->rehashCtl = nullptr; - } - } - - serverAssert(serverTL->rehashCtl == nullptr); - /* Are we async rehashing? And if so is it time to re-calibrate? */ - /* The recalibration limit is a prime number to ensure balancing across threads */ - if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled) { - serverTL->rehashCtl = dictRehashAsyncStart(g_pserver->db[rehash_db].dict, rehashes_per_ms); - ++async_rehashes; - } - if (serverTL->rehashCtl) - break; - - // Before starting anything new, can we end the rehash of a blocked thread? - if (g_pserver->db[rehash_db].dict->asyncdata != nullptr) { - auto asyncdata = g_pserver->db[rehash_db].dict->asyncdata; - if (asyncdata->done) { - dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer - serverAssert(g_pserver->db[rehash_db].dict->asyncdata != asyncdata); - break; // completion can be expensive, don't do anything else - } - } - - rehashes_per_ms = incrementallyRehash(rehash_db); - async_rehashes = 0; - if (rehashes_per_ms > 0) { - /* If the function did some work, stop here, we'll do - * more at the next cron loop. */ - if (!cserver.active_defrag_enabled) { - serverLog(LL_VERBOSE, "Calibrated rehashes per ms: %d", rehashes_per_ms); - } - break; - } else if (g_pserver->db[rehash_db].dict->asyncdata == nullptr) { - /* If this db didn't need rehash and we have none in flight, we'll try the next one. */ - rehash_db++; - rehash_db %= cserver.dbnum; - } + incrementallyRehash(rehash_db); + rehash_db++; + rehash_db %= cserver.dbnum; } } } - - if (serverTL->rehashCtl) { - setAeLockSetThreadSpinWorker(hash_spin_worker); - } else { - setAeLockSetThreadSpinWorker(nullptr); - } } /* We take a cached value of the unix time in the global state because with diff --git a/src/server.h b/src/server.h index 1eae8b0f2..f925c19de 100644 --- a/src/server.h +++ b/src/server.h @@ -1390,7 +1390,6 @@ struct redisServerThreadVars { bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before the thread went to sleep? */ std::vector vecclientsProcess; - dictAsyncRehashCtl *rehashCtl = nullptr; }; struct redisMaster {