From 47c51d0c7858dc8ce7747b78b73cf8cec2e59ff3 Mon Sep 17 00:00:00 2001 From: ranshid <88133677+ranshid@users.noreply.github.com> Date: Tue, 22 Feb 2022 11:19:38 +0200 Subject: [PATCH] introduce dynamic client reply buffer size - save memory on idle clients (#9822) Current implementation simple idle client which serves no traffic still use ~17Kb of memory. this is mainly due to a fixed size reply buffer currently set to 16kb. We have encountered some cases in which the server operates in a low memory environments. In such cases a user who wishes to create large connection pools to support potential burst period, will exhaust a large amount of memory to maintain connected Idle clients. Some users may choose to "sacrifice" performance in order to save memory. This commit introduce a dynamic mechanism to shrink and expend the client reply buffer based on periodic observed peak. the algorithm works as follows: 1. each time a client reply buffer has been fully written, the last recorded peak is updated: new peak = MAX( last peak, current written size) 2. during clients cron we check for each client if the last observed peak was: a. matching the current buffer size - in which case we expend (resize) the buffer size by 100% b. less than half the buffer size - in which case we shrink the buffer size by 50% 3. In any case we will **not** resize the buffer in case: a. the current buffer peak is less then the current buffer usable size and higher than 1/2 the current buffer usable size b. the value of (current buffer usable size/2) is less than 1Kib c. the value of (current buffer usable size*2) is larger than 16Kib 4. the peak value is reset to the current buffer position once every **5** seconds. we maintain a new field in the client structure (buf_peak_last_reset_time) which is used to keep track of how long it passed since the last buffer peak reset. ### **Interface changes:** **CIENT LIST** - now contains 2 new extra fields: rbs= < the current size in bytes of the client reply buffer > rbp=< the current value in bytes of the last observed buffer peak position > **INFO STATS** - now contains 2 new statistics: reply_buffer_shrinks = < total number of buffer shrinks performed > reply_buffer_expends = < total number of buffer expends performed > Co-authored-by: Oran Agra Co-authored-by: Yoav Steinberg --- src/debug.c | 14 ++++++++ src/networking.c | 15 +++++++-- src/server.c | 58 ++++++++++++++++++++++++++++++++-- src/server.h | 16 ++++++---- tests/test_helper.tcl | 1 + tests/unit/client-eviction.tcl | 17 +++++++--- tests/unit/introspection.tcl | 4 +-- tests/unit/replybufsize.tcl | 47 +++++++++++++++++++++++++++ 8 files changed, 155 insertions(+), 17 deletions(-) create mode 100644 tests/unit/replybufsize.tcl diff --git a/src/debug.c b/src/debug.c index a8261edb2..b95daaa36 100644 --- a/src/debug.c +++ b/src/debug.c @@ -482,6 +482,10 @@ void debugCommand(client *c) { " Show low level client eviction pools info (maxmemory-clients).", "PAUSE-CRON <0|1>", " Stop periodic cron job processing.", +"REPLYBUFFER-PEAK-RESET-TIME ", +" Sets the time (in milliseconds) to wait between client reply buffer peak resets.", +" In case NEVER is provided the last observed peak will never be reset", +" In case RESET is provided the peak reset time will be restored to the default value", NULL }; addReplyHelp(c, help); @@ -958,6 +962,16 @@ NULL { server.pause_cron = atoi(c->argv[2]->ptr); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"replybuffer-peak-reset-time") && c->argc == 3 ) { + if (!strcasecmp(c->argv[2]->ptr, "never")) { + server.reply_buffer_peak_reset_time = -1; + } else if(!strcasecmp(c->argv[2]->ptr, "reset")) { + server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME; + } else { + if (getLongFromObjectOrReply(c, c->argv[2], &server.reply_buffer_peak_reset_time, NULL) != C_OK) + return; + } + addReply(c, shared.ok); } else { addReplySubcommandSyntaxError(c); return; diff --git a/src/networking.c b/src/networking.c index 8a89a88cd..2a8bcc0b9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -131,7 +131,7 @@ client *createClient(connection *conn) { connSetReadHandler(conn, readQueryFromClient); connSetPrivateData(conn, c); } - + c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES); selectDb(c,0); uint64_t client_id; atomicGetIncr(server.next_client_id, client_id, 1); @@ -140,7 +140,9 @@ client *createClient(connection *conn) { c->conn = conn; c->name = NULL; c->bufpos = 0; - c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf); + c->buf_usable_size = zmalloc_usable_size(c->buf); + c->buf_peak = c->buf_usable_size; + c->buf_peak_last_reset_time = server.unixtime; c->ref_repl_buf_node = NULL; c->ref_block_pos = 0; c->qb_pos = 0; @@ -314,6 +316,9 @@ size_t _addReplyToBuffer(client *c, const char *s, size_t len) { size_t reply_len = len > available ? available : len; memcpy(c->buf+c->bufpos,s,reply_len); c->bufpos+=reply_len; + /* We update the buffer peak after appending the reply to the buffer */ + if(c->buf_peak < (size_t)c->bufpos) + c->buf_peak = (size_t)c->bufpos; return reply_len; } @@ -1525,6 +1530,7 @@ void freeClient(client *c) { /* Free data structures. */ listRelease(c->reply); + zfree(c->buf); freeReplicaReferencedReplBuffer(c); freeClientArgv(c); freeClientOriginalArgv(c); @@ -2593,7 +2599,7 @@ sds catClientInfoString(sds s, client *client) { } sds ret = sdscatfmt(s, - "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", + "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U rbs=%U rbp=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", (unsigned long long) client->id, getClientPeerId(client), getClientSockname(client), @@ -2610,6 +2616,8 @@ sds catClientInfoString(sds s, client *client) { (unsigned long long) sdsavail(client->querybuf), (unsigned long long) client->argv_len_sum, (unsigned long long) client->mstate.argv_len_sums, + (unsigned long long) client->buf_usable_size, + (unsigned long long) client->buf_peak, (unsigned long long) client->bufpos, (unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf, (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ @@ -3452,6 +3460,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) { *output_buffer_mem_usage = mem; mem += sdsZmallocSize(c->querybuf); mem += zmalloc_size(c); + mem += c->buf_usable_size; /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to * spot problematic clients. */ diff --git a/src/server.c b/src/server.c index 8362b6b92..b3a340aaf 100644 --- a/src/server.c +++ b/src/server.c @@ -706,6 +706,51 @@ int clientsCronResizeQueryBuffer(client *c) { return 0; } +/* The client output buffer can be adjusted to better fit the memory requirements. + * + * the logic is: + * in case the last observed peak size of the buffer equals the buffer size - we double the size + * in case the last observed peak size of the buffer is less than half the buffer size - we shrink by half. + * The buffer peak will be reset back to the buffer position every server.reply_buffer_peak_reset_time milliseconds + * The function always returns 0 as it never terminates the client. */ +int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) { + + size_t new_buffer_size = 0; + char *oldbuf = NULL; + const size_t buffer_target_shrink_size = c->buf_usable_size/2; + const size_t buffer_target_expand_size = c->buf_usable_size*2; + + if (buffer_target_shrink_size >= PROTO_REPLY_MIN_BYTES && + c->buf_peak < buffer_target_shrink_size ) + { + new_buffer_size = max(PROTO_REPLY_MIN_BYTES,c->buf_peak+1); + server.stat_reply_buffer_shrinks++; + } else if (buffer_target_expand_size < PROTO_REPLY_CHUNK_BYTES*2 && + c->buf_peak == c->buf_usable_size) + { + new_buffer_size = min(PROTO_REPLY_CHUNK_BYTES,buffer_target_expand_size); + server.stat_reply_buffer_expands++; + } + + /* reset the peak value each server.reply_buffer_peak_reset_time seconds. in case the client will be idle + * it will start to shrink. + */ + if (server.reply_buffer_peak_reset_time >=0 && + now_ms - c->buf_peak_last_reset_time >= server.reply_buffer_peak_reset_time) + { + c->buf_peak = c->bufpos; + c->buf_peak_last_reset_time = now_ms; + } + + if (new_buffer_size) { + oldbuf = c->buf; + c->buf = zmalloc_usable(new_buffer_size, &c->buf_usable_size); + memcpy(c->buf,oldbuf,c->bufpos); + zfree(oldbuf); + } + return 0; +} + /* This function is used in order to track clients using the biggest amount * of memory in the latest few seconds. This way we can provide such information * in the INFO output (clients section), without having to do an O(N) scan for @@ -899,6 +944,8 @@ void clientsCron(void) { * terminated. */ if (clientsCronHandleTimeout(c,now)) continue; if (clientsCronResizeQueryBuffer(c)) continue; + if (clientsCronResizeOutputBuffer(c,now)) continue; + if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue; /* Iterating all the clients in getMemoryOverheadData() is too slow and @@ -2290,6 +2337,8 @@ void resetServerStats(void) { server.stat_total_error_replies = 0; server.stat_dump_payload_sanitizations = 0; server.aof_delayed_fsync = 0; + server.stat_reply_buffer_shrinks = 0; + server.stat_reply_buffer_expands = 0; lazyfreeResetStats(); } @@ -2350,6 +2399,7 @@ void initServer(void) { server.blocking_op_nesting = 0; server.thp_enabled = 0; server.cluster_drop_packet_filter = -1; + server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME; resetReplicationBuffer(); if ((server.tls_port || server.tls_replication || server.tls_cluster) @@ -5448,7 +5498,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "total_reads_processed:%lld\r\n" "total_writes_processed:%lld\r\n" "io_threaded_reads_processed:%lld\r\n" - "io_threaded_writes_processed:%lld\r\n", + "io_threaded_writes_processed:%lld\r\n" + "reply_buffer_shrinks:%lld\r\n" + "reply_buffer_expands:%lld\r\n", server.stat_numconnections, server.stat_numcommands, getInstantaneousMetric(STATS_METRIC_COMMAND), @@ -5491,7 +5543,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { stat_total_reads_processed, stat_total_writes_processed, server.stat_io_reads_processed, - server.stat_io_writes_processed); + server.stat_io_writes_processed, + server.stat_reply_buffer_shrinks, + server.stat_reply_buffer_expands); } /* Replication */ diff --git a/src/server.h b/src/server.h index 75ccda1ba..3fe8686c5 100644 --- a/src/server.h +++ b/src/server.h @@ -163,11 +163,14 @@ typedef long long ustime_t; /* microsecond time type. */ #define PROTO_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */ #define PROTO_MBULK_BIG_ARG (1024*32) #define PROTO_RESIZE_THRESHOLD (1024*32) /* Threshold for determining whether to resize query buffer */ +#define PROTO_REPLY_MIN_BYTES (1024) /* the lower limit on reply buffer size */ #define LONG_STR_SIZE 21 /* Bytes needed for long -> str + '\0' */ #define REDIS_AUTOSYNC_BYTES (1024*1024*4) /* Sync file every 4MB. */ #define LIMIT_PENDING_QUERYBUF (4*1024*1024) /* 4mb */ +#define REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME 5000 /* 5 seconds */ + /* When configuring the server eventloop, we setup it so that the total number * of file descriptors we can handle are server.maxclients + RESERVED_FDS + * a few more to stay safe. Since RESERVED_FDS defaults to 32, we add 96 @@ -1176,14 +1179,11 @@ typedef struct client { * i.e. the next offset to send. */ /* Response buffer */ + size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */ + mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */ int bufpos; size_t buf_usable_size; /* Usable size of buffer. */ - /* Note that 'buf' must be the last field of client struct, because memory - * allocator may give us more memory than our apply for reducing fragments, - * but we want to make full use of given memory, i.e. we may access the - * memory after 'buf'. To avoid make others fields corrupt, 'buf' must be - * the last one. */ - char buf[PROTO_REPLY_CHUNK_BYTES]; + char *buf; } client; struct saveparam { @@ -1595,6 +1595,9 @@ struct redisServer { long long samples[STATS_METRIC_SAMPLES]; int idx; } inst_metric[STATS_METRIC_COUNT]; + long long stat_reply_buffer_shrinks; /* Total number of output buffer shrinks */ + long long stat_reply_buffer_expands; /* Total number of output buffer expands */ + /* Configuration */ int verbosity; /* Loglevel in redis.conf */ int maxidletime; /* Client timeout in seconds */ @@ -1904,6 +1907,7 @@ struct redisServer { int failover_state; /* Failover state */ int cluster_allow_pubsubshard_when_down; /* Is pubsubshard allowed when the cluster is down, doesn't affect pubsub global. */ + long reply_buffer_peak_reset_time; /* The amount of time (in milliseconds) to wait between reply buffer peak resets */ }; #define MAX_KEYS_BUFFER 256 diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index c92ce06f0..277fa3803 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -93,6 +93,7 @@ set ::all_tests { unit/cluster unit/client-eviction unit/violations + unit/replybufsize } # Index to the next test to run in the ::all_tests list. set ::next_test 0 diff --git a/tests/unit/client-eviction.tcl b/tests/unit/client-eviction.tcl index 0b1b8b281..c60d2c95b 100644 --- a/tests/unit/client-eviction.tcl +++ b/tests/unit/client-eviction.tcl @@ -45,6 +45,10 @@ proc mb {v} { return [expr $v * 1024 * 1024] } +proc kb {v} { + return [expr $v * 1024] +} + start_server {} { set maxmemory_clients 3000000 r config set maxmemory-clients $maxmemory_clients @@ -441,11 +445,12 @@ start_server {} { test "evict clients in right order (large to small)" { # Note that each size step needs to be at least x2 larger than previous step # because of how the client-eviction size bucktting works - set sizes [list 100000 [mb 1] [mb 3]] + set sizes [list [kb 128] [mb 1] [mb 3]] set clients_per_size 3 r client setname control r client no-evict on r config set maxmemory-clients 0 + r debug replybuffer-peak-reset-time never # Run over all sizes and create some clients using up that size set total_client_mem 0 @@ -470,7 +475,6 @@ start_server {} { # Account total client memory usage incr total_mem [expr $clients_per_size * $client_mem] } - incr total_mem [client_field control tot-mem] # Make sure all clients are connected set clients [split [string trim [r client list]] "\r\n"] @@ -481,8 +485,9 @@ start_server {} { # For each size reduce maxmemory-clients so relevant clients should be evicted # do this from largest to smallest foreach size [lreverse $sizes] { + set control_mem [client_field control tot-mem] set total_mem [expr $total_mem - $clients_per_size * $size] - r config set maxmemory-clients $total_mem + r config set maxmemory-clients [expr $total_mem + $control_mem] set clients [split [string trim [r client list]] "\r\n"] # Verify only relevant clients were evicted for {set i 0} {$i < [llength $sizes]} {incr i} { @@ -495,8 +500,12 @@ start_server {} { } } } + + # Restore the peak reset time to default + r debug replybuffer-peak-reset-time reset + foreach rr $rrs {$rr close} - } + } {} {needs:debug} } } diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 33a41ee50..59c96b5ad 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=2*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=2*} test {CLIENT LIST with IDs} { set myid [r client id] @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { r client info - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=2*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=2*} test {CLIENT KILL with illegal arguments} { assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill} diff --git a/tests/unit/replybufsize.tcl b/tests/unit/replybufsize.tcl new file mode 100644 index 000000000..9377a8fd3 --- /dev/null +++ b/tests/unit/replybufsize.tcl @@ -0,0 +1,47 @@ +proc get_reply_buffer_size {cname} { + + set clients [split [string trim [r client list]] "\r\n"] + set c [lsearch -inline $clients *name=$cname*] + if {![regexp rbs=(\[a-zA-Z0-9-\]+) $c - rbufsize]} { + error "field rbus not found in $c" + } + return $rbufsize +} + +start_server {tags {"replybufsize"}} { + + test {verify reply buffer limits} { + # In order to reduce test time we can set the peak reset time very low + r debug replybuffer-peak-reset-time 100 + + # Create a simple idle test client + variable tc [redis_client] + $tc client setname test_client + + # make sure the client is idle for 1 seconds to make it shrink the reply buffer + wait_for_condition 10 100 { + [get_reply_buffer_size test_client] >= 1024 && [get_reply_buffer_size test_client] < 2046 + } else { + set rbs [get_reply_buffer_size test_client] + fail "reply buffer of idle client is $rbs after 1 seconds" + } + + r set bigval [string repeat x 32768] + + # In order to reduce test time we can set the peak reset time very low + r debug replybuffer-peak-reset-time never + + wait_for_condition 10 100 { + [$tc get bigval ; get_reply_buffer_size test_client] >= 16384 && [get_reply_buffer_size test_client] < 32768 + } else { + set rbs [get_reply_buffer_size test_client] + fail "reply buffer of busy client is $rbs after 1 seconds" + } + + # Restore the peak reset time to default + r debug replybuffer-peak-reset-time reset + + $tc close + } {0} {needs:debug} +} + \ No newline at end of file