
Currently, when running tests with IO threads, we set the `events-per-io-thread` config to 0. This activated IO threads 100% of the time, regardless of the number of IO events. This is causing issues with tests running multiple server instances, as it drained machine CPU resources. As a result, tests could have very long runtimes, especially on limited instances. For example, in https://github.com/valkey-io/valkey/actions/runs/10066315827/job/27827426986?pr=804, the `Cluster consistency during live resharding` test ran for 1 hour and 41 minutes. This PR addresses the issue by: 1. Deactivating IO threads when there are no IO events 2. Continuing to offload all IO events to IO threads Tested on 16 cores instance, after implementing these changes, the runtime for the `Cluster consistency during live resharding` test dropped from 7 minutes an 14 seconds to 3 minutes and 28 seconds. Signed-off-by: Uri Yagelnik <uriy@amazon.com>
550 lines
22 KiB
C
550 lines
22 KiB
C
#include "io_threads.h"
|
|
|
|
static __thread int thread_id = 0; /* Thread local var */
|
|
static pthread_t io_threads[IO_THREADS_MAX_NUM] = {0};
|
|
static pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
|
|
|
|
/* IO jobs queue functions - Used to send jobs from the main-thread to the IO thread. */
|
|
typedef void (*job_handler)(void *);
|
|
typedef struct iojob {
|
|
job_handler handler;
|
|
void *data;
|
|
} iojob;
|
|
|
|
typedef struct IOJobQueue {
|
|
iojob *ring_buffer;
|
|
size_t size;
|
|
_Atomic size_t head __attribute__((aligned(CACHE_LINE_SIZE))); /* Next write index for producer (main-thread) */
|
|
_Atomic size_t tail __attribute__((aligned(CACHE_LINE_SIZE))); /* Next read index for consumer (IO-thread) */
|
|
} IOJobQueue;
|
|
IOJobQueue io_jobs[IO_THREADS_MAX_NUM] = {0};
|
|
|
|
/* Initialize the job queue with a specified number of items. */
|
|
static void IOJobQueue_init(IOJobQueue *jq, size_t item_count) {
|
|
debugServerAssertWithInfo(NULL, NULL, inMainThread());
|
|
jq->ring_buffer = zcalloc(item_count * sizeof(iojob));
|
|
jq->size = item_count; /* Total number of items */
|
|
jq->head = 0;
|
|
jq->tail = 0;
|
|
}
|
|
|
|
/* Clean up the job queue and free allocated memory. */
|
|
static void IOJobQueue_cleanup(IOJobQueue *jq) {
|
|
debugServerAssertWithInfo(NULL, NULL, inMainThread());
|
|
zfree(jq->ring_buffer);
|
|
memset(jq, 0, sizeof(*jq));
|
|
}
|
|
|
|
static int IOJobQueue_isFull(const IOJobQueue *jq) {
|
|
debugServerAssertWithInfo(NULL, NULL, inMainThread());
|
|
size_t current_head = atomic_load_explicit(&jq->head, memory_order_relaxed);
|
|
/* We don't use memory_order_acquire for the tail due to performance reasons,
|
|
* In the worst case we will just assume wrongly the buffer is full and the main thread will do the job by itself. */
|
|
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
|
|
size_t next_head = (current_head + 1) % jq->size;
|
|
return next_head == current_tail;
|
|
}
|
|
|
|
/* Attempt to push a new job to the queue from the main thread.
|
|
* the caller must ensure the queue is not full before calling this function. */
|
|
static void IOJobQueue_push(IOJobQueue *jq, job_handler handler, void *data) {
|
|
debugServerAssertWithInfo(NULL, NULL, inMainThread());
|
|
/* Assert the queue is not full - should not happen as the caller should check for it before. */
|
|
serverAssert(!IOJobQueue_isFull(jq));
|
|
|
|
/* No need to use atomic acquire for the head, as the main thread is the only one that writes to the head index. */
|
|
size_t current_head = atomic_load_explicit(&jq->head, memory_order_relaxed);
|
|
size_t next_head = (current_head + 1) % jq->size;
|
|
|
|
/* We store directly the job's fields to avoid allocating a new iojob structure. */
|
|
serverAssert(jq->ring_buffer[current_head].data == NULL);
|
|
serverAssert(jq->ring_buffer[current_head].handler == NULL);
|
|
jq->ring_buffer[current_head].data = data;
|
|
jq->ring_buffer[current_head].handler = handler;
|
|
|
|
/* memory_order_release to make sure the data is visible to the consumer (the IO thread). */
|
|
atomic_store_explicit(&jq->head, next_head, memory_order_release);
|
|
}
|
|
|
|
/* Returns the number of jobs currently available for consumption in the given job queue.
|
|
*
|
|
* This function ensures memory visibility for the jobs by
|
|
* using a memory acquire fence when there are jobs available. */
|
|
static size_t IOJobQueue_availableJobs(const IOJobQueue *jq) {
|
|
debugServerAssertWithInfo(NULL, NULL, !inMainThread());
|
|
/* We use memory_order_acquire to make sure the head and the job's fields are visible to the consumer (IO thread). */
|
|
size_t current_head = atomic_load_explicit(&jq->head, memory_order_acquire);
|
|
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
|
|
|
|
if (current_head >= current_tail) {
|
|
return current_head - current_tail;
|
|
} else {
|
|
return jq->size - (current_tail - current_head);
|
|
}
|
|
}
|
|
|
|
/* Checks if the job Queue is empty.
|
|
* returns 1 if the buffer is currently empty, 0 otherwise.
|
|
* Called by the main-thread only.
|
|
* This function uses relaxed memory order, so the caller need to use an acquire
|
|
* memory fence before calling this function to be sure it has the latest index
|
|
* from the other thread, especially when called repeatedly. */
|
|
static int IOJobQueue_isEmpty(const IOJobQueue *jq) {
|
|
size_t current_head = atomic_load_explicit(&jq->head, memory_order_relaxed);
|
|
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
|
|
return current_head == current_tail;
|
|
}
|
|
|
|
/* Removes the next job from the given job queue by advancing the tail index.
|
|
* Called by the IO thread.
|
|
* The caller must ensure that the queue is not empty before calling this function.
|
|
* This function uses relaxed memory order, so the caller need to use an release memory fence
|
|
* after calling this function to make sure the updated tail is visible to the producer (main thread). */
|
|
static void IOJobQueue_removeJob(IOJobQueue *jq) {
|
|
debugServerAssertWithInfo(NULL, NULL, !inMainThread());
|
|
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
|
|
jq->ring_buffer[current_tail].data = NULL;
|
|
jq->ring_buffer[current_tail].handler = NULL;
|
|
atomic_store_explicit(&jq->tail, (current_tail + 1) % jq->size, memory_order_relaxed);
|
|
}
|
|
|
|
/* Retrieves the next job handler and data from the job queue without removal.
|
|
* Called by the consumer (IO thread). Caller must ensure queue is not empty.*/
|
|
static void IOJobQueue_peek(const IOJobQueue *jq, job_handler *handler, void **data) {
|
|
debugServerAssertWithInfo(NULL, NULL, !inMainThread());
|
|
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
|
|
iojob *job = &jq->ring_buffer[current_tail];
|
|
*handler = job->handler;
|
|
*data = job->data;
|
|
}
|
|
|
|
/* End of IO job queue functions */
|
|
|
|
int inMainThread(void) {
|
|
return thread_id == 0;
|
|
}
|
|
|
|
int getIOThreadID(void) {
|
|
return thread_id;
|
|
}
|
|
|
|
/* Drains the I/O threads queue by waiting for all jobs to be processed.
|
|
* This function must be called from the main thread. */
|
|
void drainIOThreadsQueue(void) {
|
|
serverAssert(inMainThread());
|
|
for (int i = 1; i < IO_THREADS_MAX_NUM; i++) { /* No need to drain thread 0, which is the main thread. */
|
|
while (!IOJobQueue_isEmpty(&io_jobs[i])) {
|
|
/* memory barrier acquire to get the latest job queue state */
|
|
atomic_thread_fence(memory_order_acquire);
|
|
}
|
|
}
|
|
}
|
|
|
|
/* Wait until the IO-thread is done with the client */
|
|
void waitForClientIO(client *c) {
|
|
/* No need to wait if the client was not offloaded to the IO thread. */
|
|
if (c->io_read_state == CLIENT_IDLE && c->io_write_state == CLIENT_IDLE) return;
|
|
|
|
/* Wait for read operation to complete if pending. */
|
|
while (c->io_read_state == CLIENT_PENDING_IO) {
|
|
atomic_thread_fence(memory_order_acquire);
|
|
}
|
|
|
|
/* Wait for write operation to complete if pending. */
|
|
while (c->io_write_state == CLIENT_PENDING_IO) {
|
|
atomic_thread_fence(memory_order_acquire);
|
|
}
|
|
|
|
/* Final memory barrier to ensure all changes are visible */
|
|
atomic_thread_fence(memory_order_acquire);
|
|
}
|
|
|
|
/** Adjusts the number of active I/O threads based on the current event load.
|
|
* If increase_only is non-zero, only allows increasing the number of threads.*/
|
|
void adjustIOThreadsByEventLoad(int numevents, int increase_only) {
|
|
if (server.io_threads_num == 1) return; /* All I/O is being done by the main thread. */
|
|
debugServerAssertWithInfo(NULL, NULL, server.io_threads_num > 1);
|
|
/* When events_per_io_thread is set to 0, we offload all events to the IO threads.
|
|
* This is used mainly for testing purposes. */
|
|
int target_threads = server.events_per_io_thread == 0 ? (numevents + 1) : numevents / server.events_per_io_thread;
|
|
|
|
target_threads = max(1, min(target_threads, server.io_threads_num));
|
|
|
|
if (target_threads == server.active_io_threads_num) return;
|
|
|
|
if (target_threads < server.active_io_threads_num) {
|
|
if (increase_only) return;
|
|
|
|
int threads_to_deactivate_num = server.active_io_threads_num - target_threads;
|
|
for (int i = 0; i < threads_to_deactivate_num; i++) {
|
|
int tid = server.active_io_threads_num - 1;
|
|
IOJobQueue *jq = &io_jobs[tid];
|
|
/* We can't lock the thread if it may have pending jobs */
|
|
if (!IOJobQueue_isEmpty(jq)) return;
|
|
pthread_mutex_lock(&io_threads_mutex[tid]);
|
|
server.active_io_threads_num--;
|
|
}
|
|
} else {
|
|
int threads_to_activate_num = target_threads - server.active_io_threads_num;
|
|
for (int i = 0; i < threads_to_activate_num; i++) {
|
|
pthread_mutex_unlock(&io_threads_mutex[server.active_io_threads_num]);
|
|
server.active_io_threads_num++;
|
|
}
|
|
}
|
|
}
|
|
|
|
/* This function performs polling on the given event loop and updates the server's
|
|
* IO fired events count and poll state. */
|
|
void IOThreadPoll(void *data) {
|
|
aeEventLoop *el = (aeEventLoop *)data;
|
|
struct timeval tvp = {0, 0};
|
|
int num_events = aePoll(el, &tvp);
|
|
|
|
server.io_ae_fired_events = num_events;
|
|
atomic_store_explicit(&server.io_poll_state, AE_IO_STATE_DONE, memory_order_release);
|
|
}
|
|
|
|
static void *IOThreadMain(void *myid) {
|
|
/* The ID is the thread ID number (from 1 to server.io_threads_num-1). ID 0 is the main thread. */
|
|
long id = (long)myid;
|
|
char thdname[32];
|
|
|
|
serverAssert(server.io_threads_num > 0);
|
|
serverAssert(id > 0 && id < server.io_threads_num);
|
|
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
|
|
valkey_set_thread_title(thdname);
|
|
serverSetCpuAffinity(server.server_cpulist);
|
|
makeThreadKillable();
|
|
initSharedQueryBuf();
|
|
|
|
thread_id = (int)id;
|
|
size_t jobs_to_process = 0;
|
|
IOJobQueue *jq = &io_jobs[id];
|
|
while (1) {
|
|
/* Wait for jobs */
|
|
for (int j = 0; j < 1000000; j++) {
|
|
jobs_to_process = IOJobQueue_availableJobs(jq);
|
|
if (jobs_to_process) break;
|
|
}
|
|
|
|
/* Give the main thread a chance to stop this thread. */
|
|
if (jobs_to_process == 0) {
|
|
pthread_mutex_lock(&io_threads_mutex[id]);
|
|
pthread_mutex_unlock(&io_threads_mutex[id]);
|
|
continue;
|
|
}
|
|
|
|
for (size_t j = 0; j < jobs_to_process; j++) {
|
|
job_handler handler;
|
|
void *data;
|
|
/* We keep the job in the queue until it's processed. This ensures that if the main thread checks
|
|
* and finds the queue empty, it can be certain that the IO thread is not currently handling any job. */
|
|
IOJobQueue_peek(jq, &handler, &data);
|
|
handler(data);
|
|
/* Remove the job after it was processed */
|
|
IOJobQueue_removeJob(jq);
|
|
}
|
|
/* Memory barrier to make sure the main thread sees the updated tail index.
|
|
* We do it once per loop and not per tail-update for optimization reasons.
|
|
* As the main-thread main concern is to check if the queue is empty, it's enough to do it once at the end. */
|
|
atomic_thread_fence(memory_order_release);
|
|
}
|
|
freeSharedQueryBuf();
|
|
return NULL;
|
|
}
|
|
|
|
#define IO_JOB_QUEUE_SIZE 2048
|
|
static void createIOThread(int id) {
|
|
pthread_t tid;
|
|
pthread_mutex_init(&io_threads_mutex[id], NULL);
|
|
IOJobQueue_init(&io_jobs[id], IO_JOB_QUEUE_SIZE);
|
|
pthread_mutex_lock(&io_threads_mutex[id]); /* Thread will be stopped. */
|
|
if (pthread_create(&tid, NULL, IOThreadMain, (void *)(long)id) != 0) {
|
|
serverLog(LL_WARNING, "Fatal: Can't initialize IO thread, pthread_create failed with: %s", strerror(errno));
|
|
exit(1);
|
|
}
|
|
io_threads[id] = tid;
|
|
}
|
|
|
|
/* Terminates the IO thread specified by id.
|
|
* Called on server shutdown */
|
|
static void shutdownIOThread(int id) {
|
|
int err;
|
|
pthread_t tid = io_threads[id];
|
|
if (tid == pthread_self()) return;
|
|
if (tid == 0) return;
|
|
|
|
pthread_cancel(tid);
|
|
|
|
if ((err = pthread_join(tid, NULL)) != 0) {
|
|
serverLog(LL_WARNING, "IO thread(tid:%lu) can not be joined: %s", (unsigned long)tid, strerror(err));
|
|
} else {
|
|
serverLog(LL_NOTICE, "IO thread(tid:%lu) terminated", (unsigned long)tid);
|
|
}
|
|
|
|
IOJobQueue_cleanup(&io_jobs[id]);
|
|
}
|
|
|
|
void killIOThreads(void) {
|
|
for (int j = 1; j < server.io_threads_num; j++) { /* We don't kill thread 0, which is the main thread. */
|
|
shutdownIOThread(j);
|
|
}
|
|
}
|
|
|
|
/* Initialize the data structures needed for I/O threads. */
|
|
void initIOThreads(void) {
|
|
server.active_io_threads_num = 1; /* We start with threads not active. */
|
|
server.io_poll_state = AE_IO_STATE_NONE;
|
|
server.io_ae_fired_events = 0;
|
|
|
|
/* Don't spawn any thread if the user selected a single thread:
|
|
* we'll handle I/O directly from the main thread. */
|
|
if (server.io_threads_num == 1) return;
|
|
|
|
serverAssert(server.io_threads_num <= IO_THREADS_MAX_NUM);
|
|
|
|
/* Spawn and initialize the I/O threads. */
|
|
for (int i = 1; i < server.io_threads_num; i++) {
|
|
createIOThread(i);
|
|
}
|
|
}
|
|
|
|
int trySendReadToIOThreads(client *c) {
|
|
if (server.active_io_threads_num <= 1) return C_ERR;
|
|
if (!server.io_threads_do_reads) return C_ERR;
|
|
/* If IO thread is areadty reading, return C_OK to make sure the main thread will not handle it. */
|
|
if (c->io_read_state != CLIENT_IDLE) return C_OK;
|
|
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
|
|
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
|
|
/* With Lua debug client we may call connWrite directly in the main thread */
|
|
if (c->flag.lua_debug) return C_ERR;
|
|
/* For simplicity let the main-thread handle the blocked clients */
|
|
if (c->flag.blocked || c->flag.unblocked) return C_ERR;
|
|
if (c->flag.close_asap) return C_ERR;
|
|
size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;
|
|
|
|
/* Handle case where client has a pending IO write job on a different thread:
|
|
* 1. A write job is still pending (io_write_state == CLIENT_PENDING_IO)
|
|
* 2. The pending job is on a different thread (c->cur_tid != tid)
|
|
*
|
|
* This situation can occur if active_io_threads_num increased since the
|
|
* original job assignment. In this case, we keep the job on its current
|
|
* thread to ensure the same thread handles the client's I/O operations. */
|
|
if (c->io_write_state == CLIENT_PENDING_IO && c->cur_tid != (uint8_t)tid) tid = c->cur_tid;
|
|
|
|
IOJobQueue *jq = &io_jobs[tid];
|
|
if (IOJobQueue_isFull(jq)) return C_ERR;
|
|
|
|
c->cur_tid = tid;
|
|
c->read_flags = canParseCommand(c) ? 0 : READ_FLAGS_DONT_PARSE;
|
|
c->read_flags |= authRequired(c) ? READ_FLAGS_AUTH_REQUIRED : 0;
|
|
|
|
c->io_read_state = CLIENT_PENDING_IO;
|
|
connSetPostponeUpdateState(c->conn, 1);
|
|
IOJobQueue_push(jq, ioThreadReadQueryFromClient, c);
|
|
c->flag.pending_read = 1;
|
|
listLinkNodeTail(server.clients_pending_io_read, &c->pending_read_list_node);
|
|
return C_OK;
|
|
}
|
|
|
|
/* This function attempts to offload the client's write to an I/O thread.
|
|
* Returns C_OK if the client's writes were successfully offloaded to an I/O thread,
|
|
* or C_ERR if the client is not eligible for offloading. */
|
|
int trySendWriteToIOThreads(client *c) {
|
|
if (server.active_io_threads_num <= 1) return C_ERR;
|
|
/* The I/O thread is already writing for this client. */
|
|
if (c->io_write_state != CLIENT_IDLE) return C_OK;
|
|
/* Nothing to write */
|
|
if (!clientHasPendingReplies(c)) return C_ERR;
|
|
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
|
|
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
|
|
/* We can't offload debugged clients as the main-thread may read at the same time */
|
|
if (c->flag.lua_debug) return C_ERR;
|
|
|
|
size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;
|
|
/* Handle case where client has a pending IO read job on a different thread:
|
|
* 1. A read job is still pending (io_read_state == CLIENT_PENDING_IO)
|
|
* 2. The pending job is on a different thread (c->cur_tid != tid)
|
|
*
|
|
* This situation can occur if active_io_threads_num increased since the
|
|
* original job assignment. In this case, we keep the job on its current
|
|
* thread to ensure the same thread handles the client's I/O operations. */
|
|
if (c->io_read_state == CLIENT_PENDING_IO && c->cur_tid != (uint8_t)tid) tid = c->cur_tid;
|
|
|
|
IOJobQueue *jq = &io_jobs[tid];
|
|
if (IOJobQueue_isFull(jq)) return C_ERR;
|
|
|
|
c->cur_tid = tid;
|
|
if (c->flag.pending_write) {
|
|
/* We move the client to the io pending write queue */
|
|
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
|
|
} else {
|
|
c->flag.pending_write = 1;
|
|
}
|
|
serverAssert(c->clients_pending_write_node.prev == NULL && c->clients_pending_write_node.next == NULL);
|
|
listLinkNodeTail(server.clients_pending_io_write, &c->clients_pending_write_node);
|
|
|
|
/* Save the last block of the reply list to io_last_reply_block and the used
|
|
* position to io_last_bufpos. The I/O thread will write only up to
|
|
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
|
|
* threads from reading data that might be invalid in their local CPU cache. */
|
|
c->io_last_reply_block = listLast(c->reply);
|
|
if (c->io_last_reply_block) {
|
|
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
|
|
} else {
|
|
c->io_last_bufpos = (size_t)c->bufpos;
|
|
}
|
|
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);
|
|
|
|
/* The main-thread will update the client state after the I/O thread completes the write. */
|
|
connSetPostponeUpdateState(c->conn, 1);
|
|
c->write_flags = 0;
|
|
c->io_write_state = CLIENT_PENDING_IO;
|
|
|
|
IOJobQueue_push(jq, ioThreadWriteToClient, c);
|
|
return C_OK;
|
|
}
|
|
|
|
/* Internal function to free the client's argv in an IO thread. */
|
|
void IOThreadFreeArgv(void *data) {
|
|
robj **argv = (robj **)data;
|
|
int last_arg = 0;
|
|
for (int i = 0;; i++) {
|
|
robj *o = argv[i];
|
|
if (o == NULL) {
|
|
continue;
|
|
}
|
|
|
|
/* The main-thread set the refcount to 0 to indicate that this is the last argument to free */
|
|
if (o->refcount == 0) {
|
|
last_arg = 1;
|
|
o->refcount = 1;
|
|
}
|
|
|
|
decrRefCount(o);
|
|
|
|
if (last_arg) {
|
|
break;
|
|
}
|
|
}
|
|
|
|
zfree(argv);
|
|
}
|
|
|
|
/* This function attempts to offload the client's argv to an IO thread.
|
|
* Returns C_OK if the client's argv were successfully offloaded to an IO thread,
|
|
* C_ERR otherwise. */
|
|
int tryOffloadFreeArgvToIOThreads(client *c) {
|
|
if (server.active_io_threads_num <= 1 || c->argc == 0) {
|
|
return C_ERR;
|
|
}
|
|
|
|
size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;
|
|
|
|
IOJobQueue *jq = &io_jobs[tid];
|
|
if (IOJobQueue_isFull(jq)) {
|
|
return C_ERR;
|
|
}
|
|
|
|
int last_arg_to_free = -1;
|
|
|
|
/* Prepare the argv */
|
|
for (int j = 0; j < c->argc; j++) {
|
|
if (c->argv[j]->refcount > 1) {
|
|
decrRefCount(c->argv[j]);
|
|
/* Set argv[j] to NULL to avoid double free */
|
|
c->argv[j] = NULL;
|
|
} else {
|
|
last_arg_to_free = j;
|
|
}
|
|
}
|
|
|
|
/* If no argv to free, free the argv array at the main thread */
|
|
if (last_arg_to_free == -1) {
|
|
zfree(c->argv);
|
|
return C_OK;
|
|
}
|
|
|
|
/* We set the refcount of the last arg to free to 0 to indicate that
|
|
* this is the last argument to free. With this approach, we don't need to
|
|
* send the argc to the IO thread and we can send just the argv ptr. */
|
|
c->argv[last_arg_to_free]->refcount = 0;
|
|
|
|
/* Must succeed as we checked the free space before. */
|
|
IOJobQueue_push(jq, IOThreadFreeArgv, c->argv);
|
|
|
|
return C_OK;
|
|
}
|
|
|
|
/* This function attempts to offload the free of an object to an IO thread.
|
|
* Returns C_OK if the object was successfully offloaded to an IO thread,
|
|
* C_ERR otherwise.*/
|
|
int tryOffloadFreeObjToIOThreads(robj *obj) {
|
|
if (server.active_io_threads_num <= 1) {
|
|
return C_ERR;
|
|
}
|
|
|
|
if (obj->refcount > 1) return C_ERR;
|
|
|
|
/* We select the thread ID in a round-robin fashion. */
|
|
size_t tid = (server.stat_io_freed_objects % (server.active_io_threads_num - 1)) + 1;
|
|
|
|
IOJobQueue *jq = &io_jobs[tid];
|
|
if (IOJobQueue_isFull(jq)) {
|
|
return C_ERR;
|
|
}
|
|
|
|
IOJobQueue_push(jq, decrRefCountVoid, obj);
|
|
server.stat_io_freed_objects++;
|
|
return C_OK;
|
|
}
|
|
|
|
/* This function retrieves the results of the IO Thread poll.
|
|
* returns the number of fired events if the IO thread has finished processing poll events, 0 otherwise. */
|
|
static int getIOThreadPollResults(aeEventLoop *eventLoop) {
|
|
int io_state;
|
|
io_state = atomic_load_explicit(&server.io_poll_state, memory_order_acquire);
|
|
if (io_state == AE_IO_STATE_POLL) {
|
|
/* IO thread is still processing poll events. */
|
|
return 0;
|
|
}
|
|
|
|
/* IO thread is done processing poll events. */
|
|
serverAssert(io_state == AE_IO_STATE_DONE);
|
|
server.stat_poll_processed_by_io_threads++;
|
|
server.io_poll_state = AE_IO_STATE_NONE;
|
|
|
|
/* Remove the custom poll proc. */
|
|
aeSetCustomPollProc(eventLoop, NULL);
|
|
aeSetPollProtect(eventLoop, 0);
|
|
return server.io_ae_fired_events;
|
|
}
|
|
|
|
void trySendPollJobToIOThreads(void) {
|
|
if (server.active_io_threads_num <= 1) {
|
|
return;
|
|
}
|
|
|
|
/* If there are no pending jobs, let the main thread do the poll-wait by itself. */
|
|
if (listLength(server.clients_pending_io_write) + listLength(server.clients_pending_io_read) == 0) {
|
|
return;
|
|
}
|
|
|
|
/* If the IO thread is already processing poll events, don't send another job. */
|
|
if (server.io_poll_state != AE_IO_STATE_NONE) {
|
|
return;
|
|
}
|
|
|
|
/* The poll is sent to the last thread. While a random thread could have been selected,
|
|
* the last thread has a slightly better chance of being less loaded compared to other threads,
|
|
* As we activate the lowest threads first. */
|
|
int tid = server.active_io_threads_num - 1;
|
|
IOJobQueue *jq = &io_jobs[tid];
|
|
if (IOJobQueue_isFull(jq)) return; /* The main thread will handle the poll itself. */
|
|
|
|
server.io_poll_state = AE_IO_STATE_POLL;
|
|
aeSetCustomPollProc(server.el, getIOThreadPollResults);
|
|
aeSetPollProtect(server.el, 1);
|
|
IOJobQueue_push(jq, IOThreadPoll, server.el);
|
|
}
|