introduce dynamic client reply buffer size - save memory on idle clients (#9822)
Current implementation simple idle client which serves no traffic still use ~17Kb of memory. this is mainly due to a fixed size reply buffer currently set to 16kb. We have encountered some cases in which the server operates in a low memory environments. In such cases a user who wishes to create large connection pools to support potential burst period, will exhaust a large amount of memory to maintain connected Idle clients. Some users may choose to "sacrifice" performance in order to save memory. This commit introduce a dynamic mechanism to shrink and expend the client reply buffer based on periodic observed peak. the algorithm works as follows: 1. each time a client reply buffer has been fully written, the last recorded peak is updated: new peak = MAX( last peak, current written size) 2. during clients cron we check for each client if the last observed peak was: a. matching the current buffer size - in which case we expend (resize) the buffer size by 100% b. less than half the buffer size - in which case we shrink the buffer size by 50% 3. In any case we will **not** resize the buffer in case: a. the current buffer peak is less then the current buffer usable size and higher than 1/2 the current buffer usable size b. the value of (current buffer usable size/2) is less than 1Kib c. the value of (current buffer usable size*2) is larger than 16Kib 4. the peak value is reset to the current buffer position once every **5** seconds. we maintain a new field in the client structure (buf_peak_last_reset_time) which is used to keep track of how long it passed since the last buffer peak reset. ### **Interface changes:** **CIENT LIST** - now contains 2 new extra fields: rbs= < the current size in bytes of the client reply buffer > rbp=< the current value in bytes of the last observed buffer peak position > **INFO STATS** - now contains 2 new statistics: reply_buffer_shrinks = < total number of buffer shrinks performed > reply_buffer_expends = < total number of buffer expends performed > Co-authored-by: Oran Agra <oran@redislabs.com> Co-authored-by: Yoav Steinberg <yoav@redislabs.com>
This commit is contained in:
parent
71204f9632
commit
47c51d0c78
14
src/debug.c
14
src/debug.c
@ -482,6 +482,10 @@ void debugCommand(client *c) {
|
||||
" Show low level client eviction pools info (maxmemory-clients).",
|
||||
"PAUSE-CRON <0|1>",
|
||||
" Stop periodic cron job processing.",
|
||||
"REPLYBUFFER-PEAK-RESET-TIME <NEVER||RESET|time>",
|
||||
" Sets the time (in milliseconds) to wait between client reply buffer peak resets.",
|
||||
" In case NEVER is provided the last observed peak will never be reset",
|
||||
" In case RESET is provided the peak reset time will be restored to the default value",
|
||||
NULL
|
||||
};
|
||||
addReplyHelp(c, help);
|
||||
@ -958,6 +962,16 @@ NULL
|
||||
{
|
||||
server.pause_cron = atoi(c->argv[2]->ptr);
|
||||
addReply(c,shared.ok);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"replybuffer-peak-reset-time") && c->argc == 3 ) {
|
||||
if (!strcasecmp(c->argv[2]->ptr, "never")) {
|
||||
server.reply_buffer_peak_reset_time = -1;
|
||||
} else if(!strcasecmp(c->argv[2]->ptr, "reset")) {
|
||||
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
|
||||
} else {
|
||||
if (getLongFromObjectOrReply(c, c->argv[2], &server.reply_buffer_peak_reset_time, NULL) != C_OK)
|
||||
return;
|
||||
}
|
||||
addReply(c, shared.ok);
|
||||
} else {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
return;
|
||||
|
@ -131,7 +131,7 @@ client *createClient(connection *conn) {
|
||||
connSetReadHandler(conn, readQueryFromClient);
|
||||
connSetPrivateData(conn, c);
|
||||
}
|
||||
|
||||
c->buf = zmalloc(PROTO_REPLY_CHUNK_BYTES);
|
||||
selectDb(c,0);
|
||||
uint64_t client_id;
|
||||
atomicGetIncr(server.next_client_id, client_id, 1);
|
||||
@ -140,7 +140,9 @@ client *createClient(connection *conn) {
|
||||
c->conn = conn;
|
||||
c->name = NULL;
|
||||
c->bufpos = 0;
|
||||
c->buf_usable_size = zmalloc_usable_size(c)-offsetof(client,buf);
|
||||
c->buf_usable_size = zmalloc_usable_size(c->buf);
|
||||
c->buf_peak = c->buf_usable_size;
|
||||
c->buf_peak_last_reset_time = server.unixtime;
|
||||
c->ref_repl_buf_node = NULL;
|
||||
c->ref_block_pos = 0;
|
||||
c->qb_pos = 0;
|
||||
@ -314,6 +316,9 @@ size_t _addReplyToBuffer(client *c, const char *s, size_t len) {
|
||||
size_t reply_len = len > available ? available : len;
|
||||
memcpy(c->buf+c->bufpos,s,reply_len);
|
||||
c->bufpos+=reply_len;
|
||||
/* We update the buffer peak after appending the reply to the buffer */
|
||||
if(c->buf_peak < (size_t)c->bufpos)
|
||||
c->buf_peak = (size_t)c->bufpos;
|
||||
return reply_len;
|
||||
}
|
||||
|
||||
@ -1525,6 +1530,7 @@ void freeClient(client *c) {
|
||||
|
||||
/* Free data structures. */
|
||||
listRelease(c->reply);
|
||||
zfree(c->buf);
|
||||
freeReplicaReferencedReplBuffer(c);
|
||||
freeClientArgv(c);
|
||||
freeClientOriginalArgv(c);
|
||||
@ -2593,7 +2599,7 @@ sds catClientInfoString(sds s, client *client) {
|
||||
}
|
||||
|
||||
sds ret = sdscatfmt(s,
|
||||
"id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
|
||||
"id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U rbs=%U rbp=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i",
|
||||
(unsigned long long) client->id,
|
||||
getClientPeerId(client),
|
||||
getClientSockname(client),
|
||||
@ -2610,6 +2616,8 @@ sds catClientInfoString(sds s, client *client) {
|
||||
(unsigned long long) sdsavail(client->querybuf),
|
||||
(unsigned long long) client->argv_len_sum,
|
||||
(unsigned long long) client->mstate.argv_len_sums,
|
||||
(unsigned long long) client->buf_usable_size,
|
||||
(unsigned long long) client->buf_peak,
|
||||
(unsigned long long) client->bufpos,
|
||||
(unsigned long long) listLength(client->reply) + used_blocks_of_repl_buf,
|
||||
(unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */
|
||||
@ -3452,6 +3460,7 @@ size_t getClientMemoryUsage(client *c, size_t *output_buffer_mem_usage) {
|
||||
*output_buffer_mem_usage = mem;
|
||||
mem += sdsZmallocSize(c->querybuf);
|
||||
mem += zmalloc_size(c);
|
||||
mem += c->buf_usable_size;
|
||||
/* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory
|
||||
* i.e. unused sds space and internal fragmentation, just the string length. but this is enough to
|
||||
* spot problematic clients. */
|
||||
|
58
src/server.c
58
src/server.c
@ -706,6 +706,51 @@ int clientsCronResizeQueryBuffer(client *c) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* The client output buffer can be adjusted to better fit the memory requirements.
|
||||
*
|
||||
* the logic is:
|
||||
* in case the last observed peak size of the buffer equals the buffer size - we double the size
|
||||
* in case the last observed peak size of the buffer is less than half the buffer size - we shrink by half.
|
||||
* The buffer peak will be reset back to the buffer position every server.reply_buffer_peak_reset_time milliseconds
|
||||
* The function always returns 0 as it never terminates the client. */
|
||||
int clientsCronResizeOutputBuffer(client *c, mstime_t now_ms) {
|
||||
|
||||
size_t new_buffer_size = 0;
|
||||
char *oldbuf = NULL;
|
||||
const size_t buffer_target_shrink_size = c->buf_usable_size/2;
|
||||
const size_t buffer_target_expand_size = c->buf_usable_size*2;
|
||||
|
||||
if (buffer_target_shrink_size >= PROTO_REPLY_MIN_BYTES &&
|
||||
c->buf_peak < buffer_target_shrink_size )
|
||||
{
|
||||
new_buffer_size = max(PROTO_REPLY_MIN_BYTES,c->buf_peak+1);
|
||||
server.stat_reply_buffer_shrinks++;
|
||||
} else if (buffer_target_expand_size < PROTO_REPLY_CHUNK_BYTES*2 &&
|
||||
c->buf_peak == c->buf_usable_size)
|
||||
{
|
||||
new_buffer_size = min(PROTO_REPLY_CHUNK_BYTES,buffer_target_expand_size);
|
||||
server.stat_reply_buffer_expands++;
|
||||
}
|
||||
|
||||
/* reset the peak value each server.reply_buffer_peak_reset_time seconds. in case the client will be idle
|
||||
* it will start to shrink.
|
||||
*/
|
||||
if (server.reply_buffer_peak_reset_time >=0 &&
|
||||
now_ms - c->buf_peak_last_reset_time >= server.reply_buffer_peak_reset_time)
|
||||
{
|
||||
c->buf_peak = c->bufpos;
|
||||
c->buf_peak_last_reset_time = now_ms;
|
||||
}
|
||||
|
||||
if (new_buffer_size) {
|
||||
oldbuf = c->buf;
|
||||
c->buf = zmalloc_usable(new_buffer_size, &c->buf_usable_size);
|
||||
memcpy(c->buf,oldbuf,c->bufpos);
|
||||
zfree(oldbuf);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* This function is used in order to track clients using the biggest amount
|
||||
* of memory in the latest few seconds. This way we can provide such information
|
||||
* in the INFO output (clients section), without having to do an O(N) scan for
|
||||
@ -899,6 +944,8 @@ void clientsCron(void) {
|
||||
* terminated. */
|
||||
if (clientsCronHandleTimeout(c,now)) continue;
|
||||
if (clientsCronResizeQueryBuffer(c)) continue;
|
||||
if (clientsCronResizeOutputBuffer(c,now)) continue;
|
||||
|
||||
if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue;
|
||||
|
||||
/* Iterating all the clients in getMemoryOverheadData() is too slow and
|
||||
@ -2290,6 +2337,8 @@ void resetServerStats(void) {
|
||||
server.stat_total_error_replies = 0;
|
||||
server.stat_dump_payload_sanitizations = 0;
|
||||
server.aof_delayed_fsync = 0;
|
||||
server.stat_reply_buffer_shrinks = 0;
|
||||
server.stat_reply_buffer_expands = 0;
|
||||
lazyfreeResetStats();
|
||||
}
|
||||
|
||||
@ -2350,6 +2399,7 @@ void initServer(void) {
|
||||
server.blocking_op_nesting = 0;
|
||||
server.thp_enabled = 0;
|
||||
server.cluster_drop_packet_filter = -1;
|
||||
server.reply_buffer_peak_reset_time = REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME;
|
||||
resetReplicationBuffer();
|
||||
|
||||
if ((server.tls_port || server.tls_replication || server.tls_cluster)
|
||||
@ -5448,7 +5498,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
|
||||
"total_reads_processed:%lld\r\n"
|
||||
"total_writes_processed:%lld\r\n"
|
||||
"io_threaded_reads_processed:%lld\r\n"
|
||||
"io_threaded_writes_processed:%lld\r\n",
|
||||
"io_threaded_writes_processed:%lld\r\n"
|
||||
"reply_buffer_shrinks:%lld\r\n"
|
||||
"reply_buffer_expands:%lld\r\n",
|
||||
server.stat_numconnections,
|
||||
server.stat_numcommands,
|
||||
getInstantaneousMetric(STATS_METRIC_COMMAND),
|
||||
@ -5491,7 +5543,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
|
||||
stat_total_reads_processed,
|
||||
stat_total_writes_processed,
|
||||
server.stat_io_reads_processed,
|
||||
server.stat_io_writes_processed);
|
||||
server.stat_io_writes_processed,
|
||||
server.stat_reply_buffer_shrinks,
|
||||
server.stat_reply_buffer_expands);
|
||||
}
|
||||
|
||||
/* Replication */
|
||||
|
16
src/server.h
16
src/server.h
@ -163,11 +163,14 @@ typedef long long ustime_t; /* microsecond time type. */
|
||||
#define PROTO_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */
|
||||
#define PROTO_MBULK_BIG_ARG (1024*32)
|
||||
#define PROTO_RESIZE_THRESHOLD (1024*32) /* Threshold for determining whether to resize query buffer */
|
||||
#define PROTO_REPLY_MIN_BYTES (1024) /* the lower limit on reply buffer size */
|
||||
#define LONG_STR_SIZE 21 /* Bytes needed for long -> str + '\0' */
|
||||
#define REDIS_AUTOSYNC_BYTES (1024*1024*4) /* Sync file every 4MB. */
|
||||
|
||||
#define LIMIT_PENDING_QUERYBUF (4*1024*1024) /* 4mb */
|
||||
|
||||
#define REPLY_BUFFER_DEFAULT_PEAK_RESET_TIME 5000 /* 5 seconds */
|
||||
|
||||
/* When configuring the server eventloop, we setup it so that the total number
|
||||
* of file descriptors we can handle are server.maxclients + RESERVED_FDS +
|
||||
* a few more to stay safe. Since RESERVED_FDS defaults to 32, we add 96
|
||||
@ -1176,14 +1179,11 @@ typedef struct client {
|
||||
* i.e. the next offset to send. */
|
||||
|
||||
/* Response buffer */
|
||||
size_t buf_peak; /* Peak used size of buffer in last 5 sec interval. */
|
||||
mstime_t buf_peak_last_reset_time; /* keeps the last time the buffer peak value was reset */
|
||||
int bufpos;
|
||||
size_t buf_usable_size; /* Usable size of buffer. */
|
||||
/* Note that 'buf' must be the last field of client struct, because memory
|
||||
* allocator may give us more memory than our apply for reducing fragments,
|
||||
* but we want to make full use of given memory, i.e. we may access the
|
||||
* memory after 'buf'. To avoid make others fields corrupt, 'buf' must be
|
||||
* the last one. */
|
||||
char buf[PROTO_REPLY_CHUNK_BYTES];
|
||||
char *buf;
|
||||
} client;
|
||||
|
||||
struct saveparam {
|
||||
@ -1595,6 +1595,9 @@ struct redisServer {
|
||||
long long samples[STATS_METRIC_SAMPLES];
|
||||
int idx;
|
||||
} inst_metric[STATS_METRIC_COUNT];
|
||||
long long stat_reply_buffer_shrinks; /* Total number of output buffer shrinks */
|
||||
long long stat_reply_buffer_expands; /* Total number of output buffer expands */
|
||||
|
||||
/* Configuration */
|
||||
int verbosity; /* Loglevel in redis.conf */
|
||||
int maxidletime; /* Client timeout in seconds */
|
||||
@ -1904,6 +1907,7 @@ struct redisServer {
|
||||
int failover_state; /* Failover state */
|
||||
int cluster_allow_pubsubshard_when_down; /* Is pubsubshard allowed when the cluster
|
||||
is down, doesn't affect pubsub global. */
|
||||
long reply_buffer_peak_reset_time; /* The amount of time (in milliseconds) to wait between reply buffer peak resets */
|
||||
};
|
||||
|
||||
#define MAX_KEYS_BUFFER 256
|
||||
|
@ -93,6 +93,7 @@ set ::all_tests {
|
||||
unit/cluster
|
||||
unit/client-eviction
|
||||
unit/violations
|
||||
unit/replybufsize
|
||||
}
|
||||
# Index to the next test to run in the ::all_tests list.
|
||||
set ::next_test 0
|
||||
|
@ -45,6 +45,10 @@ proc mb {v} {
|
||||
return [expr $v * 1024 * 1024]
|
||||
}
|
||||
|
||||
proc kb {v} {
|
||||
return [expr $v * 1024]
|
||||
}
|
||||
|
||||
start_server {} {
|
||||
set maxmemory_clients 3000000
|
||||
r config set maxmemory-clients $maxmemory_clients
|
||||
@ -441,11 +445,12 @@ start_server {} {
|
||||
test "evict clients in right order (large to small)" {
|
||||
# Note that each size step needs to be at least x2 larger than previous step
|
||||
# because of how the client-eviction size bucktting works
|
||||
set sizes [list 100000 [mb 1] [mb 3]]
|
||||
set sizes [list [kb 128] [mb 1] [mb 3]]
|
||||
set clients_per_size 3
|
||||
r client setname control
|
||||
r client no-evict on
|
||||
r config set maxmemory-clients 0
|
||||
r debug replybuffer-peak-reset-time never
|
||||
|
||||
# Run over all sizes and create some clients using up that size
|
||||
set total_client_mem 0
|
||||
@ -470,7 +475,6 @@ start_server {} {
|
||||
# Account total client memory usage
|
||||
incr total_mem [expr $clients_per_size * $client_mem]
|
||||
}
|
||||
incr total_mem [client_field control tot-mem]
|
||||
|
||||
# Make sure all clients are connected
|
||||
set clients [split [string trim [r client list]] "\r\n"]
|
||||
@ -481,8 +485,9 @@ start_server {} {
|
||||
# For each size reduce maxmemory-clients so relevant clients should be evicted
|
||||
# do this from largest to smallest
|
||||
foreach size [lreverse $sizes] {
|
||||
set control_mem [client_field control tot-mem]
|
||||
set total_mem [expr $total_mem - $clients_per_size * $size]
|
||||
r config set maxmemory-clients $total_mem
|
||||
r config set maxmemory-clients [expr $total_mem + $control_mem]
|
||||
set clients [split [string trim [r client list]] "\r\n"]
|
||||
# Verify only relevant clients were evicted
|
||||
for {set i 0} {$i < [llength $sizes]} {incr i} {
|
||||
@ -495,8 +500,12 @@ start_server {} {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
# Restore the peak reset time to default
|
||||
r debug replybuffer-peak-reset-time reset
|
||||
|
||||
foreach rr $rrs {$rr close}
|
||||
}
|
||||
} {} {needs:debug}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ start_server {tags {"introspection"}} {
|
||||
|
||||
test {CLIENT LIST} {
|
||||
r client list
|
||||
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=2*}
|
||||
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=2*}
|
||||
|
||||
test {CLIENT LIST with IDs} {
|
||||
set myid [r client id]
|
||||
@ -17,7 +17,7 @@ start_server {tags {"introspection"}} {
|
||||
|
||||
test {CLIENT INFO} {
|
||||
r client info
|
||||
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=2*}
|
||||
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=2*}
|
||||
|
||||
test {CLIENT KILL with illegal arguments} {
|
||||
assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill}
|
||||
|
47
tests/unit/replybufsize.tcl
Normal file
47
tests/unit/replybufsize.tcl
Normal file
@ -0,0 +1,47 @@
|
||||
proc get_reply_buffer_size {cname} {
|
||||
|
||||
set clients [split [string trim [r client list]] "\r\n"]
|
||||
set c [lsearch -inline $clients *name=$cname*]
|
||||
if {![regexp rbs=(\[a-zA-Z0-9-\]+) $c - rbufsize]} {
|
||||
error "field rbus not found in $c"
|
||||
}
|
||||
return $rbufsize
|
||||
}
|
||||
|
||||
start_server {tags {"replybufsize"}} {
|
||||
|
||||
test {verify reply buffer limits} {
|
||||
# In order to reduce test time we can set the peak reset time very low
|
||||
r debug replybuffer-peak-reset-time 100
|
||||
|
||||
# Create a simple idle test client
|
||||
variable tc [redis_client]
|
||||
$tc client setname test_client
|
||||
|
||||
# make sure the client is idle for 1 seconds to make it shrink the reply buffer
|
||||
wait_for_condition 10 100 {
|
||||
[get_reply_buffer_size test_client] >= 1024 && [get_reply_buffer_size test_client] < 2046
|
||||
} else {
|
||||
set rbs [get_reply_buffer_size test_client]
|
||||
fail "reply buffer of idle client is $rbs after 1 seconds"
|
||||
}
|
||||
|
||||
r set bigval [string repeat x 32768]
|
||||
|
||||
# In order to reduce test time we can set the peak reset time very low
|
||||
r debug replybuffer-peak-reset-time never
|
||||
|
||||
wait_for_condition 10 100 {
|
||||
[$tc get bigval ; get_reply_buffer_size test_client] >= 16384 && [get_reply_buffer_size test_client] < 32768
|
||||
} else {
|
||||
set rbs [get_reply_buffer_size test_client]
|
||||
fail "reply buffer of busy client is $rbs after 1 seconds"
|
||||
}
|
||||
|
||||
# Restore the peak reset time to default
|
||||
r debug replybuffer-peak-reset-time reset
|
||||
|
||||
$tc close
|
||||
} {0} {needs:debug}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user