Optimization: remove updateClientMemUsage from i/o threads. (#10401)

In a benchmark we noticed we spend a relatively long time updating the client
memory usage leading to performance degradation.
Before #8687 this was performed in the client's cron and didn't affect performance.
But since introducing client eviction we need to perform this after filling the input
buffers and after processing commands. This also lead me to write this code to be
thread safe and perform it in the i/o threads.

It turns out that the main performance issue here is related to atomic operations
being performed while updating the total clients memory usage stats used for client
eviction (`server.stat_clients_type_memory[]`). This update needed to be atomic
because `updateClientMemUsage()` was called from the IO threads.

In this commit I make sure to call `updateClientMemUsage()` only from the main thread.
In case of threaded IO I call it for each client during the "fan-in" phase of the read/write
operation. This also means I could chuck the `updateClientMemUsageBucket()` function
which was called during this phase and embed it into `updateClientMemUsage()`.

Profiling shows this makes `updateClientMemUsage()` (on my x86_64 linux) roughly x4 faster.
This commit is contained in:
yoav-steinberg 2022-03-15 14:18:23 +02:00 committed by GitHub
parent 871fa12fec
commit cf6dcb7bf1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 36 additions and 41 deletions

View File

@ -201,7 +201,7 @@ client *createClient(connection *conn) {
c->pending_read_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
c->last_memory_usage = c->last_memory_usage_on_bucket_update = 0;
c->last_memory_usage = 0;
c->last_memory_type = CLIENT_TYPE_NORMAL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
@ -1954,7 +1954,11 @@ int writeToClient(client *c, int handler_installed) {
return C_ERR;
}
}
updateClientMemUsage(c);
/* Update client's memory usage after writing.
* Since this isn't thread safe we do this conditionally. In case of threaded writes this is done in
* handleClientsWithPendingWritesUsingThreads(). */
if (io_threads_op == IO_THREADS_OP_IDLE)
updateClientMemUsage(c);
return C_OK;
}
@ -2519,7 +2523,8 @@ int processInputBuffer(client *c) {
/* Update client memory usage after processing the query buffer, this is
* important in case the query buffer is big and wasn't drained during
* the above loop (because of partially sent big commands). */
updateClientMemUsage(c);
if (io_threads_op == IO_THREADS_OP_IDLE)
updateClientMemUsage(c);
return C_OK;
}
@ -4167,8 +4172,8 @@ int handleClientsWithPendingWritesUsingThreads(void) {
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Update the client in the mem usage buckets after we're done processing it in the io-threads */
updateClientMemUsageBucket(c);
/* Update the client in the mem usage after we're done processing it in the io-threads */
updateClientMemUsage(c);
/* Install the write handler if there are pending writes in some
* of the clients. */
@ -4276,8 +4281,8 @@ int handleClientsWithPendingReadsUsingThreads(void) {
continue;
}
/* Once io-threads are idle we can update the client in the mem usage buckets */
updateClientMemUsageBucket(c);
/* Once io-threads are idle we can update the client in the mem usage */
updateClientMemUsage(c);
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid

View File

@ -814,7 +814,7 @@ int clientsCronTrackExpansiveClients(client *c, int time_idx) {
* client's memory usage doubles it's moved up to the next bucket, if it's
* halved we move it down a bucket.
* For more details see CLIENT_MEM_USAGE_BUCKETS documentation in server.h. */
clientMemUsageBucket *getMemUsageBucket(size_t mem) {
static inline clientMemUsageBucket *getMemUsageBucket(size_t mem) {
int size_in_bits = 8*(int)sizeof(mem);
int clz = mem > 0 ? __builtin_clzl(mem) : size_in_bits;
int bucket_idx = size_in_bits - clz;
@ -831,46 +831,34 @@ clientMemUsageBucket *getMemUsageBucket(size_t mem) {
* and also from the clientsCron. We call it from the cron so we have updated
* stats for non CLIENT_TYPE_NORMAL/PUBSUB clients and in case a configuration
* change requires us to evict a non-active client.
*
* This also adds the client to the correct memory usage bucket. Each bucket contains
* all clients with roughly the same amount of memory. This way we group
* together clients consuming about the same amount of memory and can quickly
* free them in case we reach maxmemory-clients (client eviction).
*/
int updateClientMemUsage(client *c) {
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
size_t mem = getClientMemoryUsage(c, NULL);
int type = getClientType(c);
/* Remove the old value of the memory used by the client from the old
* category, and add it back. */
atomicDecr(server.stat_clients_type_memory[c->last_memory_type], c->last_memory_usage);
atomicIncr(server.stat_clients_type_memory[type], mem);
if (type != c->last_memory_type) {
server.stat_clients_type_memory[c->last_memory_type] -= c->last_memory_usage;
server.stat_clients_type_memory[type] += mem;
c->last_memory_type = type;
} else {
server.stat_clients_type_memory[type] += mem - c->last_memory_usage;
}
/* Remember what we added and where, to remove it next time. */
c->last_memory_usage = mem;
c->last_memory_type = type;
/* Update client mem usage bucket only when we're not in the context of an
* IO thread. See updateClientMemUsageBucket() for details. */
if (io_threads_op == IO_THREADS_OP_IDLE)
updateClientMemUsageBucket(c);
return 0;
}
/* Adds the client to the correct memory usage bucket. Each bucket contains
* all clients with roughly the same amount of memory. This way we group
* together clients consuming about the same amount of memory and can quickly
* free them in case we reach maxmemory-clients (client eviction).
* Note that in case of io-threads enabled we have to call this function only
* after the fan-in phase (when no io-threads are working) because the bucket
* lists are global. The io-threads themselves track per-client memory usage in
* updateClientMemUsage(). Here we update the clients to each bucket when all
* io-threads are done (both for read and write io-threading). */
void updateClientMemUsageBucket(client *c) {
serverAssert(io_threads_op == IO_THREADS_OP_IDLE);
int allow_eviction =
(c->last_memory_type == CLIENT_TYPE_NORMAL || c->last_memory_type == CLIENT_TYPE_PUBSUB) &&
(type == CLIENT_TYPE_NORMAL || type == CLIENT_TYPE_PUBSUB) &&
!(c->flags & CLIENT_NO_EVICT);
/* Update the client in the mem usage buckets */
if (c->mem_usage_bucket) {
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage_on_bucket_update;
c->mem_usage_bucket->mem_usage_sum -= c->last_memory_usage;
/* If this client can't be evicted then remove it from the mem usage
* buckets */
if (!allow_eviction) {
@ -880,8 +868,8 @@ void updateClientMemUsageBucket(client *c) {
}
}
if (allow_eviction) {
clientMemUsageBucket *bucket = getMemUsageBucket(c->last_memory_usage);
bucket->mem_usage_sum += c->last_memory_usage;
clientMemUsageBucket *bucket = getMemUsageBucket(mem);
bucket->mem_usage_sum += mem;
if (bucket != c->mem_usage_bucket) {
if (c->mem_usage_bucket)
listDelNode(c->mem_usage_bucket->clients,
@ -892,7 +880,10 @@ void updateClientMemUsageBucket(client *c) {
}
}
c->last_memory_usage_on_bucket_update = c->last_memory_usage;
/* Remember what we added, to remove it next time. */
c->last_memory_usage = mem;
return 0;
}
/* Return the max samples in the memory usage of clients tracked by

View File

@ -1172,7 +1172,6 @@ typedef struct client {
size_t last_memory_usage;
int last_memory_type;
size_t last_memory_usage_on_bucket_update;
listNode *mem_usage_bucket_node;
clientMemUsageBucket *mem_usage_bucket;
@ -1583,8 +1582,8 @@ struct redisServer {
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
double stat_module_progress; /* Module save progress. */
redisAtomic size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
size_t stat_cluster_links_memory;/* Mem usage by cluster links */
size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */
size_t stat_cluster_links_memory; /* Mem usage by cluster links */
long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */
long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */