Repair async rehash code removed by merge
Former-commit-id: 14a3ab8133c474bf8f1ad636c8699d5d59c5394e
This commit is contained in:
parent
6b4f686f5f
commit
6c510bc5cb
@ -814,7 +814,9 @@ int aeThreadOwnsLock()
|
|||||||
|
|
||||||
int aeLockContested(int threshold)
|
int aeLockContested(int threshold)
|
||||||
{
|
{
|
||||||
return g_lock.m_ticket.m_active < static_cast<uint16_t>(g_lock.m_ticket.m_avail - threshold);
|
ticket ticketT;
|
||||||
|
__atomic_load(&g_lock.m_ticket.u, &ticketT.u, __ATOMIC_RELAXED);
|
||||||
|
return ticketT.m_active < static_cast<uint16_t>(ticketT.m_avail - threshold);
|
||||||
}
|
}
|
||||||
|
|
||||||
int aeLockContention()
|
int aeLockContention()
|
||||||
|
14
src/dict.cpp
14
src/dict.cpp
@ -350,6 +350,7 @@ int dictTryExpand(dict *d, unsigned long size, bool fShrink) {
|
|||||||
int dictRehash(dict *d, int n) {
|
int dictRehash(dict *d, int n) {
|
||||||
int empty_visits = n*10; /* Max number of empty buckets to visit. */
|
int empty_visits = n*10; /* Max number of empty buckets to visit. */
|
||||||
if (!dictIsRehashing(d)) return 0;
|
if (!dictIsRehashing(d)) return 0;
|
||||||
|
if (d->asyncdata) return 0;
|
||||||
|
|
||||||
while(n-- && d->ht[0].used != 0) {
|
while(n-- && d->ht[0].used != 0) {
|
||||||
dictEntry *de, *nextde;
|
dictEntry *de, *nextde;
|
||||||
@ -694,7 +695,7 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
|
|||||||
if (!nofree) {
|
if (!nofree) {
|
||||||
if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) {
|
if (table == 0 && d->asyncdata != nullptr && (ssize_t)idx < d->rehashidx) {
|
||||||
he->next = d->asyncdata->deGCList;
|
he->next = d->asyncdata->deGCList;
|
||||||
d->asyncdata->deGCList = he->next;
|
d->asyncdata->deGCList = he;
|
||||||
} else {
|
} else {
|
||||||
dictFreeKey(d, he);
|
dictFreeKey(d, he);
|
||||||
dictFreeVal(d, he);
|
dictFreeVal(d, he);
|
||||||
@ -750,9 +751,14 @@ dictEntry *dictUnlink(dict *ht, const void *key) {
|
|||||||
void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
|
void dictFreeUnlinkedEntry(dict *d, dictEntry *he) {
|
||||||
if (he == NULL) return;
|
if (he == NULL) return;
|
||||||
|
|
||||||
dictFreeKey(d, he);
|
if (d->asyncdata) {
|
||||||
dictFreeVal(d, he);
|
he->next = d->asyncdata->deGCList;
|
||||||
zfree(he);
|
d->asyncdata->deGCList = he;
|
||||||
|
} else {
|
||||||
|
dictFreeKey(d, he);
|
||||||
|
dictFreeVal(d, he);
|
||||||
|
zfree(he);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Destroy an entire dictionary */
|
/* Destroy an entire dictionary */
|
||||||
|
@ -2059,6 +2059,11 @@ bool expireOwnKeys()
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int hash_spin_worker() {
|
||||||
|
auto ctl = serverTL->rehashCtl;
|
||||||
|
return dictRehashSomeAsync(ctl, 1);
|
||||||
|
}
|
||||||
|
|
||||||
/* This function handles 'background' operations we are required to do
|
/* This function handles 'background' operations we are required to do
|
||||||
* incrementally in Redis databases, such as active key expiring, resizing,
|
* incrementally in Redis databases, such as active key expiring, resizing,
|
||||||
* rehashing. */
|
* rehashing. */
|
||||||
@ -2154,6 +2159,12 @@ void databasesCron(bool fMainThread) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (serverTL->rehashCtl) {
|
||||||
|
setAeLockSetThreadSpinWorker(hash_spin_worker);
|
||||||
|
} else {
|
||||||
|
setAeLockSetThreadSpinWorker(nullptr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* We take a cached value of the unix time in the global state because with
|
/* We take a cached value of the unix time in the global state because with
|
||||||
|
Loading…
x
Reference in New Issue
Block a user