From d5ee9cb1be1638992662f74fea0e5c7d0c37c8ce Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 17 Sep 2021 17:27:19 +0000 Subject: [PATCH] Unify job types so everything is processed in order Former-commit-id: 625aa97e4cf16337e8b052b7a27491a0ab09110f --- src/rdb.cpp | 75 +++++++++++++++++++++++++++++++++++++---------------- 1 file changed, 53 insertions(+), 22 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 0afe08267..edbf1ccaa 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2563,7 +2563,25 @@ void stopSaving(int success) { NULL); } -struct rdbInsertJob + +class JobBase +{ +public: + enum class JobType { + Function, + Insert + }; + + JobType type; + + JobBase(JobType type) + : type(type) + {} + + virtual ~JobBase() = default; +}; + +struct rdbInsertJob : public JobBase { redisDb *db = nullptr; sds key = nullptr; @@ -2579,8 +2597,13 @@ struct rdbInsertJob decrRefCount(subkey); } - rdbInsertJob() = default; - rdbInsertJob(rdbInsertJob &&src) { + rdbInsertJob() + : JobBase(JobBase::JobType::Insert) + {} + + rdbInsertJob(rdbInsertJob &&src) + : JobBase(JobBase::JobType::Insert) + { db = src.db; src.db = nullptr; key = src.key; @@ -2602,12 +2625,21 @@ struct rdbInsertJob } }; +struct rdbFunctionJob : public JobBase +{ +public: + std::function m_fn; + + rdbFunctionJob(std::function &&fn) + : JobBase(JobBase::JobType::Function), m_fn(fn) + {} +}; + class rdbAsyncWorkThread { rdbSaveInfo *rsi; int rdbflags; list *listJobs; - std::vector> queuefn; // for custom jobs std::mutex mutex; std::condition_variable cv; std::condition_variable cvThrottle; @@ -2622,7 +2654,7 @@ class rdbAsyncWorkThread long long lastPing = -1; static void listFreeMethod(const void *v) { - delete reinterpret_cast(v); + delete reinterpret_cast(v); } public: @@ -2679,10 +2711,11 @@ public: } void enqueue(std::function &&fn) { + JobBase *pjob = new rdbFunctionJob(std::move(fn)); std::unique_lock l(mutex); - bool fNotify = queuefn.empty(); - queuefn.push_back(std::move(fn)); - if (fNotify) + throttle(l); + listAddNodeTail(listJobs, pjob); + if (listLength(listJobs) == 1) cv.notify_one(); } @@ -2728,7 +2761,6 @@ public: fLaunched = false; fExit = false; serverAssert(listLength(listJobs) == 0); - serverAssert(queuefn.empty()); return ckeysLoaded; } @@ -2832,11 +2864,11 @@ public: for (;;) { std::unique_lock lock(queue.mutex); - if (listLength(queue.listJobs) == 0 && queue.queuefn.empty()) { + if (listLength(queue.listJobs) == 0) { if (queue.fExit) break; queue.cv.wait(lock); - if (listLength(queue.listJobs) == 0 && queue.queuefn.empty() && queue.fExit) + if (listLength(queue.listJobs) == 0 && queue.fExit) break; } pqueue->cvThrottle.notify_one(); @@ -2844,27 +2876,27 @@ public: list *listJobs = queue.listJobs; queue.listJobs = listCreate(); listSetFreeMethod(queue.listJobs, listFreeMethod); - - auto queuefn = std::move(queue.queuefn); lock.unlock(); vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); while (listLength(listJobs)) { std::unique_lock ulPause(pqueue->m_lockPause); - rdbInsertJob &job = *((rdbInsertJob*)listNodeValue(listFirst(listJobs))); + JobBase *pjobBase = ((JobBase*)listNodeValue(listFirst(listJobs))); - pqueue->processJob(job); + switch (pjobBase->type) + { + case JobBase::JobType::Insert: + pqueue->processJob(*static_cast(pjobBase)); + break; + case JobBase::JobType::Function: + static_cast(pjobBase)->m_fn(); + break; + } // Pop from the list listDelNode(listJobs, listFirst(listJobs)); } listRelease(listJobs); - - for (auto &fn : queuefn) { - std::unique_lock ulPause(pqueue->m_lockPause); - fn(); - } - g_pserver->garbageCollector.endEpoch(vars.gcEpoch); } @@ -2875,7 +2907,6 @@ public: queue.workerThreadDone = true; std::unique_lock lock(queue.mutex); - serverAssert(queue.queuefn.empty()); serverAssert(listLength(queue.listJobs) == 0); ProcessPendingAsyncWrites(); listRelease(vars.clients_pending_asyncwrite);