From 76698beeafc45684462e056e86a556b2bcbb3f78 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 2 Mar 2021 04:16:20 +0000 Subject: [PATCH] Drastically improve perf when loading an RDB with a storage provider Former-commit-id: 0133b42d54676e8fac2c5cb006cc87988dced268 --- src/db.cpp | 77 ++++++++++++++++++++++++++++++++++++++-------------- src/server.h | 12 ++------ 2 files changed, 60 insertions(+), 29 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 9d05e8082..66c19552b 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -49,10 +49,25 @@ struct dbBackup { int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key); int expireIfNeeded(redisDb *db, robj *key, robj *o); void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add); +void changedescDtor(void *privdata, void *obj); std::unique_ptr deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset); sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o); +dictType dictChangeDescType { + dictSdsHash, /* hash function */ + NULL, /* key dup */ + NULL, /* val dup */ + dictSdsKeyCompare, /* key compare */ + nullptr, /* key destructor */ + changedescDtor /* val destructor */ +}; + +void changedescDtor(void *, void *obj) { + redisDbPersistentData::changedesc *desc = (redisDbPersistentData::changedesc*)obj; + delete desc; +} + /* Update LFU when an object is accessed. * Firstly, decrement the counter if the decrement time is reached. * Then logarithmically increment the counter, and update the access time. */ @@ -419,12 +434,13 @@ bool redisDbPersistentData::syncDelete(robj *key) fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted; if (fDeleted) { - auto itrChange = m_setchanged.find(szFromObj(key)); - if (itrChange != m_setchanged.end()) + dictEntry *de = dictUnlink(m_dictChanged, szFromObj(key)); + if (de != nullptr) { - if (!itrChange->fUpdate) + changedesc *desc = (changedesc*)dictGetVal(de); + if (!desc->fUpdate) --m_cnewKeysPending; - m_setchanged.erase(itrChange); + dictFreeUnlinkedEntry(m_dictChanged, de); } if (m_pdbSnapshot != nullptr) @@ -2505,7 +2521,7 @@ void redisDbPersistentData::clear(void(callback)(void*)) dictEmpty(m_pdict,callback); if (m_fTrackingChanges) { - m_setchanged.clear(); + dictEmpty(m_dictChanged, nullptr); m_cnewKeysPending = 0; m_fAllChanged++; } @@ -2708,17 +2724,20 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) if (m_spstorage != nullptr) { - if (!m_fAllChanged && m_setchanged.empty() && m_cnewKeysPending == 0) + if (!m_fAllChanged && dictSize(m_dictChanged) == 0 && m_cnewKeysPending == 0) return false; m_spstorage->beginWriteBatch(); serverAssert(m_pdbSnapshotStorageFlush == nullptr); - if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100) + if (fSnapshot && !m_fAllChanged && dictSize(m_dictChanged) > 100) { // Do a snapshot based process if possible m_pdbSnapshotStorageFlush = createSnapshot(getMvccTstamp(), true /* optional */); if (m_pdbSnapshotStorageFlush) { - m_setchangedStorageFlush = std::move(m_setchanged); + if (m_dictChangedStorageFlush) + dictRelease(m_dictChangedStorageFlush); + m_dictChangedStorageFlush = m_dictChanged; + m_dictChanged = dictCreate(&dictChangeDescType, nullptr); } } @@ -2732,13 +2751,17 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) } else { - for (auto &change : m_setchanged) + dictIterator *di = dictGetIterator(m_dictChanged); + dictEntry *de; + while ((de = dictNext(di)) != nullptr) { - serializeAndStoreChange(m_spstorage.get(), this, change); + changedesc *change = (changedesc*)dictGetVal(de); + serializeAndStoreChange(m_spstorage.get(), this, *change); } + dictReleaseIterator(di); } } - m_setchanged.clear(); + dictEmpty(m_dictChanged, nullptr); m_cnewKeysPending = 0; } return (m_spstorage != nullptr); @@ -2748,12 +2771,16 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot ** { if (m_pdbSnapshotStorageFlush) { - - for (auto &change : m_setchangedStorageFlush) + dictIterator *di = dictGetIterator(m_dictChangedStorageFlush); + dictEntry *de; + while ((de = dictNext(di)) != nullptr) { - serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, change); + changedesc *change = (changedesc*)dictGetVal(de); + serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, *change); } - m_setchangedStorageFlush.clear(); + dictReleaseIterator(di); + dictRelease(m_dictChangedStorageFlush); + m_dictChangedStorageFlush = nullptr; *psnapshotFree = m_pdbSnapshotStorageFlush; m_pdbSnapshotStorageFlush = nullptr; } @@ -2773,6 +2800,12 @@ redisDbPersistentData::~redisDbPersistentData() dictRelease(m_pdict); if (m_pdictTombstone) dictRelease(m_pdictTombstone); + + if (m_dictChanged) + dictRelease(m_dictChanged); + if (m_dictChangedStorageFlush) + dictRelease(m_dictChangedStorageFlush); + delete m_setexpire; } @@ -2815,8 +2848,8 @@ bool redisDbPersistentData::removeCachedValue(const char *key) if (m_spstorage != nullptr) m_spstorage->batch_lock(); - auto itr = m_setchanged.find(key); - if (itr != m_setchanged.end()) + dictEntry *de = dictFind(m_dictChanged, key); + if (de != nullptr) { if (m_spstorage != nullptr) m_spstorage->batch_unlock(); @@ -2837,6 +2870,9 @@ void redisDbPersistentData::trackChanges(bool fBulk) m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed); if (fBulk) m_fAllChanged.fetch_add(1, std::memory_order_acq_rel); + + if (m_dictChanged == nullptr) + m_dictChanged = dictCreate(&dictChangeDescType, nullptr); } void redisDbPersistentData::removeAllCachedValues() @@ -2855,9 +2891,10 @@ void redisDbPersistentData::removeAllCachedValues() void redisDbPersistentData::trackkey(const char *key, bool fUpdate) { if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) { - auto itr = m_setchanged.find(key); - if (itr == m_setchanged.end()) { - m_setchanged.emplace(sdsdupshared(key), fUpdate); + dictEntry *de = dictFind(m_dictChanged, key); + if (de == nullptr) { + changedesc *desc = new changedesc(sdsdupshared(key), fUpdate); + dictAdd(m_dictChanged, (void*)desc->strkey.get(), desc); if (!fUpdate) ++m_cnewKeysPending; } diff --git a/src/server.h b/src/server.h index 8e29ac196..2c962cb60 100644 --- a/src/server.h +++ b/src/server.h @@ -1042,6 +1042,7 @@ class redisDbPersistentDataSnapshot; class redisDbPersistentData { friend void dictDbKeyDestructor(void *privdata, void *key); + friend void changedescDtor(void*, void*); friend class redisDbPersistentDataSnapshot; public: @@ -1153,13 +1154,6 @@ private: changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {} }; - struct changedescCmp - { - using is_transparent = void; // C++14 to allow comparisons with different types - bool operator()(const changedesc &a, const changedesc &b) const { return a.strkey < b.strkey; } - bool operator()(const changedesc &a, const char *key) const { return a.strkey < sdsview(key); } - bool operator()(const char *key, const changedesc &b) const { return sdsview(key) < b.strkey; } - }; static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const changedesc &change); @@ -1174,7 +1168,7 @@ private: dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */ std::atomic m_fTrackingChanges {0}; // Note: Stack based std::atomic m_fAllChanged {0}; - std::set m_setchanged; + dict *m_dictChanged = nullptr; size_t m_cnewKeysPending = 0; std::shared_ptr m_spstorage = nullptr; @@ -1189,7 +1183,7 @@ private: const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr; const redisDbPersistentDataSnapshot *m_pdbSnapshotStorageFlush = nullptr; - std::set m_setchangedStorageFlush; + dict *m_dictChangedStorageFlush = nullptr; int m_refCount = 0; };