Client eviction ci issues (#9549)
Fixing CI test issues introduced in #8687 - valgrind warnings in readQueryFromClient when client was freed by processInputBuffer - adding DEBUG pause-cron for tests not to be time dependent. - skipping a test that depends on socket buffers / events not compatible with TLS - making sure client got subscribed by not using deferring client
This commit is contained in:
parent
0af7fe2cab
commit
6600253046
@ -141,8 +141,10 @@ void processUnblockedClients(void) {
|
||||
if (processPendingCommandsAndResetClient(c) == C_OK) {
|
||||
/* Now process client if it has more data in it's buffer. */
|
||||
if (c->querybuf && sdslen(c->querybuf) > 0) {
|
||||
processInputBuffer(c);
|
||||
if (processInputBuffer(c) == C_ERR) c = NULL;
|
||||
}
|
||||
} else {
|
||||
c = NULL;
|
||||
}
|
||||
}
|
||||
beforeNextClient(c);
|
||||
|
@ -471,6 +471,8 @@ void debugCommand(client *c) {
|
||||
" Show low level info about the ziplist encoding of <key>.",
|
||||
"CLIENT-EVICTION",
|
||||
" Show low level client eviction pools info (maxmemory-clients).",
|
||||
"PAUSE-CRON <0|1>",
|
||||
" Stop periodic cron job processing.",
|
||||
NULL
|
||||
};
|
||||
addReplyHelp(c, help);
|
||||
@ -910,6 +912,10 @@ NULL
|
||||
mallctl_string(c, c->argv+2, c->argc-2);
|
||||
return;
|
||||
#endif
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"pause-cron") && c->argc == 3)
|
||||
{
|
||||
server.pause_cron = atoi(c->argv[2]->ptr);
|
||||
addReply(c,shared.ok);
|
||||
} else {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
return;
|
||||
|
@ -1481,7 +1481,9 @@ void freeClientAsync(client *c) {
|
||||
/* Perform processing of the client before moving on to processing the next client
|
||||
* this is useful for performing operations that affect the global state but can't
|
||||
* wait until we're done with all clients. In other words can't wait until beforeSleep()
|
||||
* return C_ERR in case client is no longer valid after call. */
|
||||
* return C_ERR in case client is no longer valid after call.
|
||||
* The input client argument: c, may be NULL in case the previous client was
|
||||
* freed before the call. */
|
||||
int beforeNextClient(client *c) {
|
||||
/* Skip the client processing if we're in an IO thread, in that case we'll perform
|
||||
this operation later (this function is called again) in the fan-in stage of the threading mechanism */
|
||||
@ -1492,7 +1494,7 @@ int beforeNextClient(client *c) {
|
||||
* cases where we want an async free of a client other than myself. For example
|
||||
* in ACL modifications we disconnect clients authenticated to non-existent
|
||||
* users (see ACL LOAD). */
|
||||
if (c->flags & CLIENT_CLOSE_ASAP) {
|
||||
if (c && (c->flags & CLIENT_CLOSE_ASAP)) {
|
||||
freeClient(c);
|
||||
return C_ERR;
|
||||
}
|
||||
@ -2106,8 +2108,9 @@ int processPendingCommandsAndResetClient(client *c) {
|
||||
/* This function is called every time, in the client structure 'c', there is
|
||||
* more query buffer to process, because we read more data from the socket
|
||||
* or because a client was blocked and later reactivated, so there could be
|
||||
* pending query buffer, already representing a full command, to process. */
|
||||
void processInputBuffer(client *c) {
|
||||
* pending query buffer, already representing a full command, to process.
|
||||
* return C_ERR in case the client was freed during the processing */
|
||||
int processInputBuffer(client *c) {
|
||||
/* Keep processing while there is something in the input buffer */
|
||||
while(c->qb_pos < sdslen(c->querybuf)) {
|
||||
/* Immediately abort if the client is in the middle of something. */
|
||||
@ -2165,7 +2168,7 @@ void processInputBuffer(client *c) {
|
||||
/* If the client is no longer valid, we avoid exiting this
|
||||
* loop and trimming the client buffer later. So we return
|
||||
* ASAP in that case. */
|
||||
return;
|
||||
return C_ERR;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -2180,6 +2183,8 @@ void processInputBuffer(client *c) {
|
||||
* important in case the query buffer is big and wasn't drained during
|
||||
* the above loop (because of partially sent big commands). */
|
||||
updateClientMemUsage(c);
|
||||
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
void readQueryFromClient(connection *conn) {
|
||||
@ -2270,8 +2275,9 @@ void readQueryFromClient(connection *conn) {
|
||||
}
|
||||
|
||||
/* There is more data in the client input buffer, continue parsing it
|
||||
* in case to check if there is a full command to execute. */
|
||||
processInputBuffer(c);
|
||||
* and check if there is a full command to execute. */
|
||||
if (processInputBuffer(c) == C_ERR)
|
||||
c = NULL;
|
||||
|
||||
done:
|
||||
beforeNextClient(c);
|
||||
@ -3855,7 +3861,12 @@ int handleClientsWithPendingReadsUsingThreads(void) {
|
||||
continue;
|
||||
}
|
||||
|
||||
processInputBuffer(c);
|
||||
if (processInputBuffer(c) == C_ERR) {
|
||||
/* If the client is no longer valid, we avoid
|
||||
* processing the client later. So we just go
|
||||
* to the next. */
|
||||
continue;
|
||||
}
|
||||
|
||||
/* We may have pending replies if a thread readQueryFromClient() produced
|
||||
* replies and did not install a write handler (it can't).
|
||||
|
@ -2564,6 +2564,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
}
|
||||
}
|
||||
|
||||
/* for debug purposes: skip actual cron work if pause_cron is on */
|
||||
if (server.pause_cron) return 1000/server.hz;
|
||||
|
||||
run_with_period(100) {
|
||||
long long stat_net_input_bytes, stat_net_output_bytes;
|
||||
atomicGet(server.stat_net_input_bytes, stat_net_input_bytes);
|
||||
@ -3187,6 +3190,7 @@ void initServerConfig(void) {
|
||||
server.next_client_id = 1; /* Client IDs, start from 1 .*/
|
||||
server.loading_process_events_interval_bytes = (1024*1024*2);
|
||||
server.page_size = sysconf(_SC_PAGESIZE);
|
||||
server.pause_cron = 0;
|
||||
|
||||
unsigned int lruclock = getLRUClock();
|
||||
atomicSet(server.lruclock,lruclock);
|
||||
|
@ -1417,6 +1417,7 @@ struct redisServer {
|
||||
int set_proc_title; /* True if change proc title */
|
||||
char *proc_title_template; /* Process title template format */
|
||||
clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT];
|
||||
int pause_cron; /* Don't run cron tasks (debug) */
|
||||
/* AOF persistence */
|
||||
int aof_enabled; /* AOF configuration */
|
||||
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
|
||||
@ -2005,7 +2006,7 @@ void setDeferredMapLen(client *c, void *node, long length);
|
||||
void setDeferredSetLen(client *c, void *node, long length);
|
||||
void setDeferredAttributeLen(client *c, void *node, long length);
|
||||
void setDeferredPushLen(client *c, void *node, long length);
|
||||
void processInputBuffer(client *c);
|
||||
int processInputBuffer(client *c);
|
||||
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
void acceptTLSHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask);
|
||||
|
@ -36,6 +36,7 @@ The following compatibility and capability tags are currently used:
|
||||
| --------------------- | --------- |
|
||||
| `external:skip` | Not compatible with external servers. |
|
||||
| `cluster:skip` | Not compatible with `--cluster-mode`. |
|
||||
| `tls:skip` | Not campatible with `--tls`. |
|
||||
| `needs:repl` | Uses replication and needs to be able to `SYNC` from server. |
|
||||
| `needs:debug` | Uses the `DEBUG` command or other debugging focused commands (like `OBJECT`). |
|
||||
| `needs:pfdebug` | Uses the `PFDEBUG` command. |
|
||||
|
@ -202,6 +202,11 @@ proc tags_acceptable {tags err_return} {
|
||||
return 0
|
||||
}
|
||||
|
||||
if {$::tls && [lsearch $tags "tls:skip"] >= 0} {
|
||||
set err "Not supported in tls mode"
|
||||
return 0
|
||||
}
|
||||
|
||||
return 1
|
||||
}
|
||||
|
||||
|
@ -184,11 +184,6 @@ start_server {} {
|
||||
|
||||
test "client evicted due to tracking redirection" {
|
||||
r flushdb
|
||||
# Use slow hz to avoid clientsCron from updating memory usage frequently since
|
||||
# we're testing the update logic when writing tracking redirection output
|
||||
set backup_hz [lindex [r config get hz] 1]
|
||||
r config set hz 1
|
||||
|
||||
set rr [redis_client]
|
||||
set redirected_c [redis_client]
|
||||
$redirected_c client setname redirected_client
|
||||
@ -196,12 +191,16 @@ start_server {} {
|
||||
$redirected_c SUBSCRIBE __redis__:invalidate
|
||||
$rr client tracking on redirect $redir_id bcast
|
||||
# Use a big key name to fill the redirected tracking client's buffer quickly
|
||||
set key_length [expr 1024*10]
|
||||
set key_length [expr 1024*200]
|
||||
set long_key [string repeat k $key_length]
|
||||
# Use a script so we won't need to pass the long key name when dirtying it in the loop
|
||||
set script_sha [$rr script load "redis.call('incr', '$long_key')"]
|
||||
|
||||
# Pause serverCron so it won't update memory usage since we're testing the update logic when
|
||||
# writing tracking redirection output
|
||||
r debug pause-cron 1
|
||||
|
||||
# Read and write to same (long) key until redirected_client's buffers cause it to be evicted
|
||||
set t [clock milliseconds]
|
||||
catch {
|
||||
while true {
|
||||
set mem [client_field redirected_client tot-mem]
|
||||
@ -210,13 +209,8 @@ start_server {} {
|
||||
}
|
||||
} e
|
||||
assert_match {no client named redirected_client found*} $e
|
||||
|
||||
# Make sure eviction happened in less than 1sec, this means, statistically eviction
|
||||
# wasn't caused by clientCron accounting
|
||||
set t [expr [clock milliseconds] - $t]
|
||||
assert {$t < 1000}
|
||||
|
||||
r config set hz $backup_hz
|
||||
|
||||
r debug pause-cron 0
|
||||
$rr close
|
||||
$redirected_c close
|
||||
}
|
||||
|
@ -74,7 +74,11 @@ start_server {tags {"maxmemory" "external:skip"}} {
|
||||
}
|
||||
|
||||
verify_test $client_eviction
|
||||
}
|
||||
|
||||
# This test relies on SIGSTOP/CONT to handle all sent commands in a single event loop.
|
||||
# In TLS multiple event loops are needed to receive all sent commands, so the test breaks.
|
||||
# Mark it to be skipped when running in TLS mode.
|
||||
} {} {tls:skip}
|
||||
foreach rr $clients {
|
||||
$rr close
|
||||
}
|
||||
@ -113,13 +117,12 @@ start_server {tags {"maxmemory" "external:skip"}} {
|
||||
init_test $client_eviction
|
||||
|
||||
for {set j 0} {$j < 10} {incr j} {
|
||||
set rr [redis_deferring_client]
|
||||
set rr [redis_client]
|
||||
lappend clients $rr
|
||||
}
|
||||
|
||||
foreach rr $clients {
|
||||
$rr subscribe bla
|
||||
$rr flush
|
||||
}
|
||||
|
||||
for {set j 0} {$j < 40} {incr j} {
|
||||
|
Loading…
x
Reference in New Issue
Block a user