From 4f06fb2b4fe2636f370f86fb0f82d0dde8e127aa Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 16 Mar 2021 02:38:41 +0000 Subject: [PATCH] Make async rehash behave with snapshots (thread safety issues) Former-commit-id: 372adf39a80252b8035e3c948fcaf7d5ef6f928f --- src/dict.cpp | 54 +++++++++++++++++++++++++----------------------- src/dict.h | 12 ++++++----- src/server.cpp | 7 +++++-- src/snapshot.cpp | 53 +++++++++++++++++++++++++++++++++++++---------- 4 files changed, 82 insertions(+), 44 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index 4afcb3bf7..c682e2ec9 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -129,6 +129,7 @@ int _dictInit(dict *d, dictType *type, d->rehashidx = -1; d->iterators = 0; d->asyncdata = nullptr; + d->refcount = 1; return DICT_OK; } @@ -369,8 +370,14 @@ int dictRehash(dict *d, int n) { return 1; } +dictAsyncRehashCtl::dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) { + queue.reserve(c_targetQueueSize); + __atomic_fetch_add(&d->refcount, 1, __ATOMIC_RELEASE); +} + dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) { - if (!dictIsRehashing(d)) return 0; + assert(d->type->asyncfree != nullptr); + if (!dictIsRehashing(d) || d->iterators != 0) return nullptr; d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata); @@ -454,7 +461,7 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) { } } - if (fUnlinked && !ctl->release) { + if (fUnlinked && !ctl->abondon) { if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash for (auto &wi : ctl->queue) { // We need to remove it from the source hash table, and store it in the dest. @@ -487,23 +494,10 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) { } if (fFree) { - while (ctl->deGCList != nullptr) { - auto next = ctl->deGCList->next; - dictFreeKey(d, ctl->deGCList); - dictFreeVal(d, ctl->deGCList); - zfree(ctl->deGCList); - ctl->deGCList = next; - } + d->type->asyncfree(ctl); - // Was the dictionary free'd while we were in flight? - if (ctl->release) { - if (d->asyncdata != nullptr) - d->asyncdata->release = true; - else - dictRelease(d); - } - - delete ctl; + // Remove our reference + dictRelease(d); } } @@ -514,6 +508,16 @@ long long timeInMilliseconds(void) { return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000); } +dictAsyncRehashCtl::~dictAsyncRehashCtl() { + while (deGCList != nullptr) { + auto next = deGCList->next; + dictFreeKey(dict, deGCList); + dictFreeVal(dict, deGCList); + zfree(deGCList); + deGCList = next; + } +} + /* Rehash in ms+"delta" milliseconds. The value of "delta" is larger * than 0, and is smaller than 1 in most cases. The exact upper bound * depends on the running time of dictRehash(d,100).*/ @@ -537,7 +541,7 @@ int dictRehashMilliseconds(dict *d, int ms) { * dictionary so that the hash table automatically migrates from H1 to H2 * while it is actively used. */ static void _dictRehashStep(dict *d) { - unsigned long iterators; + unsigned iterators; __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); if (iterators == 0) dictRehash(d,2); } @@ -766,13 +770,11 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) { /* Clear & Release the hash table */ void dictRelease(dict *d) { - if (d->asyncdata) { - d->asyncdata->release = true; - return; + if (__atomic_sub_fetch(&d->refcount, 1, __ATOMIC_ACQ_REL) == 0) { + _dictClear(d,&d->ht[0],NULL); + _dictClear(d,&d->ht[1],NULL); + zfree(d); } - _dictClear(d,&d->ht[0],NULL); - _dictClear(d,&d->ht[1],NULL); - zfree(d); } dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht, bool fShallowCompare) @@ -1460,7 +1462,7 @@ void dictGetStats(char *buf, size_t bufsize, dict *d) { void dictForceRehash(dict *d) { - unsigned long iterators; + unsigned iterators; __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); while (iterators == 0 && dictIsRehashing(d)) _dictRehashStep(d); } diff --git a/src/dict.h b/src/dict.h index 54519996b..ab57a7d7f 100644 --- a/src/dict.h +++ b/src/dict.h @@ -53,6 +53,7 @@ extern "C" { /* Unused arguments generate annoying warnings... */ #define DICT_NOTUSED(V) ((void) V) +struct dictAsyncRehashCtl; typedef struct dictEntry { void *key; @@ -72,6 +73,7 @@ typedef struct dictType { int (*keyCompare)(void *privdata, const void *key1, const void *key2); void (*keyDestructor)(void *privdata, void *key); void (*valDestructor)(void *privdata, void *obj); + void (*asyncfree)(dictAsyncRehashCtl *); } dictType; /* This is our hash table structure. Every dictionary has two of this as we @@ -98,13 +100,12 @@ struct dictAsyncRehashCtl { struct dict *dict = nullptr; std::vector queue; size_t hashIdx = 0; - bool release = false; dictAsyncRehashCtl *next = nullptr; std::atomic done { false }; + std::atomic abondon { false }; - dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) { - queue.reserve(c_targetQueueSize); - } + dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next); + ~dictAsyncRehashCtl(); }; #else struct dictAsyncRehashCtl; @@ -115,7 +116,8 @@ typedef struct dict { void *privdata; dictht ht[2]; long rehashidx; /* rehashing not in progress if rehashidx == -1 */ - unsigned long iterators; /* number of iterators currently running */ + unsigned iterators; /* number of iterators currently running */ + unsigned refcount; dictAsyncRehashCtl *asyncdata; } dict; diff --git a/src/server.cpp b/src/server.cpp index bba6d997e..96d62e4ea 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1359,6 +1359,8 @@ uint64_t dictEncObjHash(const void *key) { } } +void dictGCAsyncFree(dictAsyncRehashCtl *async); + /* Generic hash table type where keys are Redis Objects, Values * dummy pointers. */ dictType objectKeyPointerValueDictType = { @@ -1407,8 +1409,9 @@ dictType dbDictType = { NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ - dictDbKeyDestructor, /* key destructor */ - dictObjectDestructor /* val destructor */ + dictDbKeyDestructor, /* key destructor */ + dictObjectDestructor, /* val destructor */ + dictGCAsyncFree /* async free destructor */ }; /* db->pdict, keys are sds strings, vals are Redis objects. */ diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 2c08088e1..29fa4a663 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -26,6 +26,17 @@ public: std::vector vecde; }; +void discontinueAsyncRehash(dict *d) { + if (d->asyncdata != nullptr) { + auto adata = d->asyncdata; + while (adata != nullptr) { + adata->abondon = true; + adata = adata->next; + } + d->rehashidx = 0; + } +} + const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) { serverAssert(GlobalLocksAcquired()); @@ -67,14 +78,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 // We can't have async rehash modifying these. Setting the asyncdata list to null // will cause us to throw away the async work rather than modify the tables in flight - if (m_pdict->asyncdata != nullptr) { - m_pdict->asyncdata = nullptr; - m_pdict->rehashidx = 0; - } - if (m_pdictTombstone->asyncdata != nullptr) { - m_pdictTombstone->rehashidx = 0; - m_pdictTombstone->asyncdata = nullptr; - } + discontinueAsyncRehash(m_pdict); + discontinueAsyncRehash(m_pdictTombstone); spdb->m_fAllChanged = false; spdb->m_fTrackingChanges = 0; @@ -124,6 +129,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 m_pdbSnapshotASYNC = nullptr; } + std::atomic_thread_fence(std::memory_order_seq_cst); return m_pdbSnapshot; } @@ -318,6 +324,12 @@ bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) dictForceRehash(dictTombstoneNew); aeAcquireLock(); + if (m_pdbSnapshot->m_pdict->asyncdata != nullptr) { + // In this case we use the asyncdata to free us, not our own lazy free + for (auto de : splazy->vecde) + dictFreeUnlinkedEntry(m_pdbSnapshot->m_pdict, de); + splazy->vecde.clear(); + } dict *dT = m_pdbSnapshot->m_pdict; splazy->vecdictLazyFree.push_back(m_pdictTombstone); __atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE); @@ -402,9 +414,14 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn } // Delete the object from the source dict, we don't use dictDelete to avoid a second search - if (deSnapshot != nullptr) - splazy->vecde.push_back(deSnapshot); - *dePrev = deSnapshot->next; + *dePrev = deSnapshot->next; // Unlink it first + if (deSnapshot != nullptr) { + if (m_spdbSnapshotHOLDER->m_pdict->asyncdata != nullptr) { + dictFreeUnlinkedEntry(m_spdbSnapshotHOLDER->m_pdict, deSnapshot); + } else { + splazy->vecde.push_back(deSnapshot); + } + } ht->used--; } @@ -640,4 +657,18 @@ bool redisDbPersistentDataSnapshot::FStale() const // 0.5 seconds considered stale; static const uint64_t msStale = 500; return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale; +} + +void dictGCAsyncFree(dictAsyncRehashCtl *async) { + if (async->deGCList != nullptr && serverTL != nullptr && !serverTL->gcEpoch.isReset()) { + auto splazy = std::make_unique(); + auto *de = async->deGCList; + while (de != nullptr) { + splazy->vecde.push_back(de); + de = de->next; + } + async->deGCList = nullptr; + g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy)); + } + delete async; } \ No newline at end of file