Add missing files

Former-commit-id: 8fa2951e21c5c75a0bb4dc3ef50891123166b76a
This commit is contained in:
John Sully 2019-11-19 23:03:20 -05:00
parent 59b23ddfac
commit e685a4f4a1
2 changed files with 109 additions and 0 deletions

73
src/AsyncWorkQueue.cpp Normal file
View File

@ -0,0 +1,73 @@
#include "AsyncWorkQueue.h"
#include "server.h"
AsyncWorkQueue::AsyncWorkQueue(int nthreads)
{
for (int i = 0; i < nthreads; ++i)
{
m_vecthreads.emplace_back([&]{
WorkerThreadMain();
});
}
}
void AsyncWorkQueue::WorkerThreadMain()
{
static redisServerThreadVars vars;
memset(&vars, 0, sizeof(redisServerThreadVars));
serverTL = &vars;
vars.clients_pending_asyncwrite = listCreate();
while (!m_fQuitting)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_cvWakeup.wait(lock);
while (!m_workqueue.empty())
{
WorkItem task = std::move(m_workqueue.front());
m_workqueue.pop();
lock.unlock();
task.fnAsync();
lock.lock();
}
lock.unlock();
aeAcquireLock();
ProcessPendingAsyncWrites();
aeReleaseLock();
}
listRelease(vars.clients_pending_asyncwrite);
}
void AsyncWorkQueue::abandonThreads()
{
std::unique_lock<std::mutex> lock(m_mutex);
m_fQuitting = true;
m_cvWakeup.notify_all();
for (auto &thread : m_vecthreads)
{
thread.detach();
}
m_vecthreads.clear();
}
AsyncWorkQueue::~AsyncWorkQueue()
{
serverAssert(!GlobalLocksAcquired() || m_vecthreads.empty());
std::unique_lock<std::mutex> lock(m_mutex);
m_fQuitting = true;
m_cvWakeup.notify_all();
lock.unlock();
abandonThreads();
}
void AsyncWorkQueue::AddWorkFunction(std::function<void()> &&fnAsync)
{
std::unique_lock<std::mutex> lock(m_mutex);
m_workqueue.emplace(std::move(fnAsync));
m_cvWakeup.notify_one();
}

36
src/AsyncWorkQueue.h Normal file
View File

@ -0,0 +1,36 @@
#pragma once
#include "fastlock.h"
#include <vector>
#include <queue>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <atomic>
#include <functional>
class AsyncWorkQueue
{
struct WorkItem
{
WorkItem(std::function<void()> &&fnAsync)
: fnAsync(std::move(fnAsync))
{}
WorkItem(WorkItem&&) = default;
std::function<void()> fnAsync;
};
std::vector<std::thread> m_vecthreads;
std::queue<WorkItem> m_workqueue;
std::mutex m_mutex;
std::condition_variable m_cvWakeup;
std::atomic<bool> m_fQuitting { false };
void WorkerThreadMain();
public:
AsyncWorkQueue(int nthreads);
~AsyncWorkQueue();
void AddWorkFunction(std::function<void()> &&fnAsync);
void abandonThreads();
};