From df6d8170aec2989c5ea48a082ab09d3ff370b8c5 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 23 Nov 2019 19:00:27 -0500 Subject: [PATCH] Add support for removing pending writes for clients we're about to free Former-commit-id: 67c1fd322e1a12b1b22707d67713d60c97c34cef --- src/AsyncWorkQueue.cpp | 26 ++++++++++++++++++++++++++ src/AsyncWorkQueue.h | 2 ++ 2 files changed, 28 insertions(+) diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index daa10ed78..ece5518ba 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -19,6 +19,10 @@ void AsyncWorkQueue::WorkerThreadMain() vars.clients_pending_asyncwrite = listCreate(); + aeAcquireLock(); + m_vecpthreadVars.push_back(&vars); + aeReleaseLock(); + while (!m_fQuitting) { std::unique_lock lock(m_mutex); @@ -42,6 +46,28 @@ void AsyncWorkQueue::WorkerThreadMain() listRelease(vars.clients_pending_asyncwrite); } +bool AsyncWorkQueue::removeClientAsyncWrites(client *c) +{ + bool fFound = false; + aeAcquireLock(); + for (auto pvars : m_vecpthreadVars) + { + listIter li; + listNode *ln; + listRewind(pvars->clients_pending_asyncwrite, &li); + while ((ln = listNext(&li)) != nullptr) + { + if (c == listNodeValue(ln)) + { + listDelNode(pvars->clients_pending_asyncwrite, ln); + fFound = true; + } + } + } + aeReleaseLock(); + return fFound; +} + void AsyncWorkQueue::abandonThreads() { std::unique_lock lock(m_mutex); diff --git a/src/AsyncWorkQueue.h b/src/AsyncWorkQueue.h index 459d5dccf..b3144f585 100644 --- a/src/AsyncWorkQueue.h +++ b/src/AsyncWorkQueue.h @@ -20,6 +20,7 @@ class AsyncWorkQueue std::function fnAsync; }; std::vector m_vecthreads; + std::vector m_vecpthreadVars; std::queue m_workqueue; std::mutex m_mutex; std::condition_variable m_cvWakeup; @@ -31,6 +32,7 @@ public: ~AsyncWorkQueue(); void AddWorkFunction(std::function &&fnAsync); + bool removeClientAsyncWrites(struct client *c); void abandonThreads(); }; \ No newline at end of file