Initial implementation

Former-commit-id: 958f2c00c8efc15dc91fdeec2ff2e2ae2016c124
This commit is contained in:
John Sully 2021-01-20 20:23:56 +00:00
parent aa47e643b0
commit 1c0b603def
3 changed files with 180 additions and 22 deletions

View File

@ -127,6 +127,7 @@ int _dictInit(dict *d, dictType *type,
d->privdata = privDataPtr;
d->rehashidx = -1;
d->iterators = 0;
d->asyncdata = nullptr;
return DICT_OK;
}
@ -188,13 +189,13 @@ int dictExpand(dict *d, unsigned long size)
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
if (d->asyncdata) return 0;
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
@ -230,6 +231,95 @@ int dictRehash(dict *d, int n) {
return 1;
}
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d) {
if (!dictIsRehashing(d)) return 0;
if (d->asyncdata != nullptr) {
/* An async rehash is already in progress, in the future we could give out an additional block */
return nullptr;
}
d->asyncdata = new dictAsyncRehashCtl(d);
int empty_visits = dictAsyncRehashCtl::c_targetQueueSize * 10;
while (d->asyncdata->queue.size() < dictAsyncRehashCtl::c_targetQueueSize && (d->ht[0].used - d->asyncdata->queue.size()) != 0) {
dictEntry *de;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) goto LDone;
}
de = d->ht[0].table[d->rehashidx];
// We have to queue every node in the bucket, even if we go over our target size
while (de) {
d->asyncdata->queue.emplace_back(de);
de = de->next;
}
d->rehashidx++;
}
LDone:
if (d->asyncdata->queue.empty()) {
// We didn't find any work to do (can happen if there is a large gap in the hash table)
delete d->asyncdata;
d->asyncdata = nullptr;
}
return d->asyncdata;
}
void dictRehashAsync(dictAsyncRehashCtl *ctl) {
for (auto &wi : ctl->queue) {
wi.hash = dictHashKey(ctl->dict, dictGetKey(wi.de));
}
}
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl) {
dict *d = ctl->dict;
// Was the dictionary free'd while we were in flight?
if (ctl->release) {
dictRelease(d);
delete ctl;
return;
}
for (auto &wi : ctl->queue) {
// We need to remove it from the source hash table, and store it in the dest.
// Note that it may have been deleted in the meantime and therefore not exist.
// In this case it will be in the garbage collection list
dictEntry **pdePrev = &d->ht[0].table[wi.hash & d->ht[0].sizemask];
while (*pdePrev != nullptr && *pdePrev != wi.de) {
pdePrev = &((*pdePrev)->next);
}
if (*pdePrev != nullptr) {
assert(*pdePrev == wi.de);
*pdePrev = wi.de->next;
// Now link it to the dest hash table
wi.de->next = d->ht[1].table[wi.hash & d->ht[1].sizemask];
d->ht[1].table[wi.hash & d->ht[1].sizemask] = wi.de;
d->ht[0].used--;
d->ht[1].used++;
} else {
// In this scenario the de should be in the free list, find and delete it
bool found = false;
for (dictEntry **pdeGCPrev = &ctl->deGCList; *pdeGCPrev != nullptr && !found; pdeGCPrev = &((*pdeGCPrev)->next)) {
if (*pdeGCPrev == wi.de) {
*pdeGCPrev = wi.de->next;
dictFreeUnlinkedEntry(d, wi.de);
found = true;
}
}
// Note: We may not find it if the entry was just unlinked but reused elsewhere
}
}
delete ctl;
d->asyncdata = nullptr;
}
long long timeInMilliseconds(void) {
struct timeval tv;
@ -385,9 +475,14 @@ static dictEntry *dictGenericDelete(dict *d, const void *key, int nofree) {
else
d->ht[table].table[idx] = he->next;
if (!nofree) {
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
if (d->asyncdata != nullptr) {
he->next = d->asyncdata->deGCList;
d->asyncdata->deGCList = he->next;
} else {
dictFreeKey(d, he);
dictFreeVal(d, he);
zfree(he);
}
}
d->ht[table].used--;
return he;
@ -470,6 +565,10 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
/* Clear & Release the hash table */
void dictRelease(dict *d)
{
if (d->asyncdata) {
d->asyncdata->release = true;
return;
}
_dictClear(d,&d->ht[0],NULL);
_dictClear(d,&d->ht[1],NULL);
zfree(d);

View File

@ -36,6 +36,7 @@
#include <stdint.h>
#ifdef __cplusplus
#include <vector>
extern "C" {
#endif
@ -81,12 +82,37 @@ typedef struct dictht {
unsigned long used;
} dictht;
#ifdef __cplusplus
struct dictAsyncRehashCtl {
struct workItem {
dictEntry *de;
uint64_t hash;
workItem(dictEntry *de) {
this->de = de;
}
};
static const int c_targetQueueSize = 100;
dictEntry *deGCList = nullptr;
struct dict *dict = nullptr;
std::vector<workItem> queue;
bool release = false;
dictAsyncRehashCtl(struct dict *d) : dict(d) {
queue.reserve(c_targetQueueSize);
}
};
#else
struct dictAsyncRehashCtl;
#endif
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
unsigned long iterators; /* number of iterators currently running */
dictAsyncRehashCtl *asyncdata;
} dict;
/* If safe is set to 1 this is a safe iterator, that means, you can call
@ -190,6 +216,11 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanB
uint64_t dictGetHash(dict *d, const void *key);
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
/* Async Rehash Functions */
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d);
void dictRehashAsync(dictAsyncRehashCtl *ctl);
void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl);
/* Hash table types */
extern dictType dictTypeHeapStringCopyKey;
extern dictType dictTypeHeapStrings;

View File

@ -1775,19 +1775,23 @@ bool expireOwnKeys()
/* This function handles 'background' operations we are required to do
* incrementally in Redis databases, such as active key expiring, resizing,
* rehashing. */
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (g_pserver->active_expire_enabled) {
if (expireOwnKeys()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
dictAsyncRehashCtl* databasesCron(bool fMainThread) {
dictAsyncRehashCtl *ctl = nullptr;
/* Defrag keys gradually. */
activeDefragCycle();
if (fMainThread) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (g_pserver->active_expire_enabled) {
if (expireOwnKeys()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
/* Defrag keys gradually. */
activeDefragCycle();
}
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
@ -1804,16 +1808,21 @@ void databasesCron(void) {
/* Don't test more DBs than we have. */
if (dbs_per_call > cserver.dbnum) dbs_per_call = cserver.dbnum;
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % cserver.dbnum);
resize_db++;
if (fMainThread) {
/* Resize */
for (j = 0; j < dbs_per_call; j++) {
tryResizeHashTables(resize_db % cserver.dbnum);
resize_db++;
}
}
/* Rehash */
if (g_pserver->activerehashing) {
for (j = 0; j < dbs_per_call; j++) {
int work_done = incrementallyRehash(rehash_db);
ctl = dictRehashAsyncStart(g_pserver->db[rehash_db].pdict);
if (ctl)
break;
int work_done = fMainThread && incrementallyRehash(rehash_db);
if (work_done) {
/* If the function did some work, stop here, we'll do
* more at the next cron loop. */
@ -1826,6 +1835,8 @@ void databasesCron(void) {
}
}
}
return ctl;
}
/* We take a cached value of the unix time in the global state because with
@ -2065,7 +2076,14 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
clientsCron(IDX_EVENT_LOOP_MAIN);
/* Handle background operations on Redis databases. */
databasesCron();
auto asyncRehash = databasesCron(true /* fMainThread */);
if (asyncRehash) {
aeReleaseLock();
dictRehashAsync(asyncRehash);
aeAcquireLock();
dictCompleteRehashAsync(asyncRehash);
}
/* Start a scheduled AOF rewrite if this was requested by the user while
* a BGSAVE was in progress. */
@ -2215,6 +2233,16 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
processUnblockedClients(iel);
}
/* Handle background operations on Redis databases. */
auto asyncRehash = databasesCron(false /* fMainThread */);
if (asyncRehash) {
aeReleaseLock();
dictRehashAsync(asyncRehash);
aeAcquireLock();
dictCompleteRehashAsync(asyncRehash);
}
/* Unpause clients if enough time has elapsed */
unpauseClientsIfNecessary();