Pause execution during rdbLoadProgressCallback as its too risky to let it run

Former-commit-id: e70c01cb3e756d1e02ed190b76c73b7b7010c0d3
This commit is contained in:
John Sully 2021-05-06 00:42:49 +00:00
parent 40fdb3ce05
commit 442aa5bbd9

View File

@ -2361,6 +2361,7 @@ class rdbAsyncWorkThread
std::mutex mutex;
std::condition_variable cv;
std::condition_variable cvThrottle;
fastlock m_lockPause { "rdbAsyncWork-Pause"};
bool fLaunched = false;
bool fExit = false;
std::atomic<size_t> ckeysLoaded;
@ -2409,6 +2410,14 @@ public:
cv.notify_one();
}
void pauseExecution() {
m_lockPause.lock();
}
void resumeExecution() {
m_lockPause.unlock();
}
void enqueue(std::function<void()> &&fn) {
std::unique_lock<std::mutex> l(mutex);
bool fNotify = queuefn.empty();
@ -2459,6 +2468,7 @@ public:
vars.gcEpoch = g_pserver->garbageCollector.startEpoch();
while (listLength(listJobs)) {
std::unique_lock<fastlock> ulPause(pqueue->m_lockPause);
rdbInsertJob &job = *((rdbInsertJob*)listNodeValue(listFirst(listJobs)));
redisObjectStack keyobj;
@ -2547,6 +2557,7 @@ public:
listRelease(listJobs);
for (auto &fn : queuefn) {
std::unique_lock<fastlock> ulPause(pqueue->m_lockPause);
fn();
}
@ -2573,8 +2584,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
{
rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg);
if (pwthread && g_pserver->fActiveReplica)
pwthread->endWork(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing
if (pwthread)
pwthread->pauseExecution(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
@ -2593,8 +2604,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
if (pwthread && g_pserver->fActiveReplica)
pwthread->start();
if (pwthread)
pwthread->resumeExecution();
r->keys_since_last_callback = 0;
}