From 3813d5835e9f0d87392473f41c19b8b72ce952ea Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 23 May 2020 16:03:24 -0400 Subject: [PATCH] During RDB load we can only handle clients on the main thread as our workers are not yet created. This change also unifies the accept logic across the socket types Former-commit-id: 59cda9cfb96eca55c4b445309970bd95277ebc5c --- src/networking.cpp | 123 +++++++++++++++++++------------------------- src/replication.cpp | 1 + src/server.cpp | 13 +++-- 3 files changed, 64 insertions(+), 73 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 579e81561..af97f229b 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1283,11 +1283,60 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel) } } +void acceptOnThread(connection *conn, int flags, char *cip) +{ + int ielCur = ielFromEventLoop(serverTL->el); + + int ielTarget = 0; + if (g_pserver->loading) + { + ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active + } + else if (g_fTestMode) + { + // On test mode we don't want any bunching of clients + while (cserver.cthreads > 1 && ielTarget == IDX_EVENT_LOOP_MAIN) + ielTarget = rand() % cserver.cthreads; + } + else + { + ielTarget = chooseBestThreadForAccept(); + } + + rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed); + if (ielTarget != ielCur) + { + char *szT = nullptr; + if (cip != nullptr) + { + szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); + memcpy(szT, cip, NET_IP_STR_LEN); + } + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT]{ + acceptCommonHandler(conn,flags,szT,ielTarget); + if (!g_fTestMode && !g_pserver->loading) + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); + zfree(szT); + }); + + if (res == AE_OK) + return; + // If res != AE_OK we can still try to accept on the local thread + } + if (!g_fTestMode && !g_pserver->loading) + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); + + aeAcquireLock(); + acceptCommonHandler(conn,flags,cip,ielCur); + aeReleaseLock(); +} + void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(mask); UNUSED(privdata); + UNUSED(el); while(max--) { cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport); @@ -1298,55 +1347,8 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); - int ielCur = ielFromEventLoop(el); - if (!g_fTestMode) - { - { - int ielTarget = chooseBestThreadForAccept(); - rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed); - if (ielTarget != ielCur) - { - char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); - memcpy(szT, cip, NET_IP_STR_LEN); - int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ - acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,ielTarget); - rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); - zfree(szT); - }); - - if (res == AE_OK) - continue; - } - rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); - } - - LLocalThread: - aeAcquireLock(); - acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip,ielCur); - aeReleaseLock(); - } - else - { - // In test mode we want a good distribution among threads and avoid the main thread - // since the main thread is most likely to work - int iel = IDX_EVENT_LOOP_MAIN; - while (cserver.cthreads > 1 && iel == IDX_EVENT_LOOP_MAIN) - iel = rand() % cserver.cthreads; - if (iel == ielFromEventLoop(el)) - goto LLocalThread; - char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); - memcpy(szT, cip, NET_IP_STR_LEN); - int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ - acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,iel); - zfree(szT); - }); - if (res != AE_OK) - { - zfree(szT); - goto LLocalThread; - } - } + acceptOnThread(connCreateAcceptedSocket(cfd), 0, cip); } } @@ -1357,7 +1359,6 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(mask); UNUSED(privdata); - int ielCur = ielFromEventLoop(el); while(max--) { cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport); if (cfd == ANET_ERR) { @@ -1367,9 +1368,8 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); - aeAcquireLock(); - acceptCommonHandler(connCreateAcceptedTLS(cfd, g_pserver->tls_auth_clients),0,cip,ielCur); - aeReleaseLock(); + + acceptOnThread(connCreateAcceptedTLS(cfd, g_pserver->tls_auth_clients), 0, cip); } } @@ -1379,7 +1379,6 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(mask); UNUSED(privdata); - int iel = ielFromEventLoop(el); while(max--) { cfd = anetUnixAccept(serverTL->neterr, fd); if (cfd == ANET_ERR) { @@ -1390,23 +1389,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket); - aeAcquireLock(); - int ielTarget = rand() % cserver.cthreads; - if (ielTarget == iel) - { - LLocalThread: - acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel); - } - else - { - int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ - acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,ielTarget); - }); - if (res != AE_OK) - goto LLocalThread; - } - aeReleaseLock(); - + acceptOnThread(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL); } } diff --git a/src/replication.cpp b/src/replication.cpp index 48b9f21f8..a0ca766fc 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1449,6 +1449,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, continue; client *slave = (client*)connGetPrivateData(conn); + std::unique_lock ul(slave->lock); // Normally it would be bug to talk a client conn from a different thread, but here we know nobody else will // be sending anything while in this replication state so it is OK if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) { diff --git a/src/server.cpp b/src/server.cpp index d7dcdaebf..8d2a10129 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2923,9 +2923,9 @@ static void initNetworkingThread(int iel, int fReusePort) static void initNetworking(int fReusePort) { - int celListen = (fReusePort) ? cserver.cthreads : 1; - for (int iel = 0; iel < celListen; ++iel) - initNetworkingThread(iel, fReusePort); + // We only initialize the main thread here, since RDB load is a special case that processes + // clients before our server threads are launched. + initNetworkingThread(IDX_EVENT_LOOP_MAIN, fReusePort); /* Open the listening Unix domain socket. */ if (g_pserver->unixsocket != NULL) { @@ -5298,6 +5298,13 @@ void *workerThreadMain(void *parg) serverLog(LOG_INFO, "Thread %d alive.", iel); serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global + if (iel != IDX_EVENT_LOOP_MAIN) + { + aeAcquireLock(); + initNetworkingThread(iel, cserver.cthreads > 1); + aeReleaseLock(); + } + moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run aeEventLoop *el = g_pserver->rgthreadvar[iel].el; try