diff --git a/src/ae.cpp b/src/ae.cpp index 90c148510..b92cd4a67 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -273,7 +273,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) cmd.proc = proc; cmd.clientData = arg; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); - AE_ASSERT(size == sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; return AE_OK; } @@ -296,6 +297,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch } auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; AE_ASSERT(size == sizeof(cmd)); int ret = AE_OK; if (fSynchronous) diff --git a/src/aof.cpp b/src/aof.cpp index b82be9a34..65647bbba 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -167,11 +167,12 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { * not one already. */ if (!g_pserver->aof_rewrite_pending) { g_pserver->aof_rewrite_pending = true; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { g_pserver->aof_rewrite_pending = false; if (g_pserver->aof_pipe_write_data_to_child >= 0) aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); }); + serverAssert(res == AE_OK); // we can't handle an error here } } @@ -1563,16 +1564,18 @@ error: void aofClosePipes(void) { int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child; - aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ + int res = aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE); close (fdAofAckPipe); }); + serverAssert(res == AE_OK); int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ + res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE); close(fdAofWritePipe); }); + serverAssert(res == AE_OK); g_pserver->aof_pipe_write_data_to_child = -1; close(g_pserver->aof_pipe_read_data_from_parent); diff --git a/src/cluster.cpp b/src/cluster.cpp index f6a6e03dc..c20b0f4c4 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -295,6 +295,15 @@ int clusterLoadConfig(char *filename) { if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) { g_pserver->cluster->currentEpoch = clusterGetMaxEpoch(); } + + if (dictSize(g_pserver->cluster->nodes) > 1 && cserver.thread_min_client_threshold < 100) + { + // Because we expect the individual load of a client to be much less in a cluster (it will spread over multiple server) + // we can increase the grouping of clients on a single thread within reason + cserver.thread_min_client_threshold *= dictSize(g_pserver->cluster->nodes); + cserver.thread_min_client_threshold = std::min(cserver.thread_min_client_threshold, 200); + serverLog(LL_NOTICE, "Expanding min-clients-per-thread to %d due to cluster", cserver.thread_min_client_threshold); + } return C_OK; fmterr: @@ -623,9 +632,10 @@ void freeClusterLink(clusterLink *link) { if (link->node) link->node->link = NULL; link->node = nullptr; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ freeClusterLink(link); }); + serverAssert(res == AE_OK); return; } if (link->fd != -1) { diff --git a/src/config.cpp b/src/config.cpp index c056b98dc..2aaad825e 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -805,6 +805,11 @@ void loadServerConfigFromString(char *config) { } else if (!strcasecmp(argv[0],"enable-pro")) { cserver.fUsePro = true; break; + } else if (!strcasecmp(argv[0],"min-clients-per-thread") && argc == 2) { + cserver.thread_min_client_threshold = atoi(argv[1]); + if (cserver.thread_min_client_threshold < 0 || cserver.thread_min_client_threshold > 400) { + err = "min-thread-client must be between 0 and 400"; goto loaderr; + } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } diff --git a/src/networking.cpp b/src/networking.cpp index 97744c410..097df0f87 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,6 +1003,33 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } +int chooseBestThreadForAccept(int ielCur) +{ + listIter li; + listNode *ln; + int rgcclients[MAX_EVENT_LOOPS] = {0}; + + listRewind(g_pserver->clients, &li); + while ((ln = listNext(&li)) != nullptr) + { + client *c = (client*)listNodeValue(ln); + if (c->iel < 0) + continue; + + rgcclients[c->iel]++; + } + + int ielMinLoad = 0; + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + if (rgcclients[iel] < cserver.thread_min_client_threshold) + return iel; + if (rgcclients[iel] < rgcclients[ielMinLoad]) + ielMinLoad = iel; + } + return ielMinLoad; +} + #define MAX_ACCEPTS_PER_CALL 1000 static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { client *c; @@ -1105,7 +1132,22 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (!g_fTestMode) { - // We always accept on the same thread + { + int ielTarget = chooseBestThreadForAccept(ielCur); + if (ielTarget != ielCur) + { + char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); + memcpy(szT, cip, NET_IP_STR_LEN); + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ + acceptCommonHandler(cfd,0,szT, ielTarget); + zfree(szT); + }); + + if (res == AE_OK) + continue; + } + } + LLocalThread: aeAcquireLock(); acceptCommonHandler(cfd,0,cip, ielCur); @@ -1122,10 +1164,15 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { goto LLocalThread; char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); - aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ + int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ acceptCommonHandler(cfd,0,szT, iel); zfree(szT); }); + if (res != AE_OK) + { + zfree(szT); + goto LLocalThread; + } } } } @@ -1151,13 +1198,16 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int ielTarget = rand() % cserver.cthreads; if (ielTarget == ielCur) { + LLocalThread: acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur); } else { - aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielTarget); }); + if (res != AE_OK) + goto LLocalThread; } aeReleaseLock(); @@ -2529,7 +2579,7 @@ NULL { int iel = client->iel; freeClientAsync(client); - aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { + aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { // note: failure is OK freeClientsInAsyncFreeQueue(iel); }); } diff --git a/src/server.h b/src/server.h index 22d983b0f..3ff023677 100644 --- a/src/server.h +++ b/src/server.h @@ -1604,6 +1604,7 @@ struct redisServerConst { unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */ bool fUsePro = false; + int thread_min_client_threshold = 50; }; struct redisServer {