Use the concurrentqueue for multithread load instead of mutex and cvs

Former-commit-id: d5a59113dbfedaf7b62a650cff58a2e8ec01826f
This commit is contained in:
John Sully 2021-10-04 07:33:03 +00:00
parent 8a2f2bcb91
commit d29df021b1
2 changed files with 45 additions and 57 deletions

View File

@ -2639,17 +2639,15 @@ class rdbAsyncWorkThread
{
rdbSaveInfo *rsi;
int rdbflags;
list *listJobs;
std::mutex mutex;
std::condition_variable cv;
std::condition_variable cvThrottle;
moodycamel::BlockingConcurrentQueue<JobBase*> queueJobs;
fastlock m_lockPause { "rdbAsyncWork-Pause"};
bool fLaunched = false;
bool fExit = false;
std::atomic<int> fExit {false};
std::atomic<size_t> ckeysLoaded;
std::atomic<int> cstorageWritesInFlight;
std::atomic<bool> workerThreadDone;
std::thread m_thread;
std::vector<JobBase*> vecbatch;
long long now;
long long lastPing = -1;
@ -2664,14 +2662,11 @@ public:
{
ckeysLoaded = 0;
cstorageWritesInFlight = 0;
listJobs = listCreate();
listSetFreeMethod(listJobs, listFreeMethod);
}
~rdbAsyncWorkThread() {
if (m_thread.joinable())
endWork();
listRelease(listJobs);
}
void start() {
@ -2680,26 +2675,24 @@ public:
fLaunched = true;
}
void throttle(std::unique_lock<std::mutex> &l) {
if (listLength(listJobs) > 0 && (listLength(listJobs) % 1024 == 0) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
cvThrottle.wait(l);
while (cstorageWritesInFlight.load(std::memory_order_relaxed) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
l.unlock();
void throttle() {
if (g_pserver->m_pstorageFactory && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
while ((cstorageWritesInFlight.load(std::memory_order_relaxed) || queueJobs.size_approx()) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
usleep(1);
pauseExecution();
ProcessWhileBlocked();
resumeExecution();
l.lock();
}
}
}
void enqueue(std::unique_ptr<rdbInsertJob> &spjob) {
std::unique_lock<std::mutex> l(mutex);
throttle(l);
listAddNodeTail(listJobs, spjob.release());
if (listLength(listJobs) == 1)
cv.notify_one();
void enqueue(std::unique_ptr<rdbInsertJob> &spjob) {
vecbatch.push_back(spjob.release());
if (vecbatch.size() >= 64) {
queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size());
vecbatch.clear();
throttle();
}
}
void pauseExecution() {
@ -2711,12 +2704,9 @@ public:
}
void enqueue(std::function<void()> &&fn) {
JobBase *pjob = new rdbFunctionJob(std::move(fn));
std::unique_lock<std::mutex> l(mutex);
throttle(l);
listAddNodeTail(listJobs, pjob);
if (listLength(listJobs) == 1)
cv.notify_one();
std::unique_ptr<JobBase> spjob = std::make_unique<rdbFunctionJob>(std::move(fn));
queueJobs.enqueue(spjob.release());
throttle();
}
void ProcessWhileBlocked() {
@ -2739,11 +2729,13 @@ public:
size_t ckeys() { return ckeysLoaded; }
size_t endWork() {
std::unique_lock<std::mutex> l(mutex);
if (!vecbatch.empty()) {
queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size());
vecbatch.clear();
}
std::atomic_thread_fence(std::memory_order_seq_cst); // The queue must have transferred to the consumer before we call fExit
serverAssert(fLaunched);
fExit = true;
cv.notify_one();
l.unlock();
if (g_pserver->m_pstorageFactory) {
// If we have a storage provider it can take some time to complete and we want to process events in the meantime
while (!workerThreadDone) {
@ -2760,7 +2752,7 @@ public:
}
fLaunched = false;
fExit = false;
serverAssert(listLength(listJobs) == 0);
serverAssert(queueJobs.size_approx() == 0);
return ckeysLoaded;
}
@ -2863,40 +2855,35 @@ public:
}
for (;;) {
std::unique_lock<std::mutex> lock(queue.mutex);
if (listLength(queue.listJobs) == 0) {
if (queue.fExit)
break;
queue.cv.wait(lock);
if (listLength(queue.listJobs) == 0 && queue.fExit)
if (queue.queueJobs.size_approx() == 0) {
if (queue.fExit.load(std::memory_order_relaxed))
break;
}
pqueue->cvThrottle.notify_one();
list *listJobs = queue.listJobs;
queue.listJobs = listCreate();
listSetFreeMethod(queue.listJobs, listFreeMethod);
lock.unlock();
if (queue.fExit.load(std::memory_order_seq_cst) && queue.queueJobs.size_approx() == 0)
break;
vars.gcEpoch = g_pserver->garbageCollector.startEpoch();
while (listLength(listJobs)) {
JobBase *rgjob[64];
int cjobs = 0;
while ((cjobs = pqueue->queueJobs.wait_dequeue_bulk_timed(rgjob, 64, std::chrono::milliseconds(5))) > 0) {
std::unique_lock<fastlock> ulPause(pqueue->m_lockPause);
JobBase *pjobBase = ((JobBase*)listNodeValue(listFirst(listJobs)));
switch (pjobBase->type)
{
case JobBase::JobType::Insert:
pqueue->processJob(*static_cast<rdbInsertJob*>(pjobBase));
break;
for (int ijob = 0; ijob < cjobs; ++ijob) {
JobBase *pjob = rgjob[ijob];
switch (pjob->type)
{
case JobBase::JobType::Insert:
pqueue->processJob(*static_cast<rdbInsertJob*>(pjob));
break;
case JobBase::JobType::Function:
static_cast<rdbFunctionJob*>(pjobBase)->m_fn();
break;
case JobBase::JobType::Function:
static_cast<rdbFunctionJob*>(pjob)->m_fn();
break;
}
delete pjob;
}
// Pop from the list
listDelNode(listJobs, listFirst(listJobs));
}
listRelease(listJobs);
g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
}
@ -2906,8 +2893,6 @@ public:
}
queue.workerThreadDone = true;
std::unique_lock<std::mutex> lock(queue.mutex);
serverAssert(listLength(queue.listJobs) == 0);
ProcessPendingAsyncWrites();
listRelease(vars.clients_pending_asyncwrite);
aeSetThreadOwnsLockOverride(false);

View File

@ -39,6 +39,9 @@
#include "rio.h"
#include "atomicvar.h"
#include <concurrentqueue.h>
#include <blockingconcurrentqueue.h>
#include <stdio.h>
#include <stdlib.h>
#include <cmath>