From 1c0b603def4b18b286ca33233b0816416cd460ac Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 20 Jan 2021 20:23:56 +0000 Subject: [PATCH] Initial implementation Former-commit-id: 958f2c00c8efc15dc91fdeec2ff2e2ae2016c124 --- src/dict.cpp | 107 +++++++++++++++++++++++++++++++++++++++++++++++-- src/dict.h | 31 ++++++++++++++ src/server.cpp | 64 ++++++++++++++++++++--------- 3 files changed, 180 insertions(+), 22 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index 2c16af119..e0b193312 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -127,6 +127,7 @@ int _dictInit(dict *d, dictType *type, d->privdata = privDataPtr; d->rehashidx = -1; d->iterators = 0; + d->asyncdata = nullptr; return DICT_OK; } @@ -188,13 +189,13 @@ int dictExpand(dict *d, unsigned long size) int dictRehash(dict *d, int n) { int empty_visits = n*10; /* Max number of empty buckets to visit. */ if (!dictIsRehashing(d)) return 0; + if (d->asyncdata) return 0; while(n-- && d->ht[0].used != 0) { dictEntry *de, *nextde; /* Note that rehashidx can't overflow as we are sure there are more * elements because ht[0].used != 0 */ - assert(d->ht[0].size > (unsigned long)d->rehashidx); while(d->ht[0].table[d->rehashidx] == NULL) { d->rehashidx++; if (--empty_visits == 0) return 1; @@ -230,6 +231,95 @@ int dictRehash(dict *d, int n) { return 1; } +dictAsyncRehashCtl *dictRehashAsyncStart(dict *d) { + if (!dictIsRehashing(d)) return 0; + if (d->asyncdata != nullptr) { + /* An async rehash is already in progress, in the future we could give out an additional block */ + return nullptr; + } + d->asyncdata = new dictAsyncRehashCtl(d); + + int empty_visits = dictAsyncRehashCtl::c_targetQueueSize * 10; + + while (d->asyncdata->queue.size() < dictAsyncRehashCtl::c_targetQueueSize && (d->ht[0].used - d->asyncdata->queue.size()) != 0) { + dictEntry *de; + + /* Note that rehashidx can't overflow as we are sure there are more + * elements because ht[0].used != 0 */ + while(d->ht[0].table[d->rehashidx] == NULL) { + d->rehashidx++; + if (--empty_visits == 0) goto LDone; + } + + de = d->ht[0].table[d->rehashidx]; + // We have to queue every node in the bucket, even if we go over our target size + while (de) { + d->asyncdata->queue.emplace_back(de); + de = de->next; + } + d->rehashidx++; + } + +LDone: + if (d->asyncdata->queue.empty()) { + // We didn't find any work to do (can happen if there is a large gap in the hash table) + delete d->asyncdata; + d->asyncdata = nullptr; + } + return d->asyncdata; +} + +void dictRehashAsync(dictAsyncRehashCtl *ctl) { + for (auto &wi : ctl->queue) { + wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de)); + } +} + +void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) { + dict *d = ctl->dict; + + // Was the dictionary free'd while we were in flight? + if (ctl->release) { + dictRelease(d); + delete ctl; + return; + } + + for (auto &wi : ctl->queue) { + // We need to remove it from the source hash table, and store it in the dest. + // Note that it may have been deleted in the meantime and therefore not exist. + // In this case it will be in the garbage collection list + + dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask]; + while (*pdePrev != nullptr && *pdePrev != wi.de) { + pdePrev = &((*pdePrev)->next); + } + if (*pdePrev != nullptr) { + assert(*pdePrev == wi.de); + *pdePrev = wi.de->next; + // Now link it to the dest hash table + wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask]; + d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de; + d->ht[0].used--; + d->ht[1].used++; + } else { + // In this scenario the de should be in the free list, find and delete it + bool found = false; + for (dictEntry **pdeGCPrev = &ctl->deGCList; *pdeGCPrev != nullptr && !found; pdeGCPrev = &((*pdeGCPrev)->next)) { + if (*pdeGCPrev == wi.de) { + *pdeGCPrev = wi.de->next; + dictFreeUnlinkedEntry(d, wi.de); + found = true; + } + } + // Note: We may not find it if the entry was just unlinked but reused elsewhere + } + } + + delete ctl; + d->asyncdata = nullptr; +} + long long timeInMilliseconds(void) { struct timeval tv; @@ -385,9 +475,14 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) { else d->ht[table].table[idx] = he->next; if (!nofree) { - dictFreeKey(d, he); - dictFreeVal(d, he); - zfree(he); + if (d->asyncdata != nullptr) { + he->next = d->asyncdata->deGCList; + d->asyncdata->deGCList = he->next; + } else { + dictFreeKey(d, he); + dictFreeVal(d, he); + zfree(he); + } } d->ht[table].used--; return he; @@ -470,6 +565,10 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { /* Clear & Release the hash table */ void dictRelease(dict *d) { + if (d->asyncdata) { + d->asyncdata->release = true; + return; + } _dictClear(d,&d->ht[0],NULL); _dictClear(d,&d->ht[1],NULL); zfree(d); diff --git a/src/dict.h b/src/dict.h index 03c39c7ff..42d097f9b 100644 --- a/src/dict.h +++ b/src/dict.h @@ -36,6 +36,7 @@ #include #ifdef __cplusplus +#include extern "C" { #endif @@ -81,12 +82,37 @@ typedef struct dictht { unsigned long used; } dictht; +#ifdef __cplusplus +struct dictAsyncRehashCtl { + struct workItem { + dictEntry *de; + uint64_t hash; + workItem(dictEntry *de) { + this->de = de; + } + }; + + static const int c_targetQueueSize = 100; + dictEntry *deGCList = nullptr; + struct dict *dict = nullptr; + std::vector queue; + bool release = false; + + dictAsyncRehashCtl(struct dict *d) : dict(d) { + queue.reserve(c_targetQueueSize); + } +}; +#else +struct dictAsyncRehashCtl; +#endif + typedef struct dict { dictType *type; void *privdata; dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 */ unsigned long iterators; /* number of iterators currently running */ + dictAsyncRehashCtl *asyncdata; } dict; /* If safe is set to 1 this is a safe iterator, that means, you can call @@ -190,6 +216,11 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanB uint64_t dictGetHash(dict *d, const void *key); dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash); +/* Async Rehash Functions */ +dictAsyncRehashCtl *dictRehashAsyncStart(dict *d); +void dictRehashAsync(dictAsyncRehashCtl *ctl); +void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl); + /* Hash table types */ extern dictType dictTypeHeapStringCopyKey; extern dictType dictTypeHeapStrings; diff --git a/src/server.cpp b/src/server.cpp index 848b28bcf..fdb8885b1 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1775,19 +1775,23 @@ bool expireOwnKeys() /* This function handles 'background' operations we are required to do * incrementally in Redis databases, such as active key expiring, resizing, * rehashing. */ -void databasesCron(void) { - /* Expire keys by random sampling. Not required for slaves - * as master will synthesize DELs for us. */ - if (g_pserver->active_expire_enabled) { - if (expireOwnKeys()) { - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else { - expireSlaveKeys(); - } - } +dictAsyncRehashCtl* databasesCron(bool fMainThread) { + dictAsyncRehashCtl *ctl = nullptr; - /* Defrag keys gradually. */ - activeDefragCycle(); + if (fMainThread) { + /* Expire keys by random sampling. Not required for slaves + * as master will synthesize DELs for us. */ + if (g_pserver->active_expire_enabled) { + if (expireOwnKeys()) { + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); + } else { + expireSlaveKeys(); + } + } + + /* Defrag keys gradually. */ + activeDefragCycle(); + } /* Perform hash tables rehashing if needed, but only if there are no * other processes saving the DB on disk. Otherwise rehashing is bad @@ -1804,16 +1808,21 @@ void databasesCron(void) { /* Don't test more DBs than we have. */ if (dbs_per_call > cserver.dbnum) dbs_per_call = cserver.dbnum; - /* Resize */ - for (j = 0; j < dbs_per_call; j++) { - tryResizeHashTables(resize_db % cserver.dbnum); - resize_db++; + if (fMainThread) { + /* Resize */ + for (j = 0; j < dbs_per_call; j++) { + tryResizeHashTables(resize_db % cserver.dbnum); + resize_db++; + } } /* Rehash */ if (g_pserver->activerehashing) { for (j = 0; j < dbs_per_call; j++) { - int work_done = incrementallyRehash(rehash_db); + ctl = dictRehashAsyncStart(g_pserver->db[rehash_db].pdict); + if (ctl) + break; + int work_done = fMainThread && incrementallyRehash(rehash_db); if (work_done) { /* If the function did some work, stop here, we'll do * more at the next cron loop. */ @@ -1826,6 +1835,8 @@ void databasesCron(void) { } } } + + return ctl; } /* We take a cached value of the unix time in the global state because with @@ -2065,7 +2076,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { clientsCron(IDX_EVENT_LOOP_MAIN); /* Handle background operations on Redis databases. */ - databasesCron(); + auto asyncRehash = databasesCron(true /* fMainThread */); + + if (asyncRehash) { + aeReleaseLock(); + dictRehashAsync(asyncRehash); + aeAcquireLock(); + dictCompleteRehashAsync(asyncRehash); + } /* Start a scheduled AOF rewrite if this was requested by the user while * a BGSAVE was in progress. */ @@ -2215,6 +2233,16 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData processUnblockedClients(iel); } + /* Handle background operations on Redis databases. */ + auto asyncRehash = databasesCron(false /* fMainThread */); + + if (asyncRehash) { + aeReleaseLock(); + dictRehashAsync(asyncRehash); + aeAcquireLock(); + dictCompleteRehashAsync(asyncRehash); + } + /* Unpause clients if enough time has elapsed */ unpauseClientsIfNecessary();