Fix the wrong reisze of querybuf (#9003)
The initialize memory of `querybuf` is `PROTO_IOBUF_LEN(1024*16) * 2` (due to sdsMakeRoomFor being greedy), under `jemalloc`, the allocated memory will be 40k. This will most likely result in the `querybuf` being resized when call `clientsCronResizeQueryBuffer` unless the client requests it fast enough. Note that this bug existed even before #7875, since the condition for resizing includes the sds headers (32k+6). ## Changes 1. Use non-greedy sdsMakeRoomFor when allocating the initial query buffer (of 16k). 1. Also use non-greedy allocation when working with BIG_ARG (we won't use that extra space anyway) 2. in case we did use a greedy allocation, read as much as we can into the buffer we got (including internal frag), to reduce system calls. 3. introduce a dedicated constant for the shrinking (same value as before) 3. Add test for querybuf. 4. improve a maxmemory test by ignoring the effect of replica query buffers (can accumulate many ACKs on slow env) 5. improve a maxmemory by disabling slowlog (it will cause slight memory growth on slow env).
This commit is contained in:
parent
eb15c456c9
commit
e5d8a5eb85
@ -1923,7 +1923,7 @@ int processMultibulkBuffer(client *c) {
|
||||
c->qb_pos = 0;
|
||||
/* Hint the sds library about the amount of bytes this string is
|
||||
* going to contain. */
|
||||
c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf));
|
||||
c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf,ll+2-sdslen(c->querybuf));
|
||||
}
|
||||
}
|
||||
c->bulklen = ll;
|
||||
@ -2132,8 +2132,8 @@ void processInputBuffer(client *c) {
|
||||
|
||||
void readQueryFromClient(connection *conn) {
|
||||
client *c = connGetPrivateData(conn);
|
||||
int nread, readlen;
|
||||
size_t qblen;
|
||||
int nread, big_arg = 0;
|
||||
size_t qblen, readlen;
|
||||
|
||||
/* Check if we want to read from the client later when exiting from
|
||||
* the event loop. This is the case if threaded I/O is enabled. */
|
||||
@ -2153,15 +2153,28 @@ void readQueryFromClient(connection *conn) {
|
||||
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
|
||||
{
|
||||
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
|
||||
big_arg = 1;
|
||||
|
||||
/* Note that the 'remaining' variable may be zero in some edge case,
|
||||
* for example once we resume a blocked client after CLIENT PAUSE. */
|
||||
if (remaining > 0 && remaining < readlen) readlen = remaining;
|
||||
if (remaining > 0 && (size_t)remaining < readlen) readlen = remaining;
|
||||
}
|
||||
|
||||
qblen = sdslen(c->querybuf);
|
||||
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
|
||||
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
|
||||
if (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN) {
|
||||
/* When reading a BIG_ARG we won't be reading more than that one arg
|
||||
* into the query buffer, so we don't need to pre-allocate more than we
|
||||
* need, so using the non-greedy growing. For an initial allocation of
|
||||
* the query buffer, we also don't wanna use the greedy growth, in order
|
||||
* to avoid collision with the RESIZE_THRESHOLD mechanism. */
|
||||
c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen);
|
||||
} else {
|
||||
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
|
||||
|
||||
/* Read as much as possible from the socket to save read(2) system calls. */
|
||||
readlen = sdsavail(c->querybuf);
|
||||
}
|
||||
nread = connRead(c->conn, c->querybuf+qblen, readlen);
|
||||
if (nread == -1) {
|
||||
if (connGetState(conn) == CONN_STATE_CONNECTED) {
|
||||
|
29
src/sds.c
29
src/sds.c
@ -227,10 +227,16 @@ void sdsclear(sds s) {
|
||||
/* Enlarge the free space at the end of the sds string so that the caller
|
||||
* is sure that after calling this function can overwrite up to addlen
|
||||
* bytes after the end of the string, plus one more byte for nul term.
|
||||
* If there's already sufficient free space, this function returns without any
|
||||
* action, if there isn't sufficient free space, it'll allocate what's missing,
|
||||
* and possibly more:
|
||||
* When greedy is 1, enlarge more than needed, to avoid need for future reallocs
|
||||
* on incremental growth.
|
||||
* When greedy is 0, enlarge just enough so that there's free space for 'addlen'.
|
||||
*
|
||||
* Note: this does not change the *length* of the sds string as returned
|
||||
* by sdslen(), but only the free buffer space we have. */
|
||||
sds sdsMakeRoomFor(sds s, size_t addlen) {
|
||||
sds _sdsMakeRoomFor(sds s, size_t addlen, int greedy) {
|
||||
void *sh, *newsh;
|
||||
size_t avail = sdsavail(s);
|
||||
size_t len, newlen;
|
||||
@ -245,10 +251,12 @@ sds sdsMakeRoomFor(sds s, size_t addlen) {
|
||||
sh = (char*)s-sdsHdrSize(oldtype);
|
||||
newlen = (len+addlen);
|
||||
assert(newlen > len); /* Catch size_t overflow */
|
||||
if (newlen < SDS_MAX_PREALLOC)
|
||||
newlen *= 2;
|
||||
else
|
||||
newlen += SDS_MAX_PREALLOC;
|
||||
if (greedy == 1) {
|
||||
if (newlen < SDS_MAX_PREALLOC)
|
||||
newlen *= 2;
|
||||
else
|
||||
newlen += SDS_MAX_PREALLOC;
|
||||
}
|
||||
|
||||
type = sdsReqType(newlen);
|
||||
|
||||
@ -281,6 +289,17 @@ sds sdsMakeRoomFor(sds s, size_t addlen) {
|
||||
return s;
|
||||
}
|
||||
|
||||
/* Enlarge the free space at the end of the sds string more than needed,
|
||||
* This is useful to avoid repeated re-allocations when repeatedly appending to the sds. */
|
||||
sds sdsMakeRoomFor(sds s, size_t addlen) {
|
||||
return _sdsMakeRoomFor(s, addlen, 1);
|
||||
}
|
||||
|
||||
/* Unlike sdsMakeRoomFor(), this one just grows to the necessary size. */
|
||||
sds sdsMakeRoomForNonGreedy(sds s, size_t addlen) {
|
||||
return _sdsMakeRoomFor(s, addlen, 0);
|
||||
}
|
||||
|
||||
/* Reallocate the sds string so that it has no free space at the end. The
|
||||
* contained string remains not altered, but next concatenation operations
|
||||
* will require a reallocation.
|
||||
|
@ -263,6 +263,7 @@ sds sdstemplate(const char *template, sdstemplate_callback_t cb_func, void *cb_a
|
||||
|
||||
/* Low level functions exposed to the user API */
|
||||
sds sdsMakeRoomFor(sds s, size_t addlen);
|
||||
sds sdsMakeRoomForNonGreedy(sds s, size_t addlen);
|
||||
void sdsIncrLen(sds s, ssize_t incr);
|
||||
sds sdsRemoveFreeSpace(sds s);
|
||||
size_t sdsAllocSize(sds s);
|
||||
|
@ -1693,9 +1693,9 @@ int clientsCronResizeQueryBuffer(client *c) {
|
||||
time_t idletime = server.unixtime - c->lastinteraction;
|
||||
|
||||
/* There are two conditions to resize the query buffer:
|
||||
* 1) Query buffer is > BIG_ARG and too big for latest peak.
|
||||
* 2) Query buffer is > BIG_ARG and client is idle. */
|
||||
if (querybuf_size > PROTO_MBULK_BIG_ARG &&
|
||||
* 1) Query buffer is > PROTO_RESIZE_THRESHOLD and too big for latest peak.
|
||||
* 2) Query buffer is > PROTO_RESIZE_THRESHOLD and client is idle. */
|
||||
if (querybuf_size > PROTO_RESIZE_THRESHOLD &&
|
||||
((querybuf_size/(c->querybuf_peak+1)) > 2 ||
|
||||
idletime > 2))
|
||||
{
|
||||
|
@ -147,6 +147,7 @@ typedef long long ustime_t; /* microsecond time type. */
|
||||
#define PROTO_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */
|
||||
#define PROTO_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */
|
||||
#define PROTO_MBULK_BIG_ARG (1024*32)
|
||||
#define PROTO_RESIZE_THRESHOLD (1024*32) /* Threshold for determining whether to resize query buffer */
|
||||
#define LONG_STR_SIZE 21 /* Bytes needed for long -> str + '\0' */
|
||||
#define REDIS_AUTOSYNC_BYTES (1024*1024*32) /* fdatasync every 32MB */
|
||||
|
||||
|
@ -70,6 +70,7 @@ set ::all_tests {
|
||||
unit/hyperloglog
|
||||
unit/lazyfree
|
||||
unit/wait
|
||||
unit/querybuf
|
||||
unit/pendingquerybuf
|
||||
unit/tls
|
||||
unit/tracking
|
||||
|
@ -143,6 +143,18 @@ start_server {tags {"maxmemory external:skip"}} {
|
||||
}
|
||||
}
|
||||
|
||||
# Calculate query buffer memory of slave
|
||||
proc slave_query_buffer {srv} {
|
||||
set clients [split [$srv client list] "\r\n"]
|
||||
set c [lsearch -inline $clients *flags=S*]
|
||||
if {[string length $c] > 0} {
|
||||
assert {[regexp {qbuf=([0-9]+)} $c - qbuf]}
|
||||
assert {[regexp {qbuf-free=([0-9]+)} $c - qbuf_free]}
|
||||
return [expr $qbuf + $qbuf_free]
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} {
|
||||
start_server {tags {"maxmemory external:skip"}} {
|
||||
start_server {} {
|
||||
@ -155,6 +167,9 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
|
||||
set master_host [srv -1 host]
|
||||
set master_port [srv -1 port]
|
||||
|
||||
# Disable slow log for master to avoid memory growth in slow env.
|
||||
$master config set slowlog-log-slower-than -1
|
||||
|
||||
# add 100 keys of 100k (10MB total)
|
||||
for {set j 0} {$j < 100} {incr j} {
|
||||
$master setrange "key:$j" 100000 asdf
|
||||
@ -207,7 +222,13 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
|
||||
set slave_buf [s -1 mem_clients_slaves]
|
||||
set client_buf [s -1 mem_clients_normal]
|
||||
set mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
|
||||
set used_no_repl [expr {$new_used - $mem_not_counted_for_evict}]
|
||||
set used_no_repl [expr {$new_used - $mem_not_counted_for_evict - [slave_query_buffer $master]}]
|
||||
# we need to exclude replies buffer and query buffer of replica from used memory.
|
||||
# removing the replica (output) buffers is done so that we are able to measure any other
|
||||
# changes to the used memory and see that they're insignificant (the test's purpose is to check that
|
||||
# the replica buffers are counted correctly, so the used memory growth after deducting them
|
||||
# should be nearly 0).
|
||||
# we remove the query buffers because on slow test platforms, they can accumulate many ACKs.
|
||||
set delta [expr {($used_no_repl - $client_buf) - ($orig_used_no_repl - $orig_client_buf)}]
|
||||
|
||||
assert {[$master dbsize] == 100}
|
||||
@ -219,7 +240,8 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline}
|
||||
set killed_used [s -1 used_memory]
|
||||
set killed_slave_buf [s -1 mem_clients_slaves]
|
||||
set killed_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict]
|
||||
set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}]
|
||||
# we need to exclude replies buffer and query buffer of slave from used memory after kill slave
|
||||
set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict - [slave_query_buffer $master]}]
|
||||
set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}]
|
||||
assert {$killed_slave_buf == 0}
|
||||
assert {$delta_no_repl > -$delta_max && $delta_no_repl < $delta_max}
|
||||
|
48
tests/unit/querybuf.tcl
Normal file
48
tests/unit/querybuf.tcl
Normal file
@ -0,0 +1,48 @@
|
||||
proc client_idle_sec {name} {
|
||||
set clients [split [r client list] "\r\n"]
|
||||
set c [lsearch -inline $clients *name=$name*]
|
||||
assert {[regexp {idle=([0-9]+)} $c - idle]}
|
||||
return $idle
|
||||
}
|
||||
|
||||
# Calculate query buffer memory of slave
|
||||
proc client_query_buffer {name} {
|
||||
set clients [split [r client list] "\r\n"]
|
||||
set c [lsearch -inline $clients *name=$name*]
|
||||
if {[string length $c] > 0} {
|
||||
assert {[regexp {qbuf=([0-9]+)} $c - qbuf]}
|
||||
assert {[regexp {qbuf-free=([0-9]+)} $c - qbuf_free]}
|
||||
return [expr $qbuf + $qbuf_free]
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
start_server {tags {"querybuf slow"}} {
|
||||
# The test will run at least 2s to check if client query
|
||||
# buffer will be resized when client idle 2s.
|
||||
test "query buffer resized correctly" {
|
||||
# Memory will increase by more than 32k due to client query buffer.
|
||||
set rd [redis_deferring_client]
|
||||
$rd client setname test_client
|
||||
set orig_test_client_qbuf [client_query_buffer test_client]
|
||||
assert {$orig_test_client_qbuf > 16384 && $orig_test_client_qbuf < 32768}
|
||||
|
||||
# Check that the initial query buffer is not resized if it is idle for more than 2s
|
||||
wait_for_condition 1000 10 {
|
||||
[client_idle_sec test_client] > 3 && [client_query_buffer test_client] == $orig_test_client_qbuf
|
||||
} else {
|
||||
fail "query buffer was resized"
|
||||
}
|
||||
|
||||
# Fill query buffer to more than 32k
|
||||
$rd set bigstring v ;# create bigstring in advance to avoid adding extra memory
|
||||
$rd set bigstring [string repeat A 32768] nx
|
||||
|
||||
# Wait for query buffer to be resized to 0.
|
||||
wait_for_condition 1000 10 {
|
||||
[client_query_buffer test_client] == 0
|
||||
} else {
|
||||
fail "querybuf expected to be resized"
|
||||
}
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user