From d8dcb94e37b72ad8e9683cc45527a6f561af1d20 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 1 Feb 2020 22:28:24 -0500 Subject: [PATCH] endSnapshotAsync implementation (fails tests w/ multithreading) Former-commit-id: b401e9fcae40cf1d4e4c1584f57e760d9adf36a9 --- src/rdb.cpp | 8 ++---- src/server.h | 7 ++++- src/snapshot.cpp | 75 ++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 74 insertions(+), 16 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 6e8826df6..e9263f25f 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1464,10 +1464,8 @@ void *rdbSaveThread(void *vargs) sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB"); // If we were told to cancel the requesting thread holds the lock for us - aeAcquireLock(); for (int idb = 0; idb < cserver.dbnum; ++idb) - g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]); - aeReleaseLock(); + g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]); zfree(args); g_pserver->garbageCollector.endEpoch(vars.gcEpoch); @@ -2674,10 +2672,8 @@ void *rdbSaveToSlavesSocketsThread(void *vargs) close(g_pserver->rdb_pipe_write); /* wake up the reader, tell it we're done. */ // If we were told to cancel the requesting thread is holding the lock for us - aeAcquireLock(); for (int idb = 0; idb < cserver.dbnum; ++idb) - g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]); - aeReleaseLock(); + g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]); zfree(args); return (retval == C_OK) ? (void*)0 : (void*)1; diff --git a/src/server.h b/src/server.h index 1af8ef3f8..b6caaebad 100644 --- a/src/server.h +++ b/src/server.h @@ -1224,6 +1224,7 @@ class redisDbPersistentData { friend void dictDbKeyDestructor(void *privdata, void *key); friend class redisDbPersistentDataSnapshot; + public: ~redisDbPersistentData(); @@ -1311,6 +1312,7 @@ public: const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional); void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot); + void endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot); void restoreSnapshot(const redisDbPersistentDataSnapshot *psnapshot); void consolidate_snapshot(); @@ -1354,6 +1356,7 @@ private: // in a snapshot const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr; std::unique_ptr m_spdbSnapshotHOLDER; + const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr; int m_refCount = 0; fastlock m_lockStorage { "storage" }; }; @@ -1365,7 +1368,7 @@ protected: bool m_fConsolidated = false; static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot); int snapshot_depth() const; - void consolidate_children(redisDbPersistentData *pdbPrimary); + void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce); public: bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } @@ -1373,6 +1376,7 @@ public: bool iterate_threadsafe(std::function fn, bool fKeyOnly = false) const; using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; + using redisDbPersistentData::endSnapshotAsync; using redisDbPersistentData::end; dict_iter random_cache_threadsafe() const; @@ -2443,6 +2447,7 @@ extern dictType zsetDictType; extern dictType clusterNodesDictType; extern dictType clusterNodesBlackListDictType; extern dictType dbDictType; +extern dictType dbDictTypeTombstone; extern dictType dbSnapshotDictType; extern dictType shaScriptObjectDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 5e66850ec..64dc5d4a0 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -54,7 +54,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 } m_pdict = dictCreate(&dbDictType,this); - m_pdictTombstone = dictCreate(&dbDictType, this); + m_pdictTombstone = dictCreate(&dbDictTypeTombstone, this); serverAssert(spdb->m_pdict->iterators == 1); @@ -69,6 +69,13 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 pdbSnapshotNext = pdbSnapshotNext->m_spdbSnapshotHOLDER.get(); } + if (m_pdbSnapshotASYNC != nullptr) + { + // free the async snapshot, it's done its job + endSnapshot(m_pdbSnapshotASYNC); // should be just a dec ref (FAST) + m_pdbSnapshotASYNC = nullptr; + } + return m_pdbSnapshot; } @@ -127,11 +134,55 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot serverAssert(size() == expectedSize); } +// This function is all about minimizing the amount of work done under global lock +// when there has been lots of changes since snapshot creation a naive endSnapshot() +// will block for a very long time and will cause latency spikes. +// +// Note that this function uses a lot more CPU time than a simple endSnapshot(), we +// have some internal heuristics to do a synchronous endSnapshot if it makes sense +void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot) +{ + aeAcquireLock(); + if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint) + { + // Free a stale async snapshot so consolidate_children can clean it up later + endSnapshot(m_pdbSnapshotASYNC); // FAST: just a ref decrement + m_pdbSnapshotASYNC = nullptr; + } + + size_t elements = dictSize(m_pdictTombstone); + // if neither dict is rehashing then the merge is O(1) so don't count the size + if (dictIsRehashing(psnapshot->m_pdict) || dictIsRehashing(m_pdict)) + elements += dictSize(m_pdict); + if (elements < 1000000 || psnapshot != m_spdbSnapshotHOLDER.get()) // heuristic + { + // For small snapshots it makes more sense just to merge it directly + endSnapshot(psnapshot); + aeReleaseLock(); + return; + } + + // OK this is a big snapshot so lets do the merge work outside the lock + auto psnapshotT = createSnapshot(LLONG_MAX, false); + endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref + aeReleaseLock(); + + // do the expensive work of merging snapshots outside the ref + reinterpret_cast(this)->consolidate_children(this, true /* fForce */); + + // Final Cleanup + aeAcquireLock(); + if (m_pdbSnapshotASYNC == nullptr) + m_pdbSnapshotASYNC = psnapshotT; + else + endSnapshot(psnapshotT); // finally clena up our temp snapshot + aeReleaseLock(); +} + void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot) { - // Note: This function is dependent on GlobalLocksAcquried(), but rdb background saving has a weird case where - // a seperate thread holds the lock for it. Yes that's pretty crazy and should be fixed somehow... - + serverAssert(GlobalLocksAcquired()); + if (m_spdbSnapshotHOLDER.get() != psnapshot) { if (m_spdbSnapshotHOLDER == nullptr) @@ -215,7 +266,12 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn m_spdbSnapshotHOLDER->m_pdict->iterators--; } - m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER->m_spdbSnapshotHOLDER); + auto spsnapshotFree = std::move(m_spdbSnapshotHOLDER); + m_spdbSnapshotHOLDER = std::move(spsnapshotFree->m_spdbSnapshotHOLDER); + if (serverTL != nullptr) + g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(spsnapshotFree)); + + // Sanity Checks serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr); serverAssert(m_pdbSnapshot == m_spdbSnapshotHOLDER.get() || m_pdbSnapshot == nullptr); serverAssert((m_refCount == 0 && m_pdict->iterators == 0) || (m_refCount != 0 && m_pdict->iterators == 1)); @@ -340,14 +396,14 @@ void redisDbPersistentData::consolidate_snapshot() } psnapshot->m_refCount++; // ensure it's not free'd aeReleaseLock(); - psnapshot->consolidate_children(this); + psnapshot->consolidate_children(this, false /* fForce */); aeAcquireLock(); endSnapshot(psnapshot); aeReleaseLock(); } // only call this on the "real" database to consolidate the first child -void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary) +void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce) { static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time @@ -355,14 +411,15 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData * if (!lock.try_lock()) return; // this is a best effort function - if (snapshot_depth() < 4) + if (!fForce && snapshot_depth() < 4) return; + serverAssert(snapshot_depth() > 1); auto spdb = std::unique_ptr(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); spdb->initialize(); dictExpand(spdb->m_pdict, m_pdbSnapshot->size()); - m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){ + m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o) { if (o != nullptr) { dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast()); incrRefCount(o);