From 47480943560ab4d110ac4d80f26356c93e733096 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 03:44:28 -0500 Subject: [PATCH] Fix race condition in allocating connections to threads Former-commit-id: 52434a583aa7114ff5658226441ab82ed3110a57 --- src/networking.cpp | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 54e04406f..9d94f5171 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,29 +1003,23 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } +static std::atomic rgacceptsInFlight[MAX_EVENT_LOOPS]; int chooseBestThreadForAccept() { - listIter li; - listNode *ln; - int rgcclients[MAX_EVENT_LOOPS] = {0}; - - listRewind(g_pserver->clients, &li); - while ((ln = listNext(&li)) != nullptr) - { - client *c = (client*)listNodeValue(ln); - if (c->iel < 0) - continue; - - rgcclients[c->iel]++; - } - int ielMinLoad = 0; + int cclientsMin = INT_MAX; for (int iel = 0; iel < cserver.cthreads; ++iel) { - if (rgcclients[iel] < cserver.thread_min_client_threshold) + int cclientsThread; + atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread); + cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed); + if (cclientsThread < cserver.thread_min_client_threshold) return iel; - if (rgcclients[iel] < rgcclients[ielMinLoad]) + if (cclientsThread < cclientsMin) + { + cclientsMin = cclientsThread; ielMinLoad = iel; + } } return ielMinLoad; } @@ -1134,18 +1128,21 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { { { int ielTarget = chooseBestThreadForAccept(); + rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed); if (ielTarget != ielCur) { char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ acceptCommonHandler(cfd,0,szT, ielTarget); + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); zfree(szT); }); if (res == AE_OK) continue; } + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); } LLocalThread: