Add support for removing pending writes for clients we're about to free
Former-commit-id: 67c1fd322e1a12b1b22707d67713d60c97c34cef
This commit is contained in:
parent
fda0f82d64
commit
87ca381ea2
@ -19,6 +19,10 @@ void AsyncWorkQueue::WorkerThreadMain()
|
||||
|
||||
vars.clients_pending_asyncwrite = listCreate();
|
||||
|
||||
aeAcquireLock();
|
||||
m_vecpthreadVars.push_back(&vars);
|
||||
aeReleaseLock();
|
||||
|
||||
while (!m_fQuitting)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
@ -42,6 +46,28 @@ void AsyncWorkQueue::WorkerThreadMain()
|
||||
listRelease(vars.clients_pending_asyncwrite);
|
||||
}
|
||||
|
||||
bool AsyncWorkQueue::removeClientAsyncWrites(client *c)
|
||||
{
|
||||
bool fFound = false;
|
||||
aeAcquireLock();
|
||||
for (auto pvars : m_vecpthreadVars)
|
||||
{
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(pvars->clients_pending_asyncwrite, &li);
|
||||
while ((ln = listNext(&li)) != nullptr)
|
||||
{
|
||||
if (c == listNodeValue(ln))
|
||||
{
|
||||
listDelNode(pvars->clients_pending_asyncwrite, ln);
|
||||
fFound = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
aeReleaseLock();
|
||||
return fFound;
|
||||
}
|
||||
|
||||
void AsyncWorkQueue::abandonThreads()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
|
@ -20,6 +20,7 @@ class AsyncWorkQueue
|
||||
std::function<void()> fnAsync;
|
||||
};
|
||||
std::vector<std::thread> m_vecthreads;
|
||||
std::vector<struct redisServerThreadVars*> m_vecpthreadVars;
|
||||
std::queue<WorkItem> m_workqueue;
|
||||
std::mutex m_mutex;
|
||||
std::condition_variable m_cvWakeup;
|
||||
@ -31,6 +32,7 @@ public:
|
||||
~AsyncWorkQueue();
|
||||
|
||||
void AddWorkFunction(std::function<void()> &&fnAsync);
|
||||
bool removeClientAsyncWrites(struct client *c);
|
||||
|
||||
void abandonThreads();
|
||||
};
|
Loading…
x
Reference in New Issue
Block a user