From f3fb4e320997c8e182f4444af1fac8265202eea4 Mon Sep 17 00:00:00 2001 From: John Sully Date: Wed, 1 Sep 2021 04:15:28 +0000 Subject: [PATCH] Don't be in tracking mode during load as processChangesAsync works outside the normal system Former-commit-id: 8d31ce6eafea1cea2f9f4ea25e44306efef28fa3 --- src/db.cpp | 1 + src/rdb.cpp | 23 +++++++++++++++++++++++ src/server.h | 2 ++ 3 files changed, 26 insertions(+) diff --git a/src/db.cpp b/src/db.cpp index 0079598f2..c883bddba 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2902,6 +2902,7 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) { ++pendingJobs; + serverAssert(!m_fAllChanged); dictEmpty(m_dictChanged, nullptr); dict *dictNew = dictCreate(&dbDictType, nullptr); std::swap(dictNew, m_pdict); diff --git a/src/rdb.cpp b/src/rdb.cpp index ec4be50e0..4858f75f1 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2927,6 +2927,16 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { bool fLastKeyExpired = false; std::unique_ptr spjob; + // If we're tracking changes we need to reset this + bool fTracking = g_pserver->db[0]->FTrackingChanges(); + if (fTracking) { + // We don't want to track here because processChangesAsync is outside the normal scope handling + for (int idb = 0; idb < cserver.dbnum; ++idb) { + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); + } + } + rdb->update_cksum = rdbLoadProgressCallback; rdb->chksum_arg = &wqueue; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; @@ -3249,6 +3259,12 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } wqueue.endWork(); + if (fTracking) { + // Reset track changes + for (int idb = 0; idb < cserver.dbnum; ++idb) { + g_pserver->db[idb]->trackChanges(false); + } + } return C_OK; @@ -3257,6 +3273,13 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { * the RDB file from a socket during initial SYNC (diskless replica mode), * we'll report the error to the caller, so that we can retry. */ eoferr: + if (fTracking) { + // Reset track changes + for (int idb = 0; idb < cserver.dbnum; ++idb) { + g_pserver->db[idb]->trackChanges(false); + } + } + wqueue.endWork(); if (key != nullptr) { diff --git a/src/server.h b/src/server.h index d35f12c4a..197a7a7be 100644 --- a/src/server.h +++ b/src/server.h @@ -1154,6 +1154,7 @@ public: void setStorageProvider(StorageCache *pstorage); void trackChanges(bool fBulk, size_t sizeHint = 0); + bool FTrackingChanges() const { return !!m_fTrackingChanges; } // Process and commit changes for secondary storage. Note that process and commit are seperated // to allow you to release the global lock before commiting. To prevent deadlocks you *must* @@ -1338,6 +1339,7 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::prefetchKeysAsync; using redisDbPersistentData::prepOverwriteForSnapshot; using redisDbPersistentData::FRehashing; + using redisDbPersistentData::FTrackingChanges; public: expireset::setiter expireitr;