Fix test hang

Former-commit-id: 23647390e628de07759f8e7d8768a7f638edf01d
This commit is contained in:
John Sully 2021-05-07 00:28:10 +00:00
parent 501a065759
commit 42f98c27e3
5 changed files with 13 additions and 242 deletions

View File

@ -127,7 +127,6 @@ int _dictInit(dict *d, dictType *type,
d->privdata = privDataPtr;
d->rehashidx = -1;
d->iterators = 0;
d->asyncdata = nullptr;
return DICT_OK;
}
@ -189,7 +188,6 @@ int dictExpand(dict *d, unsigned long size)
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
if (d->asyncdata) return 0;
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
@ -231,144 +229,6 @@ int dictRehash(dict *d, int n) {
return 1;
}
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
if (!dictIsRehashing(d)) return 0;
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
int empty_visits = buckets * 10;
while (d->asyncdata->queue.size() < (size_t)buckets && d->rehashidx < d->ht[0].size) {
dictEntry *de;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) goto LDone;
if (d->rehashidx >= d->ht[0].size) goto LDone;
}
de = d->ht[0].table[d->rehashidx];
// We have to queue every node in the bucket, even if we go over our target size
while (de != nullptr) {
d->asyncdata->queue.emplace_back(de);
de = de->next;
}
d->rehashidx++;
}
LDone:
if (d->asyncdata->queue.empty()) {
// We didn't find any work to do (can happen if there is a large gap in the hash table)
auto asyncT = d->asyncdata;
d->asyncdata = d->asyncdata->next;
delete asyncT;
return nullptr;
}
return d->asyncdata;
}
void dictRehashAsync(dictAsyncRehashCtl *ctl) {
for (size_t idx = ctl->hashIdx; idx < ctl->queue.size(); ++idx) {
auto &wi = ctl->queue[idx];
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
}
ctl->hashIdx = ctl->queue.size();
ctl->done = true;
}
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) {
size_t max = std::min(ctl->hashIdx + hashes, ctl->queue.size());
for (; ctl->hashIdx < max; ++ctl->hashIdx) {
auto &wi = ctl->queue[ctl->hashIdx];
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
}
if (ctl->hashIdx == ctl->queue.size()) ctl->done = true;
return ctl->hashIdx < ctl->queue.size();
}
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
dict *d = ctl->dict;
assert(ctl->done);
// Unlink ourselves
bool fUnlinked = false;
dictAsyncRehashCtl **pprev = &d->asyncdata;
while (*pprev != nullptr) {
if (*pprev == ctl) {
*pprev = ctl->next;
fUnlinked = true;
break;
}
pprev = &((*pprev)->next);
}
if (fUnlinked) {
if (ctl->next != nullptr && ctl->deGCList != nullptr) {
// An older work item may depend on our free list, so pass our free list to them
dictEntry **deGC = &ctl->next->deGCList;
while (*deGC != nullptr) deGC = &((*deGC)->next);
*deGC = ctl->deGCList;
ctl->deGCList = nullptr;
}
}
if (fUnlinked && !ctl->release) {
if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
for (auto &wi : ctl->queue) {
// We need to remove it from the source hash table, and store it in the dest.
// Note that it may have been deleted in the meantime and therefore not exist.
// In this case it will be in the garbage collection list
dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask];
while (*pdePrev != nullptr && *pdePrev != wi.de) {
pdePrev = &((*pdePrev)->next);
}
if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list
assert(*pdePrev == wi.de);
*pdePrev = wi.de->next;
// Now link it to the dest hash table
wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask];
d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de;
d->ht[0].used--;
d->ht[1].used++;
}
}
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0 && d->asyncdata == nullptr) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
}
}
if (fFree) {
while (ctl->deGCList != nullptr) {
auto next = ctl->deGCList->next;
dictFreeKey(d, ctl->deGCList);
dictFreeVal(d, ctl->deGCList);
zfree(ctl->deGCList);
ctl->deGCList = next;
}
// Was the dictionary free'd while we were in flight?
if (ctl->release) {
if (d->asyncdata != nullptr)
d->asyncdata->release = true;
else
dictRelease(d);
}
delete ctl;
}
}
long long timeInMilliseconds(void) {
struct timeval tv;
@ -524,14 +384,9 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
else
d->ht[table].table[idx] = he->next;
if (!nofree) {
if (table == 0 && d->asyncdata != nullptr && idx < d->rehashidx) {
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he->next;
} else {
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
d->ht[table].used--;
return he;
@ -579,14 +434,10 @@ dictEntry *dictUnlink(dict *ht, const void *key) {
* to dictUnlink(). It's safe to call this function with 'he' = NULL. */
void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
if (he == NULL) return;
if (d->asyncdata) {
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he;
} else {
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
/* Destroy an entire dictionary */
@ -602,14 +453,9 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
if ((he = ht->table[i]) == NULL) continue;
while(he) {
nextHe = he->next;
if (d->asyncdata && i < d->rehashidx) {
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he;
} else {
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
ht->used--;
he = nextHe;
}
@ -624,10 +470,6 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
/* Clear & Release the hash table */
void dictRelease(dict *d)
{
if (d->asyncdata) {
d->asyncdata->release = true;
return;
}
_dictClear(d,&d->ht[0],NULL);
_dictClear(d,&d->ht[1],NULL);
zfree(d);

View File

@ -116,7 +116,6 @@ typedef struct dict {
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
dictAsyncRehashCtl *asyncdata;
} dict;
/* If safe is set to 1 this is a safe iterator, that means, you can call
@ -220,12 +219,6 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanB
uint64_t dictGetHash(dict *d, const void *key);
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
/* Async Rehash Functions */
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets = dictAsyncRehashCtl::c_targetQueueSize);
void dictRehashAsync(dictAsyncRehashCtl *ctl);
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes);
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree);
/* Hash table types */
extern dictType dictTypeHeapStringCopyKey;
extern dictType dictTypeHeapStrings;

View File

@ -3491,16 +3491,6 @@ void processEventsWhileBlocked(int iel) {
locker.arm(nullptr);
locker.release();
// Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here)
for (int idb = 0; idb < cserver.dbnum; ++idb) {
redisDb *db = &g_pserver->db[idb];
while (db->dict->asyncdata != nullptr) {
if (!db->dict->asyncdata->done)
break;
dictCompleteRehashAsync(db->dict->asyncdata, false /*fFree*/);
}
}
// Restore it so the calling code is not confused
if (fReplBacklog) {
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;

View File

@ -1771,18 +1771,11 @@ bool expireOwnKeys()
return false;
}
int hash_spin_worker() {
auto ctl = serverTL->rehashCtl;
return dictRehashSomeAsync(ctl, 1);
}
/* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */
void databasesCron(bool fMainThread) {
serverAssert(GlobalLocksAcquired());
static int rehashes_per_ms = 0;
static int async_rehashes = 0;
if (fMainThread) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
@ -1824,58 +1817,12 @@ void databasesCron(bool fMainThread) {
/* Rehash */
if (g_pserver->activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
if (serverTL->rehashCtl != nullptr) {
if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) {
break;
} else {
dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/);
serverTL->rehashCtl = nullptr;
}
}
serverAssert(serverTL->rehashCtl == nullptr);
/* Are we async rehashing? And if so is it time to re-calibrate? */
/* The recalibration limit is a prime number to ensure balancing across threads */
if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled) {
serverTL->rehashCtl = dictRehashAsyncStart(g_pserver->db[rehash_db].dict, rehashes_per_ms);
++async_rehashes;
}
if (serverTL->rehashCtl)
break;
// Before starting anything new, can we end the rehash of a blocked thread?
if (g_pserver->db[rehash_db].dict->asyncdata != nullptr) {
auto asyncdata = g_pserver->db[rehash_db].dict->asyncdata;
if (asyncdata->done) {
dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer
serverAssert(g_pserver->db[rehash_db].dict->asyncdata != asyncdata);
break; // completion can be expensive, don't do anything else
}
}
rehashes_per_ms = incrementallyRehash(rehash_db);
async_rehashes = 0;
if (rehashes_per_ms > 0) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
if (!cserver.active_defrag_enabled) {
serverLog(LL_VERBOSE, "Calibrated rehashes per ms: %d", rehashes_per_ms);
}
break;
} else if (g_pserver->db[rehash_db].dict->asyncdata == nullptr) {
/* If this db didn't need rehash and we have none in flight, we'll try the next one. */
rehash_db++;
rehash_db %= cserver.dbnum;
}
incrementallyRehash(rehash_db);
rehash_db++;
rehash_db %= cserver.dbnum;
}
}
}
if (serverTL->rehashCtl) {
setAeLockSetThreadSpinWorker(hash_spin_worker);
} else {
setAeLockSetThreadSpinWorker(nullptr);
}
}
/* We take a cached value of the unix time in the global state because with

View File

@ -1390,7 +1390,6 @@ struct redisServerThreadVars {
bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before
the thread went to sleep? */
std::vector<client*> vecclientsProcess;
dictAsyncRehashCtl *rehashCtl = nullptr;
};
struct redisMaster {