From f9cc25c1dde7da159819380d8397aef25797409b Mon Sep 17 00:00:00 2001 From: Chen Tianjie Date: Wed, 13 Dec 2023 13:44:13 +0800 Subject: [PATCH] Add metric to INFO CLIENTS: pubsub_clients. (#12849) In INFO CLIENTS section, we already have blocked_clients and tracking_clients. We should add a new metric showing the number of pubsub connections, which helps performance monitoring and trouble shooting. --- src/networking.c | 4 +++- src/pubsub.c | 35 +++++++++++++++++++++++++++-------- src/server.c | 2 ++ src/server.h | 2 ++ tests/unit/info.tcl | 32 ++++++++++++++++++++++++++++++++ 5 files changed, 66 insertions(+), 9 deletions(-) diff --git a/src/networking.c b/src/networking.c index 847eee3d5..4d8daecb3 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1546,6 +1546,7 @@ void clearClientConnectionState(client *c) { pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeShardAllChannels(c, 0); pubsubUnsubscribeAllPatterns(c,0); + unmarkClientAsPubSub(c); if (c->name) { decrRefCount(c->name); @@ -1556,7 +1557,7 @@ void clearClientConnectionState(client *c) { * represent the client library behind the connection. */ /* Selectively clear state flags not covered above */ - c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB|CLIENT_REPLY_OFF| + c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_REPLY_OFF| CLIENT_REPLY_SKIP_NEXT|CLIENT_NO_TOUCH|CLIENT_NO_EVICT); } @@ -1631,6 +1632,7 @@ void freeClient(client *c) { pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeShardAllChannels(c, 0); pubsubUnsubscribeAllPatterns(c,0); + unmarkClientAsPubSub(c); dictRelease(c->pubsub_channels); dictRelease(c->pubsub_patterns); dictRelease(c->pubsubshard_channels); diff --git a/src/pubsub.c b/src/pubsub.c index a13c5a61f..2fe7a3ff5 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -241,6 +241,20 @@ int clientTotalPubSubSubscriptionCount(client *c) { return clientSubscriptionsCount(c) + clientShardSubscriptionsCount(c); } +void markClientAsPubSub(client *c) { + if (!(c->flags & CLIENT_PUBSUB)) { + c->flags |= CLIENT_PUBSUB; + server.pubsub_clients++; + } +} + +void unmarkClientAsPubSub(client *c) { + if (c->flags & CLIENT_PUBSUB) { + c->flags &= ~CLIENT_PUBSUB; + server.pubsub_clients--; + } +} + /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { @@ -326,7 +340,7 @@ void pubsubShardUnsubscribeAllClients(robj *channel) { /* If the client has no other pubsub subscription, * move out of pubsub mode. */ if (clientTotalPubSubSubscriptionCount(c) == 0) { - c->flags &= ~CLIENT_PUBSUB; + unmarkClientAsPubSub(c); } } } @@ -546,7 +560,7 @@ void subscribeCommand(client *c) { } for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j],pubSubType); - c->flags |= CLIENT_PUBSUB; + markClientAsPubSub(c); } /* UNSUBSCRIBE [channel ...] */ @@ -559,7 +573,9 @@ void unsubscribeCommand(client *c) { for (j = 1; j < c->argc; j++) pubsubUnsubscribeChannel(c,c->argv[j],1,pubSubType); } - if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; + if (clientTotalPubSubSubscriptionCount(c) == 0) { + unmarkClientAsPubSub(c); + } } /* PSUBSCRIBE pattern [pattern ...] */ @@ -579,7 +595,7 @@ void psubscribeCommand(client *c) { for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]); - c->flags |= CLIENT_PUBSUB; + markClientAsPubSub(c); } /* PUNSUBSCRIBE [pattern [pattern ...]] */ @@ -592,7 +608,9 @@ void punsubscribeCommand(client *c) { for (j = 1; j < c->argc; j++) pubsubUnsubscribePattern(c,c->argv[j],1); } - if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; + if (clientTotalPubSubSubscriptionCount(c) == 0) { + unmarkClientAsPubSub(c); + } } /* This function wraps pubsubPublishMessage and also propagates the message to cluster. @@ -727,10 +745,9 @@ void ssubscribeCommand(client *c) { } pubsubSubscribeChannel(c, c->argv[j], pubSubShardType); } - c->flags |= CLIENT_PUBSUB; + markClientAsPubSub(c); } - /* SUNSUBSCRIBE [shardchannel [shardchannel ...]] */ void sunsubscribeCommand(client *c) { if (c->argc == 1) { @@ -740,7 +757,9 @@ void sunsubscribeCommand(client *c) { pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType); } } - if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; + if (clientTotalPubSubSubscriptionCount(c) == 0) { + unmarkClientAsPubSub(c); + } } size_t pubsubMemOverhead(client *c) { diff --git a/src/server.c b/src/server.c index 9205cf6d4..29282958d 100644 --- a/src/server.c +++ b/src/server.c @@ -2758,6 +2758,7 @@ void initServer(void) { server.pubsub_channels = dictCreate(&keylistDictType); server.pubsub_patterns = dictCreate(&keylistDictType); server.pubsubshard_channels = dictCreate(&keylistDictType); + server.pubsub_clients = 0; server.cronloops = 0; server.in_exec = 0; server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; @@ -5650,6 +5651,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "client_recent_max_output_buffer:%zu\r\n", maxout, "blocked_clients:%d\r\n", server.blocked_clients, "tracking_clients:%d\r\n", server.tracking_clients, + "pubsub_clients:%d\r\n", server.pubsub_clients, "clients_in_timeout_table:%llu\r\n", (unsigned long long) raxSize(server.clients_timeout_table), "total_blocking_keys:%lu\r\n", blocking_keys, "total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey)); diff --git a/src/server.h b/src/server.h index 77ebb0f5b..a0b028f00 100644 --- a/src/server.h +++ b/src/server.h @@ -1990,6 +1990,7 @@ struct redisServer { int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */ dict *pubsubshard_channels; /* Map shard channels to list of subscribed clients */ + unsigned int pubsub_clients; /* # of clients in Pub/Sub mode */ /* Cluster */ int cluster_enabled; /* Is cluster enabled? */ int cluster_port; /* Set the cluster port for a node. */ @@ -3199,6 +3200,7 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bu int serverPubsubSubscriptionCount(void); int serverPubsubShardSubscriptionCount(void); size_t pubsubMemOverhead(client *c); +void unmarkClientAsPubSub(client *c); /* Keyspace events notification */ void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid); diff --git a/tests/unit/info.tcl b/tests/unit/info.tcl index 0459676ae..05e4bbb07 100644 --- a/tests/unit/info.tcl +++ b/tests/unit/info.tcl @@ -369,5 +369,37 @@ start_server {tags {"info" "external:skip"}} { assert_equal [getInfoProperty $info client_output_buffer_limit_disconnections] {1} r config set client-output-buffer-limit $org_outbuf_limit } {OK} {logreqres:skip} ;# same as obuf-limits.tcl, skip logreqres + + test {clients: pubsub clients} { + set info [r info clients] + assert_equal [getInfoProperty $info pubsub_clients] {0} + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + # basic count + assert_equal {1} [ssubscribe $rd1 {chan1}] + assert_equal {1} [subscribe $rd2 {chan2}] + set info [r info clients] + assert_equal [getInfoProperty $info pubsub_clients] {2} + # unsubscribe non existing channel + assert_equal {1} [unsubscribe $rd2 {non-exist-chan}] + set info [r info clients] + assert_equal [getInfoProperty $info pubsub_clients] {2} + # count change when client unsubscribe all channels + assert_equal {0} [unsubscribe $rd2 {chan2}] + set info [r info clients] + assert_equal [getInfoProperty $info pubsub_clients] {1} + # non-pubsub clients should not be involved + assert_equal {0} [unsubscribe $rd2 {non-exist-chan}] + set info [r info clients] + assert_equal [getInfoProperty $info pubsub_clients] {1} + # close all clients + $rd1 close + $rd2 close + wait_for_condition 100 50 { + [getInfoProperty [r info clients] pubsub_clients] eq {0} + } else { + fail "pubsub clients did not clear" + } + } } }