Threaded IO: stop threads when no longer needed + C11 in Makefile.
Now threads are stopped even when the connections drop immediately to zero, not allowing the networking code to detect the condition and stop the threads. serverCron() will handle that.
This commit is contained in:
parent
ea35a81c42
commit
9bf7f302a7
@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua
|
||||
NODEPS:=clean distclean
|
||||
|
||||
# Default settings
|
||||
STD=-std=c99 -pedantic -DREDIS_STATIC=''
|
||||
1TD=-std=c11 -pedantic -DREDIS_STATIC=''
|
||||
ifneq (,$(findstring clang,$(CC)))
|
||||
ifneq (,$(findstring FreeBSD,$(uname_S)))
|
||||
STD+=-Wno-c11-extensions
|
||||
|
@ -2527,7 +2527,7 @@ void *IOThreadMain(void *myid) {
|
||||
void initThreadedIO(void) {
|
||||
pthread_t tid;
|
||||
|
||||
server.io_threads_num = 4;
|
||||
server.io_threads_num = 8;
|
||||
io_threads_active = 0; /* We start with threads not active. */
|
||||
for (int i = 0; i < server.io_threads_num; i++) {
|
||||
pthread_mutex_init(&io_threads_mutex[i],NULL);
|
||||
@ -2543,6 +2543,7 @@ void initThreadedIO(void) {
|
||||
}
|
||||
|
||||
void startThreadedIO(void) {
|
||||
printf("S"); fflush(stdout);
|
||||
if (tio_debug) printf("--- STARTING THREADED IO ---\n");
|
||||
serverAssert(io_threads_active == 0);
|
||||
for (int j = 0; j < server.io_threads_num; j++)
|
||||
@ -2551,6 +2552,7 @@ void startThreadedIO(void) {
|
||||
}
|
||||
|
||||
void stopThreadedIO(void) {
|
||||
printf("E"); fflush(stdout);
|
||||
if (tio_debug) printf("--- STOPPING THREADED IO ---\n");
|
||||
serverAssert(io_threads_active == 1);
|
||||
for (int j = 0; j < server.io_threads_num; j++)
|
||||
@ -2558,19 +2560,36 @@ void stopThreadedIO(void) {
|
||||
io_threads_active = 0;
|
||||
}
|
||||
|
||||
/* This function checks if there are not enough pending clients to justify
|
||||
* taking the I/O threads active: in that case I/O threads are stopped if
|
||||
* currently active.
|
||||
*
|
||||
* The function returns 0 if the I/O threading should be used becuase there
|
||||
* are enough active threads, otherwise 1 is returned and the I/O threads
|
||||
* could be possibly stopped (if already active) as a side effect. */
|
||||
int stopThreadedIOIfNeeded(void) {
|
||||
int pending = listLength(server.clients_pending_write);
|
||||
if (pending < (server.io_threads_num*2)) {
|
||||
if (io_threads_active) stopThreadedIO();
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int handleClientsWithPendingWritesUsingThreads(void) {
|
||||
int processed = listLength(server.clients_pending_write);
|
||||
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
|
||||
|
||||
/* If we have just a few clients to serve, don't use I/O threads, but the
|
||||
* boring synchronous code. */
|
||||
if (processed < (server.io_threads_num*2)) {
|
||||
if (io_threads_active) stopThreadedIO();
|
||||
if (stopThreadedIOIfNeeded()) {
|
||||
return handleClientsWithPendingWrites();
|
||||
} else {
|
||||
if (!io_threads_active) startThreadedIO();
|
||||
}
|
||||
|
||||
/* Start threads if needed. */
|
||||
if (!io_threads_active) startThreadedIO();
|
||||
|
||||
if (tio_debug) printf("%d TOTAL pending clients\n", processed);
|
||||
|
||||
/* Distribute the clients across N different lists. */
|
||||
|
@ -2001,6 +2001,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
migrateCloseTimedoutSockets();
|
||||
}
|
||||
|
||||
/* Stop the I/O threads if we don't have enough pending work. */
|
||||
stopThreadedIOIfNeeded();
|
||||
|
||||
/* Start a scheduled BGSAVE if the corresponding flag is set. This is
|
||||
* useful when we are forced to postpone a BGSAVE because an AOF
|
||||
* rewrite is in progress.
|
||||
|
@ -1579,6 +1579,7 @@ int clientsArePaused(void);
|
||||
int processEventsWhileBlocked(void);
|
||||
int handleClientsWithPendingWrites(void);
|
||||
int handleClientsWithPendingWritesUsingThreads(void);
|
||||
int stopThreadedIOIfNeeded(void);
|
||||
int clientHasPendingReplies(client *c);
|
||||
void unlinkClient(client *c);
|
||||
int writeToClient(int fd, client *c, int handler_installed);
|
||||
|
Loading…
x
Reference in New Issue
Block a user