Add metric to INFO CLIENTS: pubsub_clients. (#12849)

In INFO CLIENTS section, we already have blocked_clients and
tracking_clients. We should add a new metric showing the number of
pubsub connections, which helps performance monitoring and trouble
shooting.
This commit is contained in:
Chen Tianjie 2023-12-13 13:44:13 +08:00 committed by GitHub
parent c85a9b7896
commit f9cc25c1dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 66 additions and 9 deletions

View File

@ -1546,6 +1546,7 @@ void clearClientConnectionState(client *c) {
pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeShardAllChannels(c, 0); pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0); pubsubUnsubscribeAllPatterns(c,0);
unmarkClientAsPubSub(c);
if (c->name) { if (c->name) {
decrRefCount(c->name); decrRefCount(c->name);
@ -1556,7 +1557,7 @@ void clearClientConnectionState(client *c) {
* represent the client library behind the connection. */ * represent the client library behind the connection. */
/* Selectively clear state flags not covered above */ /* Selectively clear state flags not covered above */
c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_PUBSUB|CLIENT_REPLY_OFF| c->flags &= ~(CLIENT_ASKING|CLIENT_READONLY|CLIENT_REPLY_OFF|
CLIENT_REPLY_SKIP_NEXT|CLIENT_NO_TOUCH|CLIENT_NO_EVICT); CLIENT_REPLY_SKIP_NEXT|CLIENT_NO_TOUCH|CLIENT_NO_EVICT);
} }
@ -1631,6 +1632,7 @@ void freeClient(client *c) {
pubsubUnsubscribeAllChannels(c,0); pubsubUnsubscribeAllChannels(c,0);
pubsubUnsubscribeShardAllChannels(c, 0); pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0); pubsubUnsubscribeAllPatterns(c,0);
unmarkClientAsPubSub(c);
dictRelease(c->pubsub_channels); dictRelease(c->pubsub_channels);
dictRelease(c->pubsub_patterns); dictRelease(c->pubsub_patterns);
dictRelease(c->pubsubshard_channels); dictRelease(c->pubsubshard_channels);

View File

@ -241,6 +241,20 @@ int clientTotalPubSubSubscriptionCount(client *c) {
return clientSubscriptionsCount(c) + clientShardSubscriptionsCount(c); return clientSubscriptionsCount(c) + clientShardSubscriptionsCount(c);
} }
void markClientAsPubSub(client *c) {
if (!(c->flags & CLIENT_PUBSUB)) {
c->flags |= CLIENT_PUBSUB;
server.pubsub_clients++;
}
}
void unmarkClientAsPubSub(client *c) {
if (c->flags & CLIENT_PUBSUB) {
c->flags &= ~CLIENT_PUBSUB;
server.pubsub_clients--;
}
}
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or /* Subscribe a client to a channel. Returns 1 if the operation succeeded, or
* 0 if the client was already subscribed to that channel. */ * 0 if the client was already subscribed to that channel. */
int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) { int pubsubSubscribeChannel(client *c, robj *channel, pubsubtype type) {
@ -326,7 +340,7 @@ void pubsubShardUnsubscribeAllClients(robj *channel) {
/* If the client has no other pubsub subscription, /* If the client has no other pubsub subscription,
* move out of pubsub mode. */ * move out of pubsub mode. */
if (clientTotalPubSubSubscriptionCount(c) == 0) { if (clientTotalPubSubSubscriptionCount(c) == 0) {
c->flags &= ~CLIENT_PUBSUB; unmarkClientAsPubSub(c);
} }
} }
} }
@ -546,7 +560,7 @@ void subscribeCommand(client *c) {
} }
for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j],pubSubType); pubsubSubscribeChannel(c,c->argv[j],pubSubType);
c->flags |= CLIENT_PUBSUB; markClientAsPubSub(c);
} }
/* UNSUBSCRIBE [channel ...] */ /* UNSUBSCRIBE [channel ...] */
@ -559,7 +573,9 @@ void unsubscribeCommand(client *c) {
for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubUnsubscribeChannel(c,c->argv[j],1,pubSubType); pubsubUnsubscribeChannel(c,c->argv[j],1,pubSubType);
} }
if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; if (clientTotalPubSubSubscriptionCount(c) == 0) {
unmarkClientAsPubSub(c);
}
} }
/* PSUBSCRIBE pattern [pattern ...] */ /* PSUBSCRIBE pattern [pattern ...] */
@ -579,7 +595,7 @@ void psubscribeCommand(client *c) {
for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]); pubsubSubscribePattern(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB; markClientAsPubSub(c);
} }
/* PUNSUBSCRIBE [pattern [pattern ...]] */ /* PUNSUBSCRIBE [pattern [pattern ...]] */
@ -592,7 +608,9 @@ void punsubscribeCommand(client *c) {
for (j = 1; j < c->argc; j++) for (j = 1; j < c->argc; j++)
pubsubUnsubscribePattern(c,c->argv[j],1); pubsubUnsubscribePattern(c,c->argv[j],1);
} }
if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; if (clientTotalPubSubSubscriptionCount(c) == 0) {
unmarkClientAsPubSub(c);
}
} }
/* This function wraps pubsubPublishMessage and also propagates the message to cluster. /* This function wraps pubsubPublishMessage and also propagates the message to cluster.
@ -727,10 +745,9 @@ void ssubscribeCommand(client *c) {
} }
pubsubSubscribeChannel(c, c->argv[j], pubSubShardType); pubsubSubscribeChannel(c, c->argv[j], pubSubShardType);
} }
c->flags |= CLIENT_PUBSUB; markClientAsPubSub(c);
} }
/* SUNSUBSCRIBE [shardchannel [shardchannel ...]] */ /* SUNSUBSCRIBE [shardchannel [shardchannel ...]] */
void sunsubscribeCommand(client *c) { void sunsubscribeCommand(client *c) {
if (c->argc == 1) { if (c->argc == 1) {
@ -740,7 +757,9 @@ void sunsubscribeCommand(client *c) {
pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType); pubsubUnsubscribeChannel(c, c->argv[j], 1, pubSubShardType);
} }
} }
if (clientTotalPubSubSubscriptionCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; if (clientTotalPubSubSubscriptionCount(c) == 0) {
unmarkClientAsPubSub(c);
}
} }
size_t pubsubMemOverhead(client *c) { size_t pubsubMemOverhead(client *c) {

View File

@ -2758,6 +2758,7 @@ void initServer(void) {
server.pubsub_channels = dictCreate(&keylistDictType); server.pubsub_channels = dictCreate(&keylistDictType);
server.pubsub_patterns = dictCreate(&keylistDictType); server.pubsub_patterns = dictCreate(&keylistDictType);
server.pubsubshard_channels = dictCreate(&keylistDictType); server.pubsubshard_channels = dictCreate(&keylistDictType);
server.pubsub_clients = 0;
server.cronloops = 0; server.cronloops = 0;
server.in_exec = 0; server.in_exec = 0;
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
@ -5650,6 +5651,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"client_recent_max_output_buffer:%zu\r\n", maxout, "client_recent_max_output_buffer:%zu\r\n", maxout,
"blocked_clients:%d\r\n", server.blocked_clients, "blocked_clients:%d\r\n", server.blocked_clients,
"tracking_clients:%d\r\n", server.tracking_clients, "tracking_clients:%d\r\n", server.tracking_clients,
"pubsub_clients:%d\r\n", server.pubsub_clients,
"clients_in_timeout_table:%llu\r\n", (unsigned long long) raxSize(server.clients_timeout_table), "clients_in_timeout_table:%llu\r\n", (unsigned long long) raxSize(server.clients_timeout_table),
"total_blocking_keys:%lu\r\n", blocking_keys, "total_blocking_keys:%lu\r\n", blocking_keys,
"total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey)); "total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey));

View File

@ -1990,6 +1990,7 @@ struct redisServer {
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */ xor of NOTIFY_... flags. */
dict *pubsubshard_channels; /* Map shard channels to list of subscribed clients */ dict *pubsubshard_channels; /* Map shard channels to list of subscribed clients */
unsigned int pubsub_clients; /* # of clients in Pub/Sub mode */
/* Cluster */ /* Cluster */
int cluster_enabled; /* Is cluster enabled? */ int cluster_enabled; /* Is cluster enabled? */
int cluster_port; /* Set the cluster port for a node. */ int cluster_port; /* Set the cluster port for a node. */
@ -3199,6 +3200,7 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bu
int serverPubsubSubscriptionCount(void); int serverPubsubSubscriptionCount(void);
int serverPubsubShardSubscriptionCount(void); int serverPubsubShardSubscriptionCount(void);
size_t pubsubMemOverhead(client *c); size_t pubsubMemOverhead(client *c);
void unmarkClientAsPubSub(client *c);
/* Keyspace events notification */ /* Keyspace events notification */
void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid); void notifyKeyspaceEvent(int type, char *event, robj *key, int dbid);

View File

@ -369,5 +369,37 @@ start_server {tags {"info" "external:skip"}} {
assert_equal [getInfoProperty $info client_output_buffer_limit_disconnections] {1} assert_equal [getInfoProperty $info client_output_buffer_limit_disconnections] {1}
r config set client-output-buffer-limit $org_outbuf_limit r config set client-output-buffer-limit $org_outbuf_limit
} {OK} {logreqres:skip} ;# same as obuf-limits.tcl, skip logreqres } {OK} {logreqres:skip} ;# same as obuf-limits.tcl, skip logreqres
test {clients: pubsub clients} {
set info [r info clients]
assert_equal [getInfoProperty $info pubsub_clients] {0}
set rd1 [redis_deferring_client]
set rd2 [redis_deferring_client]
# basic count
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {1} [subscribe $rd2 {chan2}]
set info [r info clients]
assert_equal [getInfoProperty $info pubsub_clients] {2}
# unsubscribe non existing channel
assert_equal {1} [unsubscribe $rd2 {non-exist-chan}]
set info [r info clients]
assert_equal [getInfoProperty $info pubsub_clients] {2}
# count change when client unsubscribe all channels
assert_equal {0} [unsubscribe $rd2 {chan2}]
set info [r info clients]
assert_equal [getInfoProperty $info pubsub_clients] {1}
# non-pubsub clients should not be involved
assert_equal {0} [unsubscribe $rd2 {non-exist-chan}]
set info [r info clients]
assert_equal [getInfoProperty $info pubsub_clients] {1}
# close all clients
$rd1 close
$rd2 close
wait_for_condition 100 50 {
[getInfoProperty [r info clients] pubsub_clients] eq {0}
} else {
fail "pubsub clients did not clear"
}
}
} }
} }