Unify job types so everything is processed in order

Former-commit-id: 625aa97e4cf16337e8b052b7a27491a0ab09110f
This commit is contained in:
John Sully 2021-09-17 17:27:19 +00:00
parent 5ed9217c15
commit d5ee9cb1be

View File

@ -2563,7 +2563,25 @@ void stopSaving(int success) {
NULL); NULL);
} }
struct rdbInsertJob
class JobBase
{
public:
enum class JobType {
Function,
Insert
};
JobType type;
JobBase(JobType type)
: type(type)
{}
virtual ~JobBase() = default;
};
struct rdbInsertJob : public JobBase
{ {
redisDb *db = nullptr; redisDb *db = nullptr;
sds key = nullptr; sds key = nullptr;
@ -2579,8 +2597,13 @@ struct rdbInsertJob
decrRefCount(subkey); decrRefCount(subkey);
} }
rdbInsertJob() = default; rdbInsertJob()
rdbInsertJob(rdbInsertJob &&src) { : JobBase(JobBase::JobType::Insert)
{}
rdbInsertJob(rdbInsertJob &&src)
: JobBase(JobBase::JobType::Insert)
{
db = src.db; db = src.db;
src.db = nullptr; src.db = nullptr;
key = src.key; key = src.key;
@ -2602,12 +2625,21 @@ struct rdbInsertJob
} }
}; };
struct rdbFunctionJob : public JobBase
{
public:
std::function<void()> m_fn;
rdbFunctionJob(std::function<void()> &&fn)
: JobBase(JobBase::JobType::Function), m_fn(fn)
{}
};
class rdbAsyncWorkThread class rdbAsyncWorkThread
{ {
rdbSaveInfo *rsi; rdbSaveInfo *rsi;
int rdbflags; int rdbflags;
list *listJobs; list *listJobs;
std::vector<std::function<void()>> queuefn; // for custom jobs
std::mutex mutex; std::mutex mutex;
std::condition_variable cv; std::condition_variable cv;
std::condition_variable cvThrottle; std::condition_variable cvThrottle;
@ -2622,7 +2654,7 @@ class rdbAsyncWorkThread
long long lastPing = -1; long long lastPing = -1;
static void listFreeMethod(const void *v) { static void listFreeMethod(const void *v) {
delete reinterpret_cast<const rdbInsertJob*>(v); delete reinterpret_cast<const JobBase*>(v);
} }
public: public:
@ -2679,10 +2711,11 @@ public:
} }
void enqueue(std::function<void()> &&fn) { void enqueue(std::function<void()> &&fn) {
JobBase *pjob = new rdbFunctionJob(std::move(fn));
std::unique_lock<std::mutex> l(mutex); std::unique_lock<std::mutex> l(mutex);
bool fNotify = queuefn.empty(); throttle(l);
queuefn.push_back(std::move(fn)); listAddNodeTail(listJobs, pjob);
if (fNotify) if (listLength(listJobs) == 1)
cv.notify_one(); cv.notify_one();
} }
@ -2728,7 +2761,6 @@ public:
fLaunched = false; fLaunched = false;
fExit = false; fExit = false;
serverAssert(listLength(listJobs) == 0); serverAssert(listLength(listJobs) == 0);
serverAssert(queuefn.empty());
return ckeysLoaded; return ckeysLoaded;
} }
@ -2832,11 +2864,11 @@ public:
for (;;) { for (;;) {
std::unique_lock<std::mutex> lock(queue.mutex); std::unique_lock<std::mutex> lock(queue.mutex);
if (listLength(queue.listJobs) == 0 && queue.queuefn.empty()) { if (listLength(queue.listJobs) == 0) {
if (queue.fExit) if (queue.fExit)
break; break;
queue.cv.wait(lock); queue.cv.wait(lock);
if (listLength(queue.listJobs) == 0 && queue.queuefn.empty() && queue.fExit) if (listLength(queue.listJobs) == 0 && queue.fExit)
break; break;
} }
pqueue->cvThrottle.notify_one(); pqueue->cvThrottle.notify_one();
@ -2844,27 +2876,27 @@ public:
list *listJobs = queue.listJobs; list *listJobs = queue.listJobs;
queue.listJobs = listCreate(); queue.listJobs = listCreate();
listSetFreeMethod(queue.listJobs, listFreeMethod); listSetFreeMethod(queue.listJobs, listFreeMethod);
auto queuefn = std::move(queue.queuefn);
lock.unlock(); lock.unlock();
vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); vars.gcEpoch = g_pserver->garbageCollector.startEpoch();
while (listLength(listJobs)) { while (listLength(listJobs)) {
std::unique_lock<fastlock> ulPause(pqueue->m_lockPause); std::unique_lock<fastlock> ulPause(pqueue->m_lockPause);
rdbInsertJob &job = *((rdbInsertJob*)listNodeValue(listFirst(listJobs))); JobBase *pjobBase = ((JobBase*)listNodeValue(listFirst(listJobs)));
pqueue->processJob(job); switch (pjobBase->type)
{
case JobBase::JobType::Insert:
pqueue->processJob(*static_cast<rdbInsertJob*>(pjobBase));
break;
case JobBase::JobType::Function:
static_cast<rdbFunctionJob*>(pjobBase)->m_fn();
break;
}
// Pop from the list // Pop from the list
listDelNode(listJobs, listFirst(listJobs)); listDelNode(listJobs, listFirst(listJobs));
} }
listRelease(listJobs); listRelease(listJobs);
for (auto &fn : queuefn) {
std::unique_lock<fastlock> ulPause(pqueue->m_lockPause);
fn();
}
g_pserver->garbageCollector.endEpoch(vars.gcEpoch); g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
} }
@ -2875,7 +2907,6 @@ public:
queue.workerThreadDone = true; queue.workerThreadDone = true;
std::unique_lock<std::mutex> lock(queue.mutex); std::unique_lock<std::mutex> lock(queue.mutex);
serverAssert(queue.queuefn.empty());
serverAssert(listLength(queue.listJobs) == 0); serverAssert(listLength(queue.listJobs) == 0);
ProcessPendingAsyncWrites(); ProcessPendingAsyncWrites();
listRelease(vars.clients_pending_asyncwrite); listRelease(vars.clients_pending_asyncwrite);