futriix/tests/unit/networking.tcl
uriyage 04d76d8b02
Improve multithreaded performance with memory prefetching (#861)
This PR utilizes the IO threads to execute commands in batches, allowing
us to prefetch the dictionary data in advance.

After making the IO threads asynchronous and offloading more work to
them in the first 2 PRs, the `lookupKey` function becomes a main
bottle-neck and it takes about 50% of the main-thread time (Tested with
SET command). This is because the Valkey dictionary is a straightforward
but inefficient chained hash implementation. While traversing the hash
linked lists, every access to either a dictEntry structure, pointer to
key, or a value object requires, with high probability, an expensive
external memory access.

### Memory Access Amortization

Memory Access Amortization (MAA) is a technique designed to optimize the
performance of dynamic data structures by reducing the impact of memory
access latency. It is applicable when multiple operations need to be
executed concurrently. The principle behind it is that for certain
dynamic data structures, executing operations in a batch is more
efficient than executing each one separately.

Rather than executing operations sequentially, this approach interleaves
the execution of all operations. This is done in such a way that
whenever a memory access is required during an operation, the program
prefetches the necessary memory and transitions to another operation.
This ensures that when one operation is blocked awaiting memory access,
other memory accesses are executed in parallel, thereby reducing the
average access latency.

We applied this method in the development of `dictPrefetch`, which takes
as parameters a vector of keys and dictionaries. It ensures that all
memory addresses required to execute dictionary operations for these
keys are loaded into the L1-L3 caches when executing commands.
Essentially, `dictPrefetch` is an interleaved execution of dictFind for
all the keys.


**Implementation details**

When the main thread iterates over the `clients-pending-io-read`, for
clients with ready-to-execute commands (i.e., clients for which the IO
thread has parsed the commands), a batch of up to 16 commands is
created. Initially, the command's argv, which were allocated by the IO
thread, is prefetched to the main thread's L1 cache. Subsequently, all
the dict entries and values required for the commands are prefetched
from the dictionary before the command execution. Only then will the
commands be executed.

---------

Signed-off-by: Uri Yagelnik <uriy@amazon.com>
2024-08-26 21:10:44 -07:00

291 lines
10 KiB
Tcl

source tests/support/cli.tcl
test {CONFIG SET port number} {
start_server {} {
if {$::tls} { set port_cfg tls-port} else { set port_cfg port }
# available port
set avail_port [find_available_port $::baseport $::portcount]
set rd [valkey [srv 0 host] [srv 0 port] 0 $::tls]
$rd CONFIG SET $port_cfg $avail_port
$rd close
set rd [valkey [srv 0 host] $avail_port 0 $::tls]
$rd PING
# already inuse port
catch {$rd CONFIG SET $port_cfg $::test_server_port} e
assert_match {*Unable to listen on this port*} $e
$rd close
# make sure server still listening on the previous port
set rd [valkey [srv 0 host] $avail_port 0 $::tls]
$rd PING
$rd close
}
} {} {external:skip}
test {CONFIG SET bind address} {
start_server {} {
# non-valid address
catch {r CONFIG SET bind "999.999.999.999"} e
assert_match {*Failed to bind to specified addresses*} $e
# make sure server still bound to the previous address
set rd [valkey [srv 0 host] [srv 0 port] 0 $::tls]
$rd PING
$rd close
}
} {} {external:skip}
# Attempt to connect to host using a client bound to bindaddr,
# and return a non-zero value if successful within specified
# millisecond timeout, or zero otherwise.
proc test_loopback {host bindaddr timeout} {
if {[exec uname] != {Linux}} {
return 0
}
after $timeout set ::test_loopback_state timeout
if {[catch {
set server_sock [socket -server accept 0]
set port [lindex [fconfigure $server_sock -sockname] 2] } err]} {
return 0
}
proc accept {channel clientaddr clientport} {
set ::test_loopback_state "connected"
close $channel
}
if {[catch {set client_sock [socket -async -myaddr $bindaddr $host $port]} err]} {
puts "test_loopback: Client connect failed: $err"
} else {
close $client_sock
}
vwait ::test_loopback_state
close $server_sock
return [expr {$::test_loopback_state == {connected}}]
}
test {CONFIG SET bind-source-addr} {
if {[test_loopback 127.0.0.1 127.0.0.2 1000]} {
start_server {} {
start_server {} {
set replica [srv 0 client]
set master [srv -1 client]
$master config set protected-mode no
$replica config set bind-source-addr 127.0.0.2
$replica replicaof [srv -1 host] [srv -1 port]
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replication not started."
}
assert_match {*ip=127.0.0.2*} [s -1 slave0]
}
}
} else {
if {$::verbose} { puts "Skipping bind-source-addr test." }
}
} {} {external:skip}
start_server {config "minimal.conf" tags {"external:skip"}} {
test {Default bind address configuration handling} {
# Default is explicit and sane
assert_equal "* -::*" [lindex [r CONFIG GET bind] 1]
# CONFIG REWRITE acknowledges this as a default
r CONFIG REWRITE
assert_equal 0 [count_message_lines [srv 0 config_file] bind]
# Removing the bind address works
r CONFIG SET bind ""
assert_equal "" [lindex [r CONFIG GET bind] 1]
# No additional clients can connect
catch {valkey_client} err
assert_match {*connection refused*} $err
# CONFIG REWRITE handles empty bindaddr
r CONFIG REWRITE
assert_equal 1 [count_message_lines [srv 0 config_file] bind]
# Make sure we're able to restart
restart_server 0 0 0 0
# Make sure bind parameter is as expected and server handles binding
# accordingly.
# (it seems that valkeycli_exec behaves differently in RESP3, possibly
# because CONFIG GET returns a dict instead of a list so valkey-cli emits
# it in a single line)
if {$::force_resp3} {
assert_equal {{bind }} [valkeycli_exec 0 config get bind]
} else {
assert_equal {bind {}} [valkeycli_exec 0 config get bind]
}
catch {reconnect 0} err
assert_match {*connection refused*} $err
assert_equal {OK} [valkeycli_exec 0 config set bind *]
reconnect 0
r ping
} {PONG}
test {Protected mode works as expected} {
# Get a non-loopback address of this instance for this test.
set myaddr [get_nonloopback_addr]
if {$myaddr != "" && ![string match {127.*} $myaddr]} {
# Non-loopback client should fail by default
set r2 [get_nonloopback_client]
catch {$r2 ping} err
assert_match {*DENIED*} $err
# Bind configuration should not matter
assert_equal {OK} [r config set bind "*"]
set r2 [get_nonloopback_client]
catch {$r2 ping} err
assert_match {*DENIED*} $err
# Setting a password should disable protected mode
assert_equal {OK} [r config set requirepass "secret"]
set r2 [valkey $myaddr [srv 0 "port"] 0 $::tls]
assert_equal {OK} [$r2 auth secret]
assert_equal {PONG} [$r2 ping]
# Clearing the password re-enables protected mode
assert_equal {OK} [r config set requirepass ""]
set r2 [valkey $myaddr [srv 0 "port"] 0 $::tls]
assert_match {*DENIED*} $err
# Explicitly disabling protected-mode works
assert_equal {OK} [r config set protected-mode no]
set r2 [valkey $myaddr [srv 0 "port"] 0 $::tls]
assert_equal {PONG} [$r2 ping]
}
}
}
start_server {config "minimal.conf" tags {"external:skip"} overrides {enable-debug-command {yes}}} {
set server_pid [s process_id]
# Skip if non io-threads mode - as it is relevant only for io-threads mode
if {[r config get io-threads] ne "io-threads 1"} {
test {prefetch works as expected when killing a client from the middle of prefetch commands batch} {
# Create 16 (prefetch batch size) +1 clients
for {set i 0} {$i < 16} {incr i} {
set rd$i [valkey_deferring_client]
}
# set a key that will be later be prefetch
r set a 0
# Get the client ID of rd4
$rd4 client id
set rd4_id [$rd4 read]
# Create a batch of commands by suspending the server for a while
# before responding to the first command
pause_process $server_pid
# The first client will kill the fourth client
$rd0 client kill id $rd4_id
# Send set commands for all clients except the first
for {set i 1} {$i < 16} {incr i} {
[set rd$i] set a $i
[set rd$i] flush
}
# Resume the server
resume_process $server_pid
# Read the results
assert_equal {1} [$rd0 read]
catch {$rd4 read} err
assert_match {I/O error reading reply} $err
# verify the prefetch stats are as expected
set info [r info stats]
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
assert_range $prefetch_entries 2 15; # With slower machines, the number of prefetch entries can be lower
set prefetch_batches [getInfoProperty $info io_threaded_total_prefetch_batches]
assert_range $prefetch_batches 1 7; # With slower machines, the number of batches can be higher
# Verify the final state
$rd15 get a
assert_equal {OK} [$rd15 read]
assert_equal {15} [$rd15 read]
}
test {prefetch works as expected when changing the batch size while executing the commands batch} {
# Create 16 (default prefetch batch size) clients
for {set i 0} {$i < 16} {incr i} {
set rd$i [valkey_deferring_client]
}
# Create a batch of commands by suspending the server for a while
# before responding to the first command
pause_process $server_pid
# Send set commands for all clients the 5th client will change the prefetch batch size
for {set i 0} {$i < 16} {incr i} {
if {$i == 4} {
[set rd$i] config set prefetch-batch-max-size 1
}
[set rd$i] set a $i
[set rd$i] flush
}
# Resume the server
resume_process $server_pid
# Read the results
for {set i 0} {$i < 16} {incr i} {
assert_equal {OK} [[set rd$i] read]
}
# assert the configured prefetch batch size was changed
assert {[r config get prefetch-batch-max-size] eq "prefetch-batch-max-size 1"}
}
test {no prefetch when the batch size is set to 0} {
# set the batch size to 0
r config set prefetch-batch-max-size 0
# save the current value of prefetch entries
set info [r info stats]
set prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
# Create 16 (default prefetch batch size) clients
for {set i 0} {$i < 16} {incr i} {
set rd$i [valkey_deferring_client]
}
# Create a batch of commands by suspending the server for a while
# before responding to the first command
pause_process $server_pid
# Send set commands for all clients
for {set i 0} {$i < 16} {incr i} {
[set rd$i] set a $i
[set rd$i] flush
}
# Resume the server
resume_process $server_pid
# Read the results
for {set i 0} {$i < 16} {incr i} {
assert_equal {OK} [[set rd$i] read]
}
# assert the prefetch entries did not change
set info [r info stats]
set new_prefetch_entries [getInfoProperty $info io_threaded_total_prefetch_entries]
assert_equal $prefetch_entries $new_prefetch_entries
}
}
}