Optimize IO thread offload for modified argv (#1360)

### Improve expired commands performance with IO threads

#### Background
In our IO threads architecture, IO threads allocate client argv's and
later when we free it after processCommand we offload its free to the IO
threads.
With jemalloc, it's crucial that the same thread that allocates memory
also frees it.

For some commands we modify the client's argv in the main thread during
command processing (for example in `SET EX` command we rewrite the
command to use absolute time for replication propagation).

#### Current issues
1. When commands are rewritten (e.g., expire commands), we store the
original argv
   in `c->original_argv`. However, we're currently:
   - Freeing new argv (allocated by main thread) in IO threads
   - Freeing original argv (allocated by IO threads) in main thread
2. Currently, `c->original_argv` points to new array with old 
objects, while `c->argv` has old array with new objects, making memory
free management complicated.

#### Changes
1. Refactored argv modification handling code to ensure consistency -
both array and objects are now either all new or all old
2. Moved original_argv cleanup to happen in resetClient after argv
cleanup
3. Modified IO threads code to properly handle original argv cleanup
when argv are modified.

#### Performance Impact
Benchmark with `SET EX` commands (650 clients, 512 byte value, 8 IO
threads):
- New implementation: **729,548 ops/sec**
- Old implementation: **633,243 ops/sec**
Representing a **~15%** performance improvement due to more efficient
memory handling.

---------

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
Signed-off-by: ranshid <88133677+ranshid@users.noreply.github.com>
Co-authored-by: ranshid <88133677+ranshid@users.noreply.github.com>
This commit is contained in:
uriyage 2024-12-03 19:20:31 +02:00 committed by GitHub
parent 397201c48f
commit 9f8b174c2e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 235 additions and 45 deletions

View File

@ -206,7 +206,6 @@ void unblockClient(client *c, int queue_for_reprocessing) {
/* Reset the client for a new query, unless the client has pending command to process
* or in case a shutdown operation was canceled and we are still in the processCommand sequence */
if (!c->flag.pending_command && c->bstate.btype != BLOCKED_SHUTDOWN) {
freeClientOriginalArgv(c);
/* Clients that are not blocked on keys are not reprocessed so we must
* call reqresAppendResponse here (for clients blocked on key,
* unblockClientOnKey is called, which eventually calls processCommand,

View File

@ -441,8 +441,8 @@ void IOThreadFreeArgv(void *data) {
/* This function attempts to offload the client's argv to an IO thread.
* Returns C_OK if the client's argv were successfully offloaded to an IO thread,
* C_ERR otherwise. */
int tryOffloadFreeArgvToIOThreads(client *c) {
if (server.active_io_threads_num <= 1 || c->argc == 0) {
int tryOffloadFreeArgvToIOThreads(client *c, int argc, robj **argv) {
if (server.active_io_threads_num <= 1 || argc == 0) {
return C_ERR;
}
@ -456,11 +456,11 @@ int tryOffloadFreeArgvToIOThreads(client *c) {
int last_arg_to_free = -1;
/* Prepare the argv */
for (int j = 0; j < c->argc; j++) {
if (c->argv[j]->refcount > 1) {
decrRefCount(c->argv[j]);
for (int j = 0; j < argc; j++) {
if (argv[j]->refcount > 1) {
decrRefCount(argv[j]);
/* Set argv[j] to NULL to avoid double free */
c->argv[j] = NULL;
argv[j] = NULL;
} else {
last_arg_to_free = j;
}
@ -468,17 +468,17 @@ int tryOffloadFreeArgvToIOThreads(client *c) {
/* If no argv to free, free the argv array at the main thread */
if (last_arg_to_free == -1) {
zfree(c->argv);
zfree(argv);
return C_OK;
}
/* We set the refcount of the last arg to free to 0 to indicate that
* this is the last argument to free. With this approach, we don't need to
* send the argc to the IO thread and we can send just the argv ptr. */
c->argv[last_arg_to_free]->refcount = 0;
argv[last_arg_to_free]->refcount = 0;
/* Must succeed as we checked the free space before. */
IOJobQueue_push(jq, IOThreadFreeArgv, c->argv);
IOJobQueue_push(jq, IOThreadFreeArgv, argv);
return C_OK;
}

View File

@ -9,7 +9,7 @@ int inMainThread(void);
int trySendReadToIOThreads(client *c);
int trySendWriteToIOThreads(client *c);
int tryOffloadFreeObjToIOThreads(robj *o);
int tryOffloadFreeArgvToIOThreads(client *c);
int tryOffloadFreeArgvToIOThreads(client *c, int argc, robj **argv);
void adjustIOThreadsByEventLoad(int numevents, int increase_only);
void drainIOThreadsQueue(void);
void trySendPollJobToIOThreads(void);

View File

@ -238,6 +238,10 @@ void execCommand(client *c) {
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].argv_len = c->argv_len;
c->mstate.commands[j].cmd = c->cmd;
/* The original argv has already been processed for slowlog and monitor,
* so we can safely free it before proceeding to the next command. */
freeClientOriginalArgv(c);
}
// restore old DENY_BLOCKING value

View File

@ -1488,14 +1488,19 @@ void freeClientOriginalArgv(client *c) {
/* We didn't rewrite this client */
if (!c->original_argv) return;
for (int j = 0; j < c->original_argc; j++) decrRefCount(c->original_argv[j]);
zfree(c->original_argv);
if (tryOffloadFreeArgvToIOThreads(c, c->original_argc, c->original_argv) == C_ERR) {
for (int j = 0; j < c->original_argc; j++) decrRefCount(c->original_argv[j]);
zfree(c->original_argv);
}
c->original_argv = NULL;
c->original_argc = 0;
}
void freeClientArgv(client *c) {
if (tryOffloadFreeArgvToIOThreads(c) == C_ERR) {
/* If original_argv exists, 'c->argv' was allocated by the main thread,
* so it's more efficient to free it directly here rather than offloading to IO threads */
if (c->original_argv || tryOffloadFreeArgvToIOThreads(c, c->argc, c->argv) == C_ERR) {
for (int j = 0; j < c->argc; j++) decrRefCount(c->argv[j]);
zfree(c->argv);
}
@ -2545,6 +2550,7 @@ void resetClient(client *c) {
serverCommandProc *prevcmd = c->cmd ? c->cmd->proc : NULL;
freeClientArgv(c);
freeClientOriginalArgv(c);
c->cur_script = NULL;
c->reqtype = 0;
c->multibulklen = 0;
@ -4248,16 +4254,53 @@ void securityWarningCommand(client *c) {
freeClientAsync(c);
}
/* Keep track of the original command arguments so that we can generate
* an accurate slowlog entry after the command has been executed. */
static void retainOriginalCommandVector(client *c) {
/* We already rewrote this command, so don't rewrite it again */
if (c->original_argv) return;
c->original_argc = c->argc;
c->original_argv = zmalloc(sizeof(robj *) * (c->argc));
for (int j = 0; j < c->argc; j++) {
c->original_argv[j] = c->argv[j];
incrRefCount(c->argv[j]);
/* This function preserves the original command arguments for accurate slowlog recording.
*
* It performs the following operations:
* - Stores the initial command vector if not already saved
* - Manages memory allocation for command argument modifications
*
* new_argc - The new number of arguments to allocate space for if necessary.
* new_argv - Optional pointer to a new argument vector. If NULL, space will be
* allocated for new_argc arguments, preserving the existing arguments.
*/
static void backupAndUpdateClientArgv(client *c, int new_argc, robj **new_argv) {
robj **old_argv = c->argv;
int old_argc = c->argc;
/* Store original arguments if not already saved */
if (!c->original_argv) {
c->original_argc = old_argc;
c->original_argv = old_argv;
}
/* Handle direct argv replacement */
if (new_argv) {
c->argv = new_argv;
} else if (c->original_argv == old_argv || new_argc > old_argc) {
/* Allocate new array if necessary */
c->argv = zmalloc(sizeof(robj *) * new_argc);
for (int i = 0; i < old_argc && i < new_argc; i++) {
c->argv[i] = old_argv[i];
incrRefCount(c->argv[i]);
}
/* Initialize new argument slots to NULL */
for (int i = old_argc; i < new_argc; i++) {
c->argv[i] = NULL;
}
}
c->argc = new_argc;
c->argv_len = new_argc;
/* Clean up old argv if necessary */
if (c->argv != old_argv && c->original_argv != old_argv) {
for (int i = 0; i < old_argc; i++) {
if (old_argv[i]) decrRefCount(old_argv[i]);
}
zfree(old_argv);
}
}
@ -4265,7 +4308,7 @@ static void retainOriginalCommandVector(client *c) {
* in the slowlog. This information is stored in the
* original_argv array. */
void redactClientCommandArgument(client *c, int argc) {
retainOriginalCommandVector(c);
backupAndUpdateClientArgv(c, c->argc, NULL);
if (c->original_argv[argc] == shared.redacted) {
/* This argument has already been redacted */
return;
@ -4298,10 +4341,7 @@ void rewriteClientCommandVector(client *c, int argc, ...) {
/* Completely replace the client command vector with the provided one. */
void replaceClientCommandVector(client *c, int argc, robj **argv) {
int j;
retainOriginalCommandVector(c);
freeClientArgv(c);
c->argv = argv;
c->argc = argc;
backupAndUpdateClientArgv(c, argc, argv);
c->argv_len_sum = 0;
for (j = 0; j < c->argc; j++)
if (c->argv[j]) c->argv_len_sum += getStringObjectLen(c->argv[j]);
@ -4322,19 +4362,9 @@ void replaceClientCommandVector(client *c, int argc, robj **argv) {
* free the no longer used objects on c->argv. */
void rewriteClientCommandArgument(client *c, int i, robj *newval) {
robj *oldval;
retainOriginalCommandVector(c);
int new_argc = (i >= c->argc) ? i + 1 : c->argc;
backupAndUpdateClientArgv(c, new_argc, NULL);
/* We need to handle both extending beyond argc (just update it and
* initialize the new element) or beyond argv_len (realloc is needed).
*/
if (i >= c->argc) {
if (i >= c->argv_len) {
c->argv = zrealloc(c->argv, sizeof(robj *) * (i + 1));
c->argv_len = i + 1;
}
c->argc = i + 1;
c->argv[i] = NULL;
}
oldval = c->argv[i];
if (oldval) c->argv_len_sum -= getStringObjectLen(oldval);
if (newval) c->argv_len_sum += getStringObjectLen(newval);

View File

@ -3659,10 +3659,6 @@ void call(client *c, int flags) {
replicationFeedMonitors(c, server.monitors, c->db->id, argv, argc);
}
/* Clear the original argv.
* If the client is blocked we will handle slowlog when it is unblocked. */
if (!c->flag.blocked) freeClientOriginalArgv(c);
/* Populate the per-command and per-slot statistics that we show in INFO commandstats and CLUSTER SLOT-STATS,
* respectively. If the client is blocked we will handle latency stats and duration when it is unblocked. */
if (update_command_stats && !c->flag.blocked) {

File diff suppressed because one or more lines are too long

131
src/unit/test_networking.c Normal file
View File

@ -0,0 +1,131 @@
#include <stdatomic.h>
#include "../networking.c"
#include "../server.c"
#include "test_help.h"
int test_backupAndUpdateClientArgv(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
client *c = zmalloc(sizeof(client));
/* Test 1: Initial backup of arguments */
c->argc = 2;
robj **initial_argv = zmalloc(sizeof(robj *) * 2);
c->argv = initial_argv;
c->argv[0] = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test"));
c->argv[1] = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test2"));
c->original_argv = NULL;
backupAndUpdateClientArgv(c, 3, NULL);
TEST_ASSERT(c->argv != initial_argv);
TEST_ASSERT(c->original_argv == initial_argv);
TEST_ASSERT(c->original_argc == 2);
TEST_ASSERT(c->argc == 3);
TEST_ASSERT(c->argv_len == 3);
TEST_ASSERT(c->argv[0]->refcount == 2);
TEST_ASSERT(c->argv[1]->refcount == 2);
TEST_ASSERT(c->argv[2] == NULL);
/* Test 2: Direct argv replacement */
robj **new_argv = zmalloc(sizeof(robj *) * 2);
new_argv[0] = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test"));
new_argv[1] = createObject(OBJ_STRING, sdscatfmt(sdsempty(), "test2"));
backupAndUpdateClientArgv(c, 2, new_argv);
TEST_ASSERT(c->argv == new_argv);
TEST_ASSERT(c->argc == 2);
TEST_ASSERT(c->argv_len == 2);
TEST_ASSERT(c->original_argv != c->argv);
TEST_ASSERT(c->original_argv == initial_argv);
TEST_ASSERT(c->original_argc == 2);
TEST_ASSERT(c->original_argv[0]->refcount == 1);
TEST_ASSERT(c->original_argv[1]->refcount == 1);
/* Test 3: Expanding argc */
backupAndUpdateClientArgv(c, 4, NULL);
TEST_ASSERT(c->argc == 4);
TEST_ASSERT(c->argv_len == 4);
TEST_ASSERT(c->argv[0] != NULL);
TEST_ASSERT(c->argv[1] != NULL);
TEST_ASSERT(c->argv[2] == NULL);
TEST_ASSERT(c->argv[3] == NULL);
TEST_ASSERT(c->original_argv == initial_argv);
/* Cleanup */
for (int i = 0; i < c->original_argc; i++) {
decrRefCount(c->original_argv[i]);
}
zfree(c->original_argv);
for (int i = 0; i < c->argc; i++) {
if (c->argv[i]) decrRefCount(c->argv[i]);
}
zfree(c->argv);
zfree(c);
return 0;
}
int test_rewriteClientCommandArgument(int argc, char **argv, int flags) {
UNUSED(argc);
UNUSED(argv);
UNUSED(flags);
client *c = zmalloc(sizeof(client));
c->argc = 3;
robj **initial_argv = zmalloc(sizeof(robj *) * 3);
c->argv = initial_argv;
c->original_argv = NULL;
c->argv_len_sum = 0;
/* Initialize client with command "SET key value" */
c->argv[0] = createStringObject("SET", 3);
robj *original_key = createStringObject("key", 3);
c->argv[1] = original_key;
c->argv[2] = createStringObject("value", 5);
c->argv_len_sum = 11; // 3 + 3 + 5
/* Test 1: Rewrite existing argument */
robj *newval = createStringObject("newkey", 6);
rewriteClientCommandArgument(c, 1, newval);
TEST_ASSERT(c->argv[1] == newval);
TEST_ASSERT(c->argv[1]->refcount == 2);
TEST_ASSERT(c->argv_len_sum == 14); // 3 + 6 + 5
TEST_ASSERT(c->original_argv == initial_argv);
TEST_ASSERT(c->original_argv[1] == original_key);
TEST_ASSERT(c->original_argv[1]->refcount == 1);
/* Test 3: Extend argument vector */
robj *extraval = createStringObject("extra", 5);
rewriteClientCommandArgument(c, 3, extraval);
TEST_ASSERT(c->argc == 4);
TEST_ASSERT(c->argv[3] == extraval);
TEST_ASSERT(c->argv_len_sum == 19); // 3 + 6 + 5 + 5
TEST_ASSERT(c->original_argv == initial_argv);
/* Cleanup */
for (int i = 0; i < c->argc; i++) {
if (c->argv[i]) decrRefCount(c->argv[i]);
}
zfree(c->argv);
for (int i = 0; i < c->original_argc; i++) {
if (c->original_argv[i]) decrRefCount(c->original_argv[i]);
}
zfree(c->original_argv);
decrRefCount(newval);
decrRefCount(extraval);
zfree(c);
return 0;
}

View File

@ -376,6 +376,32 @@ start_server {tags {"introspection"}} {
$rd close
}
# This test verifies that MONITOR correctly records overwritten commands
# when executed within a MULTI-EXEC block. Specifically, it checks that even if
# the original SET-EX command arguments are overwritten for replica propagation, the MONITOR output
# still shows the original command.
test {MONITOR correctly records SET EX in MULTI-EXEC} {
# Start monitoring client
set rd [valkey_deferring_client]
$rd monitor
$rd read ; # Discard the OK
# Execute multi-exec block with SET EX commands
r multi
r set "{slot}key1" value1 ex 3600
r set "{slot}key2" value2 ex 1800
r exec
# Verify monitor output shows the original commands:
assert_match {*"multi"*} [$rd read]
assert_match {*"set"*"{slot}key1"*"value1"*"ex"*"3600"*} [$rd read]
assert_match {*"set"*"{slot}key2"*"value2"*"ex"*"1800"*} [$rd read]
assert_match {*"exec"*} [$rd read]
# Clean up monitoring client
$rd close
}
test {MONITOR log blocked command only once} {
# need to reconnect in order to reset the clients state
reconnect