From cf4e74006fb6e38cab1e674c527ff6f7b316235a Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Aug 2020 00:13:19 +0000 Subject: [PATCH] Don't free snapshot objects in a critical path (under the AE lock) Former-commit-id: d0da3d3cb74334cc8a2d14f4bdaef7935181700a --- src/server.cpp | 44 ++++++++++++++++++++++++++++++++++++-------- src/server.h | 6 +++++- src/snapshot.cpp | 48 ++++++++++++++++++++++++++++++++++++------------ 3 files changed, 77 insertions(+), 21 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index f72047c31..b98a07cc4 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2040,6 +2040,29 @@ void flushStorageWeak() } } +void freeSnapshotLazyFreesAsync() +{ + aeAcquireLock(); + std::vector vecObjs = std::move(g_pserver->vecobjLazyFree); + std::vector vecDicts = std::move(g_pserver->vecdictLazyFree); + std::vector> vecvecde = std::move(g_pserver->vecvecde); + aeReleaseLock(); + + for (auto &vecdeFree : vecvecde) + { + for (auto *de : vecdeFree) + { + dbDictType.keyDestructor(nullptr, dictGetKey(de)); + dbDictType.valDestructor(nullptr, dictGetVal(de)); + zfree(de); + } + } + for (robj *o : vecObjs) + decrRefCount(o); + for (dict *d : vecDicts) + dictRelease(d); +} + /* This is our timer interrupt, called g_pserver->hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: @@ -2313,14 +2336,18 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } - bool fAnySnapshots = false; - for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb) - fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot(); - if (fAnySnapshots) - { - g_pserver->asyncworkqueue->AddWorkFunction([]{ - g_pserver->db[0]->consolidate_snapshot(); - }, true /*HiPri*/); + run_with_period(100) { + bool fAsyncFrees = g_pserver->vecobjLazyFree.size() || g_pserver->vecdictLazyFree.size() || g_pserver->vecvecde.size(); + bool fAnySnapshots = false; + for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots && !fAsyncFrees; ++idb) + fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot(); + if (fAnySnapshots || fAsyncFrees) + { + g_pserver->asyncworkqueue->AddWorkFunction([fAsyncFrees]{ + g_pserver->db[0]->consolidate_snapshot(); + freeSnapshotLazyFreesAsync(); + }, true /*HiPri*/); + } } /* Fire the cron loop modules event. */ @@ -6049,6 +6076,7 @@ int main(int argc, char **argv) { serverAssert(fLockAcquired); g_pserver->garbageCollector.shutdown(); + freeSnapshotLazyFreesAsync(); delete g_pserver->m_pstorageFactory; return 0; diff --git a/src/server.h b/src/server.h index ce521b2b6..a84368044 100644 --- a/src/server.h +++ b/src/server.h @@ -1425,7 +1425,7 @@ protected: static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot); int snapshot_depth() const; void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce); - void freeTombstoneObjects(int depth); + bool freeTombstoneObjects(int depth); public: bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } @@ -2452,6 +2452,10 @@ struct redisServer { char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */ char *bgsave_cpulist; /* cpu affinity list of bgsave process. */ + std::vector vecdictLazyFree; + std::vector vecobjLazyFree; + std::vector> vecvecde; + bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; } }; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 1eb8547ba..a0187f1c5 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -233,17 +233,23 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency); } -void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) +bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) { if (m_pdbSnapshot == nullptr) - return; + { + serverAssert(dictSize(m_pdictTombstone) == 0); + return true; + } - const_cast(m_pdbSnapshot)->freeTombstoneObjects(depth+1); + bool fPrevResult = const_cast(m_pdbSnapshot)->freeTombstoneObjects(depth+1); if (m_pdbSnapshot->m_refCount != depth && (m_pdbSnapshot->m_refCount != (m_refCount+1))) - return; + return false; dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; + std::vector vecdeFree; + vecdeFree.reserve(dictSize(m_pdictTombstone)); + bool fAllCovered = true; while ((de = dictNext(di)) != nullptr) { dictEntry **dePrev = nullptr; @@ -252,12 +258,28 @@ void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared(key)); if (deObj != nullptr) { - decrRefCount((robj*)dictGetVal(deObj)); - void *ptrSet = nullptr; - __atomic_store(&deObj->v.val, &ptrSet, __ATOMIC_RELAXED); + // Now unlink the DE + __atomic_store(dePrev, &deObj->next, __ATOMIC_RELEASE); + ht->used--; + vecdeFree.push_back(deObj); + } + else + { + fAllCovered = fPrevResult; } } dictReleaseIterator(di); + + aeAcquireLock(); + if (fAllCovered) + { + g_pserver->vecdictLazyFree.push_back(m_pdictTombstone); + m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr); + } + g_pserver->vecvecde.emplace_back(std::move(vecdeFree)); + aeReleaseLock(); + + return fAllCovered; } void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot) @@ -308,6 +330,8 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn // Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; + m_spdbSnapshotHOLDER->m_pdict->iterators++; + std::vector vecde; while ((de = dictNext(di)) != NULL) { dictEntry **dePrev; @@ -327,15 +351,15 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn } // Delete the object from the source dict, we don't use dictDelete to avoid a second search - dictFreeKey(m_spdbSnapshotHOLDER->m_pdict, deSnapshot); - dictFreeVal(m_spdbSnapshotHOLDER->m_pdict, deSnapshot); - serverAssert(*dePrev == deSnapshot); + vecde.push_back(deSnapshot); *dePrev = deSnapshot->next; - zfree(deSnapshot); ht->used--; } + g_pserver->vecvecde.emplace_back(std::move(vecde)); + m_spdbSnapshotHOLDER->m_pdict->iterators--; dictReleaseIterator(di); - dictEmpty(m_pdictTombstone, nullptr); + g_pserver->vecdictLazyFree.push_back(m_pdictTombstone); + m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr); // Stage 2 Move all new keys to the snapshot DB dictMerge(m_spdbSnapshotHOLDER->m_pdict, m_pdict);