diff --git a/src/ae.cpp b/src/ae.cpp index 77a1e1e36..d1a220e7c 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -152,7 +152,8 @@ aeEventLoop *aeCreateEventLoop(int setsize) { eventLoop->fdCmdRead = rgfd[0]; eventLoop->fdCmdWrite = rgfd[1]; fcntl(eventLoop->fdCmdRead, F_SETFL, O_NONBLOCK); - aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_THREADSAFE, aeProcessCmd, NULL); + eventLoop->cevents = 0; + aeCreateFileEvent(eventLoop, eventLoop->fdCmdRead, AE_READABLE|AE_READ_THREADSAFE, aeProcessCmd, NULL); return eventLoop; @@ -257,6 +258,9 @@ extern "C" void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask) * is removed. */ if (mask & AE_WRITABLE) mask |= AE_BARRIER; + if (mask & AE_WRITABLE) mask |= AE_WRITE_THREADSAFE; + if (mask & AE_READABLE) mask |= AE_READ_THREADSAFE; + aeApiDelEvent(eventLoop, fd, mask); fe->mask = fe->mask & (~mask); if (fd == eventLoop->maxfd && fe->mask == AE_NONE) { @@ -441,9 +445,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) { extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int mask, int fd) { -#define LOCK_IF_NECESSARY(fe) \ +#define LOCK_IF_NECESSARY(fe, tsmask) \ std::unique_lock ulock(g_lock, std::defer_lock); \ - if (!(fe->mask & AE_THREADSAFE)) \ + if (!(fe->mask & tsmask)) \ ulock.lock() int fired = 0; /* Number of events fired for current fd. */ @@ -468,7 +472,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma * Fire the readable event if the call sequence is not * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { - LOCK_IF_NECESSARY(fe); + LOCK_IF_NECESSARY(fe, AE_READ_THREADSAFE); fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } @@ -476,7 +480,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma /* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { - LOCK_IF_NECESSARY(fe); + LOCK_IF_NECESSARY(fe, AE_WRITE_THREADSAFE); fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } @@ -486,7 +490,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma * after the writable one. */ if (invert && fe->mask & mask & AE_READABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { - LOCK_IF_NECESSARY(fe); + LOCK_IF_NECESSARY(fe, AE_READ_THREADSAFE); fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } @@ -567,7 +571,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) /* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) { std::unique_lock ulock(g_lock, std::defer_lock); - if (!(eventLoop->beforesleepFlags & AE_THREADSAFE)) + if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) ulock.lock(); eventLoop->aftersleep(eventLoop); } @@ -586,6 +590,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop); + eventLoop->cevents += processed; return processed; /* return the number of processed file/time events */ } @@ -617,7 +622,7 @@ void aeMain(aeEventLoop *eventLoop) { while (!eventLoop->stop) { if (eventLoop->beforesleep != NULL) { std::unique_lock ulock(g_lock, std::defer_lock); - if (!(eventLoop->beforesleepFlags & AE_THREADSAFE)) + if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) ulock.lock(); eventLoop->beforesleep(eventLoop); } diff --git a/src/ae.h b/src/ae.h index 271f0cdf1..5bbe0013a 100644 --- a/src/ae.h +++ b/src/ae.h @@ -51,7 +51,9 @@ extern "C" { loop iteration. Useful when you want to persist things to disk before sending replies, and want to do that in a group fashion. */ -#define AE_THREADSAFE 8 /* Ok to run concurrently */ +#define AE_READ_THREADSAFE 8 +#define AE_WRITE_THREADSAFE 16 +#define AE_SLEEP_THREADSAFE 32 #define AE_FILE_EVENTS 1 #define AE_TIME_EVENTS 2 @@ -118,6 +120,7 @@ typedef struct aeEventLoop { struct fastlock flock; int fdCmdWrite; int fdCmdRead; + int cevents; } aeEventLoop; /* Prototypes */ diff --git a/src/config.c b/src/config.c index 75be21db8..e0d7e4042 100644 --- a/src/config.c +++ b/src/config.c @@ -971,7 +971,7 @@ void configSetCommand(client *c) { if ((unsigned int) aeGetSetSize(server.rgel[IDX_EVENT_LOOP_MAIN]) < server.maxclients + CONFIG_FDSET_INCR) { - for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) + for (int iel = 0; iel < server.cel; ++iel) { if (aeResizeSetSize(server.rgel[iel], server.maxclients + CONFIG_FDSET_INCR) == AE_ERR) diff --git a/src/networking.c b/src/networking.c index 66c234a78..ada1fe662 100644 --- a/src/networking.c +++ b/src/networking.c @@ -94,7 +94,7 @@ client *createClient(int fd, int iel) { anetEnableTcpNoDelay(NULL,fd); if (server.tcpkeepalive) anetKeepAlive(NULL,fd,server.tcpkeepalive); - if (aeCreateFileEvent(server.rgel[iel],fd,AE_READABLE|AE_THREADSAFE, + if (aeCreateFileEvent(server.rgel[iel],fd,AE_READABLE|AE_READ_THREADSAFE, readQueryFromClient, c) == AE_ERR) { close(fd); @@ -850,36 +850,9 @@ static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { c->flags |= flags; } -struct AcceptCommonHandlerAsyncArgs -{ - int fd; - int flags; - char cip[NET_IP_STR_LEN]; - int fUseCip; - int iel; -}; -static void AcceptCommonHandlerAsync(void *args) -{ - struct AcceptCommonHandlerAsyncArgs *aargs = args; - acceptCommonHandler(aargs->fd, aargs->flags, aargs->cip, aargs->iel); - zfree(args); -} -static void EnqueueAcceptCommonHandler(int fd, int flags, char *ip, int iel) -{ - struct AcceptCommonHandlerAsyncArgs *args = zmalloc(sizeof(struct AcceptCommonHandlerAsyncArgs), MALLOC_LOCAL); - args->fd = fd; - args->flags = flags; - if (ip != NULL) - memcpy(args->cip, ip, NET_IP_STR_LEN); - args->fUseCip = (ip != NULL); - args->iel = iel; - aePostFunction(server.rgel[iel], AcceptCommonHandlerAsync, args); -} - void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; - UNUSED(el); UNUSED(mask); UNUSED(privdata); @@ -892,24 +865,12 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { return; } serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); - int ielCur = 0; - for (; ielCur < MAX_EVENT_LOOPS; ++ielCur) - { - if (el == server.rgel[ielCur]) - break; - } - serverAssert(ielCur < MAX_EVENT_LOOPS); - int iel = rand() % MAX_EVENT_LOOPS; - if (iel == ielCur) - { - aeAcquireLock(); - acceptCommonHandler(cfd,0,cip, iel); - aeReleaseLock(); - } - else - { - EnqueueAcceptCommonHandler(cfd, 0, cip, iel); - } + int ielCur = ielFromEventLoop(el); + + // We always accept on the same thread + aeAcquireLock(); + acceptCommonHandler(cfd,0,cip, ielCur); + aeReleaseLock(); } } @@ -927,8 +888,13 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { "Accepting client connection: %s", server.neterr); return; } + int ielCur = ielFromEventLoop(el); serverLog(LL_VERBOSE,"Accepted connection to %s", server.unixsocket); - EnqueueAcceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, rand() % MAX_EVENT_LOOPS); + + aeAcquireLock(); + acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur); + aeReleaseLock(); + } } @@ -1324,7 +1290,7 @@ void protectClient(client *c) { void unprotectClient(client *c) { if (c->flags & CLIENT_PROTECTED) { c->flags &= ~CLIENT_PROTECTED; - aeCreateFileEvent(server.rgel[c->iel],c->fd,AE_READABLE|AE_THREADSAFE,readQueryFromClient,c); + aeCreateFileEvent(server.rgel[c->iel],c->fd,AE_READABLE|AE_READ_THREADSAFE,readQueryFromClient,c); if (clientHasPendingReplies(c)) clientInstallWriteHandler(c); } } @@ -1678,14 +1644,6 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(el); UNUSED(mask); - int iel = 0; - for (; iel < MAX_EVENT_LOOPS; ++iel) - { - if (server.rgel[iel] == el) - break; - } - serverAssert(iel == c->iel); - readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query diff --git a/src/server.c b/src/server.c index defbca692..bc14279f4 100644 --- a/src/server.c +++ b/src/server.c @@ -2111,13 +2111,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { void beforeSleepLite(struct aeEventLoop *eventLoop) { - int iel = 0; - for (; iel < MAX_EVENT_LOOPS; ++iel) - { - if (server.rgel[iel] == eventLoop) - break; - } - serverAssert(iel < MAX_EVENT_LOOPS); + int iel = ielFromEventLoop(eventLoop); /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.rgunblocked_clients[iel])) @@ -2451,6 +2445,9 @@ void initServerConfig(void) { * script to the slave / AOF. This is the new way starting from * Redis 5. However it is possible to revert it via redis.conf. */ server.lua_always_replicate_commands = 1; + + /* Multithreading */ + server.cel = CONFIG_DEFAULT_THREADS; } extern char **environ; @@ -2737,9 +2734,24 @@ void resetServerStats(void) { server.aof_delayed_fsync = 0; } -void initServer(void) { - int j; +void initServerAcceptHandlers(void) +{ + /* Create an event handler for accepting new connections in TCP and Unix + * domain sockets. */ + for (int j = 0; j < server.ipfd_count; j++) { + int iel = j % server.cel; + if (aeCreateFileEvent(server.rgel[iel], server.ipfd[j], AE_READABLE|AE_READ_THREADSAFE, + acceptTcpHandler,NULL) == AE_ERR) + { + serverPanic( + "Unrecoverable error creating server.ipfd file event."); + } + } + if (server.sofd > 0 && aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.sofd,AE_READABLE|AE_READ_THREADSAFE, + acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); +} +void initServer(void) { signal(SIGHUP, SIG_IGN); signal(SIGPIPE, SIG_IGN); setupSignalHandlers(); @@ -2809,7 +2821,7 @@ void initServer(void) { } /* Create the Redis databases, and initialize other internal state. */ - for (j = 0; j < server.dbnum; j++) { + for (int j = 0; j < server.dbnum; j++) { server.db[j].pdict = dictCreate(&dbDictType,NULL); server.db[j].expires = dictCreate(&keyptrDictType,NULL); server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL); @@ -2863,23 +2875,6 @@ void initServer(void) { exit(1); } - /* Create an event handler for accepting new connections in TCP and Unix - * domain sockets. */ - for (j = 0; j < server.ipfd_count; j++) { - for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) - { - if (aeCreateFileEvent(server.rgel[iel], server.ipfd[j], AE_READABLE|AE_THREADSAFE, - acceptTcpHandler,NULL) == AE_ERR) - { - serverPanic( - "Unrecoverable error creating server.ipfd file event."); - } - } - } - if (server.sofd > 0 && aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.sofd,AE_READABLE, - acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); - - /* Register a readable event for the pipe used to awake the event loop * when a blocked client in a module needs attention. */ if (aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN], server.module_blocked_pipe[0], AE_READABLE, @@ -4794,9 +4789,10 @@ int redisIsSupervised(int mode) { void *workerThreadMain(void *parg) { int iel = (int)((int64_t)parg); + serverLog(LOG_INFO, "Thread %d alive.", iel); int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); - aeSetBeforeSleepProc(server.rgel[iel], isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_THREADSAFE); + aeSetBeforeSleepProc(server.rgel[iel], isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); aeSetAfterSleepProc(server.rgel[iel], isMainThread ? afterSleep : NULL, 0); aeMain(server.rgel[iel]); aeDeleteEventLoop(server.rgel[iel]); @@ -5004,8 +5000,14 @@ int main(int argc, char **argv) { serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); } + + + server.cel = 4; //testing + initServerAcceptHandlers(); + + serverAssert(server.cel > 0 && server.cel <= MAX_EVENT_LOOPS); pthread_t rgthread[MAX_EVENT_LOOPS]; - for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) + for (int iel = 0; iel < server.cel; ++iel) { pthread_create(rgthread + iel, NULL, workerThreadMain, (void*)((int64_t)iel)); } diff --git a/src/server.h b/src/server.h index 92abbb841..c9e95db71 100644 --- a/src/server.h +++ b/src/server.h @@ -169,6 +169,8 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */ #define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */ +#define CONFIG_DEFAULT_THREADS 1 + #define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */ #define ACTIVE_EXPIRE_CYCLE_FAST_DURATION 1000 /* Microseconds */ #define ACTIVE_EXPIRE_CYCLE_SLOW_TIME_PERC 25 /* CPU max % for keys collection */ @@ -1008,7 +1010,7 @@ struct clusterState; #define CHILD_INFO_TYPE_RDB 0 #define CHILD_INFO_TYPE_AOF 1 -#define MAX_EVENT_LOOPS 2 +#define MAX_EVENT_LOOPS 16 #define IDX_EVENT_LOOP_MAIN 0 struct redisServer { @@ -2279,9 +2281,21 @@ int memtest_preserving_test(unsigned long *m, size_t bytes, int passes); void mixDigest(unsigned char *digest, void *ptr, size_t len); void xorDigest(unsigned char *digest, void *ptr, size_t len); +inline int ielFromEventLoop(const aeEventLoop *eventLoop) +{ + int iel = 0; + for (; iel < server.cel; ++iel) + { + if (server.rgel[iel] == eventLoop) + break; + } + serverAssert(iel < server.cel); + return iel; +} + #define redisDebug(fmt, ...) \ printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__) #define redisDebugMark() \ printf("-- MARK %s:%d --\n", __FILE__, __LINE__) -#endif +#endif \ No newline at end of file