diff --git a/src/dict.cpp b/src/dict.cpp index 86a29db4a..2c254b313 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -200,6 +200,8 @@ int dictMerge(dict *dst, dict *src) std::swap(dst->iterators, src->iterators); } + src->rehashidx = -1; + if (!dictIsRehashing(dst) && !dictIsRehashing(src)) { if (dst->ht[0].size >= src->ht[0].size) diff --git a/src/server.cpp b/src/server.cpp index 94477a886..2d47cab05 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2040,29 +2040,6 @@ void flushStorageWeak() } } -void freeSnapshotLazyFreesAsync() -{ - aeAcquireLock(); - std::vector vecObjs = std::move(g_pserver->vecobjLazyFree); - std::vector vecDicts = std::move(g_pserver->vecdictLazyFree); - std::vector> vecvecde = std::move(g_pserver->vecvecde); - aeReleaseLock(); - - for (auto &vecdeFree : vecvecde) - { - for (auto *de : vecdeFree) - { - dbDictType.keyDestructor(nullptr, dictGetKey(de)); - dbDictType.valDestructor(nullptr, dictGetVal(de)); - zfree(de); - } - } - for (robj *o : vecObjs) - decrRefCount(o); - for (dict *d : vecDicts) - dictRelease(d); -} - /* This is our timer interrupt, called g_pserver->hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: @@ -2243,11 +2220,22 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { CONFIG_BGSAVE_RETRY_DELAY || g_pserver->lastbgsave_status == C_OK)) { - serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", - sp->changes, (int)sp->seconds); - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSaveBackground(rsiptr); + // Ensure rehashing is complete + bool fRehashInProgress = false; + if (g_pserver->activerehashing) { + for (int idb = 0; idb < cserver.dbnum && !fRehashInProgress; ++idb) { + if (g_pserver->db[idb]->FRehashing()) + fRehashInProgress = true; + } + } + + if (!fRehashInProgress) { + serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", + sp->changes, (int)sp->seconds); + rdbSaveInfo rsi, *rsiptr; + rsiptr = rdbPopulateSaveInfo(&rsi); + rdbSaveBackground(rsiptr); + } break; } } @@ -2337,15 +2325,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } run_with_period(100) { - bool fAsyncFrees = g_pserver->vecobjLazyFree.size() || g_pserver->vecdictLazyFree.size() || g_pserver->vecvecde.size(); bool fAnySnapshots = false; - for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots && !fAsyncFrees; ++idb) + for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb) fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot(); - if (fAnySnapshots || fAsyncFrees) + if (fAnySnapshots) { - g_pserver->asyncworkqueue->AddWorkFunction([fAsyncFrees]{ + g_pserver->asyncworkqueue->AddWorkFunction([]{ g_pserver->db[0]->consolidate_snapshot(); - freeSnapshotLazyFreesAsync(); }, true /*HiPri*/); } } @@ -6084,7 +6070,6 @@ int main(int argc, char **argv) { serverAssert(fLockAcquired); g_pserver->garbageCollector.shutdown(); - freeSnapshotLazyFreesAsync(); delete g_pserver->m_pstorageFactory; return 0; diff --git a/src/server.h b/src/server.h index 07f7a4010..193004b52 100644 --- a/src/server.h +++ b/src/server.h @@ -1420,6 +1420,9 @@ private: class redisDbPersistentDataSnapshot : protected redisDbPersistentData { friend class redisDbPersistentData; +private: + bool iterate_threadsafe_core(std::function &fn, bool fKeyOnly, bool fCacheOnly, bool fTop) const; + protected: static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot); void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce); @@ -2503,10 +2506,6 @@ struct redisServer { char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */ char *bgsave_cpulist; /* cpu affinity list of bgsave process. */ - std::vector vecdictLazyFree; - std::vector vecobjLazyFree; - std::vector> vecvecde; - bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; } }; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index c0ca378b1..0f84e483e 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -2,6 +2,29 @@ #include "aelocker.h" static const size_t c_elementsSmallLimit = 500000; +static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time + +class LazyFree : public ICollectable +{ +public: + virtual ~LazyFree() + { + for (auto *de : vecde) + { + dbDictType.keyDestructor(nullptr, dictGetKey(de)); + dbDictType.valDestructor(nullptr, dictGetVal(de)); + zfree(de); + } + for (robj *o : vecobjLazyFree) + decrRefCount(o); + for (dict *d : vecdictLazyFree) + dictRelease(d); + } + + std::vector vecdictLazyFree; + std::vector vecobjLazyFree; + std::vector vecde; +}; const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) { @@ -189,7 +212,18 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot) { mstime_t latency; - aeAcquireLock(); latencyStartMonitor(latency); + + aeAcquireLock(); + while (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) { + dictRehashMilliseconds(m_pdict, 1); + dictRehashMilliseconds(m_pdictTombstone, 1); + // Give someone else a chance + aeReleaseLock(); + usleep(300); + aeAcquireLock(); + } + + latencyStartMonitor(latency); if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint) { // Free a stale async snapshot so consolidate_children can clean it up later @@ -215,11 +249,22 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot auto psnapshotT = createSnapshot(LLONG_MAX, false); endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref psnapshot = nullptr; - aeReleaseLock(); latencyEndMonitor(latency); + + latencyEndMonitor(latency); latencyAddSampleIfNeeded("end-snapshot-async-phase-1", latency); + aeReleaseLock(); // do the expensive work of merging snapshots outside the ref - const_cast(psnapshotT)->freeTombstoneObjects(1); // depth is one because we just creted it + if (const_cast(psnapshotT)->freeTombstoneObjects(1)) // depth is one because we just creted it + { + aeAcquireLock(); + if (m_pdbSnapshotASYNC != nullptr) + endSnapshot(m_pdbSnapshotASYNC); + m_pdbSnapshotASYNC = nullptr; + endSnapshot(psnapshotT); + aeReleaseLock(); + return; + } const_cast(psnapshotT)->consolidate_children(this, true); // Final Cleanup @@ -228,9 +273,10 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot m_pdbSnapshotASYNC = psnapshotT; else endSnapshot(psnapshotT); // finally clean up our temp snapshot - aeReleaseLock(); latencyEndMonitor(latency); - + + latencyEndMonitor(latency); latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency); + aeReleaseLock(); } bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) @@ -241,45 +287,66 @@ bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) return true; } - bool fPrevResult = const_cast(m_pdbSnapshot)->freeTombstoneObjects(depth+1); + if (!const_cast(m_pdbSnapshot)->freeTombstoneObjects(depth+1)) + return false; + + { + AeLocker ae; + ae.arm(nullptr); if (m_pdbSnapshot->m_refCount != depth && (m_pdbSnapshot->m_refCount != (m_refCount+1))) return false; + ae.disarm(); + } + + std::unique_lock lock(s_lock, std::defer_lock); + if (!lock.try_lock()) + return false; // this is a best effort function + std::unique_ptr splazy = std::make_unique(); + + dict *dictTombstoneNew = dictCreate(&dbTombstoneDictType, nullptr); dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; std::vector vecdeFree; vecdeFree.reserve(dictSize(m_pdictTombstone)); - bool fAllCovered = true; + unsigned rgcremoved[2] = {0}; while ((de = dictNext(di)) != nullptr) { dictEntry **dePrev = nullptr; dictht *ht = nullptr; sds key = (sds)dictGetKey(de); - dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared(key)); + // BUG BUG: Why can't we do a shallow search here? + dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, false); + if (deObj != nullptr) { // Now unlink the DE __atomic_store(dePrev, &deObj->next, __ATOMIC_RELEASE); - ht->used--; - vecdeFree.push_back(deObj); - } - else - { - fAllCovered = fPrevResult; + if (ht == &m_pdbSnapshot->m_pdict->ht[0]) + rgcremoved[0]++; + else + rgcremoved[1]++; + splazy->vecde.push_back(deObj); + } else { + serverAssert(dictFind(m_pdbSnapshot->m_pdict, key) == nullptr); + serverAssert(m_pdbSnapshot->find_cached_threadsafe(key) != nullptr); + dictAdd(dictTombstoneNew, sdsdupshared((sds)dictGetKey(de)), dictGetVal(de)); } } dictReleaseIterator(di); + dictForceRehash(dictTombstoneNew); aeAcquireLock(); - if (fAllCovered) - { - g_pserver->vecdictLazyFree.push_back(m_pdictTombstone); - m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr); - } - g_pserver->vecvecde.emplace_back(std::move(vecdeFree)); + dict *dT = m_pdbSnapshot->m_pdict; + splazy->vecdictLazyFree.push_back(m_pdictTombstone); + __atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE); + __atomic_fetch_sub(&dT->ht[0].used, rgcremoved[0], __ATOMIC_RELEASE); + __atomic_fetch_sub(&dT->ht[1].used, rgcremoved[1], __ATOMIC_RELEASE); + serverLog(LL_WARNING, "tombstones removed: %u, remain: %lu", rgcremoved[0]+rgcremoved[1], dictSize(m_pdictTombstone)); + g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy)); aeReleaseLock(); - return fAllCovered; + return true; } void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot) @@ -331,16 +398,19 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; m_spdbSnapshotHOLDER->m_pdict->iterators++; - std::vector vecde; + auto splazy = std::make_unique(); while ((de = dictNext(di)) != NULL) { dictEntry **dePrev; dictht *ht; - dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared((sds)dictGetKey(de))); + // BUG BUG Why not a shallow search? + dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, false /*!!sdsisshared((sds)dictGetKey(de))*/); if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot) { // The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt) +#ifdef CHECKED_BUILD serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr); +#endif dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr); continue; } @@ -351,14 +421,15 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn } // Delete the object from the source dict, we don't use dictDelete to avoid a second search - vecde.push_back(deSnapshot); + splazy->vecde.push_back(deSnapshot); *dePrev = deSnapshot->next; ht->used--; } - g_pserver->vecvecde.emplace_back(std::move(vecde)); + + m_spdbSnapshotHOLDER->m_pdict->iterators--; dictReleaseIterator(di); - g_pserver->vecdictLazyFree.push_back(m_pdictTombstone); + splazy->vecdictLazyFree.push_back(m_pdictTombstone); m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr); // Stage 2 Move all new keys to the snapshot DB @@ -388,8 +459,10 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn auto spsnapshotFree = std::move(m_spdbSnapshotHOLDER); m_spdbSnapshotHOLDER = std::move(spsnapshotFree->m_spdbSnapshotHOLDER); - if (serverTL != nullptr) + if (serverTL != nullptr) { g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(spsnapshotFree)); + g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy)); + } // Sanity Checks serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr); @@ -426,8 +499,10 @@ dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOn dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const { + dict *dictTombstone; + __atomic_load(&m_pdictTombstone, &dictTombstone, __ATOMIC_ACQUIRE); dictEntry *de = dictFind(m_pdict, key); - if (de == nullptr && m_pdbSnapshot != nullptr && dictFind(m_pdictTombstone, key) == nullptr) + if (de == nullptr && m_pdbSnapshot != nullptr && dictFind(dictTombstone, key) == nullptr) { auto itr = m_pdbSnapshot->find_cached_threadsafe(key); if (itr != nullptr) @@ -493,11 +568,20 @@ unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long itera } bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn, bool fKeyOnly, bool fCacheOnly) const +{ + return iterate_threadsafe_core(fn, fKeyOnly, fCacheOnly, true); +} + +bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function &fn, bool fKeyOnly, bool fCacheOnly, bool fFirst) const { // Take the size so we can ensure we visited every element exactly once // use volatile to ensure it's not checked too late. This makes it more // likely we'll detect races (but it won't gurantee it) + aeAcquireLock(); + dict *dictTombstone; + __atomic_load(&m_pdictTombstone, &dictTombstone, __ATOMIC_ACQUIRE); volatile ssize_t celem = (ssize_t)size(); + aeReleaseLock(); dictEntry *de = nullptr; bool fResult = true; @@ -543,19 +627,22 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functioniterate_threadsafe([this, &fn, &celem](const char *key, robj_roptr o) { - dictEntry *deTombstone = dictFind(m_pdictTombstone, key); + std::function fnNew = [this, &fn, &celem, dictTombstone](const char *key, robj_roptr o) { + dictEntry *deTombstone = dictFind(dictTombstone, key); if (deTombstone != nullptr) return true; // Alright it's a key in the use keyspace, lets ensure it and then pass it off --celem; return fn(key, o); - }, fKeyOnly, fCacheOnly); + }; + fResult = psnapshot->iterate_threadsafe_core(fnNew, fKeyOnly, fCacheOnly, false); } // we should have hit all keys or had a good reason not to - serverAssert(!fResult || celem == 0 || (m_spstorage && fCacheOnly)); + if (!(!fResult || celem == 0 || (m_spstorage && fCacheOnly))) + serverLog(LL_WARNING, "celem: %ld", celem); + serverAssert(!fResult || celem == 0 || (m_spstorage && fCacheOnly) || !fFirst); return fResult; } @@ -588,8 +675,6 @@ void redisDbPersistentData::consolidate_snapshot() // only call this on the "real" database to consolidate the first child void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce) { - static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time - std::unique_lock lock(s_lock, std::defer_lock); if (!lock.try_lock()) return; // this is a best effort function