From 9f0c80187e55517a8ee23f4ad31a65622e45fb84 Mon Sep 17 00:00:00 2001 From: uriyage <78144248+uriyage@users.noreply.github.com> Date: Tue, 10 Sep 2024 21:20:10 +0300 Subject: [PATCH] Fix crash in async IO threads with TLS (#1011) Fix for https://github.com/valkey-io/valkey/issues/997 Root Cause Analysis: 1. Two different jobs (READ and WRITE) may be sent to the same IO thread. 2. When processing the read job in `processIOThreadsReadDone`, the IO thread may find that the write job has also been completed. 3. In this case, the IO thread calls `processClientIOWriteDone` to first process the completed write job and free the COBs https://github.com/valkey-io/valkey/blob/affbea5dc1ae1a0d80019c4f313d2bf1c3fcb7f9/src/networking.c#L4666 4. If there are pending writes (resulting from pipeline commands), a new async IO write job is sent before processing the completed read job https://github.com/valkey-io/valkey/blob/affbea5dc1ae1a0d80019c4f313d2bf1c3fcb7f9/src/networking.c#L2417 When sending the write job, the `TLS_CONN_FLAG_POSTPONE_UPDATE_STATE` flag is set to prevent the IO thread from updating the event loop, which is not thread-safe. 5. Upon resuming the read job processing, the flag is cleared, https://github.com/valkey-io/valkey/blob/affbea5dc1ae1a0d80019c4f313d2bf1c3fcb7f9/src/networking.c#L4685 causing the IO thread to update the event loop. Fix: Prevent sending async write job for pending writes when a read job is about to be processed. Testing: The issue could not be reproduced due to its rare occurrence, which requires multiple specific conditions to align simultaneously. Signed-off-by: Uri Yagelnik --- src/networking.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/networking.c b/src/networking.c index 503a85d69..0c6716c50 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2384,8 +2384,9 @@ parseResult handleParseResults(client *c) { /* Process the completion of an IO write operation for a client. * This function handles various post-write tasks, including updating client state, + * allow_async_writes - A flag indicating whether I/O threads can handle pending writes for this client. * returns 1 if processing completed successfully, 0 if processing is skipped. */ -int processClientIOWriteDone(client *c) { +int processClientIOWriteDone(client *c, int allow_async_writes) { /* memory barrier acquire to get the latest client state */ atomic_thread_fence(memory_order_acquire); /* If a client is protected, don't proceed to check the write results as it may trigger conn close. */ @@ -2414,7 +2415,7 @@ int processClientIOWriteDone(client *c) { installClientWriteHandler(c); } else { /* If we can send the client to the I/O thread, let it handle the write. */ - if (trySendWriteToIOThreads(c) == C_OK) return 1; + if (allow_async_writes && trySendWriteToIOThreads(c) == C_OK) return 1; /* Try again in the next eventloop */ putClientInPendingWriteQueue(c); } @@ -2442,7 +2443,7 @@ int processIOThreadsWriteDone(void) { /* Client is still waiting for a pending I/O - skip it */ if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue; - processed += processClientIOWriteDone(c); + processed += processClientIOWriteDone(c, 1); } return processed; @@ -4663,7 +4664,8 @@ int processIOThreadsReadDone(void) { if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue; /* If the write job is done, process it ASAP to free the buffer and handle connection errors */ if (c->io_write_state == CLIENT_COMPLETED_IO) { - processClientIOWriteDone(c); + int allow_async_writes = 0; /* Don't send writes for the client to IO threads before processing the reads */ + processClientIOWriteDone(c, allow_async_writes); } /* memory barrier acquire to get the updated client state */ atomic_thread_fence(memory_order_acquire);