During RDB load we can only handle clients on the main thread as our workers are not yet created. This change also unifies the accept logic across the socket types

Former-commit-id: 59cda9cfb96eca55c4b445309970bd95277ebc5c
This commit is contained in:
John Sully 2020-05-23 16:03:24 -04:00
parent 3c0e1a1baf
commit 3813d5835e
3 changed files with 64 additions and 73 deletions

View File

@ -1283,11 +1283,60 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel)
} }
} }
void acceptOnThread(connection *conn, int flags, char *cip)
{
int ielCur = ielFromEventLoop(serverTL->el);
int ielTarget = 0;
if (g_pserver->loading)
{
ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active
}
else if (g_fTestMode)
{
// On test mode we don't want any bunching of clients
while (cserver.cthreads > 1 && ielTarget == IDX_EVENT_LOOP_MAIN)
ielTarget = rand() % cserver.cthreads;
}
else
{
ielTarget = chooseBestThreadForAccept();
}
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
if (ielTarget != ielCur)
{
char *szT = nullptr;
if (cip != nullptr)
{
szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN);
}
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT]{
acceptCommonHandler(conn,flags,szT,ielTarget);
if (!g_fTestMode && !g_pserver->loading)
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
zfree(szT);
});
if (res == AE_OK)
return;
// If res != AE_OK we can still try to accept on the local thread
}
if (!g_fTestMode && !g_pserver->loading)
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
aeAcquireLock();
acceptCommonHandler(conn,flags,cip,ielCur);
aeReleaseLock();
}
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL; int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN]; char cip[NET_IP_STR_LEN];
UNUSED(mask); UNUSED(mask);
UNUSED(privdata); UNUSED(privdata);
UNUSED(el);
while(max--) { while(max--) {
cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport); cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport);
@ -1298,55 +1347,8 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
return; return;
} }
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
int ielCur = ielFromEventLoop(el);
if (!g_fTestMode) acceptOnThread(connCreateAcceptedSocket(cfd), 0, cip);
{
{
int ielTarget = chooseBestThreadForAccept();
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
if (ielTarget != ielCur)
{
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN);
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,ielTarget);
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
zfree(szT);
});
if (res == AE_OK)
continue;
}
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
}
LLocalThread:
aeAcquireLock();
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip,ielCur);
aeReleaseLock();
}
else
{
// In test mode we want a good distribution among threads and avoid the main thread
// since the main thread is most likely to work
int iel = IDX_EVENT_LOOP_MAIN;
while (cserver.cthreads > 1 && iel == IDX_EVENT_LOOP_MAIN)
iel = rand() % cserver.cthreads;
if (iel == ielFromEventLoop(el))
goto LLocalThread;
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN);
int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,iel);
zfree(szT);
});
if (res != AE_OK)
{
zfree(szT);
goto LLocalThread;
}
}
} }
} }
@ -1357,7 +1359,6 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(mask); UNUSED(mask);
UNUSED(privdata); UNUSED(privdata);
int ielCur = ielFromEventLoop(el);
while(max--) { while(max--) {
cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport); cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) { if (cfd == ANET_ERR) {
@ -1367,9 +1368,8 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
return; return;
} }
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
aeAcquireLock();
acceptCommonHandler(connCreateAcceptedTLS(cfd, g_pserver->tls_auth_clients),0,cip,ielCur); acceptOnThread(connCreateAcceptedTLS(cfd, g_pserver->tls_auth_clients), 0, cip);
aeReleaseLock();
} }
} }
@ -1379,7 +1379,6 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(mask); UNUSED(mask);
UNUSED(privdata); UNUSED(privdata);
int iel = ielFromEventLoop(el);
while(max--) { while(max--) {
cfd = anetUnixAccept(serverTL->neterr, fd); cfd = anetUnixAccept(serverTL->neterr, fd);
if (cfd == ANET_ERR) { if (cfd == ANET_ERR) {
@ -1390,23 +1389,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
} }
serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket); serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket);
aeAcquireLock(); acceptOnThread(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL);
int ielTarget = rand() % cserver.cthreads;
if (ielTarget == iel)
{
LLocalThread:
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel);
}
else
{
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,ielTarget);
});
if (res != AE_OK)
goto LLocalThread;
}
aeReleaseLock();
} }
} }

View File

@ -1449,6 +1449,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
continue; continue;
client *slave = (client*)connGetPrivateData(conn); client *slave = (client*)connGetPrivateData(conn);
std::unique_lock<fastlock> ul(slave->lock);
// Normally it would be bug to talk a client conn from a different thread, but here we know nobody else will // Normally it would be bug to talk a client conn from a different thread, but here we know nobody else will
// be sending anything while in this replication state so it is OK // be sending anything while in this replication state so it is OK
if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) { if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) {

View File

@ -2923,9 +2923,9 @@ static void initNetworkingThread(int iel, int fReusePort)
static void initNetworking(int fReusePort) static void initNetworking(int fReusePort)
{ {
int celListen = (fReusePort) ? cserver.cthreads : 1; // We only initialize the main thread here, since RDB load is a special case that processes
for (int iel = 0; iel < celListen; ++iel) // clients before our server threads are launched.
initNetworkingThread(iel, fReusePort); initNetworkingThread(IDX_EVENT_LOOP_MAIN, fReusePort);
/* Open the listening Unix domain socket. */ /* Open the listening Unix domain socket. */
if (g_pserver->unixsocket != NULL) { if (g_pserver->unixsocket != NULL) {
@ -5298,6 +5298,13 @@ void *workerThreadMain(void *parg)
serverLog(LOG_INFO, "Thread %d alive.", iel); serverLog(LOG_INFO, "Thread %d alive.", iel);
serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global
if (iel != IDX_EVENT_LOOP_MAIN)
{
aeAcquireLock();
initNetworkingThread(iel, cserver.cthreads > 1);
aeReleaseLock();
}
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
aeEventLoop *el = g_pserver->rgthreadvar[iel].el; aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
try try