diff --git a/src/ae.cpp b/src/ae.cpp index 17408e316..77a1e1e36 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -30,7 +30,9 @@ * POSSIBILITY OF SUCH DAMAGE. */ +#include #include +#include #include #include #include @@ -41,11 +43,17 @@ #include #include "ae.h" +#include "fastlock.h" extern "C" { #include "zmalloc.h" #include "config.h" } +fastlock g_lock; +thread_local aeEventLoop *g_eventLoopThisThread = NULL; + +#define AE_ASSERT(x) if (!(x)) do { fprintf(stderr, "AE_ASSER FAILURE\n"); *((volatile int*)0) = 1; } while(0) + /* Include the best multiplexing layer supported by this system. * The following should be ordered by performances, descending. */ #ifdef HAVE_EVPORT @@ -62,6 +70,59 @@ extern "C" { #endif #endif +enum class AE_ASYNC_OP +{ + PostFunction, + DeleteFileEvent, +}; +typedef struct aeCommand +{ + AE_ASYNC_OP op; + int fd; + int mask; + aePostFunctionProc *proc; + void *clientData; +} aeCommand; + +void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) +{ + aeCommand cmd; + for (;;) + { + auto cb = read(fd, &cmd, sizeof(aeCommand)); + if (cb != sizeof(cmd)) + { + if (errno == EAGAIN) + break; + fprintf(stderr, "Failed to read pipe.\n"); + } + switch (cmd.op) + { + case AE_ASYNC_OP::DeleteFileEvent: + aeDeleteFileEvent(eventLoop, cmd.fd, cmd.mask); + break; + + case AE_ASYNC_OP::PostFunction: + { + std::unique_lock ulock(g_lock); + ((aePostFunctionProc*)cmd.proc)(cmd.clientData); + break; + } + } + } +} + +int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) +{ + aeCommand cmd; + cmd.op = AE_ASYNC_OP::PostFunction; + cmd.proc = proc; + cmd.clientData = arg; + auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + AE_ASSERT(size == sizeof(cmd)); + return AE_OK; +} + aeEventLoop *aeCreateEventLoop(int setsize) { aeEventLoop *eventLoop; int i; @@ -83,6 +144,16 @@ aeEventLoop *aeCreateEventLoop(int setsize) { * vector with it. */ for (i = 0; i < setsize; i++) eventLoop->events[i].mask = AE_NONE; + + fastlock_init(&eventLoop->flock); + int rgfd[2]; + if (pipe(rgfd) < 0) + goto err; + eventLoop->fdCmdRead = rgfd[0]; + eventLoop->fdCmdWrite = rgfd[1]; + fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK); + aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_THREADSAFE, aeProcessCmd, NULL); + return eventLoop; err: @@ -107,6 +178,7 @@ int aeGetSetSize(aeEventLoop *eventLoop) { * * Otherwise AE_OK is returned and the operation is successful. */ int aeResizeSetSize(aeEventLoop *eventLoop, int setsize) { + AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); int i; if (setsize == eventLoop->setsize) return AE_OK; @@ -129,19 +201,25 @@ extern "C" void aeDeleteEventLoop(aeEventLoop *eventLoop) { zfree(eventLoop->events); zfree(eventLoop->fired); zfree(eventLoop); + fastlock_free(&eventLoop->flock); + close(eventLoop->fdCmdRead); + close(eventLoop->fdCmdWrite); } extern "C" void aeStop(aeEventLoop *eventLoop) { + AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); eventLoop->stop = 1; } extern "C" int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData) { + AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); if (fd >= eventLoop->setsize) { errno = ERANGE; return AE_ERR; } + aeFileEvent *fe = &eventLoop->events[fd]; if (aeApiAddEvent(eventLoop, fd, mask) == -1) @@ -155,8 +233,22 @@ extern "C" int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, return AE_OK; } +void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask) +{ + if (eventLoop == g_eventLoopThisThread) + return aeDeleteFileEvent(eventLoop, fd, mask); + aeCommand cmd; + cmd.op = AE_ASYNC_OP::DeleteFileEvent; + cmd.fd = fd; + cmd.mask = mask; + auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + if (cb != sizeof(cmd)) + fprintf(stderr, "Failed to write to pipe.\n"); +} + extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) { + AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); if (fd >= eventLoop->setsize) return; aeFileEvent *fe = &eventLoop->events[fd]; if (fe->mask == AE_NONE) return; @@ -178,6 +270,7 @@ extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) } extern "C" int aeGetFileEvents(aeEventLoop *eventLoop, int fd) { + AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); if (fd >= eventLoop->setsize) return 0; aeFileEvent *fe = &eventLoop->events[fd]; @@ -211,6 +304,7 @@ extern "C" long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long millise aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { + AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); long long id = eventLoop->timeEventNextId++; aeTimeEvent *te; @@ -231,6 +325,7 @@ extern "C" long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long millise extern "C" int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id) { + AE_ASSERT(g_eventLoopThisThread == NULL || g_eventLoopThisThread == eventLoop); aeTimeEvent *te = eventLoop->timeEventHead; while(te) { if (te->id == id) { @@ -270,6 +365,7 @@ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) /* Process time events */ static int processTimeEvents(aeEventLoop *eventLoop) { + std::unique_lock ulock(g_lock); int processed = 0; aeTimeEvent *te; long long maxId; @@ -343,6 +439,62 @@ static int processTimeEvents(aeEventLoop *eventLoop) { return processed; } +extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int mask, int fd) +{ +#define LOCK_IF_NECESSARY(fe) \ + std::unique_lock ulock(g_lock, std::defer_lock); \ + if (!(fe->mask & AE_THREADSAFE)) \ + ulock.lock() + + int fired = 0; /* Number of events fired for current fd. */ + + /* Normally we execute the readable event first, and the writable + * event laster. This is useful as sometimes we may be able + * to serve the reply of a query immediately after processing the + * query. + * + * However if AE_BARRIER is set in the mask, our application is + * asking us to do the reverse: never fire the writable event + * after the readable. In such a case, we invert the calls. + * This is useful when, for instance, we want to do things + * in the beforeSleep() hook, like fsynching a file to disk, + * before replying to a client. */ + int invert = fe->mask & AE_BARRIER; + + /* Note the "fe->mask & mask & ..." code: maybe an already + * processed event removed an element that fired and we still + * didn't processed, so we check if the event is still valid. + * + * Fire the readable event if the call sequence is not + * inverted. */ + if (!invert && fe->mask & mask & AE_READABLE) { + LOCK_IF_NECESSARY(fe); + fe->rfileProc(eventLoop,fd,fe->clientData,mask); + fired++; + } + + /* Fire the writable event. */ + if (fe->mask & mask & AE_WRITABLE) { + if (!fired || fe->wfileProc != fe->rfileProc) { + LOCK_IF_NECESSARY(fe); + fe->wfileProc(eventLoop,fd,fe->clientData,mask); + fired++; + } + } + + /* If we have to invert the call, fire the readable event now + * after the writable one. */ + if (invert && fe->mask & mask & AE_READABLE) { + if (!fired || fe->wfileProc != fe->rfileProc) { + LOCK_IF_NECESSARY(fe); + fe->rfileProc(eventLoop,fd,fe->clientData,mask); + fired++; + } + } + +#undef LOCK_IF_NECESSARY +} + /* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event @@ -413,55 +565,19 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) numevents = aeApiPoll(eventLoop, tvp); /* After sleep callback. */ - if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) + if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { + std::unique_lock ulock(g_lock, std::defer_lock); + if (!(eventLoop->beforesleepFlags & AE_THREADSAFE)) + ulock.lock(); eventLoop->aftersleep(eventLoop); + } for (j = 0; j < numevents; j++) { aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; - int fired = 0; /* Number of events fired for current fd. */ - /* Normally we execute the readable event first, and the writable - * event laster. This is useful as sometimes we may be able - * to serve the reply of a query immediately after processing the - * query. - * - * However if AE_BARRIER is set in the mask, our application is - * asking us to do the reverse: never fire the writable event - * after the readable. In such a case, we invert the calls. - * This is useful when, for instance, we want to do things - * in the beforeSleep() hook, like fsynching a file to disk, - * before replying to a client. */ - int invert = fe->mask & AE_BARRIER; - - /* Note the "fe->mask & mask & ..." code: maybe an already - * processed event removed an element that fired and we still - * didn't processed, so we check if the event is still valid. - * - * Fire the readable event if the call sequence is not - * inverted. */ - if (!invert && fe->mask & mask & AE_READABLE) { - fe->rfileProc(eventLoop,fd,fe->clientData,mask); - fired++; - } - - /* Fire the writable event. */ - if (fe->mask & mask & AE_WRITABLE) { - if (!fired || fe->wfileProc != fe->rfileProc) { - fe->wfileProc(eventLoop,fd,fe->clientData,mask); - fired++; - } - } - - /* If we have to invert the call, fire the readable event now - * after the writable one. */ - if (invert && fe->mask & mask & AE_READABLE) { - if (!fired || fe->wfileProc != fe->rfileProc) { - fe->rfileProc(eventLoop,fd,fe->clientData,mask); - fired++; - } - } + ProcessEventCore(eventLoop, fe, mask, fd); processed++; } @@ -497,9 +613,14 @@ int aeWait(int fd, int mask, long long milliseconds) { void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; + g_eventLoopThisThread = eventLoop; while (!eventLoop->stop) { - if (eventLoop->beforesleep != NULL) + if (eventLoop->beforesleep != NULL) { + std::unique_lock ulock(g_lock, std::defer_lock); + if (!(eventLoop->beforesleepFlags & AE_THREADSAFE)) + ulock.lock(); eventLoop->beforesleep(eventLoop); + } aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } @@ -508,10 +629,22 @@ const char *aeGetApiName(void) { return aeApiName(); } -void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep) { +void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep, int flags) { eventLoop->beforesleep = beforesleep; + eventLoop->beforesleepFlags = flags; } -void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) { +void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, int flags) { eventLoop->aftersleep = aftersleep; + eventLoop->aftersleepFlags = flags; } + +void aeAcquireLock() +{ + g_lock.lock(); +} + +void aeReleaseLock() +{ + g_lock.unlock(); +} \ No newline at end of file diff --git a/src/ae.h b/src/ae.h index a6ee1d05b..271f0cdf1 100644 --- a/src/ae.h +++ b/src/ae.h @@ -34,6 +34,7 @@ #define __AE_H__ #include +#include "fastlock.h" #ifdef __cplusplus extern "C" { @@ -71,6 +72,7 @@ typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData); typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData); typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop); +typedef void aePostFunctionProc(void *pvArgs); /* File event structure */ typedef struct aeFileEvent { @@ -110,16 +112,23 @@ typedef struct aeEventLoop { int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; + int beforesleepFlags; aeBeforeSleepProc *aftersleep; + int aftersleepFlags; + struct fastlock flock; + int fdCmdWrite; + int fdCmdRead; } aeEventLoop; /* Prototypes */ aeEventLoop *aeCreateEventLoop(int setsize); +int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); void aeDeleteEventLoop(aeEventLoop *eventLoop); void aeStop(aeEventLoop *eventLoop); int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData); void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask); +void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask); int aeGetFileEvents(aeEventLoop *eventLoop, int fd); long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, @@ -129,11 +138,14 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags); int aeWait(int fd, int mask, long long milliseconds); void aeMain(aeEventLoop *eventLoop); const char *aeGetApiName(void); -void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep); -void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep); +void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep, int flags); +void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep, int flags); int aeGetSetSize(aeEventLoop *eventLoop); int aeResizeSetSize(aeEventLoop *eventLoop, int setsize); +void aeAcquireLock(); +void aeReleaseLock(); + #ifdef __cplusplus } #endif diff --git a/src/ae_epoll.cpp b/src/ae_epoll.cpp index cadcc3f51..05638ebdc 100644 --- a/src/ae_epoll.cpp +++ b/src/ae_epoll.cpp @@ -83,7 +83,11 @@ static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { if (mask & AE_READABLE) ee.events |= EPOLLIN; if (mask & AE_WRITABLE) ee.events |= EPOLLOUT; ee.data.fd = fd; - if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1; + if (epoll_ctl(state->epfd,op,fd,&ee) == -1) + { + perror("epoll_ctl failed"); + return -1; + } return 0; } diff --git a/src/aof.c b/src/aof.c index c71f88aa6..39195b510 100644 --- a/src/aof.c +++ b/src/aof.c @@ -105,7 +105,7 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { ln = listFirst(server.aof_rewrite_buf_blocks); block = ln ? ln->value : NULL; if (server.aof_stop_sending_diff || !block) { - aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child, + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child, AE_WRITABLE); return; } @@ -162,8 +162,8 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (aeGetFileEvents(server.el,server.aof_pipe_write_data_to_child) == 0) { - aeCreateFileEvent(server.el, server.aof_pipe_write_data_to_child, + if (aeGetFileEvents(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child) == 0) { + aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); } } @@ -738,7 +738,7 @@ int loadAppendOnlyFile(char *filename) { /* Serve the clients from time to time */ if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); - processEventsWhileBlocked(); + processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN); } if (fgets(buf,sizeof(buf),fp) == NULL) { @@ -1470,7 +1470,7 @@ void aofChildPipeReadable(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Remove the handler since this can be called only one time during a * rewrite. */ - aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_read_ack_from_child,AE_READABLE); } /* Create the pipes used for parent - child process IPC during rewrite. @@ -1488,7 +1488,7 @@ int aofCreatePipes(void) { /* Parent -> children data is non blocking. */ if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error; if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error; - if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; + if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; server.aof_pipe_write_data_to_child = fds[1]; server.aof_pipe_read_data_from_parent = fds[0]; @@ -1507,8 +1507,8 @@ error: } void aofClosePipes(void) { - aeDeleteFileEvent(server.el,server.aof_pipe_read_ack_from_child,AE_READABLE); - aeDeleteFileEvent(server.el,server.aof_pipe_write_data_to_child,AE_WRITABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_read_ack_from_child,AE_READABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.aof_pipe_write_data_to_child,AE_WRITABLE); close(server.aof_pipe_write_data_to_child); close(server.aof_pipe_read_data_from_parent); close(server.aof_pipe_write_ack_to_parent); diff --git a/src/blocked.c b/src/blocked.c index 2ac57b5db..7b9800496 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -109,15 +109,15 @@ void blockClient(client *c, int btype) { /* This function is called in the beforeSleep() function of the event loop * in order to process the pending input buffer of clients that were * unblocked after a blocking operation. */ -void processUnblockedClients(void) { +void processUnblockedClients(int iel) { listNode *ln; client *c; - while (listLength(server.unblocked_clients)) { - ln = listFirst(server.unblocked_clients); + while (listLength(server.rgunblocked_clients[iel])) { + ln = listFirst(server.rgunblocked_clients[iel]); serverAssert(ln != NULL); c = ln->value; - listDelNode(server.unblocked_clients,ln); + listDelNode(server.rgunblocked_clients[iel],ln); c->flags &= ~CLIENT_UNBLOCKED; /* Process remaining data in the input buffer, unless the client @@ -153,7 +153,7 @@ void queueClientForReprocessing(client *c) { * blocking operation, don't add back it into the list multiple times. */ if (!(c->flags & CLIENT_UNBLOCKED)) { c->flags |= CLIENT_UNBLOCKED; - listAddNodeTail(server.unblocked_clients,c); + listAddNodeTail(server.rgunblocked_clients[c->iel],c); } } diff --git a/src/cluster.c b/src/cluster.c index 11eb170fc..eb2749b1b 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -493,7 +493,7 @@ void clusterInit(void) { int j; for (j = 0; j < server.cfd_count; j++) { - if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE, + if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.cfd[j], AE_READABLE, clusterAcceptHandler, NULL) == AE_ERR) serverPanic("Unrecoverable error creating Redis Cluster " "file event."); @@ -601,7 +601,7 @@ clusterLink *createClusterLink(clusterNode *node) { * with this link will have the 'link' field set to NULL. */ void freeClusterLink(clusterLink *link) { if (link->fd != -1) { - aeDeleteFileEvent(server.el, link->fd, AE_READABLE|AE_WRITABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], link->fd, AE_READABLE|AE_WRITABLE); } sdsfree(link->sndbuf); sdsfree(link->rcvbuf); @@ -645,7 +645,7 @@ void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * node identity. */ link = createClusterLink(NULL); link->fd = cfd; - aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link); + aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],cfd,AE_READABLE,clusterReadHandler,link); } } @@ -2132,7 +2132,7 @@ void clusterWriteHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } sdsrange(link->sndbuf,nwritten,-1); if (sdslen(link->sndbuf) == 0) - aeDeleteFileEvent(server.el, link->fd, AE_WRITABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], link->fd, AE_WRITABLE); } /* Read data. Try to read the first field of the header first to check the @@ -2208,7 +2208,7 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { * from event handlers that will do stuff with the same link later. */ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { if (sdslen(link->sndbuf) == 0 && msglen != 0) - aeCreateFileEvent(server.el,link->fd,AE_WRITABLE|AE_BARRIER, + aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],link->fd,AE_WRITABLE|AE_BARRIER, clusterWriteHandler,link); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); @@ -3402,7 +3402,7 @@ void clusterCron(void) { link = createClusterLink(node); link->fd = fd; node->link = link; - aeCreateFileEvent(server.el,link->fd,AE_READABLE, + aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],link->fd,AE_READABLE, clusterReadHandler,link); /* Queue a PING in the new connection ASAP: this is crucial * to avoid false positives in failure detection. diff --git a/src/config.c b/src/config.c index 7e9b19d76..75be21db8 100644 --- a/src/config.c +++ b/src/config.c @@ -968,15 +968,18 @@ void configSetCommand(client *c) { server.maxclients = orig_value; return; } - if ((unsigned int) aeGetSetSize(server.el) < + if ((unsigned int) aeGetSetSize(server.rgel[IDX_EVENT_LOOP_MAIN]) < server.maxclients + CONFIG_FDSET_INCR) { - if (aeResizeSetSize(server.el, - server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) + for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) { - addReplyError(c,"The event loop API used by Redis is not able to handle the specified number of clients"); - server.maxclients = orig_value; - return; + if (aeResizeSetSize(server.rgel[iel], + server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) + { + addReplyError(c,"The event loop API used by Redis is not able to handle the specified number of clients"); + server.maxclients = orig_value; + return; + } } } } diff --git a/src/fastlock.h b/src/fastlock.h index 864c86822..7d06bbdb4 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -21,6 +21,11 @@ struct fastlock int m_lock; #ifdef __cplusplus + fastlock() + { + fastlock_init(this); + } + void lock() { fastlock_lock(this); diff --git a/src/module.c b/src/module.c index 45fa4e293..64c9fb917 100644 --- a/src/module.c +++ b/src/module.c @@ -2696,7 +2696,7 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch /* Create the client and dispatch the command. */ va_start(ap, fmt); - c = createClient(-1); + c = createClient(-1, IDX_EVENT_LOOP_MAIN); c->puser = NULL; /* Root user. */ argv = moduleCreateArgvFromUserFormat(cmdname,fmt,&argc,&flags,ap); replicate = flags & REDISMODULE_ARGV_REPLICATE; @@ -3546,7 +3546,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */ bc->free_privdata = free_privdata; bc->privdata = NULL; - bc->reply_client = createClient(-1); + bc->reply_client = createClient(-1, IDX_EVENT_LOOP_MAIN); bc->reply_client->flags |= CLIENT_MODULE; bc->dbid = c->db->id; c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0; @@ -3692,7 +3692,7 @@ void moduleHandleBlockedClients(void) { !(c->flags & CLIENT_PENDING_WRITE)) { c->flags |= CLIENT_PENDING_WRITE; - listAddNodeHead(server.clients_pending_write,c); + listAddNodeHead(server.rgclients_pending_write[IDX_EVENT_LOOP_MAIN],c); } } @@ -3794,7 +3794,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { * access it safely from another thread, so we create a fake client here * in order to keep things like the currently selected database and similar * things. */ - ctx->client = createClient(-1); + ctx->client = createClient(-1, IDX_EVENT_LOOP_MAIN); if (bc) selectDb(ctx->client,bc->dbid); return ctx; } @@ -4300,7 +4300,7 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod if (memcmp(ri.key,&key,sizeof(key)) == 0) { /* This is the first key, we need to re-install the timer according * to the just added event. */ - aeDeleteTimeEvent(server.el,aeTimer); + aeDeleteTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN],aeTimer); aeTimer = -1; } raxStop(&ri); @@ -4309,7 +4309,7 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod /* If we have no main timer (the old one was invalidated, or this is the * first module timer we have), install one. */ if (aeTimer == -1) - aeTimer = aeCreateTimeEvent(server.el,period,moduleTimerHandler,NULL,NULL); + aeTimer = aeCreateTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN],period,moduleTimerHandler,NULL,NULL); return key; } @@ -4659,7 +4659,7 @@ void moduleInitModulesSystem(void) { /* Set up the keyspace notification susbscriber list and static client */ moduleKeyspaceSubscribers = listCreate(); - moduleFreeContextReusedClient = createClient(-1); + moduleFreeContextReusedClient = createClient(-1, IDX_EVENT_LOOP_MAIN); moduleFreeContextReusedClient->flags |= CLIENT_MODULE; moduleFreeContextReusedClient->puser = NULL; /* root user. */ diff --git a/src/networking.c b/src/networking.c index 1c917af2a..66c234a78 100644 --- a/src/networking.c +++ b/src/networking.c @@ -82,7 +82,7 @@ void linkClient(client *c) { raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL); } -client *createClient(int fd) { +client *createClient(int fd, int iel) { client *c = zmalloc(sizeof(client), MALLOC_LOCAL); /* passing -1 as fd it is possible to create a non connected client. @@ -94,7 +94,7 @@ client *createClient(int fd) { anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); - if (aeCreateFileEvent(server.el,fd,AE_READABLE, + if (aeCreateFileEvent(server.rgel[iel],fd,AE_READABLE|AE_THREADSAFE, readQueryFromClient, c) == AE_ERR) { close(fd); @@ -106,6 +106,7 @@ client *createClient(int fd) { selectDb(c,0); uint64_t client_id; atomicGetIncr(server.next_client_id,client_id,1); + c->iel = iel; c->id = client_id; c->resp = 2; c->fd = fd; @@ -186,7 +187,7 @@ void clientInstallWriteHandler(client *c) { * a system call. We'll only really install the write handler if * we'll not be able to write the whole reply at once. */ c->flags |= CLIENT_PENDING_WRITE; - listAddNodeHead(server.clients_pending_write,c); + listAddNodeHead(server.rgclients_pending_write[c->iel],c); } } @@ -779,9 +780,9 @@ int clientHasPendingReplies(client *c) { } #define MAX_ACCEPTS_PER_CALL 1000 -static void acceptCommonHandler(int fd, int flags, char *ip) { +static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { client *c; - if ((c = createClient(fd)) == NULL) { + if ((c = createClient(fd, iel)) == NULL) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (fd=%d)", strerror(errno),fd); @@ -849,6 +850,32 @@ static void acceptCommonHandler(int fd, int flags, char *ip) { c->flags |= flags; } +struct AcceptCommonHandlerAsyncArgs +{ + int fd; + int flags; + char cip[NET_IP_STR_LEN]; + int fUseCip; + int iel; +}; +static void AcceptCommonHandlerAsync(void *args) +{ + struct AcceptCommonHandlerAsyncArgs *aargs = args; + acceptCommonHandler(aargs->fd, aargs->flags, aargs->cip, aargs->iel); + zfree(args); +} +static void EnqueueAcceptCommonHandler(int fd, int flags, char *ip, int iel) +{ + struct AcceptCommonHandlerAsyncArgs *args = zmalloc(sizeof(struct AcceptCommonHandlerAsyncArgs), MALLOC_LOCAL); + args->fd = fd; + args->flags = flags; + if (ip != NULL) + memcpy(args->cip, ip, NET_IP_STR_LEN); + args->fUseCip = (ip != NULL); + args->iel = iel; + aePostFunction(server.rgel[iel], AcceptCommonHandlerAsync, args); +} + void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; @@ -865,7 +892,24 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); - acceptCommonHandler(cfd,0,cip); + int ielCur = 0; + for (; ielCur < MAX_EVENT_LOOPS; ++ielCur) + { + if (el == server.rgel[ielCur]) + break; + } + serverAssert(ielCur < MAX_EVENT_LOOPS); + int iel = rand() % MAX_EVENT_LOOPS; + if (iel == ielCur) + { + aeAcquireLock(); + acceptCommonHandler(cfd,0,cip, iel); + aeReleaseLock(); + } + else + { + EnqueueAcceptCommonHandler(cfd, 0, cip, iel); + } } } @@ -884,7 +928,7 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket); - acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL); + EnqueueAcceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, rand() % MAX_EVENT_LOOPS); } } @@ -928,26 +972,26 @@ void unlinkClient(client *c) { } /* Unregister async I/O handlers and close the socket. */ - aeDeleteFileEvent(server.el,c->fd,AE_READABLE); - aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + aeDeleteFileEventAsync(server.rgel[c->iel],c->fd,AE_READABLE); + aeDeleteFileEventAsync(server.rgel[c->iel],c->fd,AE_WRITABLE); close(c->fd); c->fd = -1; } /* Remove from the list of pending writes if needed. */ if (c->flags & CLIENT_PENDING_WRITE) { - ln = listSearchKey(server.clients_pending_write,c); + ln = listSearchKey(server.rgclients_pending_write[c->iel],c); serverAssert(ln != NULL); - listDelNode(server.clients_pending_write,ln); + listDelNode(server.rgclients_pending_write[c->iel],ln); c->flags &= ~CLIENT_PENDING_WRITE; } /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & CLIENT_UNBLOCKED) { - ln = listSearchKey(server.unblocked_clients,c); + ln = listSearchKey(server.rgunblocked_clients[c->iel],c); serverAssert(ln != NULL); - listDelNode(server.unblocked_clients,ln); + listDelNode(server.rgunblocked_clients[c->iel],ln); c->flags &= ~CLIENT_UNBLOCKED; } } @@ -1092,6 +1136,7 @@ int writeToClient(int fd, client *c, int handler_installed) { while(clientHasPendingReplies(c)) { if (c->bufpos > 0) { nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen); + if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; @@ -1113,6 +1158,7 @@ int writeToClient(int fd, client *c, int handler_installed) { } nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen); + if (nwritten <= 0) break; c->sentlen += nwritten; totwritten += nwritten; @@ -1145,7 +1191,8 @@ int writeToClient(int fd, client *c, int handler_installed) { zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_SLAVE)) break; } - server.stat_net_output_bytes += totwritten; + + __atomic_fetch_add(&server.stat_net_output_bytes, totwritten, __ATOMIC_RELAXED); if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; @@ -1165,7 +1212,7 @@ int writeToClient(int fd, client *c, int handler_installed) { } if (!clientHasPendingReplies(c)) { c->sentlen = 0; - if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + if (handler_installed) aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { @@ -1187,16 +1234,17 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { * we can just write the replies to the client output buffer without any * need to use a syscall in order to install the writable event handler, * get it called, and so forth. */ -int handleClientsWithPendingWrites(void) { +int handleClientsWithPendingWrites(int iel) { listIter li; listNode *ln; - int processed = listLength(server.clients_pending_write); + list *pending_writes = server.rgclients_pending_write[iel]; + int processed = listLength(pending_writes); - listRewind(server.clients_pending_write,&li); + listRewind(pending_writes,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; - listDelNode(server.clients_pending_write,ln); + listDelNode(pending_writes,ln); /* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ @@ -1219,7 +1267,7 @@ int handleClientsWithPendingWrites(void) { { ae_flags |= AE_BARRIER; } - if (aeCreateFileEvent(server.el, c->fd, ae_flags, + if (aeCreateFileEvent(server.rgel[c->iel], c->fd, ae_flags, sendReplyToClient, c) == AE_ERR) { freeClientAsync(c); @@ -1268,15 +1316,15 @@ void resetClient(client *c) { * path, it is not really released, but only marked for later release. */ void protectClient(client *c) { c->flags |= CLIENT_PROTECTED; - aeDeleteFileEvent(server.el,c->fd,AE_READABLE); - aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); + aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_READABLE); + aeDeleteFileEvent(server.rgel[c->iel],c->fd,AE_WRITABLE); } /* This will undo the client protection done by protectClient() */ void unprotectClient(client *c) { if (c->flags & CLIENT_PROTECTED) { c->flags &= ~CLIENT_PROTECTED; - aeCreateFileEvent(server.el,c->fd,AE_READABLE,readQueryFromClient,c); + aeCreateFileEvent(server.rgel[c->iel],c->fd,AE_READABLE|AE_THREADSAFE,readQueryFromClient,c); if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); } } @@ -1630,6 +1678,14 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(el); UNUSED(mask); + int iel = 0; + for (; iel < MAX_EVENT_LOOPS; ++iel) + { + if (server.rgel[iel] == el) + break; + } + serverAssert(iel == c->iel); + readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query @@ -1650,18 +1706,24 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); + nread = read(fd, c->querybuf+qblen, readlen); + if (nread == -1) { if (errno == EAGAIN) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); + aeAcquireLock(); freeClient(c); + aeReleaseLock(); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); + aeAcquireLock(); freeClient(c); + aeReleaseLock(); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer @@ -1682,7 +1744,9 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); + aeAcquireLock(); freeClient(c); + aeReleaseLock(); return; } @@ -1692,7 +1756,9 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { * was actually applied to the master state: this quantity, and its * corresponding part of the replication stream, will be propagated to * the sub-slaves and to the replication backlog. */ + aeAcquireLock(); processInputBufferAndReplicate(c); + aeReleaseLock(); } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -1775,7 +1841,7 @@ sds catClientInfoString(sds s, client *client) { if (p == flags) *p++ = 'N'; *p++ = '\0'; - emask = client->fd == -1 ? 0 : aeGetFileEvents(server.el,client->fd); + emask = client->fd == -1 ? 0 : aeGetFileEvents(server.rgel[client->iel],client->fd); p = events; if (emask & AE_READABLE) *p++ = 'r'; if (emask & AE_WRITABLE) *p++ = 'w'; @@ -2323,7 +2389,7 @@ void flushSlavesOutputBuffers(void) { * of put_online_on_ack is to postpone the moment it is installed. * This is what we want since slaves in this state should not receive * writes before the first ACK. */ - events = aeGetFileEvents(server.el,slave->fd); + events = aeGetFileEvents(server.rgel[IDX_EVENT_LOOP_MAIN],slave->fd); if (events & AE_WRITABLE && slave->replstate == SLAVE_STATE_ONLINE && clientHasPendingReplies(slave)) @@ -2395,13 +2461,13 @@ int clientsArePaused(void) { * write, close sequence needed to serve a client. * * The function returns the total number of events processed. */ -int processEventsWhileBlocked(void) { +int processEventsWhileBlocked(int iel) { int iterations = 4; /* See the function top-comment. */ int count = 0; while (iterations--) { int events = 0; - events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(); + events += aeProcessEvents(server.rgel[iel], AE_FILE_EVENTS|AE_DONT_WAIT); + events += handleClientsWithPendingWrites(iel); if (!events) break; count += events; } diff --git a/src/rdb.c b/src/rdb.c index 849751d32..590cc7cd5 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1862,7 +1862,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { if (server.masterhost && server.repl_state == REPL_STATE_TRANSFER) replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); - processEventsWhileBlocked(); + processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN); } } diff --git a/src/replication.c b/src/replication.c index e625af854..83d5d901f 100644 --- a/src/replication.c +++ b/src/replication.c @@ -856,7 +856,7 @@ void putSlaveOnline(client *slave) { slave->replstate = SLAVE_STATE_ONLINE; slave->repl_put_online_on_ack = 0; slave->repl_ack_time = server.unixtime; /* Prevent false timeout. */ - if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, + if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], slave->fd, AE_WRITABLE, sendReplyToClient, slave) == AE_ERR) { serverLog(LL_WARNING,"Unable to register writable event for replica bulk transfer: %s", strerror(errno)); freeClient(slave); @@ -918,7 +918,7 @@ void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { if (slave->repldboff == slave->repldbsize) { close(slave->repldbfd); slave->repldbfd = -1; - aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],slave->fd,AE_WRITABLE); putSlaveOnline(slave); } } @@ -989,8 +989,8 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { slave->replpreamble = sdscatprintf(sdsempty(),"$%lld\r\n", (unsigned long long) slave->repldbsize); - aeDeleteFileEvent(server.el,slave->fd,AE_WRITABLE); - if (aeCreateFileEvent(server.el, slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],slave->fd,AE_WRITABLE); + if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], slave->fd, AE_WRITABLE, sendBulkToSlave, slave) == AE_ERR) { freeClient(slave); continue; } @@ -1075,7 +1075,7 @@ void replicationEmptyDbCallback(void *privdata) { * performed, this function materializes the master client we store * at server.master, starting from the specified file descriptor. */ void replicationCreateMasterClient(int fd, int dbid) { - server.master = createClient(fd); + server.master = createClient(fd, IDX_EVENT_LOOP_MAIN); server.master->flags |= CLIENT_MASTER; server.master->authenticated = 1; server.master->reploff = server.master_initial_offset; @@ -1276,7 +1276,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * handler, otherwise it will get called recursively since * rdbLoad() will call the event loop to process events from time to * time for non blocking loading. */ - aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { @@ -1464,7 +1464,7 @@ int slaveTryPartialResynchronization(int fd, int read_reply) { if (reply != NULL) { serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); sdsfree(reply); - aeDeleteFileEvent(server.el,fd,AE_READABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_READABLE); return PSYNC_WRITE_ERROR; } return PSYNC_WAIT_REPLY; @@ -1479,7 +1479,7 @@ int slaveTryPartialResynchronization(int fd, int read_reply) { return PSYNC_WAIT_REPLY; } - aeDeleteFileEvent(server.el,fd,AE_READABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_READABLE); if (!strncmp(reply,"+FULLRESYNC",11)) { char *replid = NULL, *offset = NULL; @@ -1626,7 +1626,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); /* Delete the writable event so that the readable event remains * registered and we can wait for the PONG reply. */ - aeDeleteFileEvent(server.el,fd,AE_WRITABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_WRITABLE); server.repl_state = REPL_STATE_RECEIVE_PONG; /* Send the PING, don't check for errors at all, we have the timeout * that will take care about this. */ @@ -1841,7 +1841,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Setup the non blocking download of the bulk file. */ - if (aeCreateFileEvent(server.el,fd, AE_READABLE,readSyncBulkPayload,NULL) + if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd, AE_READABLE,readSyncBulkPayload,NULL) == AE_ERR) { serverLog(LL_WARNING, @@ -1860,7 +1860,7 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { return; error: - aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_READABLE|AE_WRITABLE); if (dfd != -1) close(dfd); close(fd); server.repl_transfer_s = -1; @@ -1884,7 +1884,7 @@ int connectWithMaster(void) { return C_ERR; } - if (aeCreateFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == + if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_READABLE|AE_WRITABLE,syncWithMaster,NULL) == AE_ERR) { close(fd); @@ -1905,7 +1905,7 @@ int connectWithMaster(void) { void undoConnectWithMaster(void) { int fd = server.repl_transfer_s; - aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); + aeDeleteFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],fd,AE_READABLE|AE_WRITABLE); close(fd); server.repl_transfer_s = -1; } @@ -2236,7 +2236,7 @@ void replicationResurrectCachedMaster(int newfd) { /* Re-add to the list of clients. */ linkClient(server.master); - if (aeCreateFileEvent(server.el, newfd, AE_READABLE, + if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], newfd, AE_READABLE, readQueryFromClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ @@ -2245,7 +2245,7 @@ void replicationResurrectCachedMaster(int newfd) { /* We may also need to install the write handler as well if there is * pending data in the write buffers. */ if (clientHasPendingReplies(server.master)) { - if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, + if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], newfd, AE_WRITABLE, sendReplyToClient, server.master)) { serverLog(LL_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ diff --git a/src/scripting.c b/src/scripting.c index 2f44a511d..9ebfd51fc 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -1104,7 +1104,7 @@ void scriptingInit(int setup) { * Note: there is no need to create it again when this function is called * by scriptingReset(). */ if (server.lua_client == NULL) { - server.lua_client = createClient(-1); + server.lua_client = createClient(-1, IDX_EVENT_LOOP_MAIN); server.lua_client->flags |= CLIENT_LUA; } @@ -1278,7 +1278,7 @@ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) { * here when the EVAL command will return. */ protectClient(server.lua_caller); } - if (server.lua_timedout) processEventsWhileBlocked(); + if (server.lua_timedout) processEventsWhileBlocked(IDX_EVENT_LOOP_MAIN); if (server.lua_kill) { serverLog(LL_WARNING,"Lua script killed by user with SCRIPT KILL."); lua_pushstring(lua,"Script killed by user with SCRIPT KILL..."); diff --git a/src/sentinel.c b/src/sentinel.c index 6fa3cf84f..78416597b 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -2013,7 +2013,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) { link->pending_commands = 0; link->cc_conn_time = mstime(); link->cc->data = link; - redisAeAttach(server.el,link->cc); + redisAeAttach(server.rgel[IDX_EVENT_LOOP_MAIN],link->cc); redisAsyncSetConnectCallback(link->cc, sentinelLinkEstablishedCallback); redisAsyncSetDisconnectCallback(link->cc, @@ -2037,7 +2037,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) { link->pc_conn_time = mstime(); link->pc->data = link; - redisAeAttach(server.el,link->pc); + redisAeAttach(server.rgel[IDX_EVENT_LOOP_MAIN],link->pc); redisAsyncSetConnectCallback(link->pc, sentinelLinkEstablishedCallback); redisAsyncSetDisconnectCallback(link->pc, diff --git a/src/server.c b/src/server.c index 4794571c8..defbca692 100644 --- a/src/server.c +++ b/src/server.c @@ -1619,6 +1619,16 @@ void getExpansiveClientsInfo(size_t *in_usage, size_t *out_usage) { *out_usage = o; } + +void AsyncClientCron(void *pv) +{ + client *c = (client*)pv; + mstime_t now = mstime(); + if (clientsCronHandleTimeout(c,now)) return; + if (clientsCronResizeQueryBuffer(c)) return; + if (clientsCronTrackExpansiveClients(c)) return; +} + /* This function is called by serverCron() and is used in order to perform * operations on clients that are important to perform constantly. For instance * we use this function in order to disconnect clients after a timeout, including @@ -1661,12 +1671,20 @@ void clientsCron(void) { listRotate(server.clients); head = listFirst(server.clients); c = listNodeValue(head); - /* The following functions do different service checks on the client. - * The protocol is that they return non-zero if the client was - * terminated. */ - if (clientsCronHandleTimeout(c,now)) continue; - if (clientsCronResizeQueryBuffer(c)) continue; - if (clientsCronTrackExpansiveClients(c)) continue; + if (c->iel == IDX_EVENT_LOOP_MAIN) + { + /* The following functions do different service checks on the client. + * The protocol is that they return non-zero if the client was + * terminated. */ + if (clientsCronHandleTimeout(c,now)) continue; + if (clientsCronResizeQueryBuffer(c)) continue; + if (clientsCronTrackExpansiveClients(c)) continue; + } + else if (IDX_EVENT_LOOP_MAIN > 1) + { + aePostFunction(server.rgel[c->iel], AsyncClientCron, c); + } + } } @@ -2070,14 +2088,20 @@ void beforeSleep(struct aeEventLoop *eventLoop) { moduleHandleBlockedClients(); /* Try to process pending commands for clients that were just unblocked. */ - if (listLength(server.unblocked_clients)) - processUnblockedClients(); + if (listLength(server.rgunblocked_clients[IDX_EVENT_LOOP_MAIN])) + { + aeReleaseLock(); + processUnblockedClients(IDX_EVENT_LOOP_MAIN); + aeAcquireLock(); + } /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ - handleClientsWithPendingWrites(); + aeReleaseLock(); + handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); + aeAcquireLock(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this @@ -2085,6 +2109,24 @@ void beforeSleep(struct aeEventLoop *eventLoop) { if (moduleCount()) moduleReleaseGIL(); } +void beforeSleepLite(struct aeEventLoop *eventLoop) +{ + int iel = 0; + for (; iel < MAX_EVENT_LOOPS; ++iel) + { + if (server.rgel[iel] == eventLoop) + break; + } + serverAssert(iel < MAX_EVENT_LOOPS); + + /* Try to process pending commands for clients that were just unblocked. */ + if (listLength(server.rgunblocked_clients[iel])) + processUnblockedClients(iel); + + /* Handle writes with pending output buffers. */ + handleClientsWithPendingWrites(iel); +} + /* This function is called immadiately after the event loop multiplexing * API returned, and the control is going to soon return to Redis by invoking * the different events callbacks. */ @@ -2702,6 +2744,8 @@ void initServer(void) { signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); + fastlock_init(&server.flock); + if (server.syslog_enabled) { openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT, server.syslog_facility); @@ -2715,9 +2759,12 @@ void initServer(void) { server.clients_to_close = listCreate(); server.slaves = listCreate(); server.monitors = listCreate(); - server.clients_pending_write = listCreate(); + for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) + { + server.rgclients_pending_write[iel] = listCreate(); + server.rgunblocked_clients[iel] = listCreate(); + } server.slaveseldb = -1; /* Force to emit the first SELECT command. */ - server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; @@ -2726,12 +2773,15 @@ void initServer(void) { createSharedObjects(); adjustOpenFilesLimit(); - server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); - if (server.el == NULL) { - serverLog(LL_WARNING, - "Failed creating the event loop. Error message: '%s'", - strerror(errno)); - exit(1); + for (int i = 0; i < MAX_EVENT_LOOPS; ++i) + { + server.rgel[i] = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); + if (server.rgel[i] == NULL) { + serverLog(LL_WARNING, + "Failed creating the event loop. Error message: '%s'", + strerror(errno)); + exit(1); + } } server.db = zmalloc(sizeof(redisDb)*server.dbnum, MALLOC_LOCAL); @@ -2808,7 +2858,7 @@ void initServer(void) { /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */ - if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { + if (aeCreateTimeEvent(server.rgel[IDX_EVENT_LOOP_MAIN], 1, serverCron, NULL, NULL) == AE_ERR) { serverPanic("Can't create event loop timers."); exit(1); } @@ -2816,20 +2866,23 @@ void initServer(void) { /* Create an event handler for accepting new connections in TCP and Unix * domain sockets. */ for (j = 0; j < server.ipfd_count; j++) { - if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, - acceptTcpHandler,NULL) == AE_ERR) - { - serverPanic( - "Unrecoverable error creating server.ipfd file event."); - } + for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) + { + if (aeCreateFileEvent(server.rgel[iel], server.ipfd[j], AE_READABLE|AE_THREADSAFE, + acceptTcpHandler,NULL) == AE_ERR) + { + serverPanic( + "Unrecoverable error creating server.ipfd file event."); + } + } } - if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, + if (server.sofd > 0 && aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.sofd,AE_READABLE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); /* Register a readable event for the pipe used to awake the event loop * when a blocked client in a module needs attention. */ - if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE, + if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.module_blocked_pipe[0], AE_READABLE, moduleBlockedClientPipeReadable,NULL) == AE_ERR) { serverPanic( "Error registering the readable event for the module " @@ -4738,6 +4791,17 @@ int redisIsSupervised(int mode) { return 0; } +void *workerThreadMain(void *parg) +{ + int iel = (int)((int64_t)parg); + + int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); + aeSetBeforeSleepProc(server.rgel[iel], isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_THREADSAFE); + aeSetAfterSleepProc(server.rgel[iel], isMainThread ? afterSleep : NULL, 0); + aeMain(server.rgel[iel]); + aeDeleteEventLoop(server.rgel[iel]); + return NULL; +} int main(int argc, char **argv) { struct timeval tv; @@ -4940,10 +5004,13 @@ int main(int argc, char **argv) { serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); } - aeSetBeforeSleepProc(server.el,beforeSleep); - aeSetAfterSleepProc(server.el,afterSleep); - aeMain(server.el); - aeDeleteEventLoop(server.el); + pthread_t rgthread[MAX_EVENT_LOOPS]; + for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) + { + pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel)); + } + void *pretT; + pthread_join(rgthread[IDX_EVENT_LOOP_MAIN], &pretT); return 0; } diff --git a/src/server.h b/src/server.h index 7d1f34e40..92abbb841 100644 --- a/src/server.h +++ b/src/server.h @@ -51,6 +51,7 @@ typedef long long mstime_t; /* millisecond time type. */ +#include "fastlock.h" #include "ae.h" /* Event driven programming library */ #include "sds.h" /* Dynamic safe strings */ #include "dict.h" /* Hash tables */ @@ -845,6 +846,8 @@ typedef struct client { /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; + + int iel; /* the event loop index we're registered with */ } client; struct saveparam { @@ -1005,6 +1008,9 @@ struct clusterState; #define CHILD_INFO_TYPE_RDB 0 #define CHILD_INFO_TYPE_AOF 1 +#define MAX_EVENT_LOOPS 2 +#define IDX_EVENT_LOOP_MAIN 0 + struct redisServer { /* General */ pid_t pid; /* Main process pid. */ @@ -1019,7 +1025,8 @@ struct redisServer { redisDb *db; dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ - aeEventLoop *el; + int cel; + aeEventLoop *rgel[MAX_EVENT_LOOPS]; unsigned int lruclock; /* Clock for LRU eviction */ int shutdown_asap; /* SHUTDOWN needed ASAP */ int activerehashing; /* Incremental rehash in serverCron() */ @@ -1051,7 +1058,7 @@ struct redisServer { int cfd_count; /* Used slots in cfd[] */ list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ - list *clients_pending_write; /* There is to write or install handler. */ + list *rgclients_pending_write[MAX_EVENT_LOOPS]; /* There is to write or install handler. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client, only used on crash report */ rax *clients_index; /* Active clients dictionary by client ID. */ @@ -1272,7 +1279,7 @@ struct redisServer { /* Blocked clients */ unsigned int blocked_clients; /* # of clients executing a blocking cmd.*/ unsigned int blocked_clients_by_type[BLOCKED_NUM]; - list *unblocked_clients; /* list of clients to unblock before next loop */ + list *rgunblocked_clients[MAX_EVENT_LOOPS]; /* list of clients to unblock before next loop */ list *ready_keys; /* List of readyList structures for BLPOP & co */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ @@ -1362,6 +1369,8 @@ struct redisServer { pthread_mutex_t lruclock_mutex; pthread_mutex_t next_client_id_mutex; pthread_mutex_t unixtime_mutex; + + struct fastlock flock; }; typedef struct pubsubPattern { @@ -1504,7 +1513,7 @@ size_t redisPopcount(void *s, long count); void redisSetProcTitle(char *title); /* networking.c -- Networking and Client related operations */ -client *createClient(int fd); +client *createClient(int fd, int iel); void closeTimedoutClients(void); void freeClient(client *c); void freeClientAsync(client *c); @@ -1571,8 +1580,8 @@ void disconnectSlaves(void); int listenToPort(int port, int *fds, int *count); void pauseClients(mstime_t duration); int clientsArePaused(void); -int processEventsWhileBlocked(void); -int handleClientsWithPendingWrites(void); +int processEventsWhileBlocked(int iel); +int handleClientsWithPendingWrites(int iel); int clientHasPendingReplies(client *c); void unlinkClient(client *c); int writeToClient(int fd, client *c, int handler_installed); @@ -2014,7 +2023,7 @@ int ldbPendingChildren(void); sds luaCreateFunction(client *c, lua_State *lua, robj *body); /* Blocked clients */ -void processUnblockedClients(void); +void processUnblockedClients(int iel); void blockClient(client *c, int btype); void unblockClient(client *c); void queueClientForReprocessing(client *c);