From 2753429c99425e3d0216cba79e0e61192975f252 Mon Sep 17 00:00:00 2001 From: yoav-steinberg Date: Thu, 23 Sep 2021 14:02:16 +0300 Subject: [PATCH] Client eviction (#8687) ### Description A mechanism for disconnecting clients when the sum of all connected clients is above a configured limit. This prevents eviction or OOM caused by accumulated used memory between all clients. It's a complimentary mechanism to the `client-output-buffer-limit` mechanism which takes into account not only a single client and not only output buffers but rather all memory used by all clients. #### Design The general design is as following: * We track memory usage of each client, taking into account all memory used by the client (query buffer, output buffer, parsed arguments, etc...). This is kept up to date after reading from the socket, after processing commands and after writing to the socket. * Based on the used memory we sort all clients into buckets. Each bucket contains all clients using up up to x2 memory of the clients in the bucket below it. For example up to 1m clients, up to 2m clients, up to 4m clients, ... * Before processing a command and before sleep we check if we're over the configured limit. If we are we start disconnecting clients from larger buckets downwards until we're under the limit. #### Config `maxmemory-clients` max memory all clients are allowed to consume, above this threshold we disconnect clients. This config can either be set to 0 (meaning no limit), a size in bytes (possibly with MB/GB suffix), or as a percentage of `maxmemory` by using the `%` suffix (e.g. setting it to `10%` would mean 10% of `maxmemory`). #### Important code changes * During the development I encountered yet more situations where our io-threads access global vars. And needed to fix them. I also had to handle keeps the clients sorted into the memory buckets (which are global) while their memory usage changes in the io-thread. To achieve this I decided to simplify how we check if we're in an io-thread and make it much more explicit. I removed the `CLIENT_PENDING_READ` flag used for checking if the client is in an io-thread (it wasn't used for anything else) and just used the global `io_threads_op` variable the same way to check during writes. * I optimized the cleanup of the client from the `clients_pending_read` list on client freeing. We now store a pointer in the `client` struct to this list so we don't need to search in it (`pending_read_list_node`). * Added `evicted_clients` stat to `INFO` command. * Added `CLIENT NO-EVICT ON|OFF` sub command to exclude a specific client from the client eviction mechanism. Added corrosponding 'e' flag in the client info string. * Added `multi-mem` field in the client info string to show how much memory is used up by buffered multi commands. * Client `tot-mem` now accounts for buffered multi-commands, pubsub patterns and channels (partially), tracking prefixes (partially). * CLIENT_CLOSE_ASAP flag is now handled in a new `beforeNextClient()` function so clients will be disconnected between processing different clients and not only before sleep. This new function can be used in the future for work we want to do outside the command processing loop but don't want to wait for all clients to be processed before we get to it. Specifically I wanted to handle output-buffer-limit related closing before we process client eviction in case the two race with each other. * Added a `DEBUG CLIENT-EVICTION` command to print out info about the client eviction buckets. * Each client now holds a pointer to the client eviction memory usage bucket it belongs to and listNode to itself in that bucket for quick removal. * Global `io_threads_op` variable now can contain a `IO_THREADS_OP_IDLE` value indicating no io-threading is currently being executed. * In order to track memory used by each clients in real-time we can't rely on updating these stats in `clientsCron()` alone anymore. So now I call `updateClientMemUsage()` (used to be `clientsCronTrackClientsMemUsage()`) after command processing, after writing data to pubsub clients, after writing the output buffer and after reading from the socket (and maybe other places too). The function is written to be fast. * Clients are evicted if needed (with appropriate log line) in `beforeSleep()` and before processing a command (before performing oom-checks and key-eviction). * All clients memory usage buckets are grouped as follows: * All clients using less than 64k. * 64K..128K * 128K..256K * ... * 2G..4G * All clients using 4g and up. * Added client-eviction.tcl with a bunch of tests for the new mechanism. * Extended maxmemory.tcl to test the interaction between maxmemory and maxmemory-clients settings. * Added an option to flag a numeric configuration variable as a "percent", this means that if we encounter a '%' after the number in the config file (or config set command) we consider it as valid. Such a number is store internally as a negative value. This way an integer value can be interpreted as either a percent (negative) or absolute value (positive). This is useful for example if some numeric configuration can optionally be set to a percentage of something else. Co-authored-by: Oran Agra --- redis.conf | 19 ++ src/blocked.c | 12 +- src/config.c | 154 ++++++---- src/debug.c | 19 ++ src/multi.c | 9 + src/networking.c | 236 ++++++++++++--- src/object.c | 2 +- src/pubsub.c | 2 + src/redis-cli.c | 2 + src/replication.c | 1 + src/server.c | 130 +++++++-- src/server.h | 51 +++- src/tracking.c | 1 + src/util.c | 5 +- tests/support/util.tcl | 11 +- tests/test_helper.tcl | 1 + tests/unit/client-eviction.tcl | 509 +++++++++++++++++++++++++++++++++ tests/unit/maxmemory.tcl | 137 +++++++++ 18 files changed, 1169 insertions(+), 132 deletions(-) create mode 100644 tests/unit/client-eviction.tcl diff --git a/redis.conf b/redis.conf index 3eb7374cc..bd7fa5271 100644 --- a/redis.conf +++ b/redis.conf @@ -1841,6 +1841,25 @@ client-output-buffer-limit pubsub 32mb 8mb 60 # # client-query-buffer-limit 1gb +# In some scenarios client connections can hog up memory leading to OOM +# errors or data eviction. To avoid this we can cap the accumulated memory +# used by all client connections (all pubsub and normal clients). Once we +# reach that limit connections will be dropped by the server freeing up +# memory. The server will attempt to drop the connections using the most +# memory first. We call this mechanism "client eviction". +# +# Client eviction is configured using the maxmemory-clients setting as follows: +# 0 - client eviction is disabled (default) +# +# A memory value can be used for the client eviction threshold, +# for example: +# maxmemory-clients 1g +# +# A percentage value (between 1% and 100%) means the client eviction threshold +# is based on a percentage of the maxmemory setting. For example to set client +# eviction at 5% of maxmemory: +# maxmemory-clients 5% + # In the Redis protocol, bulk requests, that are, elements representing single # strings, are normally limited to 512 mb. However you can change this limit # here, but must be 1mb or greater diff --git a/src/blocked.c b/src/blocked.c index 4898cdcbf..86aed2440 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -138,14 +138,14 @@ void processUnblockedClients(void) { * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { /* If we have a queued command, execute it now. */ - if (processPendingCommandsAndResetClient(c) == C_ERR) { - continue; - } - /* Then process client if it has more data in it's buffer. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { - processInputBuffer(c); + if (processPendingCommandsAndResetClient(c) == C_OK) { + /* Now process client if it has more data in it's buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) { + processInputBuffer(c); + } } } + beforeNextClient(c); } } diff --git a/src/config.c b/src/config.c index 349021f9e..2a8697e09 100644 --- a/src/config.c +++ b/src/config.c @@ -198,6 +198,10 @@ typedef enum numericType { NUMERIC_TYPE_TIME_T, } numericType; +#define INTEGER_CONFIG 0 /* No flags means a simple integer configuration */ +#define MEMORY_CONFIG (1<<0) /* Indicates if this value can be loaded as a memory value */ +#define PERCENT_CONFIG (1<<1) /* Indicates if this value can be loaded as a percent (and stored as a negative int) */ + typedef struct numericConfigData { union { int *i; @@ -211,7 +215,7 @@ typedef struct numericConfigData { off_t *ot; time_t *tt; } config; /* The pointer to the numeric config this value is stored in */ - int is_memory; /* Indicates if this value can be loaded as a memory value */ + unsigned int flags; numericType numeric_type; /* An enum indicating the type of this value */ long long lower_bound; /* The lower bound of this numeric value */ long long upper_bound; /* The upper bound of this numeric value */ @@ -1347,6 +1351,14 @@ void rewriteConfigBytesOption(struct rewriteConfigState *state, const char *opti rewriteConfigRewriteLine(state,option,line,force); } +/* Rewrite a simple "option-name n%" configuration option. */ +void rewriteConfigPercentOption(struct rewriteConfigState *state, const char *option, long long value, long long defvalue) { + int force = value != defvalue; + sds line = sdscatprintf(sdsempty(),"%s %lld%%",option,value); + + rewriteConfigRewriteLine(state,option,line,force); +} + /* Rewrite a yes/no option. */ void rewriteConfigYesNoOption(struct rewriteConfigState *state, const char *option, int value, int defvalue) { int force = value != defvalue; @@ -2111,8 +2123,18 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) { return 0; } } else { + /* Boundary check for percentages */ + if (data.numeric.flags & PERCENT_CONFIG && ll < 0) { + if (ll < data.numeric.lower_bound) { + snprintf(loadbuf, LOADBUF_SIZE, + "percentage argument must be less or equal to %lld", + -data.numeric.lower_bound); + *err = loadbuf; + return 0; + } + } /* Boundary check for signed types */ - if (ll > data.numeric.upper_bound || ll < data.numeric.lower_bound) { + else if (ll > data.numeric.upper_bound || ll < data.numeric.lower_bound) { snprintf(loadbuf, LOADBUF_SIZE, "argument must be between %lld and %lld inclusive", data.numeric.lower_bound, @@ -2124,22 +2146,46 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) { return 1; } +static int numericParseString(typeData data, sds value, const char **err, long long *res) { + /* First try to parse as memory */ + if (data.numeric.flags & MEMORY_CONFIG) { + int memerr; + *res = memtoull(value, &memerr); + if (!memerr) + return 1; + } + + /* Attempt to parse as percent */ + if (data.numeric.flags & PERCENT_CONFIG && + sdslen(value) > 1 && value[sdslen(value)-1] == '%' && + string2ll(value, sdslen(value)-1, res) && + *res >= 0) { + /* We store percentage as negative value */ + *res = -*res; + return 1; + } + + /* Attempt a simple number (no special flags set) */ + if (!data.numeric.flags && string2ll(value, sdslen(value), res)) + return 1; + + /* Select appropriate error string */ + if (data.numeric.flags & MEMORY_CONFIG && + data.numeric.flags & PERCENT_CONFIG) + *err = "argument must be a memory or percent value" ; + else if (data.numeric.flags & MEMORY_CONFIG) + *err = "argument must be a memory value"; + else + *err = "argument couldn't be parsed into an integer"; + return 0; +} static int numericConfigSet(typeData data, sds value, int update, const char **err) { long long ll, prev = 0; - if (data.numeric.is_memory) { - int memerr; - ll = memtoull(value, &memerr); - if (memerr) { - *err = "argument must be a memory value"; - return 0; - } - } else { - if (!string2ll(value, sdslen(value), &ll)) { - *err = "argument couldn't be parsed into an integer" ; - return 0; - } - } + + if (!numericParseString(data, value, err, &ll)) + return 0; + if (!numericBoundaryCheck(data, ll, err)) return 0; @@ -2158,21 +2204,21 @@ static int numericConfigSet(typeData data, sds value, int update, const char **e static void numericConfigGet(client *c, typeData data) { char buf[128]; - if (data.numeric.is_memory) { - unsigned long long value = 0; - - GET_NUMERIC_TYPE(value) - ull2string(buf, sizeof(buf), value); - addReplyBulkCString(c, buf); - } else{ - long long value = 0; - - GET_NUMERIC_TYPE(value) + long long value = 0; + GET_NUMERIC_TYPE(value) - ll2string(buf, sizeof(buf), value); - addReplyBulkCString(c, buf); + if (data.numeric.flags & PERCENT_CONFIG && value < 0) { + int len = ll2string(buf, sizeof(buf), -value); + buf[len] = '%'; + buf[len+1] = '\0'; } + else if (data.numeric.flags & MEMORY_CONFIG) { + ull2string(buf, sizeof(buf), value); + } else { + ll2string(buf, sizeof(buf), value); + } + addReplyBulkCString(c, buf); } static void numericConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) { @@ -2180,18 +2226,17 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite GET_NUMERIC_TYPE(value) - if (data.numeric.is_memory) { + if (data.numeric.flags & PERCENT_CONFIG && value < 0) { + rewriteConfigPercentOption(state, name, -value, data.numeric.default_value); + } else if (data.numeric.flags & MEMORY_CONFIG) { rewriteConfigBytesOption(state, name, value, data.numeric.default_value); } else { rewriteConfigNumericalOption(state, name, value, data.numeric.default_value); } } -#define INTEGER_CONFIG 0 -#define MEMORY_CONFIG 1 - -#define embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) { \ - embedCommonConfig(name, alias, flags) \ +#define embedCommonNumericalConfig(name, alias, _flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) { \ + embedCommonConfig(name, alias, _flags) \ embedConfigInterface(numericConfigInit, numericConfigSet, numericConfigGet, numericConfigRewrite) \ .data.numeric = { \ .lower_bound = (lower), \ @@ -2199,73 +2244,73 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite .default_value = (default), \ .is_valid_fn = (is_valid), \ .update_fn = (update), \ - .is_memory = (memory), + .flags = (num_conf_flags), -#define createIntConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createIntConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_INT, \ .config.i = &(config_addr) \ } \ } -#define createUIntConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createUIntConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_UINT, \ .config.ui = &(config_addr) \ } \ } -#define createLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_LONG, \ .config.l = &(config_addr) \ } \ } -#define createULongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createULongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_ULONG, \ .config.ul = &(config_addr) \ } \ } -#define createLongLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createLongLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_LONG_LONG, \ .config.ll = &(config_addr) \ } \ } -#define createULongLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createULongLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_ULONG_LONG, \ .config.ull = &(config_addr) \ } \ } -#define createSizeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createSizeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_SIZE_T, \ .config.st = &(config_addr) \ } \ } -#define createSSizeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createSSizeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_SSIZE_T, \ .config.sst = &(config_addr) \ } \ } -#define createTimeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createTimeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_TIME_T, \ .config.tt = &(config_addr) \ } \ } -#define createOffTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createOffTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_OFF_T, \ .config.ot = &(config_addr) \ } \ @@ -2653,6 +2698,7 @@ standardConfig configs[] = { createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */ createSizeTConfig("client-query-buffer-limit", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.client_max_querybuf_len, 1024*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Default: 1GB max query buffer. */ + createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, NULL), /* Other configs */ createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */ diff --git a/src/debug.c b/src/debug.c index d29d48673..4e95e2dfd 100644 --- a/src/debug.c +++ b/src/debug.c @@ -469,6 +469,8 @@ void debugCommand(client *c) { " Return the size of different Redis core C structures.", "ZIPLIST ", " Show low level info about the ziplist encoding of .", +"CLIENT-EVICTION", +" Show low level client eviction pools info (maxmemory-clients).", NULL }; addReplyHelp(c, help); @@ -883,6 +885,23 @@ NULL addReplyError(c, "CONFIG-REWRITE-FORCE-ALL failed"); else addReply(c, shared.ok); + } else if(!strcasecmp(c->argv[1]->ptr,"client-eviction") && c->argc == 2) { + sds bucket_info = sdsempty(); + for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) { + if (j == 0) + bucket_info = sdscatprintf(bucket_info, "bucket 0"); + else + bucket_info = sdscatprintf(bucket_info, "bucket %10zu", (size_t)1<<(j-1+CLIENT_MEM_USAGE_BUCKET_MIN_LOG)); + if (j == CLIENT_MEM_USAGE_BUCKETS-1) + bucket_info = sdscatprintf(bucket_info, "+ : "); + else + bucket_info = sdscatprintf(bucket_info, " - %10zu: ", ((size_t)1<<(j+CLIENT_MEM_USAGE_BUCKET_MIN_LOG))-1); + bucket_info = sdscatprintf(bucket_info, "tot-mem: %10zu, clients: %lu\n", + server.client_mem_usage_buckets[j].mem_usage_sum, + server.client_mem_usage_buckets[j].clients->len); + } + addReplyVerbatim(c,bucket_info,sdslen(bucket_info),"txt"); + sdsfree(bucket_info); #ifdef USE_JEMALLOC } else if(!strcasecmp(c->argv[1]->ptr,"mallctl") && c->argc >= 3) { mallctl_int(c, c->argv+2, c->argc-2); diff --git a/src/multi.c b/src/multi.c index e40d2a447..b02457bb9 100644 --- a/src/multi.c +++ b/src/multi.c @@ -37,6 +37,7 @@ void initClientMultiState(client *c) { c->mstate.count = 0; c->mstate.cmd_flags = 0; c->mstate.cmd_inv_flags = 0; + c->mstate.argv_len_sums = 0; } /* Release all the resources associated with MULTI/EXEC state */ @@ -78,6 +79,7 @@ void queueMultiCommand(client *c) { c->mstate.count++; c->mstate.cmd_flags |= c->cmd->flags; c->mstate.cmd_inv_flags |= ~c->cmd->flags; + c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc; } void discardTransaction(client *c) { @@ -435,3 +437,10 @@ void unwatchCommand(client *c) { c->flags &= (~CLIENT_DIRTY_CAS); addReply(c,shared.ok); } + +size_t multiStateMemOverhead(client *c) { + size_t mem = c->mstate.argv_len_sums; + /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */ + mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); + return mem; +} diff --git a/src/networking.c b/src/networking.c index 3b522fb35..21278d783 100644 --- a/src/networking.c +++ b/src/networking.c @@ -180,15 +180,18 @@ client *createClient(connection *conn) { c->sockname = NULL; c->client_list_node = NULL; c->paused_list_node = NULL; + c->pending_read_list_node = NULL; c->client_tracking_redirection = 0; c->client_tracking_prefixes = NULL; - c->client_cron_last_memory_usage = 0; - c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL; + c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0; + c->last_memory_type = CLIENT_TYPE_NORMAL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); + c->mem_usage_bucket = NULL; + c->mem_usage_bucket_node = NULL; if (conn) linkClient(c); initClientMultiState(c); return c; @@ -267,7 +270,7 @@ int prepareClientToWrite(client *c) { * not install a write handler. Instead, it will be done by * handleClientsWithPendingReadsUsingThreads() upon return. */ - if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ)) + if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE) clientInstallWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -1288,13 +1291,13 @@ void unlinkClient(client *c) { } /* Remove from the list of pending reads if needed. */ - if (c->flags & CLIENT_PENDING_READ) { - ln = listSearchKey(server.clients_pending_read,c); - serverAssert(ln != NULL); - listDelNode(server.clients_pending_read,ln); - c->flags &= ~CLIENT_PENDING_READ; + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + if (c->pending_read_list_node != NULL) { + listDelNode(server.clients_pending_read,c->pending_read_list_node); + c->pending_read_list_node = NULL; } + /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & CLIENT_UNBLOCKED) { @@ -1430,10 +1433,15 @@ void freeClient(client *c) { * we lost the connection with the master. */ if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); - /* Remove the contribution that this client gave to our + /* Remove the contribution that this client gave to our * incrementally computed memory usage. */ - server.stat_clients_type_memory[c->client_cron_last_memory_type] -= - c->client_cron_last_memory_usage; + server.stat_clients_type_memory[c->last_memory_type] -= + c->last_memory_usage; + /* Remove client from memory usage buckets */ + if (c->mem_usage_bucket) { + c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; + listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node); + } /* Release other dynamically allocated client structure fields, * and finally release the client structure itself. */ @@ -1470,6 +1478,27 @@ void freeClientAsync(client *c) { pthread_mutex_unlock(&async_free_queue_mutex); } +/* Perform processing of the client before moving on to processing the next client + * this is useful for performing operations that affect the global state but can't + * wait until we're done with all clients. In other words can't wait until beforeSleep() + * return C_ERR in case client is no longer valid after call. */ +int beforeNextClient(client *c) { + /* Skip the client processing if we're in an IO thread, in that case we'll perform + this operation later (this function is called again) in the fan-in stage of the threading mechanism */ + if (io_threads_op != IO_THREADS_OP_IDLE) + return C_OK; + /* Handle async frees */ + /* Note: this doesn't make the server.clients_to_close list redundant because of + * cases where we want an async free of a client other than myself. For example + * in ACL modifications we disconnect clients authenticated to non-existent + * users (see ACL LOAD). */ + if (c->flags & CLIENT_CLOSE_ASAP) { + freeClient(c); + return C_ERR; + } + return C_OK; +} + /* Free the clients marked as CLOSE_ASAP, return the number of clients * freed. */ int freeClientsInAsyncFreeQueue(void) { @@ -1594,7 +1623,10 @@ int writeToClient(client *c, int handler_installed) { * adDeleteFileEvent() is not thread safe: however writeToClient() * is always called with handler_installed set to 0 from threads * so we are fine. */ - if (handler_installed) connSetWriteHandler(c->conn, NULL); + if (handler_installed) { + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + connSetWriteHandler(c->conn, NULL); + } /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { @@ -1602,6 +1634,7 @@ int writeToClient(client *c, int handler_installed) { return C_ERR; } } + updateClientMemUsage(c); return C_OK; } @@ -2036,7 +2069,11 @@ int processCommandAndResetClient(client *c) { server.current_client = c; if (processCommand(c) == C_OK) { commandProcessed(c); + /* Update the client's memory to include output buffer growth following the + * processed command. */ + updateClientMemUsage(c); } + if (server.current_client == NULL) deadclient = 1; /* * Restore the old client, this is needed because when a script @@ -2117,7 +2154,8 @@ void processInputBuffer(client *c) { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ - if (c->flags & CLIENT_PENDING_READ) { + if (io_threads_op != IO_THREADS_OP_IDLE) { + serverAssert(io_threads_op == IO_THREADS_OP_READ); c->flags |= CLIENT_PENDING_COMMAND; break; } @@ -2137,6 +2175,11 @@ void processInputBuffer(client *c) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } + + /* Update client memory usage after processing the query buffer, this is + * important in case the query buffer is big and wasn't drained during + * the above loop (because of partially sent big commands). */ + updateClientMemUsage(c); } void readQueryFromClient(connection *conn) { @@ -2190,7 +2233,7 @@ void readQueryFromClient(connection *conn) { } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); - return; + goto done; } } else if (nread == 0) { if (server.verbosity <= LL_VERBOSE) { @@ -2199,7 +2242,7 @@ void readQueryFromClient(connection *conn) { sdsfree(info); } freeClientAsync(c); - return; + goto done; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a @@ -2223,12 +2266,15 @@ void readQueryFromClient(connection *conn) { sdsfree(ci); sdsfree(bytes); freeClientAsync(c); - return; + goto done; } /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ processInputBuffer(c); + +done: + beforeNextClient(c); } /* A Redis "Address String" is a colon separated ip:port pair. @@ -2306,6 +2352,7 @@ sds catClientInfoString(sds s, client *client) { if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A'; if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U'; if (client->flags & CLIENT_READONLY) *p++ = 'r'; + if (client->flags & CLIENT_NO_EVICT) *p++ = 'e'; if (p == flags) *p++ = 'N'; *p++ = '\0'; @@ -2317,19 +2364,10 @@ sds catClientInfoString(sds s, client *client) { *p = '\0'; /* Compute the total memory consumed by this client. */ - size_t obufmem = getClientOutputBufferMemoryUsage(client); - size_t total_mem = obufmem; - total_mem += zmalloc_size(client); /* includes client->buf */ - total_mem += sdsZmallocSize(client->querybuf); - /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory - * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to - * spot problematic clients. */ - total_mem += client->argv_len_sum; - if (client->argv) - total_mem += zmalloc_size(client->argv); + size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem); return sdscatfmt(s, - "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", + "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", (unsigned long long) client->id, getClientPeerId(client), getClientSockname(client), @@ -2345,6 +2383,7 @@ sds catClientInfoString(sds s, client *client) { (unsigned long long) sdslen(client->querybuf), (unsigned long long) sdsavail(client->querybuf), (unsigned long long) client->argv_len_sum, + (unsigned long long) client->mstate.argv_len_sums, (unsigned long long) client->bufpos, (unsigned long long) listLength(client->reply), (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ @@ -2565,6 +2604,18 @@ NULL addReplyErrorObject(c,shared.syntaxerr); return; } + } else if (!strcasecmp(c->argv[1]->ptr,"no-evict") && c->argc == 3) { + /* CLIENT PROTECT ON|OFF */ + if (!strcasecmp(c->argv[2]->ptr,"on")) { + c->flags |= CLIENT_NO_EVICT; + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[2]->ptr,"off")) { + c->flags &= ~CLIENT_NO_EVICT; + addReply(c,shared.ok); + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } } else if (!strcasecmp(c->argv[1]->ptr,"kill")) { /* CLIENT KILL * CLIENT KILL