Allow multiple threads to rehash simultaneously
Former-commit-id: 5a2cc90786dfd1bfd341dbf5713bcde01f0cfff3
This commit is contained in:
parent
5d161ca2f2
commit
5ab1095022
53
src/dict.cpp
53
src/dict.cpp
@ -233,11 +233,7 @@ int dictRehash(dict *d, int n) {
|
|||||||
|
|
||||||
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d) {
|
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d) {
|
||||||
if (!dictIsRehashing(d)) return 0;
|
if (!dictIsRehashing(d)) return 0;
|
||||||
if (d->asyncdata != nullptr) {
|
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
|
||||||
/* An async rehash is already in progress, in the future we could give out an additional block */
|
|
||||||
return nullptr;
|
|
||||||
}
|
|
||||||
d->asyncdata = new dictAsyncRehashCtl(d);
|
|
||||||
|
|
||||||
int empty_visits = dictAsyncRehashCtl::c_targetQueueSize * 10;
|
int empty_visits = dictAsyncRehashCtl::c_targetQueueSize * 10;
|
||||||
|
|
||||||
@ -263,8 +259,10 @@ dictAsyncRehashCtl *dictRehashAsyncStart(dict *d) {
|
|||||||
LDone:
|
LDone:
|
||||||
if (d->asyncdata->queue.empty()) {
|
if (d->asyncdata->queue.empty()) {
|
||||||
// We didn't find any work to do (can happen if there is a large gap in the hash table)
|
// We didn't find any work to do (can happen if there is a large gap in the hash table)
|
||||||
delete d->asyncdata;
|
auto asyncT = d->asyncdata;
|
||||||
d->asyncdata = nullptr;
|
d->asyncdata = d->asyncdata->next;
|
||||||
|
delete asyncT;
|
||||||
|
return nullptr;
|
||||||
}
|
}
|
||||||
return d->asyncdata;
|
return d->asyncdata;
|
||||||
}
|
}
|
||||||
@ -277,7 +275,15 @@ void dictRehashAsync(dictAsyncRehashCtl *ctl) {
|
|||||||
|
|
||||||
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) {
|
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) {
|
||||||
dict *d = ctl->dict;
|
dict *d = ctl->dict;
|
||||||
d->asyncdata = nullptr; // dictFreeUnlinkedEntry won't actually free if we don't NULL this
|
// Unlink ourselves
|
||||||
|
dictAsyncRehashCtl **pprev = &d->asyncdata;
|
||||||
|
while (*pprev != nullptr) {
|
||||||
|
if (*pprev == ctl) {
|
||||||
|
*pprev = ctl->next;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pprev = &((*pprev)->next);
|
||||||
|
}
|
||||||
|
|
||||||
// Was the dictionary free'd while we were in flight?
|
// Was the dictionary free'd while we were in flight?
|
||||||
if (ctl->release) {
|
if (ctl->release) {
|
||||||
@ -295,7 +301,7 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) {
|
|||||||
while (*pdePrev != nullptr && *pdePrev != wi.de) {
|
while (*pdePrev != nullptr && *pdePrev != wi.de) {
|
||||||
pdePrev = &((*pdePrev)->next);
|
pdePrev = &((*pdePrev)->next);
|
||||||
}
|
}
|
||||||
if (*pdePrev != nullptr) {
|
if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list
|
||||||
assert(*pdePrev == wi.de);
|
assert(*pdePrev == wi.de);
|
||||||
*pdePrev = wi.de->next;
|
*pdePrev = wi.de->next;
|
||||||
// Now link it to the dest hash table
|
// Now link it to the dest hash table
|
||||||
@ -303,20 +309,25 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) {
|
|||||||
d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de;
|
d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de;
|
||||||
d->ht[0].used--;
|
d->ht[0].used--;
|
||||||
d->ht[1].used++;
|
d->ht[1].used++;
|
||||||
} else {
|
|
||||||
// In this scenario the de should be in the free list, find and delete it
|
|
||||||
bool found = false;
|
|
||||||
for (dictEntry **pdeGCPrev = &ctl->deGCList; *pdeGCPrev != nullptr && !found; pdeGCPrev = &((*pdeGCPrev)->next)) {
|
|
||||||
if (*pdeGCPrev == wi.de) {
|
|
||||||
*pdeGCPrev = wi.de->next;
|
|
||||||
dictFreeUnlinkedEntry(d, wi.de);
|
|
||||||
found = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Note: We may not find it if the entry was just unlinked but reused elsewhere
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assert(ctl->deGCList == nullptr);
|
|
||||||
|
// Now free anyting in the GC list
|
||||||
|
while (ctl->deGCList != nullptr) {
|
||||||
|
auto next = ctl->deGCList->next;
|
||||||
|
dictFreeKey(d, ctl->deGCList);
|
||||||
|
dictFreeVal(d, ctl->deGCList);
|
||||||
|
zfree(ctl->deGCList);
|
||||||
|
ctl->deGCList = next;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Check if we already rehashed the whole table... */
|
||||||
|
if (d->ht[0].used == 0 && d->asyncdata == nullptr) {
|
||||||
|
zfree(d->ht[0].table);
|
||||||
|
d->ht[0] = d->ht[1];
|
||||||
|
_dictReset(&d->ht[1]);
|
||||||
|
d->rehashidx = -1;
|
||||||
|
}
|
||||||
|
|
||||||
delete ctl;
|
delete ctl;
|
||||||
}
|
}
|
||||||
|
@ -92,13 +92,14 @@ struct dictAsyncRehashCtl {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
static const int c_targetQueueSize = 100;
|
static const int c_targetQueueSize = 512;
|
||||||
dictEntry *deGCList = nullptr;
|
dictEntry *deGCList = nullptr;
|
||||||
struct dict *dict = nullptr;
|
struct dict *dict = nullptr;
|
||||||
std::vector<workItem> queue;
|
std::vector<workItem> queue;
|
||||||
bool release = false;
|
bool release = false;
|
||||||
|
dictAsyncRehashCtl *next = nullptr;
|
||||||
|
|
||||||
dictAsyncRehashCtl(struct dict *d) : dict(d) {
|
dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) {
|
||||||
queue.reserve(c_targetQueueSize);
|
queue.reserve(c_targetQueueSize);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
Loading…
x
Reference in New Issue
Block a user