Merge branch 'unstable' of https://github.com/JohnSully/KeyDB into unstable

Former-commit-id: 63ae679fe1104f7cde4c9b5efe4a72be6b3435db
This commit is contained in:
John Sully 2020-02-16 18:12:11 -05:00
commit fd375eb95c

View File

@ -1003,29 +1003,23 @@ int clientHasPendingReplies(client *c) {
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
}
static std::atomic<int> rgacceptsInFlight[MAX_EVENT_LOOPS];
int chooseBestThreadForAccept()
{
listIter li;
listNode *ln;
int rgcclients[MAX_EVENT_LOOPS] = {0};
listRewind(g_pserver->clients, &li);
while ((ln = listNext(&li)) != nullptr)
{
client *c = (client*)listNodeValue(ln);
if (c->iel < 0)
continue;
rgcclients[c->iel]++;
}
int ielMinLoad = 0;
int cclientsMin = INT_MAX;
for (int iel = 0; iel < cserver.cthreads; ++iel)
{
if (rgcclients[iel] < cserver.thread_min_client_threshold)
int cclientsThread;
atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
if (cclientsThread < cserver.thread_min_client_threshold)
return iel;
if (rgcclients[iel] < rgcclients[ielMinLoad])
if (cclientsThread < cclientsMin)
{
cclientsMin = cclientsThread;
ielMinLoad = iel;
}
}
return ielMinLoad;
}
@ -1134,18 +1128,21 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
{
{
int ielTarget = chooseBestThreadForAccept();
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
if (ielTarget != ielCur)
{
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN);
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{
acceptCommonHandler(cfd,0,szT, ielTarget);
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
zfree(szT);
});
if (res == AE_OK)
continue;
}
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
}
LLocalThread: