
This PR utilizes the IO threads to execute commands in batches, allowing us to prefetch the dictionary data in advance. After making the IO threads asynchronous and offloading more work to them in the first 2 PRs, the `lookupKey` function becomes a main bottle-neck and it takes about 50% of the main-thread time (Tested with SET command). This is because the Valkey dictionary is a straightforward but inefficient chained hash implementation. While traversing the hash linked lists, every access to either a dictEntry structure, pointer to key, or a value object requires, with high probability, an expensive external memory access. ### Memory Access Amortization Memory Access Amortization (MAA) is a technique designed to optimize the performance of dynamic data structures by reducing the impact of memory access latency. It is applicable when multiple operations need to be executed concurrently. The principle behind it is that for certain dynamic data structures, executing operations in a batch is more efficient than executing each one separately. Rather than executing operations sequentially, this approach interleaves the execution of all operations. This is done in such a way that whenever a memory access is required during an operation, the program prefetches the necessary memory and transitions to another operation. This ensures that when one operation is blocked awaiting memory access, other memory accesses are executed in parallel, thereby reducing the average access latency. We applied this method in the development of `dictPrefetch`, which takes as parameters a vector of keys and dictionaries. It ensures that all memory addresses required to execute dictionary operations for these keys are loaded into the L1-L3 caches when executing commands. Essentially, `dictPrefetch` is an interleaved execution of dictFind for all the keys. **Implementation details** When the main thread iterates over the `clients-pending-io-read`, for clients with ready-to-execute commands (i.e., clients for which the IO thread has parsed the commands), a batch of up to 16 commands is created. Initially, the command's argv, which were allocated by the IO thread, is prefetched to the main thread's L1 cache. Subsequently, all the dict entries and values required for the commands are prefetched from the dictionary before the command execution. Only then will the commands be executed. --------- Signed-off-by: Uri Yagelnik <uriy@amazon.com>
291 lines
10 KiB
Tcl
291 lines
10 KiB
Tcl
source tests/support/cli.tcl
|
|
|
|
test {CONFIG SET port number} {
|
|
start_server {} {
|
|
if {$::tls} { set port_cfg tls-port} else { set port_cfg port }
|
|
|
|
# available port
|
|
set avail_port [find_available_port $::baseport $::portcount]
|
|
set rd [valkey [srv 0 host] [srv 0 port] 0 $::tls]
|
|
$rd CONFIG SET $port_cfg $avail_port
|
|
$rd close
|
|
set rd [valkey [srv 0 host] $avail_port 0 $::tls]
|
|
$rd PING
|
|
|
|
# already inuse port
|
|
catch {$rd CONFIG SET $port_cfg $::test_server_port} e
|
|
assert_match {*Unable to listen on this port*} $e
|
|
$rd close
|
|
|
|
# make sure server still listening on the previous port
|
|
set rd [valkey [srv 0 host] $avail_port 0 $::tls]
|
|
$rd PING
|
|
$rd close
|
|
}
|
|
} {} {external:skip}
|
|
|
|
test {CONFIG SET bind address} {
|
|
start_server {} {
|
|
# non-valid address
|
|
catch {r CONFIG SET bind "999.999.999.999"} e
|
|
assert_match {*Failed to bind to specified addresses*} $e
|
|
|
|
# make sure server still bound to the previous address
|
|
set rd [valkey [srv 0 host] [srv 0 port] 0 $::tls]
|
|
$rd PING
|
|
$rd close
|
|
}
|
|
} {} {external:skip}
|
|
|
|
# Attempt to connect to host using a client bound to bindaddr,
|
|
# and return a non-zero value if successful within specified
|
|
# millisecond timeout, or zero otherwise.
|
|
proc test_loopback {host bindaddr timeout} {
|
|
if {[exec uname] != {Linux}} {
|
|
return 0
|
|
}
|
|
|
|
after $timeout set ::test_loopback_state timeout
|
|
if {[catch {
|
|
set server_sock [socket -server accept 0]
|
|
set port [lindex [fconfigure $server_sock -sockname] 2] } err]} {
|
|
return 0
|
|
}
|
|
|
|
proc accept {channel clientaddr clientport} {
|
|
set ::test_loopback_state "connected"
|
|
close $channel
|
|
}
|
|
|
|
if {[catch {set client_sock [socket -async -myaddr $bindaddr $host $port]} err]} {
|
|
puts "test_loopback: Client connect failed: $err"
|
|
} else {
|
|
close $client_sock
|
|
}
|
|
|
|
vwait ::test_loopback_state
|
|
close $server_sock
|
|
|
|
return [expr {$::test_loopback_state == {connected}}]
|
|
}
|
|
|
|
test {CONFIG SET bind-source-addr} {
|
|
if {[test_loopback 127.0.0.1 127.0.0.2 1000]} {
|
|
start_server {} {
|
|
start_server {} {
|
|
set replica [srv 0 client]
|
|
set master [srv -1 client]
|
|
|
|
$master config set protected-mode no
|
|
|
|
$replica config set bind-source-addr 127.0.0.2
|
|
$replica replicaof [srv -1 host] [srv -1 port]
|
|
|
|
wait_for_condition 50 100 {
|
|
[s 0 master_link_status] eq {up}
|
|
} else {
|
|
fail "Replication not started."
|
|
}
|
|
|
|
assert_match {*ip=127.0.0.2*} [s -1 slave0]
|
|
}
|
|
}
|
|
} else {
|
|
if {$::verbose} { puts "Skipping bind-source-addr test." }
|
|
}
|
|
} {} {external:skip}
|
|
|
|
start_server {config "minimal.conf" tags {"external:skip"}} {
|
|
test {Default bind address configuration handling} {
|
|
# Default is explicit and sane
|
|
assert_equal "* -::*" [lindex [r CONFIG GET bind] 1]
|
|
|
|
# CONFIG REWRITE acknowledges this as a default
|
|
r CONFIG REWRITE
|
|
assert_equal 0 [count_message_lines [srv 0 config_file] bind]
|
|
|
|
# Removing the bind address works
|
|
r CONFIG SET bind ""
|
|
assert_equal "" [lindex [r CONFIG GET bind] 1]
|
|
|
|
# No additional clients can connect
|
|
catch {valkey_client} err
|
|
assert_match {*connection refused*} $err
|
|
|
|
# CONFIG REWRITE handles empty bindaddr
|
|
r CONFIG REWRITE
|
|
assert_equal 1 [count_message_lines [srv 0 config_file] bind]
|
|
|
|
# Make sure we're able to restart
|
|
restart_server 0 0 0 0
|
|
|
|
# Make sure bind parameter is as expected and server handles binding
|
|
# accordingly.
|
|
# (it seems that valkeycli_exec behaves differently in RESP3, possibly
|
|
# because CONFIG GET returns a dict instead of a list so valkey-cli emits
|
|
# it in a single line)
|
|
if {$::force_resp3} {
|
|
assert_equal {{bind }} [valkeycli_exec 0 config get bind]
|
|
} else {
|
|
assert_equal {bind {}} [valkeycli_exec 0 config get bind]
|
|
}
|
|
catch {reconnect 0} err
|
|
assert_match {*connection refused*} $err
|
|
|
|
assert_equal {OK} [valkeycli_exec 0 config set bind *]
|
|
reconnect 0
|
|
r ping
|
|
} {PONG}
|
|
|
|
test {Protected mode works as expected} {
|
|
# Get a non-loopback address of this instance for this test.
|
|
set myaddr [get_nonloopback_addr]
|
|
if {$myaddr != "" && ![string match {127.*} $myaddr]} {
|
|
# Non-loopback client should fail by default
|
|
set r2 [get_nonloopback_client]
|
|
catch {$r2 ping} err
|
|
assert_match {*DENIED*} $err
|
|
|
|
# Bind configuration should not matter
|
|
assert_equal {OK} [r config set bind "*"]
|
|
set r2 [get_nonloopback_client]
|
|
catch {$r2 ping} err
|
|
assert_match {*DENIED*} $err
|
|
|
|
# Setting a password should disable protected mode
|
|
assert_equal {OK} [r config set requirepass "secret"]
|
|
set r2 [valkey $myaddr [srv 0 "port"] 0 $::tls]
|
|
assert_equal {OK} [$r2 auth secret]
|
|
assert_equal {PONG} [$r2 ping]
|
|
|
|
# Clearing the password re-enables protected mode
|
|
assert_equal {OK} [r config set requirepass ""]
|
|
set r2 [valkey $myaddr [srv 0 "port"] 0 $::tls]
|
|
assert_match {*DENIED*} $err
|
|
|
|
# Explicitly disabling protected-mode works
|
|
assert_equal {OK} [r config set protected-mode no]
|
|
set r2 [valkey $myaddr [srv 0 "port"] 0 $::tls]
|
|
assert_equal {PONG} [$r2 ping]
|
|
}
|
|
}
|
|
}
|
|
|
|
start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} {
|
|
set server_pid [s process_id]
|
|
# Skip if non io-threads mode - as it is relevant only for io-threads mode
|
|
if {[r config get io-threads] ne "io-threads 1"} {
|
|
test {prefetch works as expected when killing a client from the middle of prefetch commands batch} {
|
|
# Create 16 (prefetch batch size) +1 clients
|
|
for {set i 0} {$i < 16} {incr i} {
|
|
set rd$i [valkey_deferring_client]
|
|
}
|
|
|
|
# set a key that will be later be prefetch
|
|
r set a 0
|
|
|
|
# Get the client ID of rd4
|
|
$rd4 client id
|
|
set rd4_id [$rd4 read]
|
|
|
|
# Create a batch of commands by suspending the server for a while
|
|
# before responding to the first command
|
|
pause_process $server_pid
|
|
|
|
# The first client will kill the fourth client
|
|
$rd0 client kill id $rd4_id
|
|
|
|
# Send set commands for all clients except the first
|
|
for {set i 1} {$i < 16} {incr i} {
|
|
[set rd$i] set a $i
|
|
[set rd$i] flush
|
|
}
|
|
|
|
# Resume the server
|
|
resume_process $server_pid
|
|
|
|
# Read the results
|
|
assert_equal {1} [$rd0 read]
|
|
catch {$rd4 read} err
|
|
assert_match {I/O error reading reply} $err
|
|
|
|
# verify the prefetch stats are as expected
|
|
set info [r info stats]
|
|
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
|
|
assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower
|
|
set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches]
|
|
assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher
|
|
|
|
# Verify the final state
|
|
$rd15 get a
|
|
assert_equal {OK} [$rd15 read]
|
|
assert_equal {15} [$rd15 read]
|
|
}
|
|
|
|
test {prefetch works as expected when changing the batch size while executing the commands batch} {
|
|
# Create 16 (default prefetch batch size) clients
|
|
for {set i 0} {$i < 16} {incr i} {
|
|
set rd$i [valkey_deferring_client]
|
|
}
|
|
|
|
# Create a batch of commands by suspending the server for a while
|
|
# before responding to the first command
|
|
pause_process $server_pid
|
|
|
|
# Send set commands for all clients the 5th client will change the prefetch batch size
|
|
for {set i 0} {$i < 16} {incr i} {
|
|
if {$i == 4} {
|
|
[set rd$i] config set prefetch-batch-max-size 1
|
|
}
|
|
[set rd$i] set a $i
|
|
[set rd$i] flush
|
|
}
|
|
# Resume the server
|
|
resume_process $server_pid
|
|
# Read the results
|
|
for {set i 0} {$i < 16} {incr i} {
|
|
assert_equal {OK} [[set rd$i] read]
|
|
}
|
|
|
|
# assert the configured prefetch batch size was changed
|
|
assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"}
|
|
}
|
|
|
|
test {no prefetch when the batch size is set to 0} {
|
|
# set the batch size to 0
|
|
r config set prefetch-batch-max-size 0
|
|
# save the current value of prefetch entries
|
|
set info [r info stats]
|
|
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
|
|
|
|
# Create 16 (default prefetch batch size) clients
|
|
for {set i 0} {$i < 16} {incr i} {
|
|
set rd$i [valkey_deferring_client]
|
|
}
|
|
|
|
# Create a batch of commands by suspending the server for a while
|
|
# before responding to the first command
|
|
pause_process $server_pid
|
|
|
|
# Send set commands for all clients
|
|
for {set i 0} {$i < 16} {incr i} {
|
|
[set rd$i] set a $i
|
|
[set rd$i] flush
|
|
}
|
|
|
|
# Resume the server
|
|
resume_process $server_pid
|
|
|
|
# Read the results
|
|
for {set i 0} {$i < 16} {incr i} {
|
|
assert_equal {OK} [[set rd$i] read]
|
|
}
|
|
|
|
# assert the prefetch entries did not change
|
|
set info [r info stats]
|
|
set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
|
|
assert_equal $prefetch_entries $new_prefetch_entries
|
|
}
|
|
}
|
|
}
|