During RDB load we can only handle clients on the main thread as our workers are not yet created. This change also unifies the accept logic across the socket types

Former-commit-id: 59cda9cfb96eca55c4b445309970bd95277ebc5c
This commit is contained in:
John Sully 2020-05-23 16:03:24 -04:00
parent 5d9afccfdd
commit 18b8a83ecb
3 changed files with 64 additions and 73 deletions

View File

@ -1283,11 +1283,60 @@ static void acceptCommonHandler(connection *conn, int flags, char *ip, int iel)
}
}
void acceptOnThread(connection *conn, int flags, char *cip)
{
int ielCur = ielFromEventLoop(serverTL->el);
int ielTarget = 0;
if (g_pserver->loading)
{
ielTarget = IDX_EVENT_LOOP_MAIN; // During load only the main thread is active
}
else if (g_fTestMode)
{
// On test mode we don't want any bunching of clients
while (cserver.cthreads > 1 && ielTarget == IDX_EVENT_LOOP_MAIN)
ielTarget = rand() % cserver.cthreads;
}
else
{
ielTarget = chooseBestThreadForAccept();
}
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
if (ielTarget != ielCur)
{
char *szT = nullptr;
if (cip != nullptr)
{
szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN);
}
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [conn, flags, ielTarget, szT]{
acceptCommonHandler(conn,flags,szT,ielTarget);
if (!g_fTestMode && !g_pserver->loading)
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
zfree(szT);
});
if (res == AE_OK)
return;
// If res != AE_OK we can still try to accept on the local thread
}
if (!g_fTestMode && !g_pserver->loading)
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
aeAcquireLock();
acceptCommonHandler(conn,flags,cip,ielCur);
aeReleaseLock();
}
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(mask);
UNUSED(privdata);
UNUSED(el);
while(max--) {
cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport);
@ -1298,55 +1347,8 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
int ielCur = ielFromEventLoop(el);
if (!g_fTestMode)
{
{
int ielTarget = chooseBestThreadForAccept();
rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed);
if (ielTarget != ielCur)
{
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN);
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,ielTarget);
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
zfree(szT);
});
if (res == AE_OK)
continue;
}
rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed);
}
LLocalThread:
aeAcquireLock();
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip,ielCur);
aeReleaseLock();
}
else
{
// In test mode we want a good distribution among threads and avoid the main thread
// since the main thread is most likely to work
int iel = IDX_EVENT_LOOP_MAIN;
while (cserver.cthreads > 1 && iel == IDX_EVENT_LOOP_MAIN)
iel = rand() % cserver.cthreads;
if (iel == ielFromEventLoop(el))
goto LLocalThread;
char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL);
memcpy(szT, cip, NET_IP_STR_LEN);
int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,szT,iel);
zfree(szT);
});
if (res != AE_OK)
{
zfree(szT);
goto LLocalThread;
}
}
acceptOnThread(connCreateAcceptedSocket(cfd), 0, cip);
}
}
@ -1357,7 +1359,6 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(mask);
UNUSED(privdata);
int ielCur = ielFromEventLoop(el);
while(max--) {
cfd = anetTcpAccept(serverTL->neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
@ -1367,9 +1368,8 @@ void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
aeAcquireLock();
acceptCommonHandler(connCreateAcceptedTLS(cfd, g_pserver->tls_auth_clients),0,cip,ielCur);
aeReleaseLock();
acceptOnThread(connCreateAcceptedTLS(cfd, g_pserver->tls_auth_clients), 0, cip);
}
}
@ -1379,7 +1379,6 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(mask);
UNUSED(privdata);
int iel = ielFromEventLoop(el);
while(max--) {
cfd = anetUnixAccept(serverTL->neterr, fd);
if (cfd == ANET_ERR) {
@ -1390,23 +1389,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
}
serverLog(LL_VERBOSE,"Accepted connection to %s", g_pserver->unixsocket);
aeAcquireLock();
int ielTarget = rand() % cserver.cthreads;
if (ielTarget == iel)
{
LLocalThread:
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,iel);
}
else
{
int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{
acceptCommonHandler(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL,ielTarget);
});
if (res != AE_OK)
goto LLocalThread;
}
aeReleaseLock();
acceptOnThread(connCreateAcceptedSocket(cfd),CLIENT_UNIX_SOCKET,NULL);
}
}

View File

@ -1449,6 +1449,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
continue;
client *slave = (client*)connGetPrivateData(conn);
std::unique_lock<fastlock> ul(slave->lock);
// Normally it would be bug to talk a client conn from a different thread, but here we know nobody else will
// be sending anything while in this replication state so it is OK
if ((nwritten = connWrite(conn, g_pserver->rdb_pipe_buff, g_pserver->rdb_pipe_bufflen)) == -1) {

View File

@ -2923,9 +2923,9 @@ static void initNetworkingThread(int iel, int fReusePort)
static void initNetworking(int fReusePort)
{
int celListen = (fReusePort) ? cserver.cthreads : 1;
for (int iel = 0; iel < celListen; ++iel)
initNetworkingThread(iel, fReusePort);
// We only initialize the main thread here, since RDB load is a special case that processes
// clients before our server threads are launched.
initNetworkingThread(IDX_EVENT_LOOP_MAIN, fReusePort);
/* Open the listening Unix domain socket. */
if (g_pserver->unixsocket != NULL) {
@ -5298,6 +5298,13 @@ void *workerThreadMain(void *parg)
serverLog(LOG_INFO, "Thread %d alive.", iel);
serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global
if (iel != IDX_EVENT_LOOP_MAIN)
{
aeAcquireLock();
initNetworkingThread(iel, cserver.cthreads > 1);
aeReleaseLock();
}
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
try