diff --git a/src/networking.c b/src/networking.c index 503a85d69..0c6716c50 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2384,8 +2384,9 @@ parseResult handleParseResults(client *c) { /* Process the completion of an IO write operation for a client. * This function handles various post-write tasks, including updating client state, + * allow_async_writes - A flag indicating whether I/O threads can handle pending writes for this client. * returns 1 if processing completed successfully, 0 if processing is skipped. */ -int processClientIOWriteDone(client *c) { +int processClientIOWriteDone(client *c, int allow_async_writes) { /* memory barrier acquire to get the latest client state */ atomic_thread_fence(memory_order_acquire); /* If a client is protected, don't proceed to check the write results as it may trigger conn close. */ @@ -2414,7 +2415,7 @@ int processClientIOWriteDone(client *c) { installClientWriteHandler(c); } else { /* If we can send the client to the I/O thread, let it handle the write. */ - if (trySendWriteToIOThreads(c) == C_OK) return 1; + if (allow_async_writes && trySendWriteToIOThreads(c) == C_OK) return 1; /* Try again in the next eventloop */ putClientInPendingWriteQueue(c); } @@ -2442,7 +2443,7 @@ int processIOThreadsWriteDone(void) { /* Client is still waiting for a pending I/O - skip it */ if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue; - processed += processClientIOWriteDone(c); + processed += processClientIOWriteDone(c, 1); } return processed; @@ -4663,7 +4664,8 @@ int processIOThreadsReadDone(void) { if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue; /* If the write job is done, process it ASAP to free the buffer and handle connection errors */ if (c->io_write_state == CLIENT_COMPLETED_IO) { - processClientIOWriteDone(c); + int allow_async_writes = 0; /* Don't send writes for the client to IO threads before processing the reads */ + processClientIOWriteDone(c, allow_async_writes); } /* memory barrier acquire to get the updated client state */ atomic_thread_fence(memory_order_acquire);