diff --git a/src/networking.c b/src/networking.c index d1714bb12..d839658e9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1923,7 +1923,7 @@ int processMultibulkBuffer(client *c) { c->qb_pos = 0; /* Hint the sds library about the amount of bytes this string is * going to contain. */ - c->querybuf = sdsMakeRoomFor(c->querybuf,ll+2-sdslen(c->querybuf)); + c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf,ll+2-sdslen(c->querybuf)); } } c->bulklen = ll; @@ -2132,8 +2132,8 @@ void processInputBuffer(client *c) { void readQueryFromClient(connection *conn) { client *c = connGetPrivateData(conn); - int nread, readlen; - size_t qblen; + int nread, big_arg = 0; + size_t qblen, readlen; /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ @@ -2153,15 +2153,28 @@ void readQueryFromClient(connection *conn) { && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf); + big_arg = 1; /* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ - if (remaining > 0 && remaining < readlen) readlen = remaining; + if (remaining > 0 && (size_t)remaining < readlen) readlen = remaining; } qblen = sdslen(c->querybuf); if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; - c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); + if (big_arg || sdsalloc(c->querybuf) < PROTO_IOBUF_LEN) { + /* When reading a BIG_ARG we won't be reading more than that one arg + * into the query buffer, so we don't need to pre-allocate more than we + * need, so using the non-greedy growing. For an initial allocation of + * the query buffer, we also don't wanna use the greedy growth, in order + * to avoid collision with the RESIZE_THRESHOLD mechanism. */ + c->querybuf = sdsMakeRoomForNonGreedy(c->querybuf, readlen); + } else { + c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); + + /* Read as much as possible from the socket to save read(2) system calls. */ + readlen = sdsavail(c->querybuf); + } nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { diff --git a/src/sds.c b/src/sds.c index 17cd6ae69..31ab2ed3a 100644 --- a/src/sds.c +++ b/src/sds.c @@ -227,10 +227,16 @@ void sdsclear(sds s) { /* Enlarge the free space at the end of the sds string so that the caller * is sure that after calling this function can overwrite up to addlen * bytes after the end of the string, plus one more byte for nul term. + * If there's already sufficient free space, this function returns without any + * action, if there isn't sufficient free space, it'll allocate what's missing, + * and possibly more: + * When greedy is 1, enlarge more than needed, to avoid need for future reallocs + * on incremental growth. + * When greedy is 0, enlarge just enough so that there's free space for 'addlen'. * * Note: this does not change the *length* of the sds string as returned * by sdslen(), but only the free buffer space we have. */ -sds sdsMakeRoomFor(sds s, size_t addlen) { +sds _sdsMakeRoomFor(sds s, size_t addlen, int greedy) { void *sh, *newsh; size_t avail = sdsavail(s); size_t len, newlen; @@ -245,10 +251,12 @@ sds sdsMakeRoomFor(sds s, size_t addlen) { sh = (char*)s-sdsHdrSize(oldtype); newlen = (len+addlen); assert(newlen > len); /* Catch size_t overflow */ - if (newlen < SDS_MAX_PREALLOC) - newlen *= 2; - else - newlen += SDS_MAX_PREALLOC; + if (greedy == 1) { + if (newlen < SDS_MAX_PREALLOC) + newlen *= 2; + else + newlen += SDS_MAX_PREALLOC; + } type = sdsReqType(newlen); @@ -281,6 +289,17 @@ sds sdsMakeRoomFor(sds s, size_t addlen) { return s; } +/* Enlarge the free space at the end of the sds string more than needed, + * This is useful to avoid repeated re-allocations when repeatedly appending to the sds. */ +sds sdsMakeRoomFor(sds s, size_t addlen) { + return _sdsMakeRoomFor(s, addlen, 1); +} + +/* Unlike sdsMakeRoomFor(), this one just grows to the necessary size. */ +sds sdsMakeRoomForNonGreedy(sds s, size_t addlen) { + return _sdsMakeRoomFor(s, addlen, 0); +} + /* Reallocate the sds string so that it has no free space at the end. The * contained string remains not altered, but next concatenation operations * will require a reallocation. diff --git a/src/sds.h b/src/sds.h index 7f8710745..8392c2064 100644 --- a/src/sds.h +++ b/src/sds.h @@ -263,6 +263,7 @@ sds sdstemplate(const char *template, sdstemplate_callback_t cb_func, void *cb_a /* Low level functions exposed to the user API */ sds sdsMakeRoomFor(sds s, size_t addlen); +sds sdsMakeRoomForNonGreedy(sds s, size_t addlen); void sdsIncrLen(sds s, ssize_t incr); sds sdsRemoveFreeSpace(sds s); size_t sdsAllocSize(sds s); diff --git a/src/server.c b/src/server.c index 9033a9f5f..a2fb20c0e 100644 --- a/src/server.c +++ b/src/server.c @@ -1693,9 +1693,9 @@ int clientsCronResizeQueryBuffer(client *c) { time_t idletime = server.unixtime - c->lastinteraction; /* There are two conditions to resize the query buffer: - * 1) Query buffer is > BIG_ARG and too big for latest peak. - * 2) Query buffer is > BIG_ARG and client is idle. */ - if (querybuf_size > PROTO_MBULK_BIG_ARG && + * 1) Query buffer is > PROTO_RESIZE_THRESHOLD and too big for latest peak. + * 2) Query buffer is > PROTO_RESIZE_THRESHOLD and client is idle. */ + if (querybuf_size > PROTO_RESIZE_THRESHOLD && ((querybuf_size/(c->querybuf_peak+1)) > 2 || idletime > 2)) { diff --git a/src/server.h b/src/server.h index 568e3ea53..23bd4a762 100644 --- a/src/server.h +++ b/src/server.h @@ -147,6 +147,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define PROTO_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */ #define PROTO_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */ #define PROTO_MBULK_BIG_ARG (1024*32) +#define PROTO_RESIZE_THRESHOLD (1024*32) /* Threshold for determining whether to resize query buffer */ #define LONG_STR_SIZE 21 /* Bytes needed for long -> str + '\0' */ #define REDIS_AUTOSYNC_BYTES (1024*1024*32) /* fdatasync every 32MB */ diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 2846a24f0..3d86fa364 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -70,6 +70,7 @@ set ::all_tests { unit/hyperloglog unit/lazyfree unit/wait + unit/querybuf unit/pendingquerybuf unit/tls unit/tracking diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 8e0a3ca53..dd986dd72 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -143,6 +143,18 @@ start_server {tags {"maxmemory external:skip"}} { } } +# Calculate query buffer memory of slave +proc slave_query_buffer {srv} { + set clients [split [$srv client list] "\r\n"] + set c [lsearch -inline $clients *flags=S*] + if {[string length $c] > 0} { + assert {[regexp {qbuf=([0-9]+)} $c - qbuf]} + assert {[regexp {qbuf-free=([0-9]+)} $c - qbuf_free]} + return [expr $qbuf + $qbuf_free] + } + return 0 +} + proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} { start_server {tags {"maxmemory external:skip"}} { start_server {} { @@ -155,6 +167,9 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} set master_host [srv -1 host] set master_port [srv -1 port] + # Disable slow log for master to avoid memory growth in slow env. + $master config set slowlog-log-slower-than -1 + # add 100 keys of 100k (10MB total) for {set j 0} {$j < 100} {incr j} { $master setrange "key:$j" 100000 asdf @@ -207,7 +222,13 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} set slave_buf [s -1 mem_clients_slaves] set client_buf [s -1 mem_clients_normal] set mem_not_counted_for_evict [s -1 mem_not_counted_for_evict] - set used_no_repl [expr {$new_used - $mem_not_counted_for_evict}] + set used_no_repl [expr {$new_used - $mem_not_counted_for_evict - [slave_query_buffer $master]}] + # we need to exclude replies buffer and query buffer of replica from used memory. + # removing the replica (output) buffers is done so that we are able to measure any other + # changes to the used memory and see that they're insignificant (the test's purpose is to check that + # the replica buffers are counted correctly, so the used memory growth after deducting them + # should be nearly 0). + # we remove the query buffers because on slow test platforms, they can accumulate many ACKs. set delta [expr {($used_no_repl - $client_buf) - ($orig_used_no_repl - $orig_client_buf)}] assert {[$master dbsize] == 100} @@ -219,7 +240,8 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} set killed_used [s -1 used_memory] set killed_slave_buf [s -1 mem_clients_slaves] set killed_mem_not_counted_for_evict [s -1 mem_not_counted_for_evict] - set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}] + # we need to exclude replies buffer and query buffer of slave from used memory after kill slave + set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict - [slave_query_buffer $master]}] set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}] assert {$killed_slave_buf == 0} assert {$delta_no_repl > -$delta_max && $delta_no_repl < $delta_max} diff --git a/tests/unit/querybuf.tcl b/tests/unit/querybuf.tcl new file mode 100644 index 000000000..955e184db --- /dev/null +++ b/tests/unit/querybuf.tcl @@ -0,0 +1,48 @@ +proc client_idle_sec {name} { + set clients [split [r client list] "\r\n"] + set c [lsearch -inline $clients *name=$name*] + assert {[regexp {idle=([0-9]+)} $c - idle]} + return $idle +} + +# Calculate query buffer memory of slave +proc client_query_buffer {name} { + set clients [split [r client list] "\r\n"] + set c [lsearch -inline $clients *name=$name*] + if {[string length $c] > 0} { + assert {[regexp {qbuf=([0-9]+)} $c - qbuf]} + assert {[regexp {qbuf-free=([0-9]+)} $c - qbuf_free]} + return [expr $qbuf + $qbuf_free] + } + return 0 +} + +start_server {tags {"querybuf slow"}} { + # The test will run at least 2s to check if client query + # buffer will be resized when client idle 2s. + test "query buffer resized correctly" { + # Memory will increase by more than 32k due to client query buffer. + set rd [redis_deferring_client] + $rd client setname test_client + set orig_test_client_qbuf [client_query_buffer test_client] + assert {$orig_test_client_qbuf > 16384 && $orig_test_client_qbuf < 32768} + + # Check that the initial query buffer is not resized if it is idle for more than 2s + wait_for_condition 1000 10 { + [client_idle_sec test_client] > 3 && [client_query_buffer test_client] == $orig_test_client_qbuf + } else { + fail "query buffer was resized" + } + + # Fill query buffer to more than 32k + $rd set bigstring v ;# create bigstring in advance to avoid adding extra memory + $rd set bigstring [string repeat A 32768] nx + + # Wait for query buffer to be resized to 0. + wait_for_condition 1000 10 { + [client_query_buffer test_client] == 0 + } else { + fail "querybuf expected to be resized" + } + } +}