diff --git a/src/rdb.cpp b/src/rdb.cpp index 8c03fa60b..7198124c3 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2361,6 +2361,7 @@ class rdbAsyncWorkThread std::mutex mutex; std::condition_variable cv; std::condition_variable cvThrottle; + fastlock m_lockPause { "rdbAsyncWork-Pause"}; bool fLaunched = false; bool fExit = false; std::atomic ckeysLoaded; @@ -2409,6 +2410,14 @@ public: cv.notify_one(); } + void pauseExecution() { + m_lockPause.lock(); + } + + void resumeExecution() { + m_lockPause.unlock(); + } + void enqueue(std::function &&fn) { std::unique_lock l(mutex); bool fNotify = queuefn.empty(); @@ -2459,6 +2468,7 @@ public: vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); while (listLength(listJobs)) { + std::unique_lock ulPause(pqueue->m_lockPause); rdbInsertJob &job = *((rdbInsertJob*)listNodeValue(listFirst(listJobs))); redisObjectStack keyobj; @@ -2547,6 +2557,7 @@ public: listRelease(listJobs); for (auto &fn : queuefn) { + std::unique_lock ulPause(pqueue->m_lockPause); fn(); } @@ -2573,8 +2584,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { (r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys))) { rdbAsyncWorkThread *pwthread = reinterpret_cast(r->chksum_arg); - if (pwthread && g_pserver->fActiveReplica) - pwthread->endWork(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing + if (pwthread) + pwthread->pauseExecution(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing listIter li; listNode *ln; listRewind(g_pserver->masters, &li); @@ -2593,8 +2604,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1); decrRefCount(ping_argv[0]); - if (pwthread && g_pserver->fActiveReplica) - pwthread->start(); + if (pwthread) + pwthread->resumeExecution(); r->keys_since_last_callback = 0; }