Fix crash in async IO threads with TLS (#1011)

Fix for https://github.com/valkey-io/valkey/issues/997

Root Cause Analysis:
1. Two different jobs (READ and WRITE) may be sent to the same IO
thread.
2. When processing the read job in `processIOThreadsReadDone`, the IO
thread may find that the write job has also been completed.
3. In this case, the IO thread calls `processClientIOWriteDone` to first
process the completed write job and free the COBs
affbea5dc1/src/networking.c (L4666)
4. If there are pending writes (resulting from pipeline commands), a new
async IO write job is sent before processing the completed read job
affbea5dc1/src/networking.c (L2417)
When sending the write job, the `TLS_CONN_FLAG_POSTPONE_UPDATE_STATE`
flag is set to prevent the IO thread from updating the event loop, which
is not thread-safe.
5. Upon resuming the read job processing, the flag is cleared,
affbea5dc1/src/networking.c (L4685)
causing the IO thread to update the event loop.

Fix:
Prevent sending async write job for pending writes when a read job is
about to be processed.

Testing:
The issue could not be reproduced due to its rare occurrence, which
requires multiple specific conditions to align simultaneously.

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
This commit is contained in:
uriyage 2024-09-10 21:20:10 +03:00 committed by GitHub
parent affbea5dc1
commit 9f0c80187e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -2384,8 +2384,9 @@ parseResult handleParseResults(client *c) {
/* Process the completion of an IO write operation for a client.
* This function handles various post-write tasks, including updating client state,
* allow_async_writes - A flag indicating whether I/O threads can handle pending writes for this client.
* returns 1 if processing completed successfully, 0 if processing is skipped. */
int processClientIOWriteDone(client *c) {
int processClientIOWriteDone(client *c, int allow_async_writes) {
/* memory barrier acquire to get the latest client state */
atomic_thread_fence(memory_order_acquire);
/* If a client is protected, don't proceed to check the write results as it may trigger conn close. */
@ -2414,7 +2415,7 @@ int processClientIOWriteDone(client *c) {
installClientWriteHandler(c);
} else {
/* If we can send the client to the I/O thread, let it handle the write. */
if (trySendWriteToIOThreads(c) == C_OK) return 1;
if (allow_async_writes && trySendWriteToIOThreads(c) == C_OK) return 1;
/* Try again in the next eventloop */
putClientInPendingWriteQueue(c);
}
@ -2442,7 +2443,7 @@ int processIOThreadsWriteDone(void) {
/* Client is still waiting for a pending I/O - skip it */
if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue;
processed += processClientIOWriteDone(c);
processed += processClientIOWriteDone(c, 1);
}
return processed;
@ -4663,7 +4664,8 @@ int processIOThreadsReadDone(void) {
if (c->io_write_state == CLIENT_PENDING_IO || c->io_read_state == CLIENT_PENDING_IO) continue;
/* If the write job is done, process it ASAP to free the buffer and handle connection errors */
if (c->io_write_state == CLIENT_COMPLETED_IO) {
processClientIOWriteDone(c);
int allow_async_writes = 0; /* Don't send writes for the client to IO threads before processing the reads */
processClientIOWriteDone(c, allow_async_writes);
}
/* memory barrier acquire to get the updated client state */
atomic_thread_fence(memory_order_acquire);