From e685a4f4a19285b6ac4df5c70161e3b11437a4c4 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 19 Nov 2019 23:03:20 -0500 Subject: [PATCH] Add missing files Former-commit-id: 8fa2951e21c5c75a0bb4dc3ef50891123166b76a --- src/AsyncWorkQueue.cpp | 73 ++++++++++++++++++++++++++++++++++++++++++ src/AsyncWorkQueue.h | 36 +++++++++++++++++++++ 2 files changed, 109 insertions(+) create mode 100644 src/AsyncWorkQueue.cpp create mode 100644 src/AsyncWorkQueue.h diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp new file mode 100644 index 000000000..daa10ed78 --- /dev/null +++ b/src/AsyncWorkQueue.cpp @@ -0,0 +1,73 @@ +#include "AsyncWorkQueue.h" +#include "server.h" + +AsyncWorkQueue::AsyncWorkQueue(int nthreads) +{ + for (int i = 0; i < nthreads; ++i) + { + m_vecthreads.emplace_back([&]{ + WorkerThreadMain(); + }); + } +} + +void AsyncWorkQueue::WorkerThreadMain() +{ + static redisServerThreadVars vars; + memset(&vars, 0, sizeof(redisServerThreadVars)); + serverTL = &vars; + + vars.clients_pending_asyncwrite = listCreate(); + + while (!m_fQuitting) + { + std::unique_lock lock(m_mutex); + m_cvWakeup.wait(lock); + while (!m_workqueue.empty()) + { + WorkItem task = std::move(m_workqueue.front()); + m_workqueue.pop(); + + lock.unlock(); + task.fnAsync(); + lock.lock(); + } + + lock.unlock(); + aeAcquireLock(); + ProcessPendingAsyncWrites(); + aeReleaseLock(); + } + + listRelease(vars.clients_pending_asyncwrite); +} + +void AsyncWorkQueue::abandonThreads() +{ + std::unique_lock lock(m_mutex); + m_fQuitting = true; + m_cvWakeup.notify_all(); + for (auto &thread : m_vecthreads) + { + thread.detach(); + } + m_vecthreads.clear(); +} + +AsyncWorkQueue::~AsyncWorkQueue() +{ + serverAssert(!GlobalLocksAcquired() || m_vecthreads.empty()); + std::unique_lock lock(m_mutex); + m_fQuitting = true; + m_cvWakeup.notify_all(); + lock.unlock(); + + abandonThreads(); +} + +void AsyncWorkQueue::AddWorkFunction(std::function &&fnAsync) +{ + std::unique_lock lock(m_mutex); + m_workqueue.emplace(std::move(fnAsync)); + m_cvWakeup.notify_one(); +} \ No newline at end of file diff --git a/src/AsyncWorkQueue.h b/src/AsyncWorkQueue.h new file mode 100644 index 000000000..459d5dccf --- /dev/null +++ b/src/AsyncWorkQueue.h @@ -0,0 +1,36 @@ +#pragma once +#include "fastlock.h" +#include +#include +#include +#include +#include +#include +#include + +class AsyncWorkQueue +{ + struct WorkItem + { + WorkItem(std::function &&fnAsync) + : fnAsync(std::move(fnAsync)) + {} + + WorkItem(WorkItem&&) = default; + std::function fnAsync; + }; + std::vector m_vecthreads; + std::queue m_workqueue; + std::mutex m_mutex; + std::condition_variable m_cvWakeup; + std::atomic m_fQuitting { false }; + + void WorkerThreadMain(); +public: + AsyncWorkQueue(int nthreads); + ~AsyncWorkQueue(); + + void AddWorkFunction(std::function &&fnAsync); + + void abandonThreads(); +}; \ No newline at end of file