diff --git a/src/ae.cpp b/src/ae.cpp index d074d702a..e6177c46c 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -121,9 +121,17 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::CreateFileEvent: { - std::unique_lock ulock(cmd.pctl->mutexcv); - std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData)); - cmd.pctl->cv.notify_all(); + if (cmd.pctl != nullptr) + { + cmd.pctl->mutexcv.lock(); + std::atomic_store(&cmd.pctl->rval, aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData)); + cmd.pctl->cv.notify_all(); + cmd.pctl->mutexcv.unlock(); + } + else + { + aeCreateFileEvent(eventLoop, cmd.fd, cmd.mask, cmd.fproc, cmd.clientData); + } } break; @@ -145,8 +153,8 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) } } -int aeCreateRemoteFileEventSync(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData) +int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, + aeFileProc *proc, void *clientData, int fSynchronous) { if (eventLoop == g_eventLoopThisThread) return aeCreateFileEvent(eventLoop, fd, mask, proc, clientData); @@ -157,14 +165,22 @@ int aeCreateRemoteFileEventSync(aeEventLoop *eventLoop, int fd, int mask, cmd.mask = mask; cmd.fproc = proc; cmd.clientData = clientData; - cmd.pctl = new aeCommandControl(); + cmd.pctl = nullptr; + if (fSynchronous) + cmd.pctl = new aeCommandControl(); - std::unique_lock ulock(cmd.pctl->mutexcv); + std::unique_lock ulock(cmd.pctl->mutexcv, std::defer_lock); + if (fSynchronous) + cmd.pctl->mutexcv.lock(); auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); AE_ASSERT(size == sizeof(cmd)); - cmd.pctl->cv.wait(ulock); - int ret = cmd.pctl->rval; - delete cmd.pctl; + int ret = AE_OK; + if (fSynchronous) + { + cmd.pctl->cv.wait(ulock); + ret = cmd.pctl->rval; + delete cmd.pctl; + } return ret; } @@ -691,7 +707,9 @@ void aeMain(aeEventLoop *eventLoop) { ulock.lock(); eventLoop->beforesleep(eventLoop); } + AE_ASSERT(!aeThreadOwnsLock()); // we should have relinquished it after processing aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); + AE_ASSERT(!aeThreadOwnsLock()); // we should have relinquished it after processing } } diff --git a/src/ae.h b/src/ae.h index 3eb0af502..affe5363f 100644 --- a/src/ae.h +++ b/src/ae.h @@ -138,8 +138,10 @@ void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeStop(aeEventLoop *eventLoop); int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); -int aeCreateRemoteFileEventSync(aeEventLoop *eventLoop, int fd, int mask, - aeFileProc *proc, void *clientData); + +int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, + aeFileProc *proc, void *clientData, int fSynchronous); + void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask); int aeGetFileEvents(aeEventLoop *eventLoop, int fd); diff --git a/src/aof.c b/src/aof.c index 0a7fdd9ab..b0f52e2a6 100644 --- a/src/aof.c +++ b/src/aof.c @@ -97,7 +97,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { aofrwblock *block; ssize_t nwritten; serverAssert(el == server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el); - + UNUSED(el); UNUSED(fd); UNUSED(privdata); @@ -1497,7 +1497,7 @@ int aofCreatePipes(void) { /* Parent -> children data is non blocking. */ if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error; if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error; - if (aeCreateRemoteFileEventSync(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; + if (aeCreateRemoteFileEvent(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, fds[2], AE_READABLE, aofChildPipeReadable, NULL, TRUE) == AE_ERR) goto error; server.aof_pipe_write_data_to_child = fds[1]; server.aof_pipe_read_data_from_parent = fds[0]; diff --git a/src/blocked.c b/src/blocked.c index aad18ee31..d96e817d9 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -110,6 +110,8 @@ void blockClient(client *c, int btype) { * in order to process the pending input buffer of clients that were * unblocked after a blocking operation. */ void processUnblockedClients(int iel) { + serverAssert(aeThreadOwnsLock()); + listNode *ln; client *c; list *unblocked_clients = server.rgthreadvar[iel].unblocked_clients; @@ -127,11 +129,9 @@ void processUnblockedClients(int iel) { * client is not blocked before to proceed, but things may change and * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { - aeAcquireLock(); if (c->querybuf && sdslen(c->querybuf) > 0) { processInputBufferAndReplicate(c); } - aeReleaseLock(); } } } @@ -155,7 +155,7 @@ void processUnblockedClients(int iel) { void queueClientForReprocessing(client *c) { /* The client may already be into the unblocked list because of a previous * blocking operation, don't add back it into the list multiple times. */ - AssertCorrectThread(c); + serverAssert(aeThreadOwnsLock()); if (!(c->flags & CLIENT_UNBLOCKED)) { c->flags |= CLIENT_UNBLOCKED; listAddNodeTail(server.rgthreadvar[c->iel].unblocked_clients,c); diff --git a/src/networking.cpp b/src/networking.cpp index 2363ae2a6..226751571 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -32,6 +32,7 @@ #include #include #include +#include static void setProtocolError(const char *errstr, client *c); void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); @@ -233,13 +234,10 @@ void clientInstallWriteHandler(client *c) { } void clientInstallAsyncWriteHandler(client *c) { - UNUSED(c); -#if 0 // Not yet if (!(c->flags & CLIENT_PENDING_ASYNCWRITE)) { c->flags |= CLIENT_PENDING_ASYNCWRITE; listAddNodeHead(serverTL->clients_pending_asyncwrite,c); } -#endif } /* This function is called every time we are going to transmit new data @@ -436,6 +434,10 @@ void addReplyProto(client *c, const char *s, size_t len) { addReplyProtoCore(c, s, len, false); } +void addReplyProtoAsync(client *c, const char *s, size_t len) { + addReplyProtoCore(c, s, len, true); +} + /* Low level function called by the addReplyError...() functions. * It emits the protocol for a Redis error, in the form: * @@ -1127,10 +1129,24 @@ static void freeClientArgv(client *c) { * when we resync with our own master and want to force all our slaves to * resync with us as well. */ void disconnectSlaves(void) { - while (listLength(server.slaves)) { - listNode *ln = listFirst(server.slaves); - freeClient((client*)ln->value); + std::vector vecfreeImmediate; + listNode *ln; + listIter li; + listRewind(server.slaves, &li); + while ((ln = listNext(&li))) { + client *c = (client*)ln->value; + if (c->iel == serverTL - server.rgthreadvar) + { + vecfreeImmediate.push_back(c); + } + else + { + freeClientAsync(c); + } } + + for (client *c : vecfreeImmediate) + freeClient(c); } /* Remove the specified client from global lists where the client could @@ -2706,6 +2722,9 @@ int clientsArePaused(void) { int processEventsWhileBlocked(int iel) { int iterations = 4; /* See the function top-comment. */ int count = 0; + + // BUGBUG - This function isn't fair - why should clients on this thread get to run, but not clients elsewhere? + // We mix up replies when releasing the lock here so more work is needed to fix this while (iterations--) { int events = 0; events += aeProcessEvents(server.rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); diff --git a/src/pubsub.c b/src/pubsub.c index e516bde4b..0fee17bdc 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -51,13 +51,13 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg) { * this message format also includes the pattern that matched the message. */ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) { if (c->resp == 2) - addReply(c,shared.mbulkhdr[4]); + addReplyAsync(c,shared.mbulkhdr[4]); else - addReplyPushLen(c,4); - addReply(c,shared.pmessagebulk); - addReplyBulk(c,pat); - addReplyBulk(c,channel); - addReplyBulk(c,msg); + addReplyPushLenAsync(c,4); + addReplyAsync(c,shared.pmessagebulk); + addReplyBulkAsync(c,pat); + addReplyBulkAsync(c,channel); + addReplyBulkAsync(c,msg); } /* Send the pubsub subscription notification to the client. */ diff --git a/src/replication.cpp b/src/replication.cpp index bdf6bf0f4..33b15d8d1 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -296,7 +296,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle /* Don't feed slaves that are still waiting for BGSAVE to start */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; - addReplyProto(slave,buf,buflen); + addReplyProtoAsync(slave,buf,buflen); } } @@ -334,7 +334,7 @@ void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, listRewind(monitors,&li); while((ln = listNext(&li))) { client *monitor = (client*)ln->value; - addReply(monitor,cmdobj); + addReplyAsync(monitor,cmdobj); } decrRefCount(cmdobj); } diff --git a/src/scripting.c b/src/scripting.c index f97808356..a3d290bbf 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -368,6 +368,9 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { struct redisCommand *cmd; client *c = server.lua_client; sds reply; + + // Ensure our client is on the right thread + c->iel = serverTL - server.rgthreadvar; /* Cached across calls. */ static robj **argv = NULL; @@ -1278,7 +1281,7 @@ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) { * here when the EVAL command will return. */ protectClient(server.lua_caller); } - if (server.lua_timedout) processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN); + if (server.lua_timedout) processEventsWhileBlocked(serverTL - server.rgthreadvar); if (server.lua_kill) { serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL."); lua_pushstring(lua,"Script killed by user with SCRIPT KILL..."); diff --git a/src/server.c b/src/server.c index 76f066d54..21d295a42 100644 --- a/src/server.c +++ b/src/server.c @@ -2104,9 +2104,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].unblocked_clients)) { - aeReleaseLock(); processUnblockedClients(IDX_EVENT_LOOP_MAIN); - aeAcquireLock(); } /* Write the AOF buffer on disk */ @@ -2129,9 +2127,12 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.rgthreadvar[iel].unblocked_clients)) { + aeAcquireLock(); processUnblockedClients(iel); + aeReleaseLock(); } + /* Handle writes with pending output buffers. */ handleClientsWithPendingWrites(iel); } @@ -3213,7 +3214,7 @@ static void ProcessPendingAsyncWrites() ae_flags |= AE_BARRIER; } - if (aeCreateRemoteFileEventSync(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) + if (aeCreateRemoteFileEvent(server.rgthreadvar[c->iel].el, c->fd, ae_flags, sendReplyToClient, c, FALSE) == AE_ERR) freeClientAsync(c); } } @@ -5045,7 +5046,7 @@ int main(int argc, char **argv) { initServer(); - server.cthreads = 1; //testing + server.cthreads = 2; //testing initNetworking(1 /* fReusePort */); if (background || server.pidfile) createPidFile(); diff --git a/src/server.h b/src/server.h index 213739c41..2613bcbe1 100644 --- a/src/server.h +++ b/src/server.h @@ -1044,7 +1044,7 @@ struct redisServerThreadVars { int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ int ipfd_count; /* Used slots in ipfd[] */ list *clients_pending_write; /* There is to write or install handler. */ - list *unblocked_clients; /* list of clients to unblock before next loop */ + list *unblocked_clients; /* list of clients to unblock before next loop NOT THREADSAFE */ list *clients_pending_asyncwrite; }; @@ -1628,6 +1628,7 @@ void unprotectClient(client *c); // Special Thread-safe addReply() commands for posting messages to clients from a different thread void addReplyAsync(client *c, robj *obj); void addReplyArrayLenAsync(client *c, long length); +void addReplyProtoAsync(client *c, const char *s, size_t len); void addReplyBulkAsync(client *c, robj *obj); void addReplyBulkCBufferAsync(client *c, const void *p, size_t len); void addReplyErrorAsync(client *c, const char *err);