diff --git a/src/networking.cpp b/src/networking.cpp index 579e81561..af97f229b 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1283,11 +1283,60 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) } } +void acceptOnThread(connection *conn, int flags, char *cip) +{ + int ielCur = ielFromEventLoop(serverTL->el); + + int ielTarget = 0; + if (g_pserver->loading) + { + ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active + } + else if (g_fTestMode) + { + // On test mode we don't want any bunching of clients + while (cserver.cthreads > 1 && ielTarget == IDX_EVENT_LOOP_MAIN) + ielTarget = rand() % cserver.cthreads; + } + else + { + ielTarget = chooseBestThreadForAccept(); + } + + rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed); + if (ielTarget != ielCur) + { + char *szT = nullptr; + if (cip != nullptr) + { + szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); + memcpy(szT, cip, NET_IP_STR_LEN); + } + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT]{ + acceptCommonHandler(conn,flags,szT,ielTarget); + if (!g_fTestMode && !g_pserver->loading) + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); + zfree(szT); + }); + + if (res == AE_OK) + return; + // If res != AE_OK we can still try to accept on the local thread + } + if (!g_fTestMode && !g_pserver->loading) + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); + + aeAcquireLock(); + acceptCommonHandler(conn,flags,cip,ielCur); + aeReleaseLock(); +} + void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(mask); UNUSED(privdata); + UNUSED(el); while(max--) { cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport); @@ -1298,55 +1347,8 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); - int ielCur = ielFromEventLoop(el); - if (!g_fTestMode) - { - { - int ielTarget = chooseBestThreadForAccept(); - rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed); - if (ielTarget != ielCur) - { - char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); - memcpy(szT, cip, NET_IP_STR_LEN); - int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ - acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,ielTarget); - rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); - zfree(szT); - }); - - if (res == AE_OK) - continue; - } - rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); - } - - LLocalThread: - aeAcquireLock(); - acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip,ielCur); - aeReleaseLock(); - } - else - { - // In test mode we want a good distribution among threads and avoid the main thread - // since the main thread is most likely to work - int iel = IDX_EVENT_LOOP_MAIN; - while (cserver.cthreads > 1 && iel == IDX_EVENT_LOOP_MAIN) - iel = rand() % cserver.cthreads; - if (iel == ielFromEventLoop(el)) - goto LLocalThread; - char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); - memcpy(szT, cip, NET_IP_STR_LEN); - int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ - acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,iel); - zfree(szT); - }); - if (res != AE_OK) - { - zfree(szT); - goto LLocalThread; - } - } + acceptOnThread(connCreateAcceptedSocket(cfd), 0, cip); } } @@ -1357,7 +1359,6 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(mask); UNUSED(privdata); - int ielCur = ielFromEventLoop(el); while(max--) { cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { @@ -1367,9 +1368,8 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); - aeAcquireLock(); - acceptCommonHandler(connCreateAcceptedTLS(cfd, g_pserver->tls_auth_clients),0,cip,ielCur); - aeReleaseLock(); + + acceptOnThread(connCreateAcceptedTLS(cfd, g_pserver->tls_auth_clients), 0, cip); } } @@ -1379,7 +1379,6 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(mask); UNUSED(privdata); - int iel = ielFromEventLoop(el); while(max--) { cfd = anetUnixAccept(serverTL->neterr, fd); if (cfd == ANET_ERR) { @@ -1390,23 +1389,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket); - aeAcquireLock(); - int ielTarget = rand() % cserver.cthreads; - if (ielTarget == iel) - { - LLocalThread: - acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel); - } - else - { - int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ - acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,ielTarget); - }); - if (res != AE_OK) - goto LLocalThread; - } - aeReleaseLock(); - + acceptOnThread(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL); } } diff --git a/src/replication.cpp b/src/replication.cpp index 48b9f21f8..a0ca766fc 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1449,6 +1449,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, continue; client *slave = (client*)connGetPrivateData(conn); + std::unique_lock ul(slave->lock); // Normally it would be bug to talk a client conn from a different thread, but here we know nobody else will // be sending anything while in this replication state so it is OK if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) { diff --git a/src/server.cpp b/src/server.cpp index d7dcdaebf..8d2a10129 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2923,9 +2923,9 @@ static void initNetworkingThread(int iel, int fReusePort) static void initNetworking(int fReusePort) { - int celListen = (fReusePort) ? cserver.cthreads : 1; - for (int iel = 0; iel < celListen; ++iel) - initNetworkingThread(iel, fReusePort); + // We only initialize the main thread here, since RDB load is a special case that processes + // clients before our server threads are launched. + initNetworkingThread(IDX_EVENT_LOOP_MAIN, fReusePort); /* Open the listening Unix domain socket. */ if (g_pserver->unixsocket != NULL) { @@ -5298,6 +5298,13 @@ void *workerThreadMain(void *parg) serverLog(LOG_INFO, "Thread %d alive.", iel); serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global + if (iel != IDX_EVENT_LOOP_MAIN) + { + aeAcquireLock(); + initNetworkingThread(iel, cserver.cthreads > 1); + aeReleaseLock(); + } + moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run aeEventLoop *el = g_pserver->rgthreadvar[iel].el; try