writes shouldn't be under the global lock

Former-commit-id: bcfd9327cb4fcf5e1fca9477862919817ddc5ab8
This commit is contained in:
John Sully 2019-02-26 00:19:38 -05:00
parent da1850efa3
commit 9f52d518c3
4 changed files with 13 additions and 2 deletions

View File

@ -3702,7 +3702,10 @@ void moduleHandleBlockedClients(void) {
{ {
c->flags |= CLIENT_PENDING_WRITE; c->flags |= CLIENT_PENDING_WRITE;
AssertCorrectThread(c); AssertCorrectThread(c);
fastlock_lock(&server.rgthreadvar[c->iel].lockPendingWrite);
listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c); listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c);
fastlock_unlock(&server.rgthreadvar[c->iel].lockPendingWrite);
} }
} }

View File

@ -258,6 +258,7 @@ void clientInstallWriteHandler(client *c) {
* a system call. We'll only really install the write handler if * a system call. We'll only really install the write handler if
* we'll not be able to write the whole reply at once. */ * we'll not be able to write the whole reply at once. */
c->flags |= CLIENT_PENDING_WRITE; c->flags |= CLIENT_PENDING_WRITE;
std::unique_lock<fastlock> lockf(server.rgthreadvar[c->iel].lockPendingWrite);
listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c); listAddNodeHead(server.rgthreadvar[c->iel].clients_pending_write,c);
} }
} }
@ -1203,6 +1204,7 @@ void unlinkClient(client *c) {
/* Remove from the list of pending writes if needed. */ /* Remove from the list of pending writes if needed. */
if (c->flags & CLIENT_PENDING_WRITE) { if (c->flags & CLIENT_PENDING_WRITE) {
std::unique_lock<fastlock> lockf(server.rgthreadvar[c->iel].lockPendingWrite);
ln = listSearchKey(server.rgthreadvar[c->iel].clients_pending_write,c); ln = listSearchKey(server.rgthreadvar[c->iel].clients_pending_write,c);
serverAssert(ln != NULL); serverAssert(ln != NULL);
listDelNode(server.rgthreadvar[c->iel].clients_pending_write,ln); listDelNode(server.rgthreadvar[c->iel].clients_pending_write,ln);
@ -1462,7 +1464,6 @@ int writeToClient(int fd, client *c, int handler_installed) {
} }
else else
{ {
lock.unlock();
freeClientAsync(c); freeClientAsync(c);
} }
@ -1567,6 +1568,7 @@ int handleClientsWithPendingWrites(int iel) {
listIter li; listIter li;
listNode *ln; listNode *ln;
std::unique_lock<fastlock> lockf(server.rgthreadvar[iel].lockPendingWrite);
list *list = server.rgthreadvar[iel].clients_pending_write; list *list = server.rgthreadvar[iel].clients_pending_write;
int processed = listLength(list); int processed = listLength(list);
serverAssert(iel == (serverTL - server.rgthreadvar)); serverAssert(iel == (serverTL - server.rgthreadvar));

View File

@ -2109,7 +2109,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
flushAppendOnlyFile(0); flushAppendOnlyFile(0);
/* Handle writes with pending output buffers. */ /* Handle writes with pending output buffers. */
aeReleaseLock();
handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN);
aeAcquireLock();
/* Before we are going to sleep, let the threads access the dataset by /* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this * releasing the GIL. Redis main thread will not touch anything at this
@ -2126,10 +2128,10 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
if (listLength(server.rgthreadvar[iel].unblocked_clients)) { if (listLength(server.rgthreadvar[iel].unblocked_clients)) {
processUnblockedClients(iel); processUnblockedClients(iel);
} }
aeReleaseLock();
/* Handle writes with pending output buffers. */ /* Handle writes with pending output buffers. */
handleClientsWithPendingWrites(iel); handleClientsWithPendingWrites(iel);
aeReleaseLock();
} }
/* This function is called immadiately after the event loop multiplexing /* This function is called immadiately after the event loop multiplexing
@ -2813,6 +2815,8 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
exit(1); exit(1);
} }
fastlock_init(&pvar->lockPendingWrite);
if (!fMain) if (!fMain)
{ {
if (aeCreateTimeEvent(pvar->el, 1, serverCronLite, NULL, NULL) == AE_ERR) { if (aeCreateTimeEvent(pvar->el, 1, serverCronLite, NULL, NULL) == AE_ERR) {
@ -5036,6 +5040,7 @@ int main(int argc, char **argv) {
int background = server.daemonize && !server.supervised; int background = server.daemonize && !server.supervised;
if (background) daemonize(); if (background) daemonize();
server.cthreads = 2;
initServer(); initServer();
initNetworking(server.cthreads > 1 /* fReusePort */); initNetworking(server.cthreads > 1 /* fReusePort */);

View File

@ -1051,6 +1051,7 @@ struct redisServerThreadVars {
list *clients_pending_write; /* There is to write or install handler. */ list *clients_pending_write; /* There is to write or install handler. */
list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */
list *clients_pending_asyncwrite; list *clients_pending_asyncwrite;
struct fastlock lockPendingWrite;
}; };
struct redisServer { struct redisServer {