From 9f52d518c3a7ff1f1f26683735dda3e43c0624f4 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 26 Feb 2019 00:19:38 -0500 Subject: [PATCH] writes shouldn't be under the global lock Former-commit-id: bcfd9327cb4fcf5e1fca9477862919817ddc5ab8 --- src/module.c | 3 +++ src/networking.cpp | 4 +++- src/server.c | 7 ++++++- src/server.h | 1 + 4 files changed, 13 insertions(+), 2 deletions(-) diff --git a/src/module.c b/src/module.c index 54a36e0c1..7cb059b34 100644 --- a/src/module.c +++ b/src/module.c @@ -3702,7 +3702,10 @@ void moduleHandleBlockedClients(void) { { c->flags |= CLIENT_PENDING_WRITE; AssertCorrectThread(c); + + fastlock_lock(&server.rgthreadvar[c->iel].lockPendingWrite); listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c); + fastlock_unlock(&server.rgthreadvar[c->iel].lockPendingWrite); } } diff --git a/src/networking.cpp b/src/networking.cpp index 2cddccf92..244a83335 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -258,6 +258,7 @@ void clientInstallWriteHandler(client *c) { * a system call. We'll only really install the write handler if * we'll not be able to write the whole reply at once. */ c->flags |= CLIENT_PENDING_WRITE; + std::unique_lock lockf(server.rgthreadvar[c->iel].lockPendingWrite); listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c); } } @@ -1203,6 +1204,7 @@ void unlinkClient(client *c) { /* Remove from the list of pending writes if needed. */ if (c->flags & CLIENT_PENDING_WRITE) { + std::unique_lock lockf(server.rgthreadvar[c->iel].lockPendingWrite); ln = listSearchKey(server.rgthreadvar[c->iel].clients_pending_write,c); serverAssert(ln != NULL); listDelNode(server.rgthreadvar[c->iel].clients_pending_write,ln); @@ -1462,7 +1464,6 @@ int writeToClient(int fd, client *c, int handler_installed) { } else { - lock.unlock(); freeClientAsync(c); } @@ -1567,6 +1568,7 @@ int handleClientsWithPendingWrites(int iel) { listIter li; listNode *ln; + std::unique_lock lockf(server.rgthreadvar[iel].lockPendingWrite); list *list = server.rgthreadvar[iel].clients_pending_write; int processed = listLength(list); serverAssert(iel == (serverTL - server.rgthreadvar)); diff --git a/src/server.c b/src/server.c index 32b7069dd..cb921fd40 100644 --- a/src/server.c +++ b/src/server.c @@ -2109,7 +2109,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ + aeReleaseLock(); handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); + aeAcquireLock(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this @@ -2126,10 +2128,10 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) if (listLength(server.rgthreadvar[iel].unblocked_clients)) { processUnblockedClients(iel); } + aeReleaseLock(); /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(iel); - aeReleaseLock(); } /* This function is called immadiately after the event loop multiplexing @@ -2813,6 +2815,8 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) exit(1); } + fastlock_init(&pvar->lockPendingWrite); + if (!fMain) { if (aeCreateTimeEvent(pvar->el, 1, serverCronLite, NULL, NULL) == AE_ERR) { @@ -5036,6 +5040,7 @@ int main(int argc, char **argv) { int background = server.daemonize && !server.supervised; if (background) daemonize(); + server.cthreads = 2; initServer(); initNetworking(server.cthreads > 1 /* fReusePort */); diff --git a/src/server.h b/src/server.h index b8e3fa4e7..cb0977585 100644 --- a/src/server.h +++ b/src/server.h @@ -1051,6 +1051,7 @@ struct redisServerThreadVars { list *clients_pending_write; /* There is to write or install handler. */ list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *clients_pending_asyncwrite; + struct fastlock lockPendingWrite; }; struct redisServer {