From aed3d33499bf7e5ce19d7c3b0163e95cdcbc2353 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 24 Mar 2020 00:21:12 -0400 Subject: [PATCH] Prevent issue where count can be out of sync temporarily, causing crashes where we expect the count to be perfect Former-commit-id: 77c9f36413c6f0cbb0b13a7ec746746c97faadcd --- src/aof.cpp | 4 ++-- src/db.cpp | 18 ++++++------------ src/rdb.cpp | 4 ++-- src/server.cpp | 17 ++++++++--------- src/server.h | 19 +++++++++---------- 5 files changed, 27 insertions(+), 35 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index b9e70734e..cb01f3978 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -899,8 +899,8 @@ int loadAppendOnlyFile(char *filename) { loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ for (int idb = 0; idb < cserver.dbnum; ++idb) { - auto vec = g_pserver->db[idb]->processChanges(); - g_pserver->db[idb]->commitChanges(vec); + g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->commitChanges(); } fclose(fp); freeFakeClient(fakeClient); diff --git a/src/db.cpp b/src/db.cpp index 9adf8cd11..bf256edc4 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2257,13 +2257,12 @@ void redisDbPersistentData::storeDatabase() dictReleaseIterator(di); } -redisDbPersistentData::changelist redisDbPersistentData::processChanges() +void redisDbPersistentData::processChanges() { serverAssert(GlobalLocksAcquired()); --m_fTrackingChanges; serverAssert(m_fTrackingChanges >= 0); - changelist vecRet; if (m_spstorage != nullptr) { @@ -2285,23 +2284,18 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() continue; robj *o = (robj*)dictGetVal(de); sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); - vecRet.emplace_back(std::move(change), unique_sds_ptr(temp)); + m_spstorage->insert(change.strkey.get(), sdslen(change.strkey.get()), temp, sdslen(temp), change.fUpdate); + sdsfree(temp); } } m_setchanged.clear(); m_cnewKeysPending = 0; } } - - return vecRet; } -void redisDbPersistentData::commitChanges(const changelist &vec) +void redisDbPersistentData::commitChanges() { - for (auto &pair : vec) - { - m_spstorage->insert(pair.first.strkey.get(), sdslen(pair.first.strkey.get()), pair.second.get(), sdslen(pair.second.get()), pair.first.fUpdate); - } if (m_spstorage != nullptr) m_spstorage->endWriteBatch(); } @@ -2379,8 +2373,8 @@ void redisDbPersistentData::removeAllCachedValues() // First we have to flush the tracked changes if (m_fTrackingChanges) { - auto vec = processChanges(); - commitChanges(vec); + processChanges(); + commitChanges(); trackChanges(false); } diff --git a/src/rdb.cpp b/src/rdb.cpp index e350adf15..05180c7ed 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2534,8 +2534,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { for (int idb = 0; idb < cserver.dbnum; ++idb) { - auto vec = g_pserver->db[idb]->processChanges(); - g_pserver->db[idb]->commitChanges(vec); + g_pserver->db[idb]->processChanges(); + g_pserver->db[idb]->commitChanges(); } return C_OK; diff --git a/src/server.cpp b/src/server.cpp index bb5f47a80..dbcb4f542 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2339,21 +2339,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) { static thread_local bool fFirstRun = true; // note: we also copy the DB pointer in case a DB swap is done while the lock is released - std::vector> vecchanges; + std::vector vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released if (!fFirstRun) { - for (int idb = 0; idb < cserver.dbnum; ++idb) - { - auto vec = g_pserver->db[idb]->processChanges(); - vecchanges.emplace_back(g_pserver->db[idb], std::move(vec)); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + vecdb.push_back(g_pserver->db[idb]); + g_pserver->db[idb]->processChanges(); } - } - else { + } else { fFirstRun = false; } aeReleaseLock(); - for (auto &pair : vecchanges) - pair.first->commitChanges(pair.second); + for (redisDb *db : vecdb) + db->commitChanges(); + handleClientsWithPendingWrites(iel); if (serverTL->gcEpoch != 0) diff --git a/src/server.h b/src/server.h index 7fdee9aac..7f1953098 100644 --- a/src/server.h +++ b/src/server.h @@ -1293,16 +1293,8 @@ public: // to allow you to release the global lock before commiting. To prevent deadlocks you *must* // either release the global lock or keep the same global lock between the two functions as // a second look is kept to ensure writes to secondary storage are ordered - struct changedesc - { - sdsimmutablestring strkey; - bool fUpdate; - - changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {} - }; - typedef std::vector> changelist; - changelist processChanges(); - void commitChanges(const changelist &vec); + void processChanges(); + void commitChanges(); // This should only be used if you look at the key, we do not fixup // objects stored elsewhere @@ -1326,6 +1318,13 @@ protected: uint64_t m_mvccCheckpoint = 0; private: + struct changedesc + { + sdsimmutablestring strkey; + bool fUpdate; + + changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {} + }; struct changedescCmp { using is_transparent = void; // C++14 to allow comparisons with different types