2019-11-19 23:03:20 -05:00
|
|
|
#include "AsyncWorkQueue.h"
|
|
|
|
#include "server.h"
|
|
|
|
|
|
|
|
AsyncWorkQueue::AsyncWorkQueue(int nthreads)
|
|
|
|
{
|
|
|
|
for (int i = 0; i < nthreads; ++i)
|
|
|
|
{
|
|
|
|
m_vecthreads.emplace_back([&]{
|
|
|
|
WorkerThreadMain();
|
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
void AsyncWorkQueue::WorkerThreadMain()
|
|
|
|
{
|
2019-11-28 19:00:51 -05:00
|
|
|
redisServerThreadVars vars;
|
2019-11-19 23:03:20 -05:00
|
|
|
serverTL = &vars;
|
|
|
|
|
|
|
|
vars.clients_pending_asyncwrite = listCreate();
|
|
|
|
|
2019-11-23 19:00:27 -05:00
|
|
|
aeAcquireLock();
|
|
|
|
m_vecpthreadVars.push_back(&vars);
|
|
|
|
aeReleaseLock();
|
|
|
|
|
2019-11-19 23:03:20 -05:00
|
|
|
while (!m_fQuitting)
|
|
|
|
{
|
|
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
|
|
m_cvWakeup.wait(lock);
|
2019-11-28 19:00:51 -05:00
|
|
|
|
2019-11-19 23:03:20 -05:00
|
|
|
while (!m_workqueue.empty())
|
|
|
|
{
|
|
|
|
WorkItem task = std::move(m_workqueue.front());
|
2019-11-28 19:00:51 -05:00
|
|
|
m_workqueue.pop_front();
|
2019-11-19 23:03:20 -05:00
|
|
|
|
|
|
|
lock.unlock();
|
2019-11-28 19:00:51 -05:00
|
|
|
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
2019-11-19 23:03:20 -05:00
|
|
|
task.fnAsync();
|
2019-11-28 19:00:51 -05:00
|
|
|
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
|
2019-11-19 23:03:20 -05:00
|
|
|
lock.lock();
|
|
|
|
}
|
|
|
|
|
|
|
|
lock.unlock();
|
2019-11-28 19:00:51 -05:00
|
|
|
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
2019-11-19 23:03:20 -05:00
|
|
|
aeAcquireLock();
|
|
|
|
ProcessPendingAsyncWrites();
|
|
|
|
aeReleaseLock();
|
2019-11-28 19:00:51 -05:00
|
|
|
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
|
|
|
|
serverTL->gcEpoch = 0;
|
2019-11-19 23:03:20 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
listRelease(vars.clients_pending_asyncwrite);
|
|
|
|
}
|
|
|
|
|
2019-11-23 19:00:27 -05:00
|
|
|
bool AsyncWorkQueue::removeClientAsyncWrites(client *c)
|
|
|
|
{
|
|
|
|
bool fFound = false;
|
|
|
|
aeAcquireLock();
|
|
|
|
for (auto pvars : m_vecpthreadVars)
|
|
|
|
{
|
|
|
|
listIter li;
|
|
|
|
listNode *ln;
|
|
|
|
listRewind(pvars->clients_pending_asyncwrite, &li);
|
|
|
|
while ((ln = listNext(&li)) != nullptr)
|
|
|
|
{
|
|
|
|
if (c == listNodeValue(ln))
|
|
|
|
{
|
|
|
|
listDelNode(pvars->clients_pending_asyncwrite, ln);
|
|
|
|
fFound = true;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
aeReleaseLock();
|
|
|
|
return fFound;
|
|
|
|
}
|
|
|
|
|
2019-11-19 23:03:20 -05:00
|
|
|
void AsyncWorkQueue::abandonThreads()
|
|
|
|
{
|
|
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
|
|
m_fQuitting = true;
|
|
|
|
m_cvWakeup.notify_all();
|
|
|
|
for (auto &thread : m_vecthreads)
|
|
|
|
{
|
|
|
|
thread.detach();
|
|
|
|
}
|
|
|
|
m_vecthreads.clear();
|
|
|
|
}
|
|
|
|
|
|
|
|
AsyncWorkQueue::~AsyncWorkQueue()
|
|
|
|
{
|
|
|
|
serverAssert(!GlobalLocksAcquired() || m_vecthreads.empty());
|
|
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
|
|
|
m_fQuitting = true;
|
|
|
|
m_cvWakeup.notify_all();
|
|
|
|
lock.unlock();
|
|
|
|
|
|
|
|
abandonThreads();
|
|
|
|
}
|
|
|
|
|
2019-11-28 19:00:51 -05:00
|
|
|
void AsyncWorkQueue::AddWorkFunction(std::function<void()> &&fnAsync, bool fHiPri)
|
2019-11-19 23:03:20 -05:00
|
|
|
{
|
|
|
|
std::unique_lock<std::mutex> lock(m_mutex);
|
2019-11-28 19:00:51 -05:00
|
|
|
if (fHiPri)
|
|
|
|
m_workqueue.emplace_front(std::move(fnAsync));
|
|
|
|
else
|
|
|
|
m_workqueue.emplace_back(std::move(fnAsync));
|
2019-11-19 23:03:20 -05:00
|
|
|
m_cvWakeup.notify_one();
|
|
|
|
}
|