diff --git a/src/networking.c b/src/networking.c index 2355a376b..14e94b84c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -333,7 +333,7 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) { listAddNodeTail(c->reply, tail); c->reply_bytes += tail->size; - asyncCloseClientOnOutputBufferLimitReached(c); + closeClientOnOutputBufferLimitReached(c, 1); } } @@ -616,7 +616,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) { listNodeValue(ln) = buf; c->reply_bytes += buf->size; - asyncCloseClientOnOutputBufferLimitReached(c); + closeClientOnOutputBufferLimitReached(c, 1); } } @@ -949,7 +949,7 @@ void AddReplyFromClient(client *dst, client *src) { src->bufpos = 0; /* Check output buffer limits */ - asyncCloseClientOnOutputBufferLimitReached(dst); + closeClientOnOutputBufferLimitReached(dst, 1); } /* Copy 'src' client output buffers into 'dst' client output buffers. @@ -3223,18 +3223,33 @@ int checkClientOutputBufferLimits(client *c) { * * Note: we need to close the client asynchronously because this function is * called from contexts where the client can't be freed safely, i.e. from the - * lower level functions pushing data inside the client output buffers. */ -void asyncCloseClientOnOutputBufferLimitReached(client *c) { - if (!c->conn) return; /* It is unsafe to free fake clients. */ + * lower level functions pushing data inside the client output buffers. + * When `async` is set to 0, we close the client immediately, this is + * useful when called from cron. + * + * Returns 1 if client was (flagged) closed. */ +int closeClientOnOutputBufferLimitReached(client *c, int async) { + if (!c->conn) return 0; /* It is unsafe to free fake clients. */ serverAssert(c->reply_bytes < SIZE_MAX-(1024*64)); - if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return; + if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return 0; if (checkClientOutputBufferLimits(c)) { sds client = catClientInfoString(sdsempty(),c); - freeClientAsync(c); - serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client); + if (async) { + freeClientAsync(c); + serverLog(LL_WARNING, + "Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", + client); + } else { + freeClient(c); + serverLog(LL_WARNING, + "Client %s closed for overcoming of output buffer limits.", + client); + } sdsfree(client); + return 1; } + return 0; } /* Helper function used by performEvictions() in order to flush slaves diff --git a/src/server.c b/src/server.c index 1fe150b3b..9541aada7 100644 --- a/src/server.c +++ b/src/server.c @@ -1839,6 +1839,7 @@ void clientsCron(void) { if (clientsCronResizeQueryBuffer(c)) continue; if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue; if (clientsCronTrackClientsMemUsage(c)) continue; + if (closeClientOnOutputBufferLimitReached(c, 0)) continue; } } diff --git a/src/server.h b/src/server.h index 9b7e16a0d..ee87ecacf 100644 --- a/src/server.h +++ b/src/server.h @@ -1867,7 +1867,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval); void replaceClientCommandVector(client *c, int argc, robj **argv); unsigned long getClientOutputBufferMemoryUsage(client *c); int freeClientsInAsyncFreeQueue(void); -void asyncCloseClientOnOutputBufferLimitReached(client *c); +int closeClientOnOutputBufferLimitReached(client *c, int async); int getClientType(client *c); int getClientTypeByName(char *name); char *getClientTypeName(int class); diff --git a/tests/unit/obuf-limits.tcl b/tests/unit/obuf-limits.tcl index 38a643385..3f26c53f9 100644 --- a/tests/unit/obuf-limits.tcl +++ b/tests/unit/obuf-limits.tcl @@ -18,65 +18,72 @@ start_server {tags {"obuf-limits"}} { assert {$omem >= 70000 && $omem < 200000} $rd1 close } - - test {Client output buffer soft limit is not enforced if time is not overreached} { - r config set client-output-buffer-limit {pubsub 0 100000 10} - set rd1 [redis_deferring_client] - - $rd1 subscribe foo - set reply [$rd1 read] - assert {$reply eq "subscribe foo 1"} - - set omem 0 - set start_time 0 - set time_elapsed 0 - while 1 { - if {$start_time != 0} { - # Slow down loop when omen has reached the limit. - after 10 - } - r publish foo [string repeat "x" 1000] - set clients [split [r client list] "\r\n"] - set c [split [lindex $clients 1] " "] - if {![regexp {omem=([0-9]+)} $c - omem]} break - if {$omem > 100000} { - if {$start_time == 0} {set start_time [clock seconds]} - set time_elapsed [expr {[clock seconds]-$start_time}] - if {$time_elapsed >= 5} break - } + + foreach {soft_limit_time wait_for_timeout} {3 yes + 4 no } { + if $wait_for_timeout { + set test_name "Client output buffer soft limit is enforced if time is overreached" + } else { + set test_name "Client output buffer soft limit is not enforced too early and is enforced when no traffic" } - assert {$omem >= 100000 && $time_elapsed >= 5 && $time_elapsed <= 10} - $rd1 close - } - test {Client output buffer soft limit is enforced if time is overreached} { - r config set client-output-buffer-limit {pubsub 0 100000 3} - set rd1 [redis_deferring_client] + test $test_name { + r config set client-output-buffer-limit "pubsub 0 100000 $soft_limit_time" + set soft_limit_time [expr $soft_limit_time*1000] + set rd1 [redis_deferring_client] - $rd1 subscribe foo - set reply [$rd1 read] - assert {$reply eq "subscribe foo 1"} + $rd1 client setname test_client + set reply [$rd1 read] + assert {$reply eq "OK"} - set omem 0 - set start_time 0 - set time_elapsed 0 - while 1 { - if {$start_time != 0} { - # Slow down loop when omen has reached the limit. - after 10 + $rd1 subscribe foo + set reply [$rd1 read] + assert {$reply eq "subscribe foo 1"} + + set omem 0 + set start_time 0 + set time_elapsed 0 + set last_under_limit_time [clock milliseconds] + while 1 { + r publish foo [string repeat "x" 1000] + set clients [split [r client list] "\r\n"] + set c [lsearch -inline $clients *name=test_client*] + if {$start_time != 0} { + set time_elapsed [expr {[clock milliseconds]-$start_time}] + # Make sure test isn't taking too long + assert {$time_elapsed <= [expr $soft_limit_time+3000]} + } + if {$wait_for_timeout && $c == ""} { + # Make sure we're disconnected when we reach the soft limit + assert {$omem >= 100000 && $time_elapsed >= $soft_limit_time} + break + } else { + assert {[regexp {omem=([0-9]+)} $c - omem]} + } + if {$omem > 100000} { + if {$start_time == 0} {set start_time $last_under_limit_time} + if {!$wait_for_timeout && $time_elapsed >= [expr $soft_limit_time-1000]} break + # Slow down loop when omem has reached the limit. + after 10 + } else { + # if the OS socket buffers swallowed what we previously filled, reset the start timer. + set start_time 0 + set last_under_limit_time [clock milliseconds] + } } - r publish foo [string repeat "x" 1000] - set clients [split [r client list] "\r\n"] - set c [split [lindex $clients 1] " "] - if {![regexp {omem=([0-9]+)} $c - omem]} break - if {$omem > 100000} { - if {$start_time == 0} {set start_time [clock seconds]} - set time_elapsed [expr {[clock seconds]-$start_time}] - if {$time_elapsed >= 10} break + + if {!$wait_for_timeout} { + # After we completely stopped the traffic, wait for soft limit to time out + set timeout [expr {$soft_limit_time+1500 - ([clock milliseconds]-$start_time)}] + wait_for_condition [expr $timeout/10] 10 { + [lsearch [split [r client list] "\r\n"] *name=test_client*] == -1 + } else { + fail "Soft limit timed out but client still connected" + } } + + $rd1 close } - assert {$omem >= 100000 && $time_elapsed < 6} - $rd1 close } test {No response for single command if client output buffer hard limit is enforced} {