From a8d177e75811550103ef90c3082f13f8d39d9a85 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 6 May 2021 00:42:49 +0000 Subject: [PATCH] Pause execution during rdbLoadProgressCallback as its too risky to let it run Former-commit-id: e70c01cb3e756d1e02ed190b76c73b7b7010c0d3 --- src/rdb.cpp | 19 +++++++++++++++---- src/storage/rocksdbfactory.cpp | 22 +++++++++++----------- 2 files changed, 26 insertions(+), 15 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 8c03fa60b..7198124c3 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2361,6 +2361,7 @@ class rdbAsyncWorkThread std::mutex mutex; std::condition_variable cv; std::condition_variable cvThrottle; + fastlock m_lockPause { "rdbAsyncWork-Pause"}; bool fLaunched = false; bool fExit = false; std::atomic ckeysLoaded; @@ -2409,6 +2410,14 @@ public: cv.notify_one(); } + void pauseExecution() { + m_lockPause.lock(); + } + + void resumeExecution() { + m_lockPause.unlock(); + } + void enqueue(std::function &&fn) { std::unique_lock l(mutex); bool fNotify = queuefn.empty(); @@ -2459,6 +2468,7 @@ public: vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); while (listLength(listJobs)) { + std::unique_lock ulPause(pqueue->m_lockPause); rdbInsertJob &job = *((rdbInsertJob*)listNodeValue(listFirst(listJobs))); redisObjectStack keyobj; @@ -2547,6 +2557,7 @@ public: listRelease(listJobs); for (auto &fn : queuefn) { + std::unique_lock ulPause(pqueue->m_lockPause); fn(); } @@ -2573,8 +2584,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { (r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys))) { rdbAsyncWorkThread *pwthread = reinterpret_cast(r->chksum_arg); - if (pwthread && g_pserver->fActiveReplica) - pwthread->endWork(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing + if (pwthread) + pwthread->pauseExecution(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing listIter li; listNode *ln; listRewind(g_pserver->masters, &li); @@ -2593,8 +2604,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1); decrRefCount(ping_argv[0]); - if (pwthread && g_pserver->fActiveReplica) - pwthread->start(); + if (pwthread) + pwthread->resumeExecution(); r->keys_since_last_callback = 0; } diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 2a781dccc..1350c4fd1 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -53,17 +53,6 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons options.create_missing_column_families = true; rocksdb::DB *db = nullptr; - if (rgchConfig != nullptr) - { - std::string options_string(rgchConfig, cchConfig); - rocksdb::Status status; - if (!(status = rocksdb::GetDBOptionsFromString(options, options_string, &options)).ok()) - { - fprintf(stderr, "Failed to parse FLASH options: %s\r\n", status.ToString().c_str()); - exit(EXIT_FAILURE); - } - } - options.max_background_compactions = 4; options.max_background_flushes = 2; options.bytes_per_sync = 1048576; @@ -90,6 +79,17 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons cf_options.level_compaction_dynamic_level_bytes = true; veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(std::to_string(idb), cf_options)); } + + if (rgchConfig != nullptr) + { + std::string options_string(rgchConfig, cchConfig); + rocksdb::Status status; + if (!(status = rocksdb::GetDBOptionsFromString(options, options_string, &options)).ok()) + { + fprintf(stderr, "Failed to parse FLASH options: %s\r\n", status.ToString().c_str()); + exit(EXIT_FAILURE); + } + } std::vector handles; status = rocksdb::DB::Open(options, dbfile, veccoldesc, &handles, &db);