From 7d5fb5df3f2dff8102ead8542e7697a5bc312e94 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 13 Jan 2020 18:53:12 +0100 Subject: [PATCH] Setting N I/O threads should mean N-1 additional + 1 main thread. --- src/networking.c | 47 +++++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 22 deletions(-) diff --git a/src/networking.c b/src/networking.c index 73dc4afca..a2e454d4b 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2658,9 +2658,9 @@ int io_threads_active; /* Are the threads currently spinning waiting I/O? */ int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ /* This is the list of clients each thread will serve when threaded I/O is - * used. We spawn N threads, and the N+1 list is used for the clients that - * are processed by the main thread itself (this is why ther is "+1"). */ -list *io_threads_list[IO_THREADS_MAX_NUM+1]; + * used. We spawn io_threads_num-1 threads, since one is the main thread + * itself. */ +list *io_threads_list[IO_THREADS_MAX_NUM]; void *IOThreadMain(void *myid) { /* The ID is the thread number (from 0 to server.iothreads_num-1), and is @@ -2720,12 +2720,16 @@ void initThreadedIO(void) { exit(1); } - /* Spawn the I/O threads. */ + /* Spawn and initialize the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { + /* Things we do for all the threads including the main thread. */ + io_threads_list[i] = listCreate(); + if (i == 0) continue; /* Thread 0 is the main thread. */ + + /* Things we do only for the additional threads. */ pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); io_threads_pending[i] = 0; - io_threads_list[i] = listCreate(); pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); @@ -2733,14 +2737,13 @@ void initThreadedIO(void) { } io_threads[i] = tid; } - io_threads_list[server.io_threads_num] = listCreate(); /* For main thread */ } void startThreadedIO(void) { if (tio_debug) { printf("S"); fflush(stdout); } if (tio_debug) printf("--- STARTING THREADED IO ---\n"); serverAssert(io_threads_active == 0); - for (int j = 0; j < server.io_threads_num; j++) + for (int j = 1; j < server.io_threads_num; j++) pthread_mutex_unlock(&io_threads_mutex[j]); io_threads_active = 1; } @@ -2754,7 +2757,7 @@ void stopThreadedIO(void) { (int) listLength(server.clients_pending_read), (int) listLength(server.clients_pending_write)); serverAssert(io_threads_active == 1); - for (int j = 0; j < server.io_threads_num; j++) + for (int j = 1; j < server.io_threads_num; j++) pthread_mutex_lock(&io_threads_mutex[j]); io_threads_active = 0; } @@ -2805,7 +2808,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; - int target_id = item_id % (server.io_threads_num+1); + int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } @@ -2813,23 +2816,23 @@ int handleClientsWithPendingWritesUsingThreads(void) { /* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ io_threads_op = IO_THREADS_OP_WRITE; - for (int j = 0; j < server.io_threads_num; j++) { + for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; } - /* Also use the main thread to process a slide of clients. */ - listRewind(io_threads_list[server.io_threads_num],&li); + /* Also use the main thread to process a slice of clients. */ + listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); writeToClient(c,0); } - listEmpty(io_threads_list[server.io_threads_num]); + listEmpty(io_threads_list[0]); - /* Wait for all threads to end their work. */ + /* Wait for all the other threads to end their work. */ while(1) { unsigned long pending = 0; - for (int j = 0; j < server.io_threads_num; j++) + for (int j = 1; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0) break; } @@ -2890,7 +2893,7 @@ int handleClientsWithPendingReadsUsingThreads(void) { int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); - int target_id = item_id % (server.io_threads_num+1); + int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; } @@ -2898,23 +2901,23 @@ int handleClientsWithPendingReadsUsingThreads(void) { /* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ io_threads_op = IO_THREADS_OP_READ; - for (int j = 0; j < server.io_threads_num; j++) { + for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; } - /* Also use the main thread to process a slide of clients. */ - listRewind(io_threads_list[server.io_threads_num],&li); + /* Also use the main thread to process a slice of clients. */ + listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); readQueryFromClient(c->conn); } - listEmpty(io_threads_list[server.io_threads_num]); + listEmpty(io_threads_list[0]); - /* Wait for all threads to end their work. */ + /* Wait for all the other threads to end their work. */ while(1) { unsigned long pending = 0; - for (int j = 0; j < server.io_threads_num; j++) + for (int j = 1; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0) break; }