diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index daa10ed78..ece5518ba 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -19,6 +19,10 @@ void AsyncWorkQueue::WorkerThreadMain() vars.clients_pending_asyncwrite = listCreate(); + aeAcquireLock(); + m_vecpthreadVars.push_back(&vars); + aeReleaseLock(); + while (!m_fQuitting) { std::unique_lock lock(m_mutex); @@ -42,6 +46,28 @@ void AsyncWorkQueue::WorkerThreadMain() listRelease(vars.clients_pending_asyncwrite); } +bool AsyncWorkQueue::removeClientAsyncWrites(client *c) +{ + bool fFound = false; + aeAcquireLock(); + for (auto pvars : m_vecpthreadVars) + { + listIter li; + listNode *ln; + listRewind(pvars->clients_pending_asyncwrite, &li); + while ((ln = listNext(&li)) != nullptr) + { + if (c == listNodeValue(ln)) + { + listDelNode(pvars->clients_pending_asyncwrite, ln); + fFound = true; + } + } + } + aeReleaseLock(); + return fFound; +} + void AsyncWorkQueue::abandonThreads() { std::unique_lock lock(m_mutex); diff --git a/src/AsyncWorkQueue.h b/src/AsyncWorkQueue.h index 459d5dccf..b3144f585 100644 --- a/src/AsyncWorkQueue.h +++ b/src/AsyncWorkQueue.h @@ -20,6 +20,7 @@ class AsyncWorkQueue std::function fnAsync; }; std::vector m_vecthreads; + std::vector m_vecpthreadVars; std::queue m_workqueue; std::mutex m_mutex; std::condition_variable m_cvWakeup; @@ -31,6 +32,7 @@ public: ~AsyncWorkQueue(); void AddWorkFunction(std::function &&fnAsync); + bool removeClientAsyncWrites(struct client *c); void abandonThreads(); }; \ No newline at end of file