Ensure rehash completes even when we're in a long running task

Former-commit-id: f107746e90f7a8ff3c7094145ee1ad438911e8c2
This commit is contained in:
John Sully 2021-01-27 00:47:22 +00:00
parent 495dff1e8c
commit ba006abe02
5 changed files with 103 additions and 68 deletions

View File

@ -275,6 +275,7 @@ void dictRehashAsync(dictAsyncRehashCtl *ctl) {
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
}
ctl->hashIdx = ctl->queue.size();
ctl->done = true;
}
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) {
@ -284,71 +285,88 @@ bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes) {
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
}
if (ctl->hashIdx == ctl->queue.size()) ctl->done = true;
return ctl->hashIdx < ctl->queue.size();
}
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) {
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
dict *d = ctl->dict;
assert(ctl->done);
// Unlink ourselves
bool fUnlinked = false;
dictAsyncRehashCtl **pprev = &d->asyncdata;
while (*pprev != nullptr) {
if (*pprev == ctl) {
*pprev = ctl->next;
fUnlinked = true;
break;
}
pprev = &((*pprev)->next);
}
// Was the dictionary free'd while we were in flight?
if (ctl->release) {
if (d->asyncdata != nullptr)
d->asyncdata->release = true;
else
dictRelease(d);
delete ctl;
return;
}
if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
for (auto &wi : ctl->queue) {
// We need to remove it from the source hash table, and store it in the dest.
// Note that it may have been deleted in the meantime and therefore not exist.
// In this case it will be in the garbage collection list
dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask];
while (*pdePrev != nullptr && *pdePrev != wi.de) {
pdePrev = &((*pdePrev)->next);
}
if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list
assert(*pdePrev == wi.de);
*pdePrev = wi.de->next;
// Now link it to the dest hash table
wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask];
d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de;
d->ht[0].used--;
d->ht[1].used++;
}
if (fUnlinked) {
if (ctl->next != nullptr && ctl->deGCList != nullptr) {
// An older work item may depend on our free list, so pass our free list to them
dictEntry **deGC = &ctl->next->deGCList;
while (*deGC != nullptr) deGC = &((*deGC)->next);
*deGC = ctl->deGCList;
ctl->deGCList = nullptr;
}
}
// Now free anyting in the GC list
while (ctl->deGCList != nullptr) {
auto next = ctl->deGCList->next;
dictFreeKey(d, ctl->deGCList);
dictFreeVal(d, ctl->deGCList);
zfree(ctl->deGCList);
ctl->deGCList = next;
if (fUnlinked && !ctl->release) {
if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
for (auto &wi : ctl->queue) {
// We need to remove it from the source hash table, and store it in the dest.
// Note that it may have been deleted in the meantime and therefore not exist.
// In this case it will be in the garbage collection list
dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask];
while (*pdePrev != nullptr && *pdePrev != wi.de) {
pdePrev = &((*pdePrev)->next);
}
if (*pdePrev != nullptr) { // The element may be NULL if its in the GC list
assert(*pdePrev == wi.de);
*pdePrev = wi.de->next;
// Now link it to the dest hash table
wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask];
d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de;
d->ht[0].used--;
d->ht[1].used++;
}
}
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0 && d->asyncdata == nullptr) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
}
}
/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0 && d->asyncdata == nullptr) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
}
if (fFree) {
while (ctl->deGCList != nullptr) {
auto next = ctl->deGCList->next;
dictFreeKey(d, ctl->deGCList);
dictFreeVal(d, ctl->deGCList);
zfree(ctl->deGCList);
ctl->deGCList = next;
}
delete ctl;
// Was the dictionary free'd while we were in flight?
if (ctl->release) {
if (d->asyncdata != nullptr)
d->asyncdata->release = true;
else
dictRelease(d);
}
delete ctl;
}
}
long long timeInMilliseconds(void) {
@ -506,7 +524,7 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
else
d->ht[table].table[idx] = he->next;
if (!nofree) {
if (table == 0 && d->asyncdata != nullptr) {
if (table == 0 && d->asyncdata != nullptr && idx < d->rehashidx) {
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he->next;
} else {
@ -584,7 +602,7 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
if ((he = ht->table[i]) == NULL) continue;
while(he) {
nextHe = he->next;
if (d->asyncdata) {
if (d->asyncdata && i < d->rehashidx) {
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he;
} else {

View File

@ -37,6 +37,7 @@
#ifdef __cplusplus
#include <vector>
#include <atomic>
extern "C" {
#endif
@ -99,6 +100,7 @@ struct dictAsyncRehashCtl {
size_t hashIdx = 0;
bool release = false;
dictAsyncRehashCtl *next = nullptr;
std::atomic<bool> done { false };
dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) {
queue.reserve(c_targetQueueSize);
@ -222,7 +224,7 @@ dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t h
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets = dictAsyncRehashCtl::c_targetQueueSize);
void dictRehashAsync(dictAsyncRehashCtl *ctl);
bool dictRehashSomeAsync(dictAsyncRehashCtl *ctl, size_t hashes);
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl);
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree);
/* Hash table types */
extern dictType dictTypeHeapStringCopyKey;

View File

@ -356,6 +356,7 @@ extern "C" void fastlock_lock(struct fastlock *lock, spin_worker worker)
__atomic_load(&g_fHighCpuPressure, &fHighPressure, __ATOMIC_RELAXED);
unsigned loopLimit = fHighPressure ? 0x10000 : 0x100000;
// WARNING:::: THIS DOESN"T MATCH ASM
for (;;)
{
__atomic_load(&lock->m_ticket.u, &ticketT.u, __ATOMIC_ACQUIRE);

View File

@ -3486,6 +3486,16 @@ void processEventsWhileBlocked(int iel) {
locker.arm(nullptr);
locker.release();
// Try to complete any async rehashes (this would normally happen in dbCron, but that won't run here)
for (int idb = 0; idb < cserver.dbnum; ++idb) {
redisDb *db = &g_pserver->db[idb];
while (db->pdict->asyncdata != nullptr) {
if (!db->pdict->asyncdata->done)
break;
dictCompleteRehashAsync(db->pdict->asyncdata, false /*fFree*/);
}
}
// Restore it so the calling code is not confused
if (fReplBacklog) {
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;

View File

@ -1773,19 +1773,14 @@ bool expireOwnKeys()
int hash_spin_worker() {
auto ctl = serverTL->rehashCtl;
if (ctl->hashIdx < ctl->queue.size()) {
ctl->queue[ctl->hashIdx].hash = dictHashKey(ctl->dict, dictGetKey(ctl->queue[ctl->hashIdx].de));
ctl->hashIdx++;
return true;
} else {
return false;
}
return dictRehashSomeAsync(ctl, 1);
}
/* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */
void databasesCron(bool fMainThread) {
serverAssert(GlobalLocksAcquired());
static int rehashes_per_ms = 0;
if (fMainThread) {
/* Expire keys by random sampling. Not required for slaves
@ -1832,29 +1827,38 @@ void databasesCron(bool fMainThread) {
if (dictRehashSomeAsync(serverTL->rehashCtl, 5)) {
break;
} else {
dictCompleteRehashAsync(serverTL->rehashCtl);
dictCompleteRehashAsync(serverTL->rehashCtl, true /*fFree*/);
serverTL->rehashCtl = nullptr;
}
}
serverAssert(serverTL->rehashCtl == nullptr);
if (rehashes_per_ms > 0)
serverTL->rehashCtl = dictRehashAsyncStart(g_pserver->db[rehash_db].dict, rehashes_per_ms);
if (serverTL->rehashCtl)
break;
if (fMainThread) {
// We only synchronously rehash on the main thread, otherwise we'll cause too much latency
rehashes_per_ms = incrementallyRehash(rehash_db);
if (rehashes_per_ms > 0) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
serverLog(LL_WARNING, "Rehashes per ms: %d", rehashes_per_ms);
break;
} else {
/* If this db didn't need rehash, we'll try the next one. */
rehash_db++;
rehash_db %= cserver.dbnum;
// Before starting anything new, can we end the rehash of a blocked thread?
if (g_pserver->db[rehash_db].pdict->asyncdata != nullptr) {
auto asyncdata = g_pserver->db[rehash_db].pdict->asyncdata;
if (asyncdata->done) {
dictCompleteRehashAsync(asyncdata, false /*fFree*/); // Don't free because we don't own the pointer
serverAssert(g_pserver->db[rehash_db].pdict->asyncdata != asyncdata);
break; // completion can be expensive, don't do anything else
}
}
rehashes_per_ms = incrementallyRehash(rehash_db);
if (rehashes_per_ms > 0) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
serverLog(LL_WARNING, "Rehashes per ms: %d", rehashes_per_ms);
break;
} else if (g_pserver->db[rehash_db].pdict->asyncdata == nullptr) {
/* If this db didn't need rehash and we have none in flight, we'll try the next one. */
rehash_db++;
rehash_db %= cserver.dbnum;
}
}
}
}