Async IO threads (#758)

This PR is 1 of 3 PRs intended to achieve the goal of 1 million requests
per second, as detailed by [dan touitou](https://github.com/touitou-dan)
in https://github.com/valkey-io/valkey/issues/22. This PR modifies the
IO threads to be fully asynchronous, which is a first and necessary step
to allow more work offloading and better utilization of the IO threads.

### Current IO threads state:

Valkey IO threads were introduced in Redis 6.0 to allow better
utilization of multi-core machines. Before this, Redis was
single-threaded and could only use one CPU core for network and command
processing. The introduction of IO threads helps in offloading the IO
operations to multiple threads.

**Current IO Threads flow:**

1. Initialization: When Redis starts, it initializes a specified number
of IO threads. These threads are in addition to the main thread, each
thread starts with an empty list, the main thread will populate that
list in each event-loop with pending-read-clients or
pending-write-clients.
2. Read Phase: The main thread accepts incoming connections and reads
requests from clients. The reading of requests are offloaded to IO
threads. The main thread puts the clients ready-to-read in a list and
set the global io_threads_op to IO_THREADS_OP_READ, the IO threads pick
the clients up, perform the read operation and parse the first incoming
command.
3. Command Processing: After reading the requests, command processing is
still single-threaded and handled by the main thread.
4. Write Phase: Similar to the read phase, the write phase is also be
offloaded to IO threads. The main thread prepares the response in the
clients’ output buffer then the main thread puts the client in the list,
and sets the global io_threads_op to the IO_THREADS_OP_WRITE. The IO
threads then pick the clients up and perform the write operation to send
the responses back to clients.
5. Synchronization: The main-thread communicate with the threads on how
many jobs left per each thread with atomic counter. The main-thread
doesn’t access the clients while being handled by the IO threads.

**Issues with current implementation:**

* Underutilized Cores: The current implementation of IO-threads leads to
the underutilization of CPU cores.
* The main thread remains responsible for a significant portion of
IO-related tasks that could be offloaded to IO-threads.
* When the main-thread is processing client’s commands, the IO threads
are idle for a considerable amount of time.
* Notably, the main thread's performance during the IO-related tasks is
constrained by the speed of the slowest IO-thread.
* Limited Offloading: Currently, Since the Main-threads waits
synchronously for the IO threads, the Threads perform only read-parse,
and write operations, with parsing done only for the first command. If
the threads can do work asynchronously we may offload more work to the
threads reducing the load from the main-thread.
* TLS: Currently, we don't support IO threads with TLS (where offloading
IO would be more beneficial) since TLS read/write operations are not
thread-safe with the current implementation.

### Suggested change

Non-blocking main thread - The main thread and IO threads will operate
in parallel to maximize efficiency. The main thread will not be blocked
by IO operations. It will continue to process commands independently of
the IO thread's activities.

**Implementation details**

**Inter-thread communication.**

* We use a static, lock-free ring buffer of fixed size (2048 jobs) for
the main thread to send jobs and for the IO to receive them. If the ring
buffer fills up, the main thread will handle the task itself, acting as
back pressure (in case IO operations are more expensive than command
processing). A static ring buffer is a better candidate than a dynamic
job queue as it eliminates the need for allocation/freeing per job.
* An IO job will be in the format: ` [void* function-call-back | void
*data] `where data is either a client to read/write from and the
function-ptr is the function to be called with the data for example
readQueryFromClient using this format we can use it later to offload
other types of works to the IO threads.
* The Ring buffer is one way from the main-thread to the IO thread, Upon
read/write event the main thread will send a read/write job then in
before sleep it will iterate over the pending read/write clients to
checking for each client if the IO threads has already finished handling
it. The IO thread signals it has finished handling a client read/write
by toggling an atomic flag read_state / write_state on the client
struct.

**Thread Safety**

As suggested in this solution, the IO threads are reading from and
writing to the clients' buffers while the main thread may access those
clients.
We must ensure no race conditions or unsafe access occurs while keeping
the Valkey code simple and lock free.

Minimal Action in the IO Threads
The main change is to limit the IO thread operations to the bare
minimum. The IO thread will access only the client's struct and only the
necessary fields in this struct.
The IO threads will be responsible for the following:

* Read Operation: The IO thread will only read and parse a single
command. It will not update the server stats, handle read errors, or
parsing errors. These tasks will be taken care of by the main thread.
* Write Operation: The IO thread will only write the available data. It
will not free the client's replies, handle write errors, or update the
server statistics.


To achieve this without code duplication, the read/write code has been
refactored into smaller, independent components:

* Functions that perform only the read/parse/write calls.
* Functions that handle the read/parse/write results.

This refactor accounts for the majority of the modifications in this PR.

**Client Struct Safe Access**

As we ensure that the IO threads access memory only within the client
struct, we need to ensure thread safety only for the client's struct's
shared fields.

* Query Buffer 
* Command parsing - The main thread will not try to parse a command from
the query buffer when a client is offloaded to the IO thread.
* Client's memory checks in client-cron - The main thread will not
access the client query buffer if it is offloaded and will handle the
querybuf grow/shrink when the client is back.
* CLIENT LIST command - The main thread will busy-wait for the IO thread
to finish handling the client, falling back to the current behavior
where the main thread waits for the IO thread to finish their
processing.
* Output Buffer 
* The IO thread will not change the client's bufpos and won't free the
client's reply lists. These actions will be done by the main thread on
the client's return from the IO thread.
* bufpos / block→used: As the main thread may change the bufpos, the
reply-block→used, or add/delete blocks to the reply list while the IO
thread writes, we add two fields to the client struct: io_last_bufpos
and io_last_reply_block. The IO thread will write until the
io_last_bufpos, which was set by the main-thread before sending the
client to the IO thread. If more data has been added to the cob in
between, it will be written in the next write-job. In addition, the main
thread will not trim or merge reply blocks while the client is
offloaded.
* Parsing Fields 
    * Client's cmd, argc, argv, reqtype, etc., are set during parsing.
* The main thread will indicate to the IO thread not to parse a cmd if
the client is not reset. In this case, the IO thread will only read from
the network and won't attempt to parse a new command.
* The main thread won't access the c→cmd/c→argv in the CLIENT LIST
command as stated before it will busy wait for the IO threads.
* Client Flags 
* c→flags, which may be changed by the main thread in multiple places,
won't be accessed by the IO thread. Instead, the main thread will set
the c→io_flags with the information necessary for the IO thread to know
the client's state.
* Client Close 
* On freeClient, the main thread will busy wait for the IO thread to
finish processing the client's read/write before proceeding to free the
client.
* Client's Memory Limits 
* The IO thread won't handle the qb/cob limits. In case a client crosses
the qb limit, the IO thread will stop reading for it, letting the main
thread know that the client crossed the limit.

**TLS**

TLS is currently not supported with IO threads for the following
reasons:

1. Pending reads - If SSL has pending data that has already been read
from the socket, there is a risk of not calling the read handler again.
To handle this, a list is used to hold the pending clients. With IO
threads, multiple threads can access the list concurrently.
2. Event loop modification - Currently, the TLS code
registers/unregisters the file descriptor from the event loop depending
on the read/write results. With IO threads, multiple threads can modify
the event loop struct simultaneously.
3. The same client can be sent to 2 different threads concurrently
(https://github.com/redis/redis/issues/12540).

Those issues were handled in the current PR:

1. The IO thread only performs the read operation. The main thread will
check for pending reads after the client returns from the IO thread and
will be the only one to access the pending list.
2. The registering/unregistering of events will be similarly postponed
and handled by the main thread only.
3. Each client is being sent to the same dedicated thread (c→id %
num_of_threads).


**Sending Replies Immediately with IO threads.**

Currently, after processing a command, we add the client to the
pending_writes_list. Only after processing all the clients do we send
all the replies. Since the IO threads are now working asynchronously, we
can send the reply immediately after processing the client’s requests,
reducing the command latency. However, if we are using AOF=always, we
must wait for the AOF buffer to be written, in which case we revert to
the current behavior.

**IO threads dynamic adjustment**

Currently, we use an all-or-nothing approach when activating the IO
threads. The current logic is as follows: if the number of pending write
clients is greater than twice the number of threads (including the main
thread), we enable all threads; otherwise, we enable none. For example,
if 8 IO threads are defined, we enable all 8 threads if there are 16
pending clients; else, we enable none.
It makes more sense to enable partial activation of the IO threads. If
we have 10 pending clients, we will enable 5 threads, and so on. This
approach allows for a more granular and efficient allocation of
resources based on the current workload.

In addition, the user will now be able to change the number of I/O
threads at runtime. For example, when decreasing the number of threads
from 4 to 2, threads 3 and 4 will be closed after flushing their job
queues.

**Tests**

Currently, we run the io-threads tests with 4 IO threads
(443d80f168/.github/workflows/daily.yml (L353)).
This means that we will not activate the IO threads unless there are 8
(threads * 2) pending write clients per single loop, which is unlikely
to happened in most of tests, meaning the IO threads are not currently
being tested.

To enforce the main thread to always offload work to the IO threads,
regardless of the number of pending events, we add an
events-per-io-thread configuration with a default value of 2. When set
to 0, this configuration will force the main thread to always offload
work to the IO threads.

When we offload every single read/write operation to the IO threads, the
IO-threads are running with 100% CPU when running multiple tests
concurrently some tests fail as a result of larger than expected command
latencies. To address this issue, we have to add some after or wait_for
calls to some of the tests to ensure they pass with IO threads as well.

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
This commit is contained in:
uriyage 2024-07-09 06:01:39 +03:00 committed by GitHub
parent 5f0ccf1478
commit bbfd041895
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
38 changed files with 1551 additions and 969 deletions

View File

@ -358,10 +358,10 @@ jobs:
run: sudo apt-get install tcl8.6 tclx
- name: test
if: true && !contains(github.event.inputs.skiptests, 'valkey')
run: ./runtest --config io-threads 4 --config io-threads-do-reads yes --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
run: ./runtest --config io-threads 2 --config events-per-io-thread 0 --accurate --verbose --tags network --dump-logs ${{github.event.inputs.test_args}}
- name: cluster tests
if: true && !contains(github.event.inputs.skiptests, 'cluster')
run: ./runtest-cluster --config io-threads 4 --config io-threads-do-reads yes ${{github.event.inputs.cluster_test_args}}
run: ./runtest-cluster --config io-threads 2 --config events-per-io-thread 0 ${{github.event.inputs.cluster_test_args}}
test-ubuntu-reclaim-cache:
runs-on: ubuntu-latest

View File

@ -401,7 +401,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)

View File

@ -392,7 +392,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) {
}
/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop);
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop, numevents);
for (j = 0; j < numevents; j++) {
int fd = eventLoop->fired[j].fd;
@ -489,6 +489,6 @@ void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep
eventLoop->beforesleep = beforesleep;
}
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep) {
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep) {
eventLoop->aftersleep = aftersleep;
}

View File

@ -68,6 +68,7 @@ typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData,
typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
typedef void aeBeforeSleepProc(struct aeEventLoop *eventLoop);
typedef void aeAfterSleepProc(struct aeEventLoop *eventLoop, int numevents);
/* File event structure */
typedef struct aeFileEvent {
@ -107,7 +108,7 @@ typedef struct aeEventLoop {
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
aeAfterSleepProc *aftersleep;
int flags;
} aeEventLoop;
@ -130,7 +131,7 @@ int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeAfterSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);

View File

@ -146,7 +146,7 @@ void processUnblockedClients(void) {
if (!c->flag.blocked) {
/* If we have a queued command, execute it now. */
if (processPendingCommandAndInputBuffer(c) == C_ERR) {
c = NULL;
continue;
}
}
beforeNextClient(c);

View File

@ -590,6 +590,9 @@ void loadServerConfigFromString(char *config) {
if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ;
if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ;
/* To ensure backward compatibility when io_threads_num is according to the previous maximum of 128. */
if (server.io_threads_num > IO_THREADS_MAX_NUM) server.io_threads_num = IO_THREADS_MAX_NUM;
sdsfreesplitres(lines, totlines);
reading_config_file = 0;
return;
@ -3023,7 +3026,7 @@ standardConfig static_configs[] = {
/* Bool configs */
createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL),
createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL),
createBoolConfig("io-threads-do-reads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_threads_do_reads, 0, NULL, NULL), /* Read + parse from threads? */
createBoolConfig("io-threads-do-reads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, server.io_threads_do_reads, 1, NULL, NULL), /* Read + parse from threads */
createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL),
createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL),
createBoolConfig("rdbcompression", NULL, MODIFIABLE_CONFIG, server.rdb_compression, 1, NULL, NULL),
@ -3124,6 +3127,7 @@ standardConfig static_configs[] = {
createIntConfig("databases", NULL, IMMUTABLE_CONFIG, 1, INT_MAX, server.dbnum, 16, INTEGER_CONFIG, NULL, NULL),
createIntConfig("port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.port, 6379, INTEGER_CONFIG, NULL, updatePort), /* TCP port. */
createIntConfig("io-threads", NULL, DEBUG_CONFIG | IMMUTABLE_CONFIG, 1, 128, server.io_threads_num, 1, INTEGER_CONFIG, NULL, NULL), /* Single threaded by default */
createIntConfig("events-per-io-thread", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.events_per_io_thread, 2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("auto-aof-rewrite-percentage", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.aof_rewrite_perc, 100, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-replica-validity-factor", "cluster-slave-validity-factor", MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_replica_validity_factor, 10, INTEGER_CONFIG, NULL, NULL), /* replica max data age factor. */
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),

View File

@ -264,6 +264,15 @@ void setproctitle(const char *fmt, ...);
#error "Undefined or invalid BYTE_ORDER"
#endif
/* Cache line alignment */
#ifndef CACHE_LINE_SIZE
#if defined(__aarch64__) && defined(__APPLE__)
#define CACHE_LINE_SIZE 128
#else
#define CACHE_LINE_SIZE 64
#endif /* __aarch64__ && __APPLE__ */
#endif /* CACHE_LINE_SIZE */
#if (__i386 || __amd64 || __powerpc__) && __GNUC__
#define GNUC_VERSION (__GNUC__ * 10000 + __GNUC_MINOR__ * 100 + __GNUC_PATCHLEVEL__)
#if defined(__clang__)
@ -329,7 +338,7 @@ void setcpuaffinity(const char *cpulist);
#define HAVE_FADVISE
#endif
#define IO_THREADS_MAX_NUM 128
#define IO_THREADS_MAX_NUM 16
#ifndef CACHE_LINE_SIZE
#if defined(__aarch64__) && defined(__APPLE__)

View File

@ -112,6 +112,12 @@ typedef struct ConnectionType {
int (*has_pending_data)(void);
int (*process_pending_data)(void);
/* Postpone update state - with IO threads & TLS we don't want the IO threads to update the event loop events - let
* the main-thread do it */
void (*postpone_update_state)(struct connection *conn, int);
/* Called by the main-thread */
void (*update_state)(struct connection *conn);
/* TLS specified methods */
sds (*get_peer_cert)(struct connection *conn);
} ConnectionType;
@ -456,4 +462,16 @@ static inline int connIsTLS(connection *conn) {
return conn && conn->type == connectionTypeTls();
}
static inline void connUpdateState(connection *conn) {
if (conn->type->update_state) {
conn->type->update_state(conn);
}
}
static inline void connSetPostponeUpdateState(connection *conn, int on) {
if (conn->type->postpone_update_state) {
conn->type->postpone_update_state(conn, on);
}
}
#endif /* __REDIS_CONNECTION_H */

View File

@ -37,6 +37,7 @@
#include "fpconv_dtoa.h"
#include "cluster.h"
#include "threads_mngr.h"
#include "io_threads.h"
#include <arpa/inet.h>
#include <signal.h>
@ -2159,6 +2160,7 @@ void removeSigSegvHandlers(void) {
}
void printCrashReport(void) {
server.crashed = 1;
/* Log INFO and CLIENT LIST */
logServerInfo();

View File

@ -928,7 +928,7 @@ void ldbEndSession(client *c) {
/* If it's a fork()ed session, we just exit. */
if (ldb.forked) {
writeToClient(c, 0);
writeToClient(c);
serverLog(LL_NOTICE, "Lua debugging session child exiting");
exitFromChild(0);
} else {

377
src/io_threads.c Normal file
View File

@ -0,0 +1,377 @@
#include "io_threads.h"
static __thread int thread_id = 0; /* Thread local var */
static pthread_t io_threads[IO_THREADS_MAX_NUM] = {0};
static pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM];
/* IO jobs queue functions - Used to send jobs from the main-thread to the IO thread. */
typedef void (*job_handler)(void *);
typedef struct iojob {
job_handler handler;
void *data;
} iojob;
typedef struct IOJobQueue {
iojob *ring_buffer;
size_t size;
_Atomic size_t head __attribute__((aligned(CACHE_LINE_SIZE))); /* Next write index for producer (main-thread) */
_Atomic size_t tail __attribute__((aligned(CACHE_LINE_SIZE))); /* Next read index for consumer (IO-thread) */
} IOJobQueue;
IOJobQueue io_jobs[IO_THREADS_MAX_NUM] = {0};
/* Initialize the job queue with a specified number of items. */
static void IOJobQueue_init(IOJobQueue *jq, size_t item_count) {
debugServerAssertWithInfo(NULL, NULL, inMainThread());
jq->ring_buffer = zcalloc(item_count * sizeof(iojob));
jq->size = item_count; /* Total number of items */
jq->head = 0;
jq->tail = 0;
}
/* Clean up the job queue and free allocated memory. */
static void IOJobQueue_cleanup(IOJobQueue *jq) {
debugServerAssertWithInfo(NULL, NULL, inMainThread());
zfree(jq->ring_buffer);
memset(jq, 0, sizeof(*jq));
}
static int IOJobQueue_isFull(const IOJobQueue *jq) {
debugServerAssertWithInfo(NULL, NULL, inMainThread());
size_t current_head = atomic_load_explicit(&jq->head, memory_order_relaxed);
/* We don't use memory_order_acquire for the tail due to performance reasons,
* In the worst case we will just assume wrongly the buffer is full and the main thread will do the job by itself. */
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
size_t next_head = (current_head + 1) % jq->size;
return next_head == current_tail;
}
/* Attempt to push a new job to the queue from the main thread.
* the caller must ensure the queue is not full before calling this function. */
static void IOJobQueue_push(IOJobQueue *jq, job_handler handler, void *data) {
debugServerAssertWithInfo(NULL, NULL, inMainThread());
/* Assert the queue is not full - should not happen as the caller should check for it before. */
serverAssert(!IOJobQueue_isFull(jq));
/* No need to use atomic acquire for the head, as the main thread is the only one that writes to the head index. */
size_t current_head = atomic_load_explicit(&jq->head, memory_order_relaxed);
size_t next_head = (current_head + 1) % jq->size;
/* We store directly the job's fields to avoid allocating a new iojob structure. */
serverAssert(jq->ring_buffer[current_head].data == NULL);
serverAssert(jq->ring_buffer[current_head].handler == NULL);
jq->ring_buffer[current_head].data = data;
jq->ring_buffer[current_head].handler = handler;
/* memory_order_release to make sure the data is visible to the consumer (the IO thread). */
atomic_store_explicit(&jq->head, next_head, memory_order_release);
}
/* Returns the number of jobs currently available for consumption in the given job queue.
*
* This function ensures memory visibility for the jobs by
* using a memory acquire fence when there are jobs available. */
static size_t IOJobQueue_availableJobs(const IOJobQueue *jq) {
debugServerAssertWithInfo(NULL, NULL, !inMainThread());
/* We use memory_order_acquire to make sure the head and the job's fields are visible to the consumer (IO thread). */
size_t current_head = atomic_load_explicit(&jq->head, memory_order_acquire);
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
if (current_head >= current_tail) {
return current_head - current_tail;
} else {
return jq->size - (current_tail - current_head);
}
}
/* Checks if the job Queue is empty.
* returns 1 if the buffer is currently empty, 0 otherwise.
* Called by the main-thread only.
* This function uses relaxed memory order, so the caller need to use an acquire
* memory fence before calling this function to be sure it has the latest index
* from the other thread, especially when called repeatedly. */
static int IOJobQueue_isEmpty(const IOJobQueue *jq) {
size_t current_head = atomic_load_explicit(&jq->head, memory_order_relaxed);
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
return current_head == current_tail;
}
/* Removes the next job from the given job queue by advancing the tail index.
* Called by the IO thread.
* The caller must ensure that the queue is not empty before calling this function.
* This function uses relaxed memory order, so the caller need to use an release memory fence
* after calling this function to make sure the updated tail is visible to the producer (main thread). */
static void IOJobQueue_removeJob(IOJobQueue *jq) {
debugServerAssertWithInfo(NULL, NULL, !inMainThread());
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
jq->ring_buffer[current_tail].data = NULL;
jq->ring_buffer[current_tail].handler = NULL;
atomic_store_explicit(&jq->tail, (current_tail + 1) % jq->size, memory_order_relaxed);
}
/* Retrieves the next job handler and data from the job queue without removal.
* Called by the consumer (IO thread). Caller must ensure queue is not empty.*/
static void IOJobQueue_peek(const IOJobQueue *jq, job_handler *handler, void **data) {
debugServerAssertWithInfo(NULL, NULL, !inMainThread());
size_t current_tail = atomic_load_explicit(&jq->tail, memory_order_relaxed);
iojob *job = &jq->ring_buffer[current_tail];
*handler = job->handler;
*data = job->data;
}
/* End of IO job queue functions */
int inMainThread(void) {
return thread_id == 0;
}
/* Wait until the IO-thread is done with the client */
void waitForClientIO(client *c) {
/* No need to wait if the client was not offloaded to the IO thread. */
if (c->io_read_state == CLIENT_IDLE && c->io_write_state == CLIENT_IDLE) return;
/* Wait for read operation to complete if pending. */
while (c->io_read_state == CLIENT_PENDING_IO) {
atomic_thread_fence(memory_order_acquire);
}
/* Wait for write operation to complete if pending. */
while (c->io_write_state == CLIENT_PENDING_IO) {
atomic_thread_fence(memory_order_acquire);
}
/* Final memory barrier to ensure all changes are visible */
atomic_thread_fence(memory_order_acquire);
}
/** Adjusts the number of active I/O threads based on the current event load.
* If increase_only is non-zero, only allows increasing the number of threads.*/
void adjustIOThreadsByEventLoad(int numevents, int increase_only) {
if (server.io_threads_num == 1) return; /* All I/O is being done by the main thread. */
debugServerAssertWithInfo(NULL, NULL, server.io_threads_num > 1);
int target_threads =
server.events_per_io_thread == 0 ? server.io_threads_num : numevents / server.events_per_io_thread;
target_threads = max(1, min(target_threads, server.io_threads_num));
if (target_threads == server.active_io_threads_num) return;
if (target_threads < server.active_io_threads_num) {
if (increase_only) return;
int threads_to_deactivate_num = server.active_io_threads_num - target_threads;
for (int i = 0; i < threads_to_deactivate_num; i++) {
int tid = server.active_io_threads_num - 1;
IOJobQueue *jq = &io_jobs[tid];
/* We can't lock the thread if it may have pending jobs */
if (!IOJobQueue_isEmpty(jq)) return;
pthread_mutex_lock(&io_threads_mutex[tid]);
server.active_io_threads_num--;
}
} else {
int threads_to_activate_num = target_threads - server.active_io_threads_num;
for (int i = 0; i < threads_to_activate_num; i++) {
pthread_mutex_unlock(&io_threads_mutex[server.active_io_threads_num]);
server.active_io_threads_num++;
}
}
}
static void *IOThreadMain(void *myid) {
/* The ID is the thread ID number (from 1 to server.io_threads_num-1). ID 0 is the main thread. */
long id = (long)myid;
char thdname[32];
serverAssert(server.io_threads_num > 0);
serverAssert(id > 0 && id < server.io_threads_num);
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
valkey_set_thread_title(thdname);
serverSetCpuAffinity(server.server_cpulist);
makeThreadKillable();
initSharedQueryBuf();
thread_id = (int)id;
size_t jobs_to_process = 0;
IOJobQueue *jq = &io_jobs[id];
while (1) {
/* Wait for jobs */
for (int j = 0; j < 1000000; j++) {
jobs_to_process = IOJobQueue_availableJobs(jq);
if (jobs_to_process) break;
}
/* Give the main thread a chance to stop this thread. */
if (jobs_to_process == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
for (size_t j = 0; j < jobs_to_process; j++) {
job_handler handler;
void *data;
/* We keep the job in the queue until it's processed. This ensures that if the main thread checks
* and finds the queue empty, it can be certain that the IO thread is not currently handling any job. */
IOJobQueue_peek(jq, &handler, &data);
handler(data);
/* Remove the job after it was processed */
IOJobQueue_removeJob(jq);
}
/* Memory barrier to make sure the main thread sees the updated tail index.
* We do it once per loop and not per tail-update for optimization reasons.
* As the main-thread main concern is to check if the queue is empty, it's enough to do it once at the end. */
atomic_thread_fence(memory_order_release);
}
freeSharedQueryBuf();
return NULL;
}
#define IO_JOB_QUEUE_SIZE 2048
static void createIOThread(int id) {
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[id], NULL);
IOJobQueue_init(&io_jobs[id], IO_JOB_QUEUE_SIZE);
pthread_mutex_lock(&io_threads_mutex[id]); /* Thread will be stopped. */
if (pthread_create(&tid, NULL, IOThreadMain, (void *)(long)id) != 0) {
serverLog(LL_WARNING, "Fatal: Can't initialize IO thread, pthread_create failed with: %s", strerror(errno));
exit(1);
}
io_threads[id] = tid;
}
/* Terminates the IO thread specified by id.
* Called on server shutdown */
static void shutdownIOThread(int id) {
int err;
pthread_t tid = io_threads[id];
if (tid == pthread_self()) return;
if (tid == 0) return;
pthread_cancel(tid);
if ((err = pthread_join(tid, NULL)) != 0) {
serverLog(LL_WARNING, "IO thread(tid:%lu) can not be joined: %s", (unsigned long)tid, strerror(err));
} else {
serverLog(LL_NOTICE, "IO thread(tid:%lu) terminated", (unsigned long)tid);
}
IOJobQueue_cleanup(&io_jobs[id]);
}
void killIOThreads(void) {
for (int j = 1; j < server.io_threads_num; j++) { /* We don't kill thread 0, which is the main thread. */
shutdownIOThread(j);
}
}
/* Initialize the data structures needed for I/O threads. */
void initIOThreads(void) {
server.active_io_threads_num = 1; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
serverAssert(server.io_threads_num <= IO_THREADS_MAX_NUM);
/* Spawn and initialize the I/O threads. */
for (int i = 1; i < server.io_threads_num; i++) {
createIOThread(i);
}
}
int trySendReadToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
if (!server.io_threads_do_reads) return C_ERR;
/* If IO thread is areadty reading, return C_OK to make sure the main thread will not handle it. */
if (c->io_read_state != CLIENT_IDLE) return C_OK;
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* With Lua debug client we may call connWrite directly in the main thread */
if (c->flag.lua_debug) return C_ERR;
/* For simplicity let the main-thread handle the blocked clients */
if (c->flag.blocked || c->flag.unblocked) return C_ERR;
if (c->flag.close_asap) return C_ERR;
size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;
/* Handle case where client has a pending IO write job on a different thread:
* 1. A write job is still pending (io_write_state == CLIENT_PENDING_IO)
* 2. The pending job is on a different thread (c->cur_tid != tid)
*
* This situation can occur if active_io_threads_num increased since the
* original job assignment. In this case, we keep the job on its current
* thread to ensure the same thread handles the client's I/O operations. */
if (c->io_write_state == CLIENT_PENDING_IO && c->cur_tid != (uint8_t)tid) tid = c->cur_tid;
IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) return C_ERR;
c->cur_tid = tid;
c->read_flags = canParseCommand(c) ? 0 : READ_FLAGS_DONT_PARSE;
c->read_flags |= authRequired(c) ? READ_FLAGS_AUTH_REQUIRED : 0;
c->io_read_state = CLIENT_PENDING_IO;
connSetPostponeUpdateState(c->conn, 1);
IOJobQueue_push(jq, ioThreadReadQueryFromClient, c);
c->flag.pending_read = 1;
listLinkNodeTail(server.clients_pending_io_read, &c->pending_read_list_node);
return C_OK;
}
/* This function attempts to offload the client's write to an I/O thread.
* Returns C_OK if the client's writes were successfully offloaded to an I/O thread,
* or C_ERR if the client is not eligible for offloading. */
int trySendWriteToIOThreads(client *c) {
if (server.active_io_threads_num <= 1) return C_ERR;
/* The I/O thread is already writing for this client. */
if (c->io_write_state != CLIENT_IDLE) return C_OK;
/* Nothing to write */
if (!clientHasPendingReplies(c)) return C_ERR;
/* Currently, replica/master writes are not offloaded and are processed synchronously. */
if (c->flag.primary || getClientType(c) == CLIENT_TYPE_REPLICA) return C_ERR;
/* We can't offload debugged clients as the main-thread may read at the same time */
if (c->flag.lua_debug) return C_ERR;
size_t tid = (c->id % (server.active_io_threads_num - 1)) + 1;
/* Handle case where client has a pending IO read job on a different thread:
* 1. A read job is still pending (io_read_state == CLIENT_PENDING_IO)
* 2. The pending job is on a different thread (c->cur_tid != tid)
*
* This situation can occur if active_io_threads_num increased since the
* original job assignment. In this case, we keep the job on its current
* thread to ensure the same thread handles the client's I/O operations. */
if (c->io_read_state == CLIENT_PENDING_IO && c->cur_tid != (uint8_t)tid) tid = c->cur_tid;
IOJobQueue *jq = &io_jobs[tid];
if (IOJobQueue_isFull(jq)) return C_ERR;
c->cur_tid = tid;
if (c->flag.pending_write) {
/* We move the client to the io pending write queue */
listUnlinkNode(server.clients_pending_write, &c->clients_pending_write_node);
} else {
c->flag.pending_write = 1;
}
serverAssert(c->clients_pending_write_node.prev == NULL && c->clients_pending_write_node.next == NULL);
listLinkNodeTail(server.clients_pending_io_write, &c->clients_pending_write_node);
/* Save the last block of the reply list to io_last_reply_block and the used
* position to io_last_bufpos. The I/O thread will write only up to
* io_last_bufpos, regardless of the c->bufpos value. This is to prevent I/O
* threads from reading data that might be invalid in their local CPU cache. */
c->io_last_reply_block = listLast(c->reply);
if (c->io_last_reply_block) {
c->io_last_bufpos = ((clientReplyBlock *)listNodeValue(c->io_last_reply_block))->used;
} else {
c->io_last_bufpos = (size_t)c->bufpos;
}
serverAssert(c->bufpos > 0 || c->io_last_bufpos > 0);
/* The main-thread will update the client state after the I/O thread completes the write. */
connSetPostponeUpdateState(c->conn, 1);
c->write_flags = 0;
c->io_write_state = CLIENT_PENDING_IO;
IOJobQueue_push(jq, ioThreadWriteToClient, c);
return C_OK;
}

13
src/io_threads.h Normal file
View File

@ -0,0 +1,13 @@
#ifndef IO_THREADS_H
#define IO_THREADS_H
#include "server.h"
void initIOThreads(void);
void killIOThreads(void);
int inMainThread(void);
int trySendReadToIOThreads(client *c);
int trySendWriteToIOThreads(client *c);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
#endif /* IO_THREADS_H */

File diff suppressed because it is too large Load Diff

View File

@ -2931,7 +2931,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
processModuleLoadingProgressEvent(0);
}
if (server.repl_state == REPL_STATE_TRANSFER && rioCheckType(r) == RIO_TYPE_CONN) {
atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes, len, memory_order_relaxed);
server.stat_net_repl_input_bytes += len;
}
}

View File

@ -765,9 +765,11 @@ int primaryTryPartialResynchronization(client *c, long long psync_offset) {
}
/* If we reached this point, we are able to perform a partial resync:
* 1) Set client state to make it a replica.
* 2) Inform the client we can continue with +CONTINUE
* 3) Send the backlog data (from the offset to the end) to the replica. */
* 1) Make sure no IO operations are being performed before changing the client state.
* 2) Set client state to make it a replica.
* 3) Inform the client we can continue with +CONTINUE
* 4) Send the backlog data (from the offset to the end) to the replica. */
waitForClientIO(c);
c->flag.replica = 1;
c->repl_state = REPLICA_STATE_ONLINE;
c->repl_ack_time = server.unixtime;
@ -1009,6 +1011,8 @@ void syncCommand(client *c) {
c->repl_state = REPLICA_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay) connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
c->repldbfd = -1;
/* Wait for any IO pending operation to finish before changing the client state */
waitForClientIO(c);
c->flag.replica = 1;
listAddNodeTail(server.replicas, c);
@ -1377,7 +1381,7 @@ void sendBulkToReplica(connection *conn) {
freeClient(replica);
return;
}
atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, nwritten, memory_order_relaxed);
server.stat_net_repl_output_bytes += nwritten;
sdsrange(replica->replpreamble, nwritten, -1);
if (sdslen(replica->replpreamble) == 0) {
sdsfree(replica->replpreamble);
@ -1405,7 +1409,7 @@ void sendBulkToReplica(connection *conn) {
return;
}
replica->repldboff += nwritten;
atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, nwritten, memory_order_relaxed);
server.stat_net_repl_output_bytes += nwritten;
if (replica->repldboff == replica->repldbsize) {
closeRepldbfd(replica);
connSetWriteHandler(replica->conn, NULL);
@ -1447,7 +1451,7 @@ void rdbPipeWriteHandler(struct connection *conn) {
return;
} else {
replica->repldboff += nwritten;
atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, nwritten, memory_order_relaxed);
server.stat_net_repl_output_bytes += nwritten;
if (replica->repldboff < server.rdb_pipe_bufflen) {
replica->repl_last_partial_write = server.unixtime;
return; /* more data to write.. */
@ -1520,7 +1524,7 @@ void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData,
/* Note: when use diskless replication, 'repldboff' is the offset
* of 'rdb_pipe_buff' sent rather than the offset of entire RDB. */
replica->repldboff = nwritten;
atomic_fetch_add_explicit(&server.stat_net_repl_output_bytes, nwritten, memory_order_relaxed);
server.stat_net_repl_output_bytes += nwritten;
}
/* If we were unable to write all the data to one of the replicas,
* setup write handler (and disable pipe read handler, below) */
@ -1831,7 +1835,7 @@ void readSyncBulkPayload(connection *conn) {
} else {
/* nread here is returned by connSyncReadLine(), which calls syncReadLine() and
* convert "\r\n" to '\0' so 1 byte is lost. */
atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes, nread + 1, memory_order_relaxed);
server.stat_net_repl_input_bytes += nread + 1;
}
if (buf[0] == '-') {
@ -1900,7 +1904,7 @@ void readSyncBulkPayload(connection *conn) {
cancelReplicationHandshake(1);
return;
}
atomic_fetch_add_explicit(&server.stat_net_repl_input_bytes, nread, memory_order_relaxed);
server.stat_net_repl_input_bytes += nread;
/* When a mark is used, we want to detect EOF asap in order to avoid
* writing the EOF mark into the file... */

View File

@ -39,6 +39,7 @@
#include "syscheck.h"
#include "threads_mngr.h"
#include "fmtargs.h"
#include "io_threads.h"
#include <time.h>
#include <signal.h>
@ -754,6 +755,8 @@ int clientsCronResizeQueryBuffer(client *c) {
* The buffer peak will be reset back to the buffer position every server.reply_buffer_peak_reset_time milliseconds
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) {
if (c->io_write_state != CLIENT_IDLE) return 0;
size_t new_buffer_size = 0;
char *oldbuf = NULL;
const size_t buffer_target_shrink_size = c->buf_usable_size / 2;
@ -904,7 +907,6 @@ void removeClientFromMemUsageBucket(client *c, int allow_eviction) {
* returns 1 if client eviction for this client is allowed, 0 otherwise.
*/
int updateClientMemUsageAndBucket(client *c) {
serverAssert(io_threads_op == IO_THREADS_OP_IDLE && c->conn);
int allow_eviction = clientEvictionAllowed(c);
removeClientFromMemUsageBucket(c, allow_eviction);
@ -997,6 +999,7 @@ void clientsCron(void) {
head = listFirst(server.clients);
c = listNodeValue(head);
listRotateHeadToTail(server.clients);
if (c->io_read_state != CLIENT_IDLE || c->io_write_state != CLIENT_IDLE) continue;
/* The following functions do different service checks on the client.
* The protocol is that they return non-zero if the client was
* terminated. */
@ -1075,8 +1078,7 @@ void databasesCron(void) {
static inline void updateCachedTimeWithUs(int update_daylight_info, const long long ustime) {
server.ustime = ustime;
server.mstime = server.ustime / 1000;
time_t unixtime = server.mstime / 1000;
atomic_store_explicit(&server.unixtime, unixtime, memory_order_relaxed);
server.unixtime = server.mstime / 1000;
/* To get information about daylight saving time, we need to call
* localtime_r and cache the result. However calling localtime_r in this
@ -1257,23 +1259,18 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
monotime cron_start = getMonotonicUs();
run_with_period(100) {
long long stat_net_input_bytes, stat_net_output_bytes;
long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
stat_net_input_bytes = atomic_load_explicit(&server.stat_net_input_bytes, memory_order_relaxed);
stat_net_output_bytes = atomic_load_explicit(&server.stat_net_output_bytes, memory_order_relaxed);
stat_net_repl_input_bytes = atomic_load_explicit(&server.stat_net_repl_input_bytes, memory_order_relaxed);
stat_net_repl_output_bytes = atomic_load_explicit(&server.stat_net_repl_output_bytes, memory_order_relaxed);
monotime current_time = getMonotonicUs();
long long factor = 1000000; // us
trackInstantaneousMetric(STATS_METRIC_COMMAND, server.stat_numcommands, current_time, factor);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT, stat_net_input_bytes + stat_net_repl_input_bytes, current_time,
factor);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT, stat_net_output_bytes + stat_net_repl_output_bytes,
trackInstantaneousMetric(STATS_METRIC_NET_INPUT, server.stat_net_input_bytes + server.stat_net_repl_input_bytes,
current_time, factor);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION, stat_net_repl_input_bytes, current_time, factor);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION, stat_net_repl_output_bytes, current_time, factor);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT,
server.stat_net_output_bytes + server.stat_net_repl_output_bytes, current_time,
factor);
trackInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION, server.stat_net_repl_input_bytes, current_time,
factor);
trackInstantaneousMetric(STATS_METRIC_NET_OUTPUT_REPLICATION, server.stat_net_repl_output_bytes, current_time,
factor);
trackInstantaneousMetric(STATS_METRIC_EL_CYCLE, server.duration_stats[EL_DURATION_TYPE_EL].cnt, current_time,
factor);
trackInstantaneousMetric(STATS_METRIC_EL_DURATION, server.duration_stats[EL_DURATION_TYPE_EL].sum,
@ -1433,9 +1430,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
migrateCloseTimedoutSockets();
}
/* Stop the I/O threads if we don't have enough pending work. */
stopThreadedIOIfNeeded();
/* Resize tracking keys table if needed. This is also done at every
* command execution, but we want to be sure that if the last command
* executed changes the value via CONFIG SET, the server will perform
@ -1580,23 +1574,31 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
* events to handle. */
if (ProcessingEventsWhileBlocked) {
uint64_t processed = 0;
processed += handleClientsWithPendingReadsUsingThreads();
processed += processIOThreadsReadDone();
processed += connTypeProcessPendingData();
if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) flushAppendOnlyFile(0);
processed += handleClientsWithPendingWrites();
int last_procssed = 0;
do {
/* Try to process all the pending IO events. */
last_procssed = processIOThreadsReadDone() + processIOThreadsWriteDone();
processed += last_procssed;
} while (last_procssed != 0);
processed += freeClientsInAsyncFreeQueue();
server.events_processed_while_blocked += processed;
return;
}
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
processIOThreadsReadDone();
/* Handle pending data(typical TLS). (must be done before flushAppendOnlyFile) */
connTypeProcessPendingData();
/* If any connection type(typical TLS) still has pending unread data don't sleep at all. */
int dont_sleep = connTypeHasPendingData();
/* If any connection type(typical TLS) still has pending unread data or if there are clients
* with pending IO reads/writes, don't sleep at all. */
int dont_sleep = connTypeHasPendingData() || listLength(server.clients_pending_io_read) > 0 ||
listLength(server.clients_pending_io_write) > 0;
/* Call the Cluster before sleep function. Note that this function
* may change the state of Cluster (from ok to fail or vice versa),
@ -1659,7 +1661,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
long long prev_fsynced_reploff = server.fsynced_reploff;
/* Write the AOF buffer on disk,
* must be done before handleClientsWithPendingWritesUsingThreads,
* must be done before handleClientsWithPendingWrites,
* in case of appendfsync=always. */
if (server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) flushAppendOnlyFile(0);
@ -1679,7 +1681,14 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
}
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
handleClientsWithPendingWrites();
/* Try to process more IO reads that are ready to be processed. */
if (server.aof_fsync != AOF_FSYNC_ALWAYS) {
processIOThreadsReadDone();
}
processIOThreadsWriteDone();
/* Record cron time in beforeSleep. This does not include the time consumed by AOF writing and IO writing above. */
monotime cron_start_time_after_write = getMonotonicUs();
@ -1729,7 +1738,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* This function is called immediately after the event loop multiplexing
* API returned, and the control is going to soon return to the server by invoking
* the different events callbacks. */
void afterSleep(struct aeEventLoop *eventLoop) {
void afterSleep(struct aeEventLoop *eventLoop, int numevents) {
UNUSED(eventLoop);
/********************* WARNING ********************
* Do NOT add anything above moduleAcquireGIL !!! *
@ -1761,6 +1770,8 @@ void afterSleep(struct aeEventLoop *eventLoop) {
if (!ProcessingEventsWhileBlocked) {
server.cmd_time_snapshot = server.mstime;
}
adjustIOThreadsByEventLoad(numevents, 0);
}
/* =========================== Server initialization ======================== */
@ -2478,10 +2489,10 @@ void resetServerStats(void) {
server.stat_sync_partial_ok = 0;
server.stat_sync_partial_err = 0;
server.stat_io_reads_processed = 0;
atomic_store_explicit(&server.stat_total_reads_processed, 0, memory_order_relaxed);
server.stat_total_reads_processed = 0;
server.stat_io_writes_processed = 0;
atomic_store_explicit(&server.stat_total_writes_processed, 0, memory_order_relaxed);
atomic_store_explicit(&server.stat_client_qbuf_limit_disconnections, 0, memory_order_relaxed);
server.stat_total_writes_processed = 0;
server.stat_client_qbuf_limit_disconnections = 0;
server.stat_client_outbuf_limit_disconnections = 0;
for (j = 0; j < STATS_METRIC_COUNT; j++) {
server.inst_metric[j].idx = 0;
@ -2492,10 +2503,10 @@ void resetServerStats(void) {
server.stat_aof_rewrites = 0;
server.stat_rdb_saves = 0;
server.stat_aofrw_consecutive_failures = 0;
atomic_store_explicit(&server.stat_net_input_bytes, 0, memory_order_relaxed);
atomic_store_explicit(&server.stat_net_output_bytes, 0, memory_order_relaxed);
atomic_store_explicit(&server.stat_net_repl_input_bytes, 0, memory_order_relaxed);
atomic_store_explicit(&server.stat_net_repl_output_bytes, 0, memory_order_relaxed);
server.stat_net_input_bytes = 0;
server.stat_net_output_bytes = 0;
server.stat_net_repl_input_bytes = 0;
server.stat_net_repl_output_bytes = 0;
server.stat_unexpected_error_replies = 0;
server.stat_total_error_replies = 0;
server.stat_dump_payload_sanitizations = 0;
@ -2545,7 +2556,8 @@ void initServer(void) {
server.replicas = listCreate();
server.monitors = listCreate();
server.clients_pending_write = listCreate();
server.clients_pending_read = listCreate();
server.clients_pending_io_write = listCreate();
server.clients_pending_io_read = listCreate();
server.clients_timeout_table = raxNew();
server.replication_allowed = 1;
server.replicas_eldb = -1; /* Force to emit the first SELECT command. */
@ -2641,6 +2653,7 @@ void initServer(void) {
server.rdb_last_load_keys_expired = 0;
server.rdb_last_load_keys_loaded = 0;
server.dirty = 0;
server.crashed = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
@ -2796,7 +2809,7 @@ void initListeners(void) {
* see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
void InitServerLast(void) {
bioInit();
initThreadedIO();
initIOThreads();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
@ -5395,7 +5408,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"lru_clock:%u\r\n", server.lruclock,
"executable:%s\r\n", server.executable ? server.executable : "",
"config_file:%s\r\n", server.configfile ? server.configfile : "",
"io_threads_active:%i\r\n", server.io_threads_active,
"io_threads_active:%i\r\n", server.active_io_threads_num > 1,
"availability_zone:%s\r\n", server.availability_zone));
/* clang-format on */
@ -5630,23 +5643,10 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
/* Stats */
if (all_sections || (dictFind(section_dict, "stats") != NULL)) {
long long stat_total_reads_processed, stat_total_writes_processed;
long long stat_net_input_bytes, stat_net_output_bytes;
long long stat_net_repl_input_bytes, stat_net_repl_output_bytes;
long long current_eviction_exceeded_time =
server.stat_last_eviction_exceeded_time ? (long long)elapsedUs(server.stat_last_eviction_exceeded_time) : 0;
long long current_active_defrag_time =
server.stat_last_active_defrag_time ? (long long)elapsedUs(server.stat_last_active_defrag_time) : 0;
long long stat_client_qbuf_limit_disconnections;
stat_total_reads_processed = atomic_load_explicit(&server.stat_total_reads_processed, memory_order_relaxed);
stat_total_writes_processed = atomic_load_explicit(&server.stat_total_writes_processed, memory_order_relaxed);
stat_net_input_bytes = atomic_load_explicit(&server.stat_net_input_bytes, memory_order_relaxed);
stat_net_output_bytes = atomic_load_explicit(&server.stat_net_output_bytes, memory_order_relaxed);
stat_net_repl_input_bytes = atomic_load_explicit(&server.stat_net_repl_input_bytes, memory_order_relaxed);
stat_net_repl_output_bytes = atomic_load_explicit(&server.stat_net_repl_output_bytes, memory_order_relaxed);
stat_client_qbuf_limit_disconnections =
atomic_load_explicit(&server.stat_client_qbuf_limit_disconnections, memory_order_relaxed);
if (sections++) info = sdscat(info, "\r\n");
/* clang-format off */
@ -5654,10 +5654,10 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"total_connections_received:%lld\r\n", server.stat_numconnections,
"total_commands_processed:%lld\r\n", server.stat_numcommands,
"instantaneous_ops_per_sec:%lld\r\n", getInstantaneousMetric(STATS_METRIC_COMMAND),
"total_net_input_bytes:%lld\r\n", stat_net_input_bytes + stat_net_repl_input_bytes,
"total_net_output_bytes:%lld\r\n", stat_net_output_bytes + stat_net_repl_output_bytes,
"total_net_repl_input_bytes:%lld\r\n", stat_net_repl_input_bytes,
"total_net_repl_output_bytes:%lld\r\n", stat_net_repl_output_bytes,
"total_net_input_bytes:%lld\r\n", server.stat_net_input_bytes + server.stat_net_repl_input_bytes,
"total_net_output_bytes:%lld\r\n", server.stat_net_output_bytes + server.stat_net_repl_output_bytes,
"total_net_repl_input_bytes:%lld\r\n", server.stat_net_repl_input_bytes,
"total_net_repl_output_bytes:%lld\r\n", server.stat_net_repl_output_bytes,
"instantaneous_input_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT)/1024,
"instantaneous_output_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_OUTPUT)/1024,
"instantaneous_input_repl_kbps:%.2f\r\n", (float)getInstantaneousMetric(STATS_METRIC_NET_INPUT_REPLICATION)/1024,
@ -5696,11 +5696,11 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"unexpected_error_replies:%lld\r\n", server.stat_unexpected_error_replies,
"total_error_replies:%lld\r\n", server.stat_total_error_replies,
"dump_payload_sanitizations:%lld\r\n", server.stat_dump_payload_sanitizations,
"total_reads_processed:%lld\r\n", stat_total_reads_processed,
"total_writes_processed:%lld\r\n", stat_total_writes_processed,
"total_reads_processed:%lld\r\n", server.stat_total_reads_processed,
"total_writes_processed:%lld\r\n", server.stat_total_writes_processed,
"io_threaded_reads_processed:%lld\r\n", server.stat_io_reads_processed,
"io_threaded_writes_processed:%lld\r\n", server.stat_io_writes_processed,
"client_query_buffer_limit_disconnections:%lld\r\n", stat_client_qbuf_limit_disconnections,
"client_query_buffer_limit_disconnections:%lld\r\n", server.stat_client_qbuf_limit_disconnections,
"client_output_buffer_limit_disconnections:%lld\r\n", server.stat_client_outbuf_limit_disconnections,
"reply_buffer_shrinks:%lld\r\n", server.stat_reply_buffer_shrinks,
"reply_buffer_expands:%lld\r\n", server.stat_reply_buffer_expands,

View File

@ -1117,6 +1117,12 @@ typedef struct {
} clientReqResInfo;
#endif
typedef enum {
CLIENT_IDLE = 0, /* Initial state: client is idle. */
CLIENT_PENDING_IO = 1, /* Main-thread sets this state when client is sent to IO-thread for read/write. */
CLIENT_COMPLETED_IO = 2 /* IO-thread sets this state after completing IO operation. */
} clientIOState;
typedef struct ClientFlags {
uint64_t primary : 1; /* This client is a primary */
uint64_t replica : 1; /* This client is a replica */
@ -1141,6 +1147,7 @@ typedef struct ClientFlags {
uint64_t prevent_repl_prop : 1; /* Don't propagate to replicas. */
uint64_t prevent_prop : 1; /* Don't propagate to AOF or replicas. */
uint64_t pending_write : 1; /* Client has output to send but a write handler is yet not installed. */
uint64_t pending_read : 1; /* Client has output to send but a write handler is yet not installed. */
uint64_t reply_off : 1; /* Don't send replies to client. */
uint64_t reply_skip_next : 1; /* Set CLIENT_REPLY_SKIP for next cmd */
uint64_t reply_skip : 1; /* Don't send just this reply. */
@ -1173,7 +1180,7 @@ typedef struct ClientFlags {
uint64_t reprocessing_command : 1; /* The client is re-processing the command. */
uint64_t replication_done : 1; /* Indicate that replication has been done on the client */
uint64_t authenticated : 1; /* Indicate a client has successfully authenticated */
uint64_t reserved : 10; /* Reserved for future use */
uint64_t reserved : 9; /* Reserved for future use */
} ClientFlags;
typedef struct client {
@ -1198,6 +1205,13 @@ typedef struct client {
int original_argc; /* Num of arguments of original command if arguments were rewritten. */
robj **original_argv; /* Arguments of original command if arguments were rewritten. */
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
volatile uint8_t io_read_state; /* Indicate the IO read state of the client */
volatile uint8_t io_write_state; /* Indicate the IO write state of the client */
uint8_t cur_tid; /* ID of IO thread currently performing IO for this client */
int nread; /* Number of bytes of the last read. */
int nwritten; /* Number of bytes of the last write. */
int read_flags; /* Client Read flags - used to communicate the client read state. */
uint16_t write_flags; /* Client Write flags - used to communicate the client write state. */
struct serverCommand *cmd, *lastcmd; /* Last command executed. */
struct serverCommand *realcmd; /* The original command that was executed by the client,
Used to update error stats in case the c->cmd was modified
@ -1209,6 +1223,7 @@ typedef struct client {
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
listNode *io_last_reply_block; /* Last client reply block when sent to IO thread */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
list *deferred_reply_errors; /* Used for module thread safe contexts. */
size_t sentlen; /* Amount of bytes already sent in the current
@ -1253,7 +1268,6 @@ typedef struct client {
sds sockname; /* Cached connection target address. */
listNode *client_list_node; /* list node in client list */
listNode *postponed_list_node; /* list node within the postponed list */
listNode *pending_read_list_node; /* list node in clients pending read list */
void *module_blocked_client; /* Pointer to the ValkeyModuleBlockedClient associated with this
* client. This is set in case of module authentication before the
* unblocked client is reprocessed to handle reply callbacks. */
@ -1293,12 +1307,14 @@ typedef struct client {
size_t ref_block_pos; /* Access position of referenced buffer block,
* i.e. the next offset to send. */
/* list node in clients_pending_write list */
/* list node in clients_pending_write or in clients_pending_io_write list */
listNode clients_pending_write_node;
listNode pending_read_list_node; /* list node in clients_pending_io_read list */
/* Response buffer */
size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */
mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */
int bufpos;
size_t io_last_bufpos; /* The client's bufpos at the time it was sent to the IO thread */
size_t buf_usable_size; /* Usable size of buffer. */
char *buf;
#ifdef LOG_REQ_RES
@ -1629,7 +1645,8 @@ struct valkeyServer {
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *clients_pending_write; /* There is to write or install handler. */
list *clients_pending_read; /* Client has pending read socket buffers. */
list *clients_pending_io_read; /* List of clients with pending read to be process by I/O threads. */
list *clients_pending_io_write; /* List of clients with pending write to be process by I/O threads. */
list *replicas, *monitors; /* List of replicas and MONITORs */
client *current_client; /* The client that triggered the command execution (External or AOF). */
client *executing_client; /* The client executing the current command (possibly script or module). */
@ -1657,7 +1674,8 @@ struct valkeyServer {
int protected_mode; /* Don't accept external connections. */
int io_threads_num; /* Number of IO threads to use. */
int io_threads_do_reads; /* Read and parse from IO threads? */
int io_threads_active; /* Is IO threads currently active? */
int active_io_threads_num; /* Current number of active IO threads, includes main thread. */
int events_per_io_thread; /* Number of events on the event loop to trigger IO threads activation. */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
int enable_protected_configs; /* Enable the modification of protected configs, see PROTECTED_ACTION_ALLOWED_* */
int enable_debug_cmd; /* Enable DEBUG commands, see PROTECTED_ACTION_ALLOWED_* */
@ -1710,15 +1728,14 @@ struct valkeyServer {
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
_Atomic long long stat_net_input_bytes; /* Bytes read from network. */
_Atomic long long stat_net_output_bytes; /* Bytes written to network. */
_Atomic long long
stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */
_Atomic long long
stat_net_repl_output_bytes; /* Bytes written during replication, added to stat_net_output_bytes in 'info'. */
size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */
size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */
monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */
long long stat_net_input_bytes; /* Bytes read from network. */
long long stat_net_output_bytes; /* Bytes written to network. */
long long stat_net_repl_input_bytes; /* Bytes read during replication, added to stat_net_input_bytes in 'info'. */
/* Bytes written during replication, added to stat_net_output_bytes in 'info'. */
long long stat_net_repl_output_bytes;
size_t stat_current_cow_peak; /* Peak size of copy on write bytes. */
size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */
monotime stat_current_cow_updated; /* Last update time of stat_current_cow_bytes */
size_t stat_current_save_keys_processed; /* Processed keys while child is active. */
size_t stat_current_save_keys_total; /* Number of keys when child started. */
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
@ -1730,12 +1747,12 @@ struct valkeyServer {
long long
stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to primary, etc.) error replies */
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
long long stat_io_reads_processed; /* Number of read events processed by IO / Main threads */
long long stat_io_writes_processed; /* Number of write events processed by IO / Main threads */
_Atomic long long stat_total_reads_processed; /* Total number of read events processed */
_Atomic long long stat_total_writes_processed; /* Total number of write events processed */
_Atomic long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
long long stat_io_reads_processed; /* Number of read events processed by IO threads */
long long stat_io_writes_processed; /* Number of write events processed by IO threads */
long long stat_total_reads_processed; /* Total number of read events processed */
long long stat_total_writes_processed; /* Total number of write events processed */
long long stat_client_qbuf_limit_disconnections; /* Total number of clients reached query buf length limit */
long long stat_client_outbuf_limit_disconnections; /* Total number of clients reached output buf length limit */
/* The following two are used to track instantaneous metrics, like
* number of operations per second, network traffic. */
@ -1881,6 +1898,8 @@ struct valkeyServer {
int syslog_facility; /* Syslog facility */
int crashlog_enabled; /* Enable signal handler for crashlog.
* disable for clean core dumps. */
int crashed; /* True if the server has crashed, used in catClientInfoString
* to indicate that no wait for IO threads is needed. */
int memcheck_enabled; /* Enable memory check on crash. */
int use_exit_on_panic; /* Use exit() on panic and assert rather than
* abort(). useful for Valgrind. */
@ -2002,7 +2021,7 @@ struct valkeyServer {
int list_max_listpack_size;
int list_compress_depth;
/* time cache */
_Atomic time_t unixtime; /* Unix time sampled every cron cycle. */
time_t unixtime; /* Unix time sampled every cron cycle. */
time_t timezone; /* Cached timezone. As set by tzset(). */
int daylight_active; /* Currently in daylight saving time. */
mstime_t mstime; /* 'unixtime' in milliseconds. */
@ -2491,11 +2510,6 @@ typedef struct {
#define OBJ_HASH_KEY 1
#define OBJ_HASH_VALUE 2
#define IO_THREADS_OP_IDLE 0
#define IO_THREADS_OP_READ 1
#define IO_THREADS_OP_WRITE 2
extern int io_threads_op;
/*-----------------------------------------------------------------------------
* Extern declarations
*----------------------------------------------------------------------------*/
@ -2601,11 +2615,35 @@ void dictVanillaFree(dict *d, void *val);
(1ULL << 0) /* Indicating that we should not update \
error stats after sending error reply */
/* networking.c -- Networking and Client related operations */
/* Read flags for various read errors and states */
#define READ_FLAGS_QB_LIMIT_REACHED (1 << 0)
#define READ_FLAGS_ERROR_BIG_INLINE_REQUEST (1 << 1)
#define READ_FLAGS_ERROR_BIG_MULTIBULK (1 << 2)
#define READ_FLAGS_ERROR_INVALID_MULTIBULK_LEN (1 << 3)
#define READ_FLAGS_ERROR_UNAUTHENTICATED_MULTIBULK_LEN (1 << 4)
#define READ_FLAGS_ERROR_UNAUTHENTICATED_BULK_LEN (1 << 5)
#define READ_FLAGS_ERROR_BIG_BULK_COUNT (1 << 6)
#define READ_FLAGS_ERROR_MBULK_UNEXPECTED_CHARACTER (1 << 7)
#define READ_FLAGS_ERROR_MBULK_INVALID_BULK_LEN (1 << 8)
#define READ_FLAGS_ERROR_UNEXPECTED_INLINE_FROM_PRIMARY (1 << 9)
#define READ_FLAGS_ERROR_UNBALANCED_QUOTES (1 << 10)
#define READ_FLAGS_INLINE_ZERO_QUERY_LEN (1 << 11)
#define READ_FLAGS_PARSING_NEGATIVE_MBULK_LEN (1 << 12)
#define READ_FLAGS_PARSING_COMPLETED (1 << 13)
#define READ_FLAGS_PRIMARY (1 << 14)
#define READ_FLAGS_DONT_PARSE (1 << 15)
#define READ_FLAGS_AUTH_REQUIRED (1 << 16)
/* Write flags for various write errors and states */
#define WRITE_FLAGS_WRITE_ERROR (1 << 0)
client *createClient(connection *conn);
void freeClient(client *c);
void freeClientAsync(client *c);
void logInvalidUseAndFreeClientAsync(client *c, const char *fmt, ...);
int beforeNextClient(client *c);
void beforeNextClient(client *c);
void clearClientConnectionState(client *c);
void resetClient(client *c);
void freeClientOriginalArgv(client *c);
@ -2698,24 +2736,28 @@ void whileBlockedCron(void);
void blockingOperationStarts(void);
void blockingOperationEnds(void);
int handleClientsWithPendingWrites(void);
int handleClientsWithPendingWritesUsingThreads(void);
int handleClientsWithPendingReadsUsingThreads(void);
int stopThreadedIOIfNeeded(void);
void adjustThreadedIOIfNeeded(void);
int clientHasPendingReplies(client *c);
int updateClientMemUsageAndBucket(client *c);
void removeClientFromMemUsageBucket(client *c, int allow_eviction);
void unlinkClient(client *c);
int writeToClient(client *c, int handler_installed);
int writeToClient(client *c);
void linkClient(client *c);
void protectClient(client *c);
void unprotectClient(client *c);
void initThreadedIO(void);
void initSharedQueryBuf(void);
void freeSharedQueryBuf(void);
client *lookupClientByID(uint64_t id);
int authRequired(client *c);
void putClientInPendingWriteQueue(client *c);
client *createCachedResponseClient(int resp);
void deleteCachedResponseClient(client *recording_client);
void waitForClientIO(client *c);
void ioThreadReadQueryFromClient(void *data);
void ioThreadWriteToClient(void *data);
int canParseCommand(client *c);
int processIOThreadsReadDone(void);
int processIOThreadsWriteDone(void);
/* logreqres.c - logging of requests and responses */
void reqresReset(client *c, int free_buf);
@ -3834,7 +3876,6 @@ void xorDigest(unsigned char *digest, const void *ptr, size_t len);
sds catSubCommandFullname(const char *parent_name, const char *sub_name);
void commandAddSubcommand(struct serverCommand *parent, struct serverCommand *subcommand, const char *declared_name);
void debugDelay(int usec);
void killIOThreads(void);
void killThreads(void);
void makeThreadKillable(void);
void swapMainDbWithTempDb(serverDb *tempDb);

View File

@ -423,6 +423,8 @@ static ConnectionType CT_Socket = {
/* pending data */
.has_pending_data = NULL,
.process_pending_data = NULL,
.postpone_update_state = NULL,
.update_state = NULL,
};
int connBlock(connection *conn) {

View File

@ -442,6 +442,7 @@ typedef enum { WANT_READ = 1, WANT_WRITE } WantIOType;
#define TLS_CONN_FLAG_READ_WANT_WRITE (1 << 0)
#define TLS_CONN_FLAG_WRITE_WANT_READ (1 << 1)
#define TLS_CONN_FLAG_FD_SET (1 << 2)
#define TLS_CONN_FLAG_POSTPONE_UPDATE_STATE (1 << 3)
typedef struct tls_connection {
connection c;
@ -596,7 +597,34 @@ static void registerSSLEvent(tls_connection *conn, WantIOType want) {
}
}
static void postPoneUpdateSSLState(connection *conn_, int postpone) {
tls_connection *conn = (tls_connection *)conn_;
if (postpone) {
conn->flags |= TLS_CONN_FLAG_POSTPONE_UPDATE_STATE;
} else {
conn->flags &= ~TLS_CONN_FLAG_POSTPONE_UPDATE_STATE;
}
}
static void updatePendingData(tls_connection *conn) {
if (conn->flags & TLS_CONN_FLAG_POSTPONE_UPDATE_STATE) return;
/* If SSL has pending data, already read from the socket, we're at risk of not calling the read handler again, make
* sure to add it to a list of pending connection that should be handled anyway. */
if (SSL_pending(conn->ssl) > 0) {
if (!conn->pending_list_node) {
listAddNodeTail(pending_list, conn);
conn->pending_list_node = listLast(pending_list);
}
} else if (conn->pending_list_node) {
listDelNode(pending_list, conn->pending_list_node);
conn->pending_list_node = NULL;
}
}
static void updateSSLEvent(tls_connection *conn) {
if (conn->flags & TLS_CONN_FLAG_POSTPONE_UPDATE_STATE) return;
int mask = aeGetFileEvents(server.el, conn->c.fd);
int need_read = conn->c.read_handler || (conn->flags & TLS_CONN_FLAG_WRITE_WANT_READ);
int need_write = conn->c.write_handler || (conn->flags & TLS_CONN_FLAG_READ_WANT_WRITE);
@ -610,6 +638,12 @@ static void updateSSLEvent(tls_connection *conn) {
if (!need_write && (mask & AE_WRITABLE)) aeDeleteFileEvent(server.el, conn->c.fd, AE_WRITABLE);
}
static void updateSSLState(connection *conn_) {
tls_connection *conn = (tls_connection *)conn_;
updateSSLEvent(conn);
updatePendingData(conn);
}
static void tlsHandleEvent(tls_connection *conn, int mask) {
int ret, conn_error;
@ -711,19 +745,8 @@ static void tlsHandleEvent(tls_connection *conn, int mask) {
if (!callHandler((connection *)conn, conn->c.read_handler)) return;
}
/* If SSL has pending that, already read from the socket, we're at
* risk of not calling the read handler again, make sure to add it
* to a list of pending connection that should be handled anyway. */
if ((mask & AE_READABLE)) {
if (SSL_pending(conn->ssl) > 0) {
if (!conn->pending_list_node) {
listAddNodeTail(pending_list, conn);
conn->pending_list_node = listLast(pending_list);
}
} else if (conn->pending_list_node) {
listDelNode(pending_list, conn->pending_list_node);
conn->pending_list_node = NULL;
}
if (mask & AE_READABLE) {
updatePendingData(conn);
}
break;
@ -1051,11 +1074,13 @@ static int tlsProcessPendingData(void) {
listIter li;
listNode *ln;
int processed = listLength(pending_list);
int processed = 0;
listRewind(pending_list, &li);
while ((ln = listNext(&li))) {
tls_connection *conn = listNodeValue(ln);
if (conn->flags & TLS_CONN_FLAG_POSTPONE_UPDATE_STATE) continue;
tlsHandleEvent(conn, AE_READABLE);
processed++;
}
return processed;
}
@ -1125,6 +1150,8 @@ static ConnectionType CT_TLS = {
/* pending data */
.has_pending_data = tlsHasPendingData,
.process_pending_data = tlsProcessPendingData,
.postpone_update_state = postPoneUpdateSSLState,
.update_state = updateSSLState,
/* TLS specified methods */
.get_peer_cert = connTLSGetPeerCert,

View File

@ -198,6 +198,8 @@ static ConnectionType CT_Unix = {
/* pending data */
.has_pending_data = NULL,
.process_pending_data = NULL,
.postpone_update_state = NULL,
.update_state = NULL,
};
int RedisRegisterConnectionTypeUnix(void) {

View File

@ -257,6 +257,12 @@ start_server {overrides {save {}}} {
# during the pause. This write will not be interrupted.
pause_process [srv -1 pid]
set rd [valkey_deferring_client]
# wait for the client creation
wait_for_condition 50 100 {
[s connected_clients] == 2
} else {
fail "Client creation failed"
}
$rd SET FOO BAR
$node_0 failover to $node_1_host $node_1_port
resume_process [srv -1 pid]

View File

@ -167,6 +167,7 @@ start_server {tags {"repl external:skip"}} {
test {BLPOP followed by role change, issue #2473} {
set rd [valkey_deferring_client]
$rd blpop foo 0 ; # Block while B is a master
wait_for_blocked_clients_count 1
# Turn B into master of A
$A slaveof no one

View File

@ -156,6 +156,12 @@ test "Shutting down master waits for replica then fails" {
set rd2 [valkey_deferring_client -1]
$rd1 shutdown
$rd2 shutdown
wait_for_condition 50 100 {
[llength [lsearch -all [split [string trim [$master client list]] "\r\n"] *cmd=shutdown*]] == 2
} else {
fail "SHUTDOWN not called on all clients"
}
set info_clients [$master info clients]
assert_match "*connected_clients:3*" $info_clients
assert_match "*blocked_clients:2*" $info_clients
@ -209,6 +215,12 @@ test "Shutting down master waits for replica then aborted" {
set rd2 [valkey_deferring_client -1]
$rd1 shutdown
$rd2 shutdown
wait_for_condition 50 100 {
[llength [lsearch -all [split [string trim [$master client list]] "\r\n"] *cmd=shutdown*]] == 2
} else {
fail "SHUTDOWN not called on all clients"
}
set info_clients [$master info clients]
assert_match "*connected_clients:3*" $info_clients
assert_match "*blocked_clients:2*" $info_clients

View File

@ -65,6 +65,7 @@ start_server {tags {"cli"}} {
proc run_command {fd cmd} {
write_cli $fd $cmd
after 50
set _ [format_output [read_cli $fd]]
}

View File

@ -91,17 +91,31 @@ start_server {} {
lassign [gen_client] rr cname
# Attempt to fill the query buff with only half the percentage threshold verify we're not disconnected
set n [expr $maxmemory_clients_actual / 2]
$rr write [join [list "*1\r\n\$$n\r\n" [string repeat v $n]] ""]
# send incomplete command (n - 1) to make sure we don't use the shared qb
$rr write [join [list "*1\r\n\$$n\r\n" [string repeat v [expr {$n - 1}]]] ""]
$rr flush
# Wait for the client to start using a private query buffer.
wait_for_condition 10 10 {
[client_field $cname qbuf] > 0
} else {
fail "client should start using a private query buffer"
}
set tot_mem [client_field $cname tot-mem]
assert {$tot_mem >= $n && $tot_mem < $maxmemory_clients_actual}
# Attempt to fill the query buff with the percentage threshold of maxmemory and verify we're evicted
$rr close
lassign [gen_client] rr cname
# send incomplete command (maxmemory_clients_actual - 1) to make sure we don't use the shared qb
catch {
$rr write [join [list "*1\r\n\$$maxmemory_clients_actual\r\n" [string repeat v $maxmemory_clients_actual]] ""]
$rr write [join [list "*1\r\n\$$maxmemory_clients_actual\r\n" [string repeat v [expr {$maxmemory_clients_actual - 1}]]] ""]
$rr flush
# Wait for the client to start using a private query buffer.
wait_for_condition 10 10 {
[client_field $cname qbuf] > 0
} else {
fail "client should start using a private query buffer"
}
} e
assert {![client_exists $cname]}
$rr close
@ -399,6 +413,11 @@ start_server {} {
# Decrease maxmemory_clients and expect client eviction
r config set maxmemory-clients [expr $maxmemory_clients / 2]
wait_for_condition 50 10 {
[llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]] < $client_count
} else {
fail "Failed to evict clients"
}
set connected_clients [llength [lsearch -all [split [string trim [r client list]] "\r\n"] *name=client*]]
assert {$connected_clients > 0 && $connected_clients < $client_count}

View File

@ -62,7 +62,13 @@ test "sunsubscribe without specifying any channel would unsubscribe all shard ch
set sub_res [ssubscribe $subscribeclient [list "\{channel.0\}1" "\{channel.0\}2" "\{channel.0\}3"]]
assert_equal [list 1 2 3] $sub_res
sunsubscribe $subscribeclient
# wait for the unsubscribe to take effect
wait_for_condition 50 10 {
[$publishclient spublish "\{channel.0\}1" hello] eq 0
} else {
fail "unsubscribe did not take effect as expected"
}
assert_equal 0 [$publishclient spublish "\{channel.0\}1" hello]
assert_equal 0 [$publishclient spublish "\{channel.0\}2" hello]
assert_equal 0 [$publishclient spublish "\{channel.0\}3" hello]

View File

@ -287,6 +287,7 @@ start_server {tags {"dump"}} {
set rd [valkey_deferring_client]
$rd debug sleep 1.0 ; # Make second server unable to reply.
after 100; # wait to make sure DEBUG command was executed.
set e {}
catch {r -1 migrate $second_host $second_port key 9 500} e
assert_match {IOERR*} $e

View File

@ -295,47 +295,50 @@ start_server {tags {"info" "external:skip"}} {
}
}
test {stats: eventloop metrics} {
set info1 [r info stats]
set cycle1 [getInfoProperty $info1 eventloop_cycles]
set el_sum1 [getInfoProperty $info1 eventloop_duration_sum]
set cmd_sum1 [getInfoProperty $info1 eventloop_duration_cmd_sum]
assert_morethan $cycle1 0
assert_morethan $el_sum1 0
assert_morethan $cmd_sum1 0
after 110 ;# default hz is 10, wait for a cron tick.
set info2 [r info stats]
set cycle2 [getInfoProperty $info2 eventloop_cycles]
set el_sum2 [getInfoProperty $info2 eventloop_duration_sum]
set cmd_sum2 [getInfoProperty $info2 eventloop_duration_cmd_sum]
if {$::verbose} { puts "eventloop metrics cycle1: $cycle1, cycle2: $cycle2" }
assert_morethan $cycle2 $cycle1
assert_lessthan $cycle2 [expr $cycle1+10] ;# we expect 2 or 3 cycles here, but allow some tolerance
if {$::verbose} { puts "eventloop metrics el_sum1: $el_sum1, el_sum2: $el_sum2" }
assert_morethan $el_sum2 $el_sum1
assert_lessthan $el_sum2 [expr $el_sum1+30000] ;# we expect roughly 100ms here, but allow some tolerance
if {$::verbose} { puts "eventloop metrics cmd_sum1: $cmd_sum1, cmd_sum2: $cmd_sum2" }
assert_morethan $cmd_sum2 $cmd_sum1
assert_lessthan $cmd_sum2 [expr $cmd_sum1+15000] ;# we expect about tens of ms here, but allow some tolerance
}
test {stats: instantaneous metrics} {
r config resetstat
set retries 0
for {set retries 1} {$retries < 4} {incr retries} {
after 1600 ;# hz is 10, wait for 16 cron tick so that sample array is fulfilled
set value [s instantaneous_eventloop_cycles_per_sec]
if {$value > 0} break
# skip the following 2 tests if we are running with io-threads as the eventloop metrics are different in that case.
if {[r config get io-threads] eq 0} {
test {stats: eventloop metrics} {
set info1 [r info stats]
set cycle1 [getInfoProperty $info1 eventloop_cycles]
set el_sum1 [getInfoProperty $info1 eventloop_duration_sum]
set cmd_sum1 [getInfoProperty $info1 eventloop_duration_cmd_sum]
assert_morethan $cycle1 0
assert_morethan $el_sum1 0
assert_morethan $cmd_sum1 0
after 110 ;# default hz is 10, wait for a cron tick.
set info2 [r info stats]
set cycle2 [getInfoProperty $info2 eventloop_cycles]
set el_sum2 [getInfoProperty $info2 eventloop_duration_sum]
set cmd_sum2 [getInfoProperty $info2 eventloop_duration_cmd_sum]
if {$::verbose} { puts "eventloop metrics cycle1: $cycle1, cycle2: $cycle2" }
assert_morethan $cycle2 $cycle1
assert_lessthan $cycle2 [expr $cycle1+10] ;# we expect 2 or 3 cycles here, but allow some tolerance
if {$::verbose} { puts "eventloop metrics el_sum1: $el_sum1, el_sum2: $el_sum2" }
assert_morethan $el_sum2 $el_sum1
assert_lessthan $el_sum2 [expr $el_sum1+30000] ;# we expect roughly 100ms here, but allow some tolerance
if {$::verbose} { puts "eventloop metrics cmd_sum1: $cmd_sum1, cmd_sum2: $cmd_sum2" }
assert_morethan $cmd_sum2 $cmd_sum1
assert_lessthan $cmd_sum2 [expr $cmd_sum1+15000] ;# we expect about tens of ms here, but allow some tolerance
}
test {stats: instantaneous metrics} {
r config resetstat
set retries 0
for {set retries 1} {$retries < 4} {incr retries} {
after 1600 ;# hz is 10, wait for 16 cron tick so that sample array is fulfilled
set value [s instantaneous_eventloop_cycles_per_sec]
if {$value > 0} break
}
assert_lessthan $retries 4
if {$::verbose} { puts "instantaneous metrics instantaneous_eventloop_cycles_per_sec: $value" }
assert_morethan $value 0
assert_lessthan $value [expr $retries*15] ;# default hz is 10
set value [s instantaneous_eventloop_duration_usec]
if {$::verbose} { puts "instantaneous metrics instantaneous_eventloop_duration_usec: $value" }
assert_morethan $value 0
assert_lessthan $value [expr $retries*22000] ;# default hz is 10, so duration < 1000 / 10, allow some tolerance
}
assert_lessthan $retries 4
if {$::verbose} { puts "instantaneous metrics instantaneous_eventloop_cycles_per_sec: $value" }
assert_morethan $value 0
assert_lessthan $value [expr $retries*15] ;# default hz is 10
set value [s instantaneous_eventloop_duration_usec]
if {$::verbose} { puts "instantaneous metrics instantaneous_eventloop_duration_usec: $value" }
assert_morethan $value 0
assert_lessthan $value [expr $retries*22000] ;# default hz is 10, so duration < 1000 / 10, allow some tolerance
}
test {stats: debug metrics} {

View File

@ -98,6 +98,7 @@ start_server {tags {"maxmemory" "external:skip"}} {
$rr write "\r\n"
$rr flush
}
after 100; # give the server some time to process the input buffer - this was added to make sure the test pass with io-threads active.
}]} {
lremove clients $rr
}

View File

@ -404,6 +404,8 @@ run_solo {defrag} {
r save ;# saving an rdb iterates over all the data / pointers
} {OK}
# Skip the following two tests if we are running with IO threads, as the IO threads allocate the command arguments in a different arena. As a result, fragmentation is not as expected.
if {[r config get io-threads] eq 0} {
test "Active defrag pubsub: $type" {
r flushdb
r config resetstat
@ -502,6 +504,7 @@ run_solo {defrag} {
}
$rd_pubsub close
}
} ;# io-threads
if {$type eq "standalone"} { ;# skip in cluster mode
test "Active defrag big list: $type" {

View File

@ -128,7 +128,7 @@ foreach call_type {nested normal} {
# send another command after the blocked one, to make sure we don't attempt to process it
$rd ping
$rd flush
after 100
# make sure we get BUSY error, and that we didn't get it too early
assert_error {*BUSY Slow module operation*} {r ping}
assert_morethan_equal [expr [clock clicks -milliseconds]-$start] $busy_time_limit

View File

@ -85,6 +85,12 @@ start_server {tags {"pubsub network"}} {
set rd1 [valkey_deferring_client]
assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}]
unsubscribe $rd1
# wait for the unsubscribe to take effect
wait_for_condition 50 100 {
[r publish chan1 hello] eq 0
} else {
fail "unsubscribe did not take effect"
}
assert_equal 0 [r publish chan1 hello]
assert_equal 0 [r publish chan2 hello]
assert_equal 0 [r publish chan3 hello]
@ -158,6 +164,12 @@ start_server {tags {"pubsub network"}} {
set rd1 [valkey_deferring_client]
assert_equal {1 2 3} [psubscribe $rd1 {chan1.* chan2.* chan3.*}]
punsubscribe $rd1
# wait for the unsubscribe to take effect
wait_for_condition 50 100 {
[r publish chan1.hi hello] eq 0
} else {
fail "unsubscribe did not take effect"
}
assert_equal 0 [r publish chan1.hi hello]
assert_equal 0 [r publish chan2.hi hello]
assert_equal 0 [r publish chan3.hi hello]

View File

@ -46,6 +46,14 @@ start_server {tags {"pubsubshard external:skip"}} {
assert_equal {2} [ssubscribe $rd1 {chan2}]
assert_equal {3} [ssubscribe $rd1 {chan3}]
sunsubscribe $rd1
# wait for the unsubscribe to take effect
wait_for_condition 50 100 {
[r spublish chan1 hello] eq 0
} else {
fail "unsubscribe did not take effect"
}
assert_equal 0 [r SPUBLISH chan1 hello]
assert_equal 0 [r SPUBLISH chan2 hello]
assert_equal 0 [r SPUBLISH chan3 hello]

View File

@ -92,7 +92,7 @@ start_server {tags {"querybuf slow"}} {
# Write something smaller, so query buf peak can shrink
$rd set x [string repeat A 100]
set new_test_client_qbuf [client_query_buffer test_client]
if {$new_test_client_qbuf < $orig_test_client_qbuf} { break }
if {$new_test_client_qbuf < $orig_test_client_qbuf && $new_test_client_qbuf > 0} { break }
if {[expr [clock milliseconds] - $t] > 1000} { break }
after 10
}

View File

@ -1100,6 +1100,13 @@ foreach {pop} {BLPOP BLMPOP_LEFT} {
$watching_client get somekey{t}
$watching_client read
$watching_client exec
# wait for exec to be called.
wait_for_condition 50 10 {
[llength [lsearch -all [split [string trim [r client list]] "\r\n"] *cmd=exec*]] == 1
} else {
fail "$cmd was not called"
}
# Blocked BLPOPLPUSH may create problems, unblock it.
r lpush srclist{t} element
set res [$watching_client read]

View File

@ -520,7 +520,7 @@ start_server {
# Before the fix in #13004, this time would have been 1200+ (i.e. more than 1200ms),
# now it should be 1000, but in order to avoid timing issues, we increase the range a bit.
assert_range [expr $end-$start] 1000 1150
assert_range [expr $end-$start] 1000 1199
$rd1 close
$rd2 close
@ -931,14 +931,14 @@ start_server {
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name
assert {[dict get $consumer_info idle] < 80} ;# consumer idle (seen-time)
assert {[dict get $consumer_info inactive] < 80} ;# consumer inactive (active-time)
assert {[dict get $consumer_info idle] < 300} ;# consumer idle (seen-time)
assert {[dict get $consumer_info inactive] < 300} ;# consumer inactive (active-time)
after 100
r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert {[dict get $consumer_info idle] < 80} ;# consumer idle (seen-time)
assert {[dict get $consumer_info idle] < 300} ;# consumer idle (seen-time)
assert {[dict get $consumer_info inactive] >= 100} ;# consumer inactive (active-time)
@ -1324,6 +1324,9 @@ start_server {
assert_equal [dict get $group entries-read] 3
assert_equal [dict get $group lag] 0
# wait for replica offset
wait_for_ofs_sync $master $replica
set reply [$replica XINFO STREAM mystream FULL]
set group [lindex [dict get $reply groups] 0]
assert_equal [dict get $group entries-read] 3

View File

@ -2012,6 +2012,7 @@ start_server {tags {"zset"}} {
# Before the fix in #13004, this time would have been 1200+ (i.e. more than 1200ms),
# now it should be 1000, but in order to avoid timing issues, we increase the range a bit.
assert_range [expr $end-$start] 1000 1150
puts "Time: [expr $end-$start]"
r debug set-active-expire 1
$rd close

View File

@ -1288,9 +1288,8 @@ lazyfree-lazy-user-flush no
# to pipelining nor sharding of the instance.
#
# By default threading is disabled, we suggest enabling it only in machines
# that have at least 4 or more cores, leaving at least one spare core.
# Using more than 8 threads is unlikely to help much. We also recommend using
# threaded I/O only if you actually have performance problems, with
# that have at least 3 or more cores, leaving at least one spare core.
# We also recommend using threaded I/O only if you actually have performance problems, with
# instances being able to use a quite big percentage of CPU time, otherwise
# there is no point in using this feature.
#
@ -1301,19 +1300,9 @@ lazyfree-lazy-user-flush no
# io-threads 4
#
# Setting io-threads to 1 will just use the main thread as usual.
# When I/O threads are enabled, we only use threads for writes, that is
# to thread the write(2) syscall and transfer the client buffers to the
# socket. However it is also possible to enable threading of reads and
# protocol parsing using the following configuration directive, by setting
# it to yes:
#
# io-threads-do-reads no
#
# Usually threading reads doesn't help much.
#
# NOTE 1: This configuration directive cannot be changed at runtime via
# CONFIG SET. Also, this feature currently does not work when SSL is
# enabled.
# When I/O threads are enabled, we use threads for reads and writes, that is
# to thread the write and read syscall and transfer the client buffers to the
# socket and to enable threading of reads and protocol parsing.
#
# NOTE 2: If you want to test the server speedup using valkey-benchmark, make
# sure you also run the benchmark itself in threaded mode, using the