diff --git a/src/rdb.cpp b/src/rdb.cpp index edbf1ccaa..e5ec4804b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2639,17 +2639,15 @@ class rdbAsyncWorkThread { rdbSaveInfo *rsi; int rdbflags; - list *listJobs; - std::mutex mutex; - std::condition_variable cv; - std::condition_variable cvThrottle; + moodycamel::BlockingConcurrentQueue queueJobs; fastlock m_lockPause { "rdbAsyncWork-Pause"}; bool fLaunched = false; - bool fExit = false; + std::atomic fExit {false}; std::atomic ckeysLoaded; std::atomic cstorageWritesInFlight; std::atomic workerThreadDone; std::thread m_thread; + std::vector vecbatch; long long now; long long lastPing = -1; @@ -2664,14 +2662,11 @@ public: { ckeysLoaded = 0; cstorageWritesInFlight = 0; - listJobs = listCreate(); - listSetFreeMethod(listJobs, listFreeMethod); } ~rdbAsyncWorkThread() { if (m_thread.joinable()) endWork(); - listRelease(listJobs); } void start() { @@ -2680,26 +2675,24 @@ public: fLaunched = true; } - void throttle(std::unique_lock &l) { - if (listLength(listJobs) > 0 && (listLength(listJobs) % 1024 == 0) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) { - cvThrottle.wait(l); - while (cstorageWritesInFlight.load(std::memory_order_relaxed) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) { - l.unlock(); + void throttle() { + if (g_pserver->m_pstorageFactory && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) { + while ((cstorageWritesInFlight.load(std::memory_order_relaxed) || queueJobs.size_approx()) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) { usleep(1); pauseExecution(); ProcessWhileBlocked(); resumeExecution(); - l.lock(); } } } - void enqueue(std::unique_ptr &spjob) { - std::unique_lock l(mutex); - throttle(l); - listAddNodeTail(listJobs, spjob.release()); - if (listLength(listJobs) == 1) - cv.notify_one(); + void enqueue(std::unique_ptr &spjob) { + vecbatch.push_back(spjob.release()); + if (vecbatch.size() >= 64) { + queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size()); + vecbatch.clear(); + throttle(); + } } void pauseExecution() { @@ -2711,12 +2704,9 @@ public: } void enqueue(std::function &&fn) { - JobBase *pjob = new rdbFunctionJob(std::move(fn)); - std::unique_lock l(mutex); - throttle(l); - listAddNodeTail(listJobs, pjob); - if (listLength(listJobs) == 1) - cv.notify_one(); + std::unique_ptr spjob = std::make_unique(std::move(fn)); + queueJobs.enqueue(spjob.release()); + throttle(); } void ProcessWhileBlocked() { @@ -2739,11 +2729,13 @@ public: size_t ckeys() { return ckeysLoaded; } size_t endWork() { - std::unique_lock l(mutex); + if (!vecbatch.empty()) { + queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size()); + vecbatch.clear(); + } + std::atomic_thread_fence(std::memory_order_seq_cst); // The queue must have transferred to the consumer before we call fExit serverAssert(fLaunched); fExit = true; - cv.notify_one(); - l.unlock(); if (g_pserver->m_pstorageFactory) { // If we have a storage provider it can take some time to complete and we want to process events in the meantime while (!workerThreadDone) { @@ -2760,7 +2752,7 @@ public: } fLaunched = false; fExit = false; - serverAssert(listLength(listJobs) == 0); + serverAssert(queueJobs.size_approx() == 0); return ckeysLoaded; } @@ -2863,40 +2855,35 @@ public: } for (;;) { - std::unique_lock lock(queue.mutex); - if (listLength(queue.listJobs) == 0) { - if (queue.fExit) - break; - queue.cv.wait(lock); - if (listLength(queue.listJobs) == 0 && queue.fExit) + if (queue.queueJobs.size_approx() == 0) { + if (queue.fExit.load(std::memory_order_relaxed)) break; } - pqueue->cvThrottle.notify_one(); - - list *listJobs = queue.listJobs; - queue.listJobs = listCreate(); - listSetFreeMethod(queue.listJobs, listFreeMethod); - lock.unlock(); + + if (queue.fExit.load(std::memory_order_seq_cst) && queue.queueJobs.size_approx() == 0) + break; vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); - while (listLength(listJobs)) { + JobBase *rgjob[64]; + int cjobs = 0; + while ((cjobs = pqueue->queueJobs.wait_dequeue_bulk_timed(rgjob, 64, std::chrono::milliseconds(5))) > 0) { std::unique_lock ulPause(pqueue->m_lockPause); - JobBase *pjobBase = ((JobBase*)listNodeValue(listFirst(listJobs))); - switch (pjobBase->type) - { - case JobBase::JobType::Insert: - pqueue->processJob(*static_cast(pjobBase)); - break; + for (int ijob = 0; ijob < cjobs; ++ijob) { + JobBase *pjob = rgjob[ijob]; + switch (pjob->type) + { + case JobBase::JobType::Insert: + pqueue->processJob(*static_cast(pjob)); + break; - case JobBase::JobType::Function: - static_cast(pjobBase)->m_fn(); - break; + case JobBase::JobType::Function: + static_cast(pjob)->m_fn(); + break; + } + delete pjob; } - // Pop from the list - listDelNode(listJobs, listFirst(listJobs)); } - listRelease(listJobs); g_pserver->garbageCollector.endEpoch(vars.gcEpoch); } @@ -2906,8 +2893,6 @@ public: } queue.workerThreadDone = true; - std::unique_lock lock(queue.mutex); - serverAssert(listLength(queue.listJobs) == 0); ProcessPendingAsyncWrites(); listRelease(vars.clients_pending_asyncwrite); aeSetThreadOwnsLockOverride(false); diff --git a/src/server.h b/src/server.h index 197a7a7be..1153fde23 100644 --- a/src/server.h +++ b/src/server.h @@ -39,6 +39,9 @@ #include "rio.h" #include "atomicvar.h" +#include +#include + #include #include #include