Enforce client output buffer soft limit when no traffic. (#8833)

When client breached the output buffer soft limit but then went idle,
we didn't disconnect on soft limit timeout, now we do.
Note this also resolves some sporadic test failures in due to Linux
buffering data which caused tests to fail if during the test we went
back under the soft COB limit.

Co-authored-by: Oran Agra <oran@redislabs.com>
Co-authored-by: sundb <sundbcn@gmail.com>
(cherry picked from commit 152fce5e2cbf947a389da414a431f7331981a374)
This commit is contained in:
yoav-steinberg 2021-05-04 13:45:08 +03:00 committed by Oran Agra
parent 439c356fe6
commit 7059cfb432
4 changed files with 85 additions and 62 deletions

View File

@ -333,7 +333,7 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) {
listAddNodeTail(c->reply, tail);
c->reply_bytes += tail->size;
asyncCloseClientOnOutputBufferLimitReached(c);
closeClientOnOutputBufferLimitReached(c, 1);
}
}
@ -616,7 +616,7 @@ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
listNodeValue(ln) = buf;
c->reply_bytes += buf->size;
asyncCloseClientOnOutputBufferLimitReached(c);
closeClientOnOutputBufferLimitReached(c, 1);
}
}
@ -949,7 +949,7 @@ void AddReplyFromClient(client *dst, client *src) {
src->bufpos = 0;
/* Check output buffer limits */
asyncCloseClientOnOutputBufferLimitReached(dst);
closeClientOnOutputBufferLimitReached(dst, 1);
}
/* Copy 'src' client output buffers into 'dst' client output buffers.
@ -3223,18 +3223,33 @@ int checkClientOutputBufferLimits(client *c) {
*
* Note: we need to close the client asynchronously because this function is
* called from contexts where the client can't be freed safely, i.e. from the
* lower level functions pushing data inside the client output buffers. */
void asyncCloseClientOnOutputBufferLimitReached(client *c) {
if (!c->conn) return; /* It is unsafe to free fake clients. */
* lower level functions pushing data inside the client output buffers.
* When `async` is set to 0, we close the client immediately, this is
* useful when called from cron.
*
* Returns 1 if client was (flagged) closed. */
int closeClientOnOutputBufferLimitReached(client *c, int async) {
if (!c->conn) return 0; /* It is unsafe to free fake clients. */
serverAssert(c->reply_bytes < SIZE_MAX-(1024*64));
if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return;
if (c->reply_bytes == 0 || c->flags & CLIENT_CLOSE_ASAP) return 0;
if (checkClientOutputBufferLimits(c)) {
sds client = catClientInfoString(sdsempty(),c);
freeClientAsync(c);
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", client);
if (async) {
freeClientAsync(c);
serverLog(LL_WARNING,
"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.",
client);
} else {
freeClient(c);
serverLog(LL_WARNING,
"Client %s closed for overcoming of output buffer limits.",
client);
}
sdsfree(client);
return 1;
}
return 0;
}
/* Helper function used by performEvictions() in order to flush slaves

View File

@ -1839,6 +1839,7 @@ void clientsCron(void) {
if (clientsCronResizeQueryBuffer(c)) continue;
if (clientsCronTrackExpansiveClients(c, curr_peak_mem_usage_slot)) continue;
if (clientsCronTrackClientsMemUsage(c)) continue;
if (closeClientOnOutputBufferLimitReached(c, 0)) continue;
}
}

View File

@ -1867,7 +1867,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval);
void replaceClientCommandVector(client *c, int argc, robj **argv);
unsigned long getClientOutputBufferMemoryUsage(client *c);
int freeClientsInAsyncFreeQueue(void);
void asyncCloseClientOnOutputBufferLimitReached(client *c);
int closeClientOnOutputBufferLimitReached(client *c, int async);
int getClientType(client *c);
int getClientTypeByName(char *name);
char *getClientTypeName(int class);

View File

@ -18,65 +18,72 @@ start_server {tags {"obuf-limits"}} {
assert {$omem >= 70000 && $omem < 200000}
$rd1 close
}
test {Client output buffer soft limit is not enforced if time is not overreached} {
r config set client-output-buffer-limit {pubsub 0 100000 10}
set rd1 [redis_deferring_client]
$rd1 subscribe foo
set reply [$rd1 read]
assert {$reply eq "subscribe foo 1"}
set omem 0
set start_time 0
set time_elapsed 0
while 1 {
if {$start_time != 0} {
# Slow down loop when omen has reached the limit.
after 10
}
r publish foo [string repeat "x" 1000]
set clients [split [r client list] "\r\n"]
set c [split [lindex $clients 1] " "]
if {![regexp {omem=([0-9]+)} $c - omem]} break
if {$omem > 100000} {
if {$start_time == 0} {set start_time [clock seconds]}
set time_elapsed [expr {[clock seconds]-$start_time}]
if {$time_elapsed >= 5} break
}
foreach {soft_limit_time wait_for_timeout} {3 yes
4 no } {
if $wait_for_timeout {
set test_name "Client output buffer soft limit is enforced if time is overreached"
} else {
set test_name "Client output buffer soft limit is not enforced too early and is enforced when no traffic"
}
assert {$omem >= 100000 && $time_elapsed >= 5 && $time_elapsed <= 10}
$rd1 close
}
test {Client output buffer soft limit is enforced if time is overreached} {
r config set client-output-buffer-limit {pubsub 0 100000 3}
set rd1 [redis_deferring_client]
test $test_name {
r config set client-output-buffer-limit "pubsub 0 100000 $soft_limit_time"
set soft_limit_time [expr $soft_limit_time*1000]
set rd1 [redis_deferring_client]
$rd1 subscribe foo
set reply [$rd1 read]
assert {$reply eq "subscribe foo 1"}
$rd1 client setname test_client
set reply [$rd1 read]
assert {$reply eq "OK"}
set omem 0
set start_time 0
set time_elapsed 0
while 1 {
if {$start_time != 0} {
# Slow down loop when omen has reached the limit.
after 10
$rd1 subscribe foo
set reply [$rd1 read]
assert {$reply eq "subscribe foo 1"}
set omem 0
set start_time 0
set time_elapsed 0
set last_under_limit_time [clock milliseconds]
while 1 {
r publish foo [string repeat "x" 1000]
set clients [split [r client list] "\r\n"]
set c [lsearch -inline $clients *name=test_client*]
if {$start_time != 0} {
set time_elapsed [expr {[clock milliseconds]-$start_time}]
# Make sure test isn't taking too long
assert {$time_elapsed <= [expr $soft_limit_time+3000]}
}
if {$wait_for_timeout && $c == ""} {
# Make sure we're disconnected when we reach the soft limit
assert {$omem >= 100000 && $time_elapsed >= $soft_limit_time}
break
} else {
assert {[regexp {omem=([0-9]+)} $c - omem]}
}
if {$omem > 100000} {
if {$start_time == 0} {set start_time $last_under_limit_time}
if {!$wait_for_timeout && $time_elapsed >= [expr $soft_limit_time-1000]} break
# Slow down loop when omem has reached the limit.
after 10
} else {
# if the OS socket buffers swallowed what we previously filled, reset the start timer.
set start_time 0
set last_under_limit_time [clock milliseconds]
}
}
r publish foo [string repeat "x" 1000]
set clients [split [r client list] "\r\n"]
set c [split [lindex $clients 1] " "]
if {![regexp {omem=([0-9]+)} $c - omem]} break
if {$omem > 100000} {
if {$start_time == 0} {set start_time [clock seconds]}
set time_elapsed [expr {[clock seconds]-$start_time}]
if {$time_elapsed >= 10} break
if {!$wait_for_timeout} {
# After we completely stopped the traffic, wait for soft limit to time out
set timeout [expr {$soft_limit_time+1500 - ([clock milliseconds]-$start_time)}]
wait_for_condition [expr $timeout/10] 10 {
[lsearch [split [r client list] "\r\n"] *name=test_client*] == -1
} else {
fail "Soft limit timed out but client still connected"
}
}
$rd1 close
}
assert {$omem >= 100000 && $time_elapsed < 6}
$rd1 close
}
test {No response for single command if client output buffer hard limit is enforced} {