diff --git a/redis.conf b/redis.conf index 3eb7374cc..bd7fa5271 100644 --- a/redis.conf +++ b/redis.conf @@ -1841,6 +1841,25 @@ client-output-buffer-limit pubsub 32mb 8mb 60 # # client-query-buffer-limit 1gb +# In some scenarios client connections can hog up memory leading to OOM +# errors or data eviction. To avoid this we can cap the accumulated memory +# used by all client connections (all pubsub and normal clients). Once we +# reach that limit connections will be dropped by the server freeing up +# memory. The server will attempt to drop the connections using the most +# memory first. We call this mechanism "client eviction". +# +# Client eviction is configured using the maxmemory-clients setting as follows: +# 0 - client eviction is disabled (default) +# +# A memory value can be used for the client eviction threshold, +# for example: +# maxmemory-clients 1g +# +# A percentage value (between 1% and 100%) means the client eviction threshold +# is based on a percentage of the maxmemory setting. For example to set client +# eviction at 5% of maxmemory: +# maxmemory-clients 5% + # In the Redis protocol, bulk requests, that are, elements representing single # strings, are normally limited to 512 mb. However you can change this limit # here, but must be 1mb or greater diff --git a/src/blocked.c b/src/blocked.c index 4898cdcbf..86aed2440 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -138,14 +138,14 @@ void processUnblockedClients(void) { * the code is conceptually more correct this way. */ if (!(c->flags & CLIENT_BLOCKED)) { /* If we have a queued command, execute it now. */ - if (processPendingCommandsAndResetClient(c) == C_ERR) { - continue; - } - /* Then process client if it has more data in it's buffer. */ - if (c->querybuf && sdslen(c->querybuf) > 0) { - processInputBuffer(c); + if (processPendingCommandsAndResetClient(c) == C_OK) { + /* Now process client if it has more data in it's buffer. */ + if (c->querybuf && sdslen(c->querybuf) > 0) { + processInputBuffer(c); + } } } + beforeNextClient(c); } } diff --git a/src/config.c b/src/config.c index 349021f9e..2a8697e09 100644 --- a/src/config.c +++ b/src/config.c @@ -198,6 +198,10 @@ typedef enum numericType { NUMERIC_TYPE_TIME_T, } numericType; +#define INTEGER_CONFIG 0 /* No flags means a simple integer configuration */ +#define MEMORY_CONFIG (1<<0) /* Indicates if this value can be loaded as a memory value */ +#define PERCENT_CONFIG (1<<1) /* Indicates if this value can be loaded as a percent (and stored as a negative int) */ + typedef struct numericConfigData { union { int *i; @@ -211,7 +215,7 @@ typedef struct numericConfigData { off_t *ot; time_t *tt; } config; /* The pointer to the numeric config this value is stored in */ - int is_memory; /* Indicates if this value can be loaded as a memory value */ + unsigned int flags; numericType numeric_type; /* An enum indicating the type of this value */ long long lower_bound; /* The lower bound of this numeric value */ long long upper_bound; /* The upper bound of this numeric value */ @@ -1347,6 +1351,14 @@ void rewriteConfigBytesOption(struct rewriteConfigState *state, const char *opti rewriteConfigRewriteLine(state,option,line,force); } +/* Rewrite a simple "option-name n%" configuration option. */ +void rewriteConfigPercentOption(struct rewriteConfigState *state, const char *option, long long value, long long defvalue) { + int force = value != defvalue; + sds line = sdscatprintf(sdsempty(),"%s %lld%%",option,value); + + rewriteConfigRewriteLine(state,option,line,force); +} + /* Rewrite a yes/no option. */ void rewriteConfigYesNoOption(struct rewriteConfigState *state, const char *option, int value, int defvalue) { int force = value != defvalue; @@ -2111,8 +2123,18 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) { return 0; } } else { + /* Boundary check for percentages */ + if (data.numeric.flags & PERCENT_CONFIG && ll < 0) { + if (ll < data.numeric.lower_bound) { + snprintf(loadbuf, LOADBUF_SIZE, + "percentage argument must be less or equal to %lld", + -data.numeric.lower_bound); + *err = loadbuf; + return 0; + } + } /* Boundary check for signed types */ - if (ll > data.numeric.upper_bound || ll < data.numeric.lower_bound) { + else if (ll > data.numeric.upper_bound || ll < data.numeric.lower_bound) { snprintf(loadbuf, LOADBUF_SIZE, "argument must be between %lld and %lld inclusive", data.numeric.lower_bound, @@ -2124,22 +2146,46 @@ static int numericBoundaryCheck(typeData data, long long ll, const char **err) { return 1; } +static int numericParseString(typeData data, sds value, const char **err, long long *res) { + /* First try to parse as memory */ + if (data.numeric.flags & MEMORY_CONFIG) { + int memerr; + *res = memtoull(value, &memerr); + if (!memerr) + return 1; + } + + /* Attempt to parse as percent */ + if (data.numeric.flags & PERCENT_CONFIG && + sdslen(value) > 1 && value[sdslen(value)-1] == '%' && + string2ll(value, sdslen(value)-1, res) && + *res >= 0) { + /* We store percentage as negative value */ + *res = -*res; + return 1; + } + + /* Attempt a simple number (no special flags set) */ + if (!data.numeric.flags && string2ll(value, sdslen(value), res)) + return 1; + + /* Select appropriate error string */ + if (data.numeric.flags & MEMORY_CONFIG && + data.numeric.flags & PERCENT_CONFIG) + *err = "argument must be a memory or percent value" ; + else if (data.numeric.flags & MEMORY_CONFIG) + *err = "argument must be a memory value"; + else + *err = "argument couldn't be parsed into an integer"; + return 0; +} static int numericConfigSet(typeData data, sds value, int update, const char **err) { long long ll, prev = 0; - if (data.numeric.is_memory) { - int memerr; - ll = memtoull(value, &memerr); - if (memerr) { - *err = "argument must be a memory value"; - return 0; - } - } else { - if (!string2ll(value, sdslen(value), &ll)) { - *err = "argument couldn't be parsed into an integer" ; - return 0; - } - } + + if (!numericParseString(data, value, err, &ll)) + return 0; + if (!numericBoundaryCheck(data, ll, err)) return 0; @@ -2158,21 +2204,21 @@ static int numericConfigSet(typeData data, sds value, int update, const char **e static void numericConfigGet(client *c, typeData data) { char buf[128]; - if (data.numeric.is_memory) { - unsigned long long value = 0; - - GET_NUMERIC_TYPE(value) - ull2string(buf, sizeof(buf), value); - addReplyBulkCString(c, buf); - } else{ - long long value = 0; - - GET_NUMERIC_TYPE(value) + long long value = 0; + GET_NUMERIC_TYPE(value) - ll2string(buf, sizeof(buf), value); - addReplyBulkCString(c, buf); + if (data.numeric.flags & PERCENT_CONFIG && value < 0) { + int len = ll2string(buf, sizeof(buf), -value); + buf[len] = '%'; + buf[len+1] = '\0'; } + else if (data.numeric.flags & MEMORY_CONFIG) { + ull2string(buf, sizeof(buf), value); + } else { + ll2string(buf, sizeof(buf), value); + } + addReplyBulkCString(c, buf); } static void numericConfigRewrite(typeData data, const char *name, struct rewriteConfigState *state) { @@ -2180,18 +2226,17 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite GET_NUMERIC_TYPE(value) - if (data.numeric.is_memory) { + if (data.numeric.flags & PERCENT_CONFIG && value < 0) { + rewriteConfigPercentOption(state, name, -value, data.numeric.default_value); + } else if (data.numeric.flags & MEMORY_CONFIG) { rewriteConfigBytesOption(state, name, value, data.numeric.default_value); } else { rewriteConfigNumericalOption(state, name, value, data.numeric.default_value); } } -#define INTEGER_CONFIG 0 -#define MEMORY_CONFIG 1 - -#define embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) { \ - embedCommonConfig(name, alias, flags) \ +#define embedCommonNumericalConfig(name, alias, _flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) { \ + embedCommonConfig(name, alias, _flags) \ embedConfigInterface(numericConfigInit, numericConfigSet, numericConfigGet, numericConfigRewrite) \ .data.numeric = { \ .lower_bound = (lower), \ @@ -2199,73 +2244,73 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite .default_value = (default), \ .is_valid_fn = (is_valid), \ .update_fn = (update), \ - .is_memory = (memory), + .flags = (num_conf_flags), -#define createIntConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createIntConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_INT, \ .config.i = &(config_addr) \ } \ } -#define createUIntConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createUIntConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_UINT, \ .config.ui = &(config_addr) \ } \ } -#define createLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_LONG, \ .config.l = &(config_addr) \ } \ } -#define createULongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createULongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_ULONG, \ .config.ul = &(config_addr) \ } \ } -#define createLongLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createLongLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_LONG_LONG, \ .config.ll = &(config_addr) \ } \ } -#define createULongLongConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createULongLongConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_ULONG_LONG, \ .config.ull = &(config_addr) \ } \ } -#define createSizeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createSizeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_SIZE_T, \ .config.st = &(config_addr) \ } \ } -#define createSSizeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createSSizeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_SSIZE_T, \ .config.sst = &(config_addr) \ } \ } -#define createTimeTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createTimeTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_TIME_T, \ .config.tt = &(config_addr) \ } \ } -#define createOffTConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, memory, is_valid, update) \ +#define createOffTConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ + embedCommonNumericalConfig(name, alias, flags, lower, upper, config_addr, default, num_conf_flags, is_valid, update) \ .numeric_type = NUMERIC_TYPE_OFF_T, \ .config.ot = &(config_addr) \ } \ @@ -2653,6 +2698,7 @@ standardConfig configs[] = { createSizeTConfig("hll-sparse-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hll_sparse_max_bytes, 3000, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("tracking-table-max-keys", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.tracking_table_max_keys, 1000000, INTEGER_CONFIG, NULL, NULL), /* Default: 1 million keys max. */ createSizeTConfig("client-query-buffer-limit", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, 1024*1024, LONG_MAX, server.client_max_querybuf_len, 1024*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Default: 1GB max query buffer. */ + createSSizeTConfig("maxmemory-clients", NULL, MODIFIABLE_CONFIG, -100, SSIZE_MAX, server.maxmemory_clients, 0, MEMORY_CONFIG | PERCENT_CONFIG, NULL, NULL), /* Other configs */ createTimeTConfig("repl-backlog-ttl", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.repl_backlog_time_limit, 60*60, INTEGER_CONFIG, NULL, NULL), /* Default: 1 hour */ diff --git a/src/debug.c b/src/debug.c index d29d48673..4e95e2dfd 100644 --- a/src/debug.c +++ b/src/debug.c @@ -469,6 +469,8 @@ void debugCommand(client *c) { " Return the size of different Redis core C structures.", "ZIPLIST ", " Show low level info about the ziplist encoding of .", +"CLIENT-EVICTION", +" Show low level client eviction pools info (maxmemory-clients).", NULL }; addReplyHelp(c, help); @@ -883,6 +885,23 @@ NULL addReplyError(c, "CONFIG-REWRITE-FORCE-ALL failed"); else addReply(c, shared.ok); + } else if(!strcasecmp(c->argv[1]->ptr,"client-eviction") && c->argc == 2) { + sds bucket_info = sdsempty(); + for (int j = 0; j < CLIENT_MEM_USAGE_BUCKETS; j++) { + if (j == 0) + bucket_info = sdscatprintf(bucket_info, "bucket 0"); + else + bucket_info = sdscatprintf(bucket_info, "bucket %10zu", (size_t)1<<(j-1+CLIENT_MEM_USAGE_BUCKET_MIN_LOG)); + if (j == CLIENT_MEM_USAGE_BUCKETS-1) + bucket_info = sdscatprintf(bucket_info, "+ : "); + else + bucket_info = sdscatprintf(bucket_info, " - %10zu: ", ((size_t)1<<(j+CLIENT_MEM_USAGE_BUCKET_MIN_LOG))-1); + bucket_info = sdscatprintf(bucket_info, "tot-mem: %10zu, clients: %lu\n", + server.client_mem_usage_buckets[j].mem_usage_sum, + server.client_mem_usage_buckets[j].clients->len); + } + addReplyVerbatim(c,bucket_info,sdslen(bucket_info),"txt"); + sdsfree(bucket_info); #ifdef USE_JEMALLOC } else if(!strcasecmp(c->argv[1]->ptr,"mallctl") && c->argc >= 3) { mallctl_int(c, c->argv+2, c->argc-2); diff --git a/src/multi.c b/src/multi.c index e40d2a447..b02457bb9 100644 --- a/src/multi.c +++ b/src/multi.c @@ -37,6 +37,7 @@ void initClientMultiState(client *c) { c->mstate.count = 0; c->mstate.cmd_flags = 0; c->mstate.cmd_inv_flags = 0; + c->mstate.argv_len_sums = 0; } /* Release all the resources associated with MULTI/EXEC state */ @@ -78,6 +79,7 @@ void queueMultiCommand(client *c) { c->mstate.count++; c->mstate.cmd_flags |= c->cmd->flags; c->mstate.cmd_inv_flags |= ~c->cmd->flags; + c->mstate.argv_len_sums += c->argv_len_sum + sizeof(robj*)*c->argc; } void discardTransaction(client *c) { @@ -435,3 +437,10 @@ void unwatchCommand(client *c) { c->flags &= (~CLIENT_DIRTY_CAS); addReply(c,shared.ok); } + +size_t multiStateMemOverhead(client *c) { + size_t mem = c->mstate.argv_len_sums; + /* Add watched keys overhead, Note: this doesn't take into account the watched keys themselves, because they aren't managed per-client. */ + mem += listLength(c->watched_keys) * (sizeof(listNode) + sizeof(watchedKey)); + return mem; +} diff --git a/src/networking.c b/src/networking.c index 3b522fb35..21278d783 100644 --- a/src/networking.c +++ b/src/networking.c @@ -180,15 +180,18 @@ client *createClient(connection *conn) { c->sockname = NULL; c->client_list_node = NULL; c->paused_list_node = NULL; + c->pending_read_list_node = NULL; c->client_tracking_redirection = 0; c->client_tracking_prefixes = NULL; - c->client_cron_last_memory_usage = 0; - c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL; + c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0; + c->last_memory_type = CLIENT_TYPE_NORMAL; c->auth_callback = NULL; c->auth_callback_privdata = NULL; c->auth_module = NULL; listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); listSetMatchMethod(c->pubsub_patterns,listMatchObjects); + c->mem_usage_bucket = NULL; + c->mem_usage_bucket_node = NULL; if (conn) linkClient(c); initClientMultiState(c); return c; @@ -267,7 +270,7 @@ int prepareClientToWrite(client *c) { * not install a write handler. Instead, it will be done by * handleClientsWithPendingReadsUsingThreads() upon return. */ - if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ)) + if (!clientHasPendingReplies(c) && io_threads_op == IO_THREADS_OP_IDLE) clientInstallWriteHandler(c); /* Authorize the caller to queue in the output buffer of this client. */ @@ -1288,13 +1291,13 @@ void unlinkClient(client *c) { } /* Remove from the list of pending reads if needed. */ - if (c->flags & CLIENT_PENDING_READ) { - ln = listSearchKey(server.clients_pending_read,c); - serverAssert(ln != NULL); - listDelNode(server.clients_pending_read,ln); - c->flags &= ~CLIENT_PENDING_READ; + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + if (c->pending_read_list_node != NULL) { + listDelNode(server.clients_pending_read,c->pending_read_list_node); + c->pending_read_list_node = NULL; } + /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & CLIENT_UNBLOCKED) { @@ -1430,10 +1433,15 @@ void freeClient(client *c) { * we lost the connection with the master. */ if (c->flags & CLIENT_MASTER) replicationHandleMasterDisconnection(); - /* Remove the contribution that this client gave to our + /* Remove the contribution that this client gave to our * incrementally computed memory usage. */ - server.stat_clients_type_memory[c->client_cron_last_memory_type] -= - c->client_cron_last_memory_usage; + server.stat_clients_type_memory[c->last_memory_type] -= + c->last_memory_usage; + /* Remove client from memory usage buckets */ + if (c->mem_usage_bucket) { + c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage; + listDelNode(c->mem_usage_bucket->clients, c->mem_usage_bucket_node); + } /* Release other dynamically allocated client structure fields, * and finally release the client structure itself. */ @@ -1470,6 +1478,27 @@ void freeClientAsync(client *c) { pthread_mutex_unlock(&async_free_queue_mutex); } +/* Perform processing of the client before moving on to processing the next client + * this is useful for performing operations that affect the global state but can't + * wait until we're done with all clients. In other words can't wait until beforeSleep() + * return C_ERR in case client is no longer valid after call. */ +int beforeNextClient(client *c) { + /* Skip the client processing if we're in an IO thread, in that case we'll perform + this operation later (this function is called again) in the fan-in stage of the threading mechanism */ + if (io_threads_op != IO_THREADS_OP_IDLE) + return C_OK; + /* Handle async frees */ + /* Note: this doesn't make the server.clients_to_close list redundant because of + * cases where we want an async free of a client other than myself. For example + * in ACL modifications we disconnect clients authenticated to non-existent + * users (see ACL LOAD). */ + if (c->flags & CLIENT_CLOSE_ASAP) { + freeClient(c); + return C_ERR; + } + return C_OK; +} + /* Free the clients marked as CLOSE_ASAP, return the number of clients * freed. */ int freeClientsInAsyncFreeQueue(void) { @@ -1594,7 +1623,10 @@ int writeToClient(client *c, int handler_installed) { * adDeleteFileEvent() is not thread safe: however writeToClient() * is always called with handler_installed set to 0 from threads * so we are fine. */ - if (handler_installed) connSetWriteHandler(c->conn, NULL); + if (handler_installed) { + serverAssert(io_threads_op == IO_THREADS_OP_IDLE); + connSetWriteHandler(c->conn, NULL); + } /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { @@ -1602,6 +1634,7 @@ int writeToClient(client *c, int handler_installed) { return C_ERR; } } + updateClientMemUsage(c); return C_OK; } @@ -2036,7 +2069,11 @@ int processCommandAndResetClient(client *c) { server.current_client = c; if (processCommand(c) == C_OK) { commandProcessed(c); + /* Update the client's memory to include output buffer growth following the + * processed command. */ + updateClientMemUsage(c); } + if (server.current_client == NULL) deadclient = 1; /* * Restore the old client, this is needed because when a script @@ -2117,7 +2154,8 @@ void processInputBuffer(client *c) { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ - if (c->flags & CLIENT_PENDING_READ) { + if (io_threads_op != IO_THREADS_OP_IDLE) { + serverAssert(io_threads_op == IO_THREADS_OP_READ); c->flags |= CLIENT_PENDING_COMMAND; break; } @@ -2137,6 +2175,11 @@ void processInputBuffer(client *c) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } + + /* Update client memory usage after processing the query buffer, this is + * important in case the query buffer is big and wasn't drained during + * the above loop (because of partially sent big commands). */ + updateClientMemUsage(c); } void readQueryFromClient(connection *conn) { @@ -2190,7 +2233,7 @@ void readQueryFromClient(connection *conn) { } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); - return; + goto done; } } else if (nread == 0) { if (server.verbosity <= LL_VERBOSE) { @@ -2199,7 +2242,7 @@ void readQueryFromClient(connection *conn) { sdsfree(info); } freeClientAsync(c); - return; + goto done; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a @@ -2223,12 +2266,15 @@ void readQueryFromClient(connection *conn) { sdsfree(ci); sdsfree(bytes); freeClientAsync(c); - return; + goto done; } /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ processInputBuffer(c); + +done: + beforeNextClient(c); } /* A Redis "Address String" is a colon separated ip:port pair. @@ -2306,6 +2352,7 @@ sds catClientInfoString(sds s, client *client) { if (client->flags & CLIENT_CLOSE_ASAP) *p++ = 'A'; if (client->flags & CLIENT_UNIX_SOCKET) *p++ = 'U'; if (client->flags & CLIENT_READONLY) *p++ = 'r'; + if (client->flags & CLIENT_NO_EVICT) *p++ = 'e'; if (p == flags) *p++ = 'N'; *p++ = '\0'; @@ -2317,19 +2364,10 @@ sds catClientInfoString(sds s, client *client) { *p = '\0'; /* Compute the total memory consumed by this client. */ - size_t obufmem = getClientOutputBufferMemoryUsage(client); - size_t total_mem = obufmem; - total_mem += zmalloc_size(client); /* includes client->buf */ - total_mem += sdsZmallocSize(client->querybuf); - /* For efficiency (less work keeping track of the argv memory), it doesn't include the used memory - * i.e. unused sds space and internal fragmentation, just the string length. but this is enough to - * spot problematic clients. */ - total_mem += client->argv_len_sum; - if (client->argv) - total_mem += zmalloc_size(client->argv); + size_t obufmem, total_mem = getClientMemoryUsage(client, &obufmem); return sdscatfmt(s, - "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", + "id=%U addr=%s laddr=%s %s name=%s age=%I idle=%I flags=%s db=%i sub=%i psub=%i multi=%i qbuf=%U qbuf-free=%U argv-mem=%U multi-mem=%U obl=%U oll=%U omem=%U tot-mem=%U events=%s cmd=%s user=%s redir=%I resp=%i", (unsigned long long) client->id, getClientPeerId(client), getClientSockname(client), @@ -2345,6 +2383,7 @@ sds catClientInfoString(sds s, client *client) { (unsigned long long) sdslen(client->querybuf), (unsigned long long) sdsavail(client->querybuf), (unsigned long long) client->argv_len_sum, + (unsigned long long) client->mstate.argv_len_sums, (unsigned long long) client->bufpos, (unsigned long long) listLength(client->reply), (unsigned long long) obufmem, /* should not include client->buf since we want to see 0 for static clients. */ @@ -2565,6 +2604,18 @@ NULL addReplyErrorObject(c,shared.syntaxerr); return; } + } else if (!strcasecmp(c->argv[1]->ptr,"no-evict") && c->argc == 3) { + /* CLIENT PROTECT ON|OFF */ + if (!strcasecmp(c->argv[2]->ptr,"on")) { + c->flags |= CLIENT_NO_EVICT; + addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[2]->ptr,"off")) { + c->flags &= ~CLIENT_NO_EVICT; + addReply(c,shared.ok); + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } } else if (!strcasecmp(c->argv[1]->ptr,"kill")) { /* CLIENT KILL * CLIENT KILL