Fix test hang
Former-commit-id: 23647390e628de07759f8e7d8768a7f638edf01d
This commit is contained in:
parent
7b982ae4c5
commit
61e054f826
160
src/dict.cpp
160
src/dict.cpp
@ -127,7 +127,6 @@ int _dictInit(dict *d, dictType *type,
|
|||||||
d->privdata = privDataPtr;
|
d->privdata = privDataPtr;
|
||||||
d->rehashidx = -1;
|
d->rehashidx = -1;
|
||||||
d->iterators = 0;
|
d->iterators = 0;
|
||||||
d->asyncdata = nullptr;
|
|
||||||
return DICT_OK;
|
return DICT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -189,7 +188,6 @@ int dictExpand(dict *d, unsigned long size)
|
|||||||
int dictRehash(dict *d, int n) {
|
int dictRehash(dict *d, int n) {
|
||||||
int empty_visits = n*10; /* Max number of empty buckets to visit. */
|
int empty_visits = n*10; /* Max number of empty buckets to visit. */
|
||||||
if (!dictIsRehashing(d)) return 0;
|
if (!dictIsRehashing(d)) return 0;
|
||||||
if (d->asyncdata) return 0;
|
|
||||||
|
|
||||||
while(n-- && d->ht[0].used != 0) {
|
while(n-- && d->ht[0].used != 0) {
|
||||||
dictEntry *de, *nextde;
|
dictEntry *de, *nextde;
|
||||||
@ -231,144 +229,6 @@ int dictRehash(dict *d, int n) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
|
|
||||||
if (!dictIsRehashing(d)) return 0;
|
|
||||||
|
|
||||||
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
|
|
||||||
|
|
||||||
int empty_visits = buckets * 10;
|
|
||||||
|
|
||||||
while (d->asyncdata->queue.size() < (size_t)buckets && d->rehashidx < d->ht[0].size) {
|
|
||||||
dictEntry *de;
|
|
||||||
|
|
||||||
/* Note that rehashidx can't overflow as we are sure there are more
|
|
||||||
* elements because ht[0].used != 0 */
|
|
||||||
while(d->ht[0].table[d->rehashidx] == NULL) {
|
|
||||||
d->rehashidx++;
|
|
||||||
if (--empty_visits == 0) goto LDone;
|
|
||||||
if (d->rehashidx >= d->ht[0].size) goto LDone;
|
|
||||||
}
|
|
||||||
|
|
||||||
de = d->ht[0].table[d->rehashidx];
|
|
||||||
// We have to queue every node in the bucket, even if we go over our target size
|
|
||||||
while (de != nullptr) {
|
|
||||||
d->asyncdata->queue.emplace_back(de);
|
|
||||||
de = de->next;
|
|
||||||
}
|
|
||||||
d->rehashidx++;
|
|
||||||
}
|
|
||||||
|
|
||||||
LDone:
|
|
||||||
if (d->asyncdata->queue.empty()) {
|
|
||||||
// We didn't find any work to do (can happen if there is a large gap in the hash table)
|
|
||||||
auto asyncT = d->asyncdata;
|
|
||||||
d->asyncdata = d->asyncdata->next;
|
|
||||||
delete asyncT;
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
return d->asyncdata;
|
|
||||||
}
|
|
||||||
|
|
||||||
void dictRehashAsync(dictAsyncRehashCtl *ctl) {
|
|
||||||
for (size_t idx = ctl->hashIdx; idx < ctl->queue.size(); ++idx) {
|
|
||||||
auto &wi = ctl->queue[idx];
|
|
||||||
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
|
|
||||||
}
|
|
||||||
ctl->hashIdx = ctl->queue.size();
|
|
||||||
ctl->done = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) {
|
|
||||||
size_t max = std::min(ctl->hashIdx + hashes, ctl->queue.size());
|
|
||||||
for (; ctl->hashIdx < max; ++ctl->hashIdx) {
|
|
||||||
auto &wi = ctl->queue[ctl->hashIdx];
|
|
||||||
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ctl->hashIdx == ctl->queue.size()) ctl->done = true;
|
|
||||||
return ctl->hashIdx < ctl->queue.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
|
|
||||||
dict *d = ctl->dict;
|
|
||||||
assert(ctl->done);
|
|
||||||
|
|
||||||
// Unlink ourselves
|
|
||||||
bool fUnlinked = false;
|
|
||||||
dictAsyncRehashCtl **pprev = &d->asyncdata;
|
|
||||||
|
|
||||||
while (*pprev != nullptr) {
|
|
||||||
if (*pprev == ctl) {
|
|
||||||
*pprev = ctl->next;
|
|
||||||
fUnlinked = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
pprev = &((*pprev)->next);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fUnlinked) {
|
|
||||||
if (ctl->next != nullptr && ctl->deGCList != nullptr) {
|
|
||||||
// An older work item may depend on our free list, so pass our free list to them
|
|
||||||
dictEntry **deGC = &ctl->next->deGCList;
|
|
||||||
while (*deGC != nullptr) deGC = &((*deGC)->next);
|
|
||||||
*deGC = ctl->deGCList;
|
|
||||||
ctl->deGCList = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fUnlinked && !ctl->release) {
|
|
||||||
if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
|
|
||||||
for (auto &wi : ctl->queue) {
|
|
||||||
// We need to remove it from the source hash table, and store it in the dest.
|
|
||||||
// Note that it may have been deleted in the meantime and therefore not exist.
|
|
||||||
// In this case it will be in the garbage collection list
|
|
||||||
|
|
||||||
dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask];
|
|
||||||
while (*pdePrev != nullptr && *pdePrev != wi.de) {
|
|
||||||
pdePrev = &((*pdePrev)->next);
|
|
||||||
}
|
|
||||||
if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list
|
|
||||||
assert(*pdePrev == wi.de);
|
|
||||||
*pdePrev = wi.de->next;
|
|
||||||
// Now link it to the dest hash table
|
|
||||||
wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask];
|
|
||||||
d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de;
|
|
||||||
d->ht[0].used--;
|
|
||||||
d->ht[1].used++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Check if we already rehashed the whole table... */
|
|
||||||
if (d->ht[0].used == 0 && d->asyncdata == nullptr) {
|
|
||||||
zfree(d->ht[0].table);
|
|
||||||
d->ht[0] = d->ht[1];
|
|
||||||
_dictReset(&d->ht[1]);
|
|
||||||
d->rehashidx = -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (fFree) {
|
|
||||||
while (ctl->deGCList != nullptr) {
|
|
||||||
auto next = ctl->deGCList->next;
|
|
||||||
dictFreeKey(d, ctl->deGCList);
|
|
||||||
dictFreeVal(d, ctl->deGCList);
|
|
||||||
zfree(ctl->deGCList);
|
|
||||||
ctl->deGCList = next;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Was the dictionary free'd while we were in flight?
|
|
||||||
if (ctl->release) {
|
|
||||||
if (d->asyncdata != nullptr)
|
|
||||||
d->asyncdata->release = true;
|
|
||||||
else
|
|
||||||
dictRelease(d);
|
|
||||||
}
|
|
||||||
|
|
||||||
delete ctl;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
long long timeInMilliseconds(void) {
|
long long timeInMilliseconds(void) {
|
||||||
struct timeval tv;
|
struct timeval tv;
|
||||||
|
|
||||||
@ -524,15 +384,10 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
|
|||||||
else
|
else
|
||||||
d->ht[table].table[idx] = he->next;
|
d->ht[table].table[idx] = he->next;
|
||||||
if (!nofree) {
|
if (!nofree) {
|
||||||
if (table == 0 && d->asyncdata != nullptr && idx < d->rehashidx) {
|
|
||||||
he->next = d->asyncdata->deGCList;
|
|
||||||
d->asyncdata->deGCList = he->next;
|
|
||||||
} else {
|
|
||||||
dictFreeKey(d, he);
|
dictFreeKey(d, he);
|
||||||
dictFreeVal(d, he);
|
dictFreeVal(d, he);
|
||||||
zfree(he);
|
zfree(he);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
d->ht[table].used--;
|
d->ht[table].used--;
|
||||||
return he;
|
return he;
|
||||||
}
|
}
|
||||||
@ -579,15 +434,11 @@ dictEntry *dictUnlink(dict *ht, const void *key) {
|
|||||||
* to dictUnlink(). It's safe to call this function with 'he' = NULL. */
|
* to dictUnlink(). It's safe to call this function with 'he' = NULL. */
|
||||||
void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
|
void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
|
||||||
if (he == NULL) return;
|
if (he == NULL) return;
|
||||||
if (d->asyncdata) {
|
|
||||||
he->next = d->asyncdata->deGCList;
|
|
||||||
d->asyncdata->deGCList = he;
|
|
||||||
} else {
|
|
||||||
dictFreeKey(d, he);
|
dictFreeKey(d, he);
|
||||||
dictFreeVal(d, he);
|
dictFreeVal(d, he);
|
||||||
zfree(he);
|
zfree(he);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/* Destroy an entire dictionary */
|
/* Destroy an entire dictionary */
|
||||||
int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
||||||
@ -602,14 +453,9 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
|||||||
if ((he = ht->table[i]) == NULL) continue;
|
if ((he = ht->table[i]) == NULL) continue;
|
||||||
while(he) {
|
while(he) {
|
||||||
nextHe = he->next;
|
nextHe = he->next;
|
||||||
if (d->asyncdata && i < d->rehashidx) {
|
|
||||||
he->next = d->asyncdata->deGCList;
|
|
||||||
d->asyncdata->deGCList = he;
|
|
||||||
} else {
|
|
||||||
dictFreeKey(d, he);
|
dictFreeKey(d, he);
|
||||||
dictFreeVal(d, he);
|
dictFreeVal(d, he);
|
||||||
zfree(he);
|
zfree(he);
|
||||||
}
|
|
||||||
ht->used--;
|
ht->used--;
|
||||||
he = nextHe;
|
he = nextHe;
|
||||||
}
|
}
|
||||||
@ -624,10 +470,6 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
|||||||
/* Clear & Release the hash table */
|
/* Clear & Release the hash table */
|
||||||
void dictRelease(dict *d)
|
void dictRelease(dict *d)
|
||||||
{
|
{
|
||||||
if (d->asyncdata) {
|
|
||||||
d->asyncdata->release = true;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
_dictClear(d,&d->ht[0],NULL);
|
_dictClear(d,&d->ht[0],NULL);
|
||||||
_dictClear(d,&d->ht[1],NULL);
|
_dictClear(d,&d->ht[1],NULL);
|
||||||
zfree(d);
|
zfree(d);
|
||||||
|
@ -116,7 +116,6 @@ typedef struct dict {
|
|||||||
dictht ht[2];
|
dictht ht[2];
|
||||||
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
|
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
|
||||||
unsigned long iterators; /* number of iterators currently running */
|
unsigned long iterators; /* number of iterators currently running */
|
||||||
dictAsyncRehashCtl *asyncdata;
|
|
||||||
} dict;
|
} dict;
|
||||||
|
|
||||||
/* If safe is set to 1 this is a safe iterator, that means, you can call
|
/* If safe is set to 1 this is a safe iterator, that means, you can call
|
||||||
@ -220,12 +219,6 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanB
|
|||||||
uint64_t dictGetHash(dict *d, const void *key);
|
uint64_t dictGetHash(dict *d, const void *key);
|
||||||
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
|
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
|
||||||
|
|
||||||
/* Async Rehash Functions */
|
|
||||||
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets = dictAsyncRehashCtl::c_targetQueueSize);
|
|
||||||
void dictRehashAsync(dictAsyncRehashCtl *ctl);
|
|
||||||
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes);
|
|
||||||
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree);
|
|
||||||
|
|
||||||
/* Hash table types */
|
/* Hash table types */
|
||||||
extern dictType dictTypeHeapStringCopyKey;
|
extern dictType dictTypeHeapStringCopyKey;
|
||||||
extern dictType dictTypeHeapStrings;
|
extern dictType dictTypeHeapStrings;
|
||||||
|
@ -3491,16 +3491,6 @@ void processEventsWhileBlocked(int iel) {
|
|||||||
locker.arm(nullptr);
|
locker.arm(nullptr);
|
||||||
locker.release();
|
locker.release();
|
||||||
|
|
||||||
// Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here)
|
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb) {
|
|
||||||
redisDb *db = &g_pserver->db[idb];
|
|
||||||
while (db->dict->asyncdata != nullptr) {
|
|
||||||
if (!db->dict->asyncdata->done)
|
|
||||||
break;
|
|
||||||
dictCompleteRehashAsync(db->dict->asyncdata, false /*fFree*/);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Restore it so the calling code is not confused
|
// Restore it so the calling code is not confused
|
||||||
if (fReplBacklog) {
|
if (fReplBacklog) {
|
||||||
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
|
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
|
||||||
|
@ -1771,18 +1771,11 @@ bool expireOwnKeys()
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int hash_spin_worker() {
|
|
||||||
auto ctl = serverTL->rehashCtl;
|
|
||||||
return dictRehashSomeAsync(ctl, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* This function handles 'background' operations we are required to do
|
/* This function handles 'background' operations we are required to do
|
||||||
* incrementally in Redis databases, such as active key expiring, resizing,
|
* incrementally in Redis databases, such as active key expiring, resizing,
|
||||||
* rehashing. */
|
* rehashing. */
|
||||||
void databasesCron(bool fMainThread) {
|
void databasesCron(bool fMainThread) {
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
static int rehashes_per_ms = 0;
|
|
||||||
static int async_rehashes = 0;
|
|
||||||
if (fMainThread) {
|
if (fMainThread) {
|
||||||
/* Expire keys by random sampling. Not required for slaves
|
/* Expire keys by random sampling. Not required for slaves
|
||||||
* as master will synthesize DELs for us. */
|
* as master will synthesize DELs for us. */
|
||||||
@ -1824,46 +1817,7 @@ void databasesCron(bool fMainThread) {
|
|||||||
/* Rehash */
|
/* Rehash */
|
||||||
if (g_pserver->activerehashing) {
|
if (g_pserver->activerehashing) {
|
||||||
for (j = 0; j < dbs_per_call; j++) {
|
for (j = 0; j < dbs_per_call; j++) {
|
||||||
if (serverTL->rehashCtl != nullptr) {
|
incrementallyRehash(rehash_db);
|
||||||
if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) {
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/);
|
|
||||||
serverTL->rehashCtl = nullptr;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
serverAssert(serverTL->rehashCtl == nullptr);
|
|
||||||
/* Are we async rehashing? And if so is it time to re-calibrate? */
|
|
||||||
/* The recalibration limit is a prime number to ensure balancing across threads */
|
|
||||||
if (rehashes_per_ms > 0 && async_rehashes < 131 && !cserver.active_defrag_enabled) {
|
|
||||||
serverTL->rehashCtl = dictRehashAsyncStart(g_pserver->db[rehash_db].dict, rehashes_per_ms);
|
|
||||||
++async_rehashes;
|
|
||||||
}
|
|
||||||
if (serverTL->rehashCtl)
|
|
||||||
break;
|
|
||||||
|
|
||||||
// Before starting anything new, can we end the rehash of a blocked thread?
|
|
||||||
if (g_pserver->db[rehash_db].dict->asyncdata != nullptr) {
|
|
||||||
auto asyncdata = g_pserver->db[rehash_db].dict->asyncdata;
|
|
||||||
if (asyncdata->done) {
|
|
||||||
dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer
|
|
||||||
serverAssert(g_pserver->db[rehash_db].dict->asyncdata != asyncdata);
|
|
||||||
break; // completion can be expensive, don't do anything else
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
rehashes_per_ms = incrementallyRehash(rehash_db);
|
|
||||||
async_rehashes = 0;
|
|
||||||
if (rehashes_per_ms > 0) {
|
|
||||||
/* If the function did some work, stop here, we'll do
|
|
||||||
* more at the next cron loop. */
|
|
||||||
if (!cserver.active_defrag_enabled) {
|
|
||||||
serverLog(LL_VERBOSE, "Calibrated rehashes per ms: %d", rehashes_per_ms);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
} else if (g_pserver->db[rehash_db].dict->asyncdata == nullptr) {
|
|
||||||
/* If this db didn't need rehash and we have none in flight, we'll try the next one. */
|
|
||||||
rehash_db++;
|
rehash_db++;
|
||||||
rehash_db %= cserver.dbnum;
|
rehash_db %= cserver.dbnum;
|
||||||
}
|
}
|
||||||
@ -1871,13 +1825,6 @@ void databasesCron(bool fMainThread) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (serverTL->rehashCtl) {
|
|
||||||
setAeLockSetThreadSpinWorker(hash_spin_worker);
|
|
||||||
} else {
|
|
||||||
setAeLockSetThreadSpinWorker(nullptr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* We take a cached value of the unix time in the global state because with
|
/* We take a cached value of the unix time in the global state because with
|
||||||
* virtual memory and aging there is to store the current time in objects at
|
* virtual memory and aging there is to store the current time in objects at
|
||||||
* every object access, and accuracy is not needed. To access a global var is
|
* every object access, and accuracy is not needed. To access a global var is
|
||||||
|
@ -1390,7 +1390,6 @@ struct redisServerThreadVars {
|
|||||||
bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before
|
bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before
|
||||||
the thread went to sleep? */
|
the thread went to sleep? */
|
||||||
std::vector<client*> vecclientsProcess;
|
std::vector<client*> vecclientsProcess;
|
||||||
dictAsyncRehashCtl *rehashCtl = nullptr;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
struct redisMaster {
|
struct redisMaster {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user