From 9bf7f302a77e69bad40c3d13639537049ece433c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 25 Mar 2019 17:05:06 +0000 Subject: [PATCH] Threaded IO: stop threads when no longer needed + C11 in Makefile. Now threads are stopped even when the connections drop immediately to zero, not allowing the networking code to detect the condition and stop the threads. serverCron() will handle that. --- src/Makefile | 2 +- src/networking.c | 29 ++++++++++++++++++++++++----- src/server.c | 3 +++ src/server.h | 1 + 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/Makefile b/src/Makefile index 93cfdc28e..1c80e547f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua NODEPS:=clean distclean # Default settings -STD=-std=c99 -pedantic -DREDIS_STATIC='' +1TD=-std=c11 -pedantic -DREDIS_STATIC='' ifneq (,$(findstring clang,$(CC))) ifneq (,$(findstring FreeBSD,$(uname_S))) STD+=-Wno-c11-extensions diff --git a/src/networking.c b/src/networking.c index 17d6b1866..d61e1f044 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2527,7 +2527,7 @@ void *IOThreadMain(void *myid) { void initThreadedIO(void) { pthread_t tid; - server.io_threads_num = 4; + server.io_threads_num = 8; io_threads_active = 0; /* We start with threads not active. */ for (int i = 0; i < server.io_threads_num; i++) { pthread_mutex_init(&io_threads_mutex[i],NULL); @@ -2543,6 +2543,7 @@ void initThreadedIO(void) { } void startThreadedIO(void) { + printf("S"); fflush(stdout); if (tio_debug) printf("--- STARTING THREADED IO ---\n"); serverAssert(io_threads_active == 0); for (int j = 0; j < server.io_threads_num; j++) @@ -2551,6 +2552,7 @@ void startThreadedIO(void) { } void stopThreadedIO(void) { + printf("E"); fflush(stdout); if (tio_debug) printf("--- STOPPING THREADED IO ---\n"); serverAssert(io_threads_active == 1); for (int j = 0; j < server.io_threads_num; j++) @@ -2558,19 +2560,36 @@ void stopThreadedIO(void) { io_threads_active = 0; } +/* This function checks if there are not enough pending clients to justify + * taking the I/O threads active: in that case I/O threads are stopped if + * currently active. + * + * The function returns 0 if the I/O threading should be used becuase there + * are enough active threads, otherwise 1 is returned and the I/O threads + * could be possibly stopped (if already active) as a side effect. */ +int stopThreadedIOIfNeeded(void) { + int pending = listLength(server.clients_pending_write); + if (pending < (server.io_threads_num*2)) { + if (io_threads_active) stopThreadedIO(); + return 1; + } else { + return 0; + } +} + int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ /* If we have just a few clients to serve, don't use I/O threads, but the * boring synchronous code. */ - if (processed < (server.io_threads_num*2)) { - if (io_threads_active) stopThreadedIO(); + if (stopThreadedIOIfNeeded()) { return handleClientsWithPendingWrites(); - } else { - if (!io_threads_active) startThreadedIO(); } + /* Start threads if needed. */ + if (!io_threads_active) startThreadedIO(); + if (tio_debug) printf("%d TOTAL pending clients\n", processed); /* Distribute the clients across N different lists. */ diff --git a/src/server.c b/src/server.c index de5a814d0..325c9010c 100644 --- a/src/server.c +++ b/src/server.c @@ -2001,6 +2001,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { migrateCloseTimedoutSockets(); } + /* Stop the I/O threads if we don't have enough pending work. */ + stopThreadedIOIfNeeded(); + /* Start a scheduled BGSAVE if the corresponding flag is set. This is * useful when we are forced to postpone a BGSAVE because an AOF * rewrite is in progress. diff --git a/src/server.h b/src/server.h index d2a563c96..96ee37887 100644 --- a/src/server.h +++ b/src/server.h @@ -1579,6 +1579,7 @@ int clientsArePaused(void); int processEventsWhileBlocked(void); int handleClientsWithPendingWrites(void); int handleClientsWithPendingWritesUsingThreads(void); +int stopThreadedIOIfNeeded(void); int clientHasPendingReplies(client *c); void unlinkClient(client *c); int writeToClient(int fd, client *c, int handler_installed);