diff --git a/src/ae.cpp b/src/ae.cpp index e6177c46c..490826b8e 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -144,8 +144,17 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostCppFunction: { + if (cmd.pctl != nullptr) + cmd.pctl->mutexcv.lock(); + std::unique_lock ulock(g_lock); (*cmd.pfn)(); + + if (cmd.pctl != nullptr) + { + cmd.pctl->cv.notify_all(); + cmd.pctl->mutexcv.unlock(); + } delete cmd.pfn; } break; @@ -186,6 +195,11 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) { + if (eventLoop == g_eventLoopThisThread) + { + proc(arg); + return AE_OK; + } aeCommand cmd; cmd.op = AE_ASYNC_OP::PostFunction; cmd.proc = proc; @@ -195,14 +209,33 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) return AE_OK; } -int aePostFunction(aeEventLoop *eventLoop, std::function fn) +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous) { + if (eventLoop == g_eventLoopThisThread) + { + fn(); + return AE_OK; + } + aeCommand cmd; cmd.op = AE_ASYNC_OP::PostCppFunction; cmd.pfn = new std::function(fn); + cmd.pctl = nullptr; + if (fSynchronous) + cmd.pctl = new aeCommandControl(); + std::unique_lock ulock(cmd.pctl->mutexcv, std::defer_lock); + if (fSynchronous) + cmd.pctl->mutexcv.lock(); auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); AE_ASSERT(size == sizeof(cmd)); - return AE_OK; + int ret = AE_OK; + if (fSynchronous) + { + cmd.pctl->cv.wait(ulock); + ret = cmd.pctl->rval; + delete cmd.pctl; + } + return ret; } aeEventLoop *aeCreateEventLoop(int setsize) { @@ -232,7 +265,8 @@ aeEventLoop *aeCreateEventLoop(int setsize) { if (pipe(rgfd) < 0) goto err; eventLoop->fdCmdRead = rgfd[0]; - eventLoop->fdCmdWrite = rgfd[1]; + eventLoop->fdCmdWrite = rgfd[1];; + fcntl(eventLoop->fdCmdWrite, F_SETFL, O_NONBLOCK); fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK); eventLoop->cevents = 0; aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_READ_THREADSAFE, aeProcessCmd, NULL); @@ -325,8 +359,7 @@ void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask) cmd.fd = fd; cmd.mask = mask; auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); - if (cb != sizeof(cmd)) - fprintf(stderr, "Failed to write to pipe.\n"); + AE_ASSERT(cb == sizeof(cmd)); } extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) diff --git a/src/ae.h b/src/ae.h index affe5363f..f1aa0380c 100644 --- a/src/ae.h +++ b/src/ae.h @@ -131,7 +131,7 @@ aeEventLoop *aeCreateEventLoop(int setsize); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); #ifdef __cplusplus } // EXTERN C -int aePostFunction(aeEventLoop *eventLoop, std::function fn); +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false); extern "C" { #endif void aeDeleteEventLoop(aeEventLoop *eventLoop); diff --git a/src/networking.cpp b/src/networking.cpp index 226751571..659fb65ba 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1154,6 +1154,7 @@ void disconnectSlaves(void) { * This is used by freeClient() and replicationCacheMaster(). */ void unlinkClient(client *c) { listNode *ln; + AssertCorrectThread(c); /* If this is marked as current client unset it. */ if (server.current_client == c) server.current_client = NULL; @@ -1196,6 +1197,13 @@ void unlinkClient(client *c) { listDelNode(server.rgthreadvar[c->iel].unblocked_clients,ln); c->flags &= ~CLIENT_UNBLOCKED; } + + if (c->flags & CLIENT_PENDING_ASYNCWRITE) { + ln = listSearchKey(server.rgthreadvar[c->iel].clients_pending_asyncwrite,c); + serverAssert(ln != NULL); + listDelNode(server.rgthreadvar[c->iel].clients_pending_asyncwrite,ln); + c->flags &= ~CLIENT_PENDING_ASYNCWRITE; + } } void freeClient(client *c) { diff --git a/src/replication.cpp b/src/replication.cpp index 33b15d8d1..179a48028 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -298,6 +298,9 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; addReplyProtoAsync(slave,buf,buflen); } + + if (listLength(slaves)) + ProcessPendingAsyncWrites(); // flush them to their respective threads } void replicationFeedMonitors(client *c, list *monitors, int dictid, robj **argv, int argc) { @@ -2039,7 +2042,10 @@ void replicationHandleMasterDisconnection(void) { * the slaves only if we'll have to do a full resync with our master. */ } +void replicaofCommandCore(client *c); void replicaofCommand(client *c) { + // Changing the master needs to be done on the main thread. + /* SLAVEOF is not allowed in cluster mode as replication is automatically * configured using the current address of the master node. */ if (server.cluster_enabled) { @@ -2047,6 +2053,23 @@ void replicaofCommand(client *c) { return; } + if ((serverTL - server.rgthreadvar) == IDX_EVENT_LOOP_MAIN) + { + replicaofCommandCore(c); + } + else + { + aeReleaseLock(); + aePostFunction(server.rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [=]{ + replicaofCommandCore(c); + }, true /*fSync*/); + aeAcquireLock(); + } +} + +void replicaofCommandCore(client *c) { + + /* The special host/port combination "NO" "ONE" turns the instance * into a master. Otherwise the new master address is set. */ if (!strcasecmp((const char*)ptrFromObj(c->argv[1]),"no") && @@ -2068,7 +2091,7 @@ void replicaofCommand(client *c) { if (server.masterhost && !strcasecmp(server.masterhost,(const char*)ptrFromObj(c->argv[1])) && server.masterport == port) { serverLog(LL_NOTICE,"REPLICAOF would result into synchronization with the master we are already connected with. No operation performed."); - addReplySds(c,sdsnew("+OK Already connected to specified master\r\n")); + addReplySdsAsync(c,sdsnew("+OK Already connected to specified master\r\n")); return; } /* There was no previous master or the user specified a different one, @@ -2079,7 +2102,7 @@ void replicaofCommand(client *c) { server.masterhost, server.masterport, client); sdsfree(client); } - addReply(c,shared.ok); + addReplyAsync(c,shared.ok); } /* ROLE command: provide information about the role of the instance diff --git a/src/server.c b/src/server.c index 21d295a42..a2fd4b337 100644 --- a/src/server.c +++ b/src/server.c @@ -3181,7 +3181,7 @@ void preventCommandReplication(client *c) { c->flags |= CLIENT_PREVENT_REPL_PROP; } -static void ProcessPendingAsyncWrites() +void ProcessPendingAsyncWrites() { while(listLength(serverTL->clients_pending_asyncwrite)) { client *c = (client*)listNodeValue(listFirst(serverTL->clients_pending_asyncwrite)); @@ -5046,8 +5046,9 @@ int main(int argc, char **argv) { initServer(); - server.cthreads = 2; //testing + server.cthreads = 1; //testing initNetworking(1 /* fReusePort */); + serverTL = &server.rgthreadvar[IDX_EVENT_LOOP_MAIN]; if (background || server.pidfile) createPidFile(); redisSetProcTitle(argv[0]); diff --git a/src/server.h b/src/server.h index 2613bcbe1..10fa38c55 100644 --- a/src/server.h +++ b/src/server.h @@ -1641,6 +1641,8 @@ void addReplySdsAsync(client *c, sds s); void addReplyBulkSdsAsync(client *c, sds s); void addReplyPushLenAsync(client *c, long length); +void ProcessPendingAsyncWrites(void); + #ifdef __GNUC__ void addReplyErrorFormat(client *c, const char *fmt, ...) __attribute__((format(printf, 2, 3)));