diff --git a/src/networking.c b/src/networking.c index 7d8470489..3a36badb8 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1711,12 +1711,12 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); - freeClient(c); + freeClientAsync(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); - freeClient(c); + freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer @@ -1739,7 +1739,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { // serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); - freeClient(c); + freeClientAsync(c); return; } @@ -2538,8 +2538,10 @@ void *IOThreadMain(void *myid) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c->fd,c,0); - } else { + } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(NULL,c->fd,c,0); + } else { + serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); @@ -2632,7 +2634,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { /* Start threads if needed. */ if (!io_threads_active) startThreadedIO(); - if (tio_debug) printf("%d TOTAL pending clients\n", processed); + if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed); /* Distribute the clients across N different lists. */ listIter li; @@ -2649,6 +2651,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { /* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ + io_threads_op = IO_THREADS_OP_WRITE; for (int j = 0; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; @@ -2661,7 +2664,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { pending += io_threads_pending[j]; if (pending == 0) break; } - if (tio_debug) printf("All threads finshed\n"); + if (tio_debug) printf("I/O WRITE All threads finshed\n"); /* Run the list of clients again to install the write handler where * needed. */ @@ -2699,4 +2702,48 @@ int postponeClientRead(client *c) { } int handleClientsWithPendingReadsUsingThreads(void) { + if (!io_threads_active) return 0; + int processed = listLength(server.clients_pending_read); + if (processed == 0) return 0; + + if (tio_debug) printf("%d TOTAL READ pending clients\n", processed); + + /* Distribute the clients across N different lists. */ + listIter li; + listNode *ln; + listRewind(server.clients_pending_read,&li); + int item_id = 0; + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + int target_id = item_id % server.io_threads_num; + listAddNodeTail(io_threads_list[target_id],c); + item_id++; + } + + /* Give the start condition to the waiting threads, by setting the + * start condition atomic var. */ + io_threads_op = IO_THREADS_OP_READ; + for (int j = 0; j < server.io_threads_num; j++) { + int count = listLength(io_threads_list[j]); + io_threads_pending[j] = count; + } + + /* Wait for all threads to end their work. */ + while(1) { + unsigned long pending = 0; + for (int j = 0; j < server.io_threads_num; j++) + pending += io_threads_pending[j]; + if (pending == 0) break; + } + if (tio_debug) printf("I/O READ All threads finshed\n"); + + /* Run the list of clients again to process the new buffers. */ + listRewind(server.clients_pending_read,&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + c->flags &= ~CLIENT_PENDING_READ; + processInputBufferAndReplicate(c); + } + listEmpty(server.clients_pending_read); + return processed; } diff --git a/src/server.c b/src/server.c index ef6b85c44..e0c48b097 100644 --- a/src/server.c +++ b/src/server.c @@ -2092,6 +2092,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); if (moduleCount()) moduleAcquireGIL(); + handleClientsWithPendingReadsUsingThreads(); } /* =========================== Server initialization ======================== */ diff --git a/src/server.h b/src/server.h index dcfcb55fb..0d7882419 100644 --- a/src/server.h +++ b/src/server.h @@ -1578,6 +1578,7 @@ int clientsArePaused(void); int processEventsWhileBlocked(void); int handleClientsWithPendingWrites(void); int handleClientsWithPendingWritesUsingThreads(void); +int handleClientsWithPendingReadsUsingThreads(void); int stopThreadedIOIfNeeded(void); int clientHasPendingReplies(client *c); void unlinkClient(client *c);