diff --git a/src/networking.c b/src/networking.c index cd241dac2..17d6b1866 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2491,24 +2491,34 @@ void *IOThreadMain(void *myid) { long id = (unsigned long)myid; while(1) { - /* ... Wait for start ... */ - pthread_mutex_lock(&io_threads_mutex[id]); - if (io_threads_pending[id]) { - if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); - - /* ... Process ... */ - listIter li; - listNode *ln; - listRewind(io_threads_list[id],&li); - while((ln = listNext(&li))) { - client *c = listNodeValue(ln); - writeToClient(c->fd,c,0); - io_threads_pending[id]--; - } - listEmpty(io_threads_list[id]); + /* Wait for start */ + for (int j = 0; j < 1000000; j++) { + if (io_threads_pending[id] != 0) break; } - pthread_mutex_unlock(&io_threads_mutex[id]); + /* Give the main thread a chance to stop this thread. */ + if (io_threads_pending[id] == 0) { + pthread_mutex_lock(&io_threads_mutex[id]); + pthread_mutex_unlock(&io_threads_mutex[id]); + continue; + } + + serverAssert(io_threads_pending[id] != 0); + + if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); + + /* Process: note that the main thread will never touch our list + * before we drop the pending count to 0. */ + listIter li; + listNode *ln; + listRewind(io_threads_list[id],&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + writeToClient(c->fd,c,0); + } + listEmpty(io_threads_list[id]); + io_threads_pending[id] = 0; + if (tio_debug) printf("[%ld] Done\n", id); } } @@ -2572,13 +2582,17 @@ int handleClientsWithPendingWritesUsingThreads(void) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; int target_id = item_id % server.io_threads_num; - pthread_mutex_lock(&io_threads_mutex[target_id]); listAddNodeTail(io_threads_list[target_id],c); - io_threads_pending[target_id]++; - pthread_mutex_unlock(&io_threads_mutex[target_id]); item_id++; } + /* Give the start condition to the waiting threads, by setting the + * start condition atomic var. */ + for (int j = 0; j < server.io_threads_num; j++) { + int count = listLength(io_threads_list[j]); + io_threads_pending[j] = count; + } + /* Wait for all threads to end their work. */ while(1) { unsigned long pending = 0;