Merge branch 'redis_6_merge' into keydbpro
Former-commit-id: 4bd4159e2d8a7cdd0af9719776dca3e7d161d166
This commit is contained in:
commit
2af2e5c730
@ -1037,29 +1037,23 @@ int clientHasPendingReplies(client *c) {
|
||||
return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP);
|
||||
}
|
||||
|
||||
static std::atomic<int> rgacceptsInFlight[MAX_EVENT_LOOPS];
|
||||
int chooseBestThreadForAccept()
|
||||
{
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
int rgcclients[MAX_EVENT_LOOPS] = {0};
|
||||
|
||||
listRewind(g_pserver->clients, &li);
|
||||
while ((ln = listNext(&li)) != nullptr)
|
||||
{
|
||||
client *c = (client*)listNodeValue(ln);
|
||||
if (c->iel < 0)
|
||||
continue;
|
||||
|
||||
rgcclients[c->iel]++;
|
||||
}
|
||||
|
||||
int ielMinLoad = 0;
|
||||
int cclientsMin = INT_MAX;
|
||||
for (int iel = 0; iel < cserver.cthreads; ++iel)
|
||||
{
|
||||
if (rgcclients[iel] < cserver.thread_min_client_threshold)
|
||||
int cclientsThread;
|
||||
atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread);
|
||||
cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed);
|
||||
if (cclientsThread < cserver.thread_min_client_threshold)
|
||||
return iel;
|
||||
if (rgcclients[iel] < rgcclients[ielMinLoad])
|
||||
if (cclientsThread < cclientsMin)
|
||||
{
|
||||
cclientsMin = cclientsThread;
|
||||
ielMinLoad = iel;
|
||||
}
|
||||
}
|
||||
return ielMinLoad;
|
||||
}
|
||||
@ -1207,18 +1201,21 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
{
|
||||
{
|
||||
int ielTarget = chooseBestThreadForAccept();
|
||||
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
|
||||
if (ielTarget != ielCur)
|
||||
{
|
||||
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
|
||||
memcpy(szT, cip, NET_IP_STR_LEN);
|
||||
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{
|
||||
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,ielTarget);
|
||||
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
|
||||
zfree(szT);
|
||||
});
|
||||
|
||||
if (res == AE_OK)
|
||||
continue;
|
||||
}
|
||||
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
LLocalThread:
|
||||
|
Loading…
x
Reference in New Issue
Block a user