Merge branch 'unstable' of https://github.com/JohnSully/KeyDB into unstable

Former-commit-id: 63ae679fe1104f7cde4c9b5efe4a72be6b3435db
This commit is contained in:
John Sully 2020-02-16 18:12:11 -05:00
commit e5f23dbd07

View File

@ -1003,29 +1003,23 @@ int clientHasPendingReplies(client *c) {
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
} }
static std::atomic<int> rgacceptsInFlight[MAX_EVENT_LOOPS];
int chooseBestThreadForAccept() int chooseBestThreadForAccept()
{ {
listIter li;
listNode *ln;
int rgcclients[MAX_EVENT_LOOPS] = {0};
listRewind(g_pserver->clients, &li);
while ((ln = listNext(&li)) != nullptr)
{
client *c = (client*)listNodeValue(ln);
if (c->iel < 0)
continue;
rgcclients[c->iel]++;
}
int ielMinLoad = 0; int ielMinLoad = 0;
int cclientsMin = INT_MAX;
for (int iel = 0; iel < cserver.cthreads; ++iel) for (int iel = 0; iel < cserver.cthreads; ++iel)
{ {
if (rgcclients[iel] < cserver.thread_min_client_threshold) int cclientsThread;
atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
if (cclientsThread < cserver.thread_min_client_threshold)
return iel; return iel;
if (rgcclients[iel] < rgcclients[ielMinLoad]) if (cclientsThread < cclientsMin)
{
cclientsMin = cclientsThread;
ielMinLoad = iel; ielMinLoad = iel;
}
} }
return ielMinLoad; return ielMinLoad;
} }
@ -1134,18 +1128,21 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
{ {
{ {
int ielTarget = chooseBestThreadForAccept(); int ielTarget = chooseBestThreadForAccept();
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
if (ielTarget != ielCur) if (ielTarget != ielCur)
{ {
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN); memcpy(szT, cip, NET_IP_STR_LEN);
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{
acceptCommonHandler(cfd,0,szT, ielTarget); acceptCommonHandler(cfd,0,szT, ielTarget);
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
zfree(szT); zfree(szT);
}); });
if (res == AE_OK) if (res == AE_OK)
continue; continue;
} }
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
} }
LLocalThread: LLocalThread: