Add metrics for WATCH (#12966)
Redis has some special commands that mark the client's state, such as `subscribe` and `blpop`, which mark the client as `CLIENT_PUBSUB` or `CLIENT_BLOCKED`, and we have metrics for the special use cases. However, there are also other special commands, like `WATCH`, which although do not have a specific flags, and should also be considered stateful client types. For stateful clients, in many scenarios, the connections cannot be shared in "connection pool", meaning connection pool cannot be used. For example, whenever the `WATCH` command is executed, a new connection is required to put the client into the "watch state" because the watched keys are stored in the client. If different business logic requires watching different keys, separate connections must be used; otherwise, there will be contamination. This also means that if a user's business heavily relies on the `WATCH` command, a large number of connections will be required. Recently we have encountered this situation in our platform, where some users consume a significant number of connections when using Redis because of `WATCH`. I hope we can have a way to observe these special use cases and special client connections. Here I add a few monitoring metrics: 1. `watching_clients` in `INFO` reply: The number of clients currently in the "watching" state. 2. `total_watched_keys` in `INFO` reply: The total number of keys being watched. 3. `watch` in `CLIENT LIST` reply: The number of keys each client is currently watching.
This commit is contained in:
parent
c854873746
commit
50d6fe8c4b
@ -727,21 +727,6 @@ void unblockClientOnError(client *c, const char *err_str) {
|
||||
unblockClient(c, 1);
|
||||
}
|
||||
|
||||
/* sets blocking_keys to the total number of keys which has at least one client blocked on them
|
||||
* sets blocking_keys_on_nokey to the total number of keys which has at least one client
|
||||
* blocked on them to be written or deleted */
|
||||
void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey) {
|
||||
unsigned long bkeys=0, bkeys_on_nokey=0;
|
||||
for (int j = 0; j < server.dbnum; j++) {
|
||||
bkeys += dictSize(server.db[j].blocking_keys);
|
||||
bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey);
|
||||
}
|
||||
if (blocking_keys)
|
||||
*blocking_keys = bkeys;
|
||||
if (bloking_keys_on_nokey)
|
||||
*bloking_keys_on_nokey = bkeys_on_nokey;
|
||||
}
|
||||
|
||||
void blockedBeforeSleep(void) {
|
||||
/* Handle precise timeouts of blocked clients. */
|
||||
handleBlockedClientsTimeout();
|
||||
|
@ -303,6 +303,8 @@ void watchForKey(client *c, robj *key) {
|
||||
listNode *ln;
|
||||
watchedKey *wk;
|
||||
|
||||
if (listLength(c->watched_keys) == 0) server.watching_clients++;
|
||||
|
||||
/* Check if we are already watching for this key */
|
||||
listRewind(c->watched_keys,&li);
|
||||
while((ln = listNext(&li))) {
|
||||
@ -353,6 +355,7 @@ void unwatchAllKeys(client *c) {
|
||||
decrRefCount(wk->key);
|
||||
zfree(wk);
|
||||
}
|
||||
server.watching_clients--;
|
||||
}
|
||||
|
||||
/* Iterates over the watched_keys list and looks for an expired key. Keys which
|
||||
|
@ -2858,6 +2858,7 @@ sds catClientInfoString(sds s, client *client) {
|
||||
" psub=%i", (int) dictSize(client->pubsub_patterns),
|
||||
" ssub=%i", (int) dictSize(client->pubsubshard_channels),
|
||||
" multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
|
||||
" watch=%i", (int) listLength(client->watched_keys),
|
||||
" qbuf=%U", (unsigned long long) sdslen(client->querybuf),
|
||||
" qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf),
|
||||
" argv-mem=%U", (unsigned long long) client->argv_len_sum,
|
||||
|
26
src/server.c
26
src/server.c
@ -2675,6 +2675,7 @@ void initServer(void) {
|
||||
server.pubsub_patterns = dictCreate(&objToDictDictType);
|
||||
server.pubsubshard_channels = kvstoreCreate(&objToDictDictType, slot_count_bits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS);
|
||||
server.pubsub_clients = 0;
|
||||
server.watching_clients = 0;
|
||||
server.cronloops = 0;
|
||||
server.in_exec = 0;
|
||||
server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE;
|
||||
@ -5475,6 +5476,25 @@ dict *genInfoSectionDict(robj **argv, int argc, char **defaults, int *out_all, i
|
||||
return section_dict;
|
||||
}
|
||||
|
||||
/* sets blocking_keys to the total number of keys which has at least one client blocked on them.
|
||||
* sets blocking_keys_on_nokey to the total number of keys which has at least one client
|
||||
* blocked on them to be written or deleted.
|
||||
* sets watched_keys to the total number of keys which has at least on client watching on them. */
|
||||
void totalNumberOfStatefulKeys(unsigned long *blocking_keys, unsigned long *blocking_keys_on_nokey, unsigned long *watched_keys) {
|
||||
unsigned long bkeys=0, bkeys_on_nokey=0, wkeys=0;
|
||||
for (int j = 0; j < server.dbnum; j++) {
|
||||
bkeys += dictSize(server.db[j].blocking_keys);
|
||||
bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey);
|
||||
wkeys += dictSize(server.db[j].watched_keys);
|
||||
}
|
||||
if (blocking_keys)
|
||||
*blocking_keys = bkeys;
|
||||
if (blocking_keys_on_nokey)
|
||||
*blocking_keys_on_nokey = bkeys_on_nokey;
|
||||
if (watched_keys)
|
||||
*watched_keys = wkeys;
|
||||
}
|
||||
|
||||
/* Create the string returned by the INFO command. This is decoupled
|
||||
* by the INFO command itself as we need to report the same information
|
||||
* on memory corruption problems. */
|
||||
@ -5554,9 +5574,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
|
||||
/* Clients */
|
||||
if (all_sections || (dictFind(section_dict,"clients") != NULL)) {
|
||||
size_t maxin, maxout;
|
||||
unsigned long blocking_keys, blocking_keys_on_nokey;
|
||||
unsigned long blocking_keys, blocking_keys_on_nokey, watched_keys;
|
||||
getExpansiveClientsInfo(&maxin,&maxout);
|
||||
totalNumberOfBlockingKeys(&blocking_keys, &blocking_keys_on_nokey);
|
||||
totalNumberOfStatefulKeys(&blocking_keys, &blocking_keys_on_nokey, &watched_keys);
|
||||
if (sections++) info = sdscat(info,"\r\n");
|
||||
info = sdscatprintf(info, "# Clients\r\n" FMTARGS(
|
||||
"connected_clients:%lu\r\n", listLength(server.clients) - listLength(server.slaves),
|
||||
@ -5567,7 +5587,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
|
||||
"blocked_clients:%d\r\n", server.blocked_clients,
|
||||
"tracking_clients:%d\r\n", server.tracking_clients,
|
||||
"pubsub_clients:%d\r\n", server.pubsub_clients,
|
||||
"watching_clients:%d\r\n", server.watching_clients,
|
||||
"clients_in_timeout_table:%llu\r\n", (unsigned long long) raxSize(server.clients_timeout_table),
|
||||
"total_watched_keys:%lu\r\n", watched_keys,
|
||||
"total_blocking_keys:%lu\r\n", blocking_keys,
|
||||
"total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey));
|
||||
}
|
||||
|
@ -1982,6 +1982,7 @@ struct redisServer {
|
||||
xor of NOTIFY_... flags. */
|
||||
kvstore *pubsubshard_channels; /* Map shard channels in every slot to list of subscribed clients */
|
||||
unsigned int pubsub_clients; /* # of clients in Pub/Sub mode */
|
||||
unsigned int watching_clients; /* # of clients are wathcing keys */
|
||||
/* Cluster */
|
||||
int cluster_enabled; /* Is cluster enabled? */
|
||||
int cluster_port; /* Set the cluster port for a node. */
|
||||
@ -3420,7 +3421,7 @@ void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numloca
|
||||
void signalDeletedKeyAsReady(redisDb *db, robj *key, int type);
|
||||
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
|
||||
void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with);
|
||||
void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey);
|
||||
void totalNumberOfStatefulKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey, unsigned long *watched_keys);
|
||||
void blockedBeforeSleep(void);
|
||||
|
||||
/* timeout.c -- Blocked clients timeout and connections timeout. */
|
||||
|
@ -401,5 +401,66 @@ start_server {tags {"info" "external:skip"}} {
|
||||
fail "pubsub clients did not clear"
|
||||
}
|
||||
}
|
||||
|
||||
test {clients: watching clients} {
|
||||
set r2 [redis_client]
|
||||
assert_equal [s watching_clients] 0
|
||||
assert_equal [s total_watched_keys] 0
|
||||
assert_match {*watch=0*} [r client info]
|
||||
assert_match {*watch=0*} [$r2 client info]
|
||||
# count after watch key
|
||||
$r2 watch key
|
||||
assert_equal [s watching_clients] 1
|
||||
assert_equal [s total_watched_keys] 1
|
||||
assert_match {*watch=0*} [r client info]
|
||||
assert_match {*watch=1*} [$r2 client info]
|
||||
# the same client watch the same key has no effect
|
||||
$r2 watch key
|
||||
assert_equal [s watching_clients] 1
|
||||
assert_equal [s total_watched_keys] 1
|
||||
assert_match {*watch=0*} [r client info]
|
||||
assert_match {*watch=1*} [$r2 client info]
|
||||
# different client watch different key
|
||||
r watch key2
|
||||
assert_equal [s watching_clients] 2
|
||||
assert_equal [s total_watched_keys] 2
|
||||
assert_match {*watch=1*} [$r2 client info]
|
||||
assert_match {*watch=1*} [r client info]
|
||||
# count after unwatch
|
||||
r unwatch
|
||||
assert_equal [s watching_clients] 1
|
||||
assert_equal [s total_watched_keys] 1
|
||||
assert_match {*watch=0*} [r client info]
|
||||
assert_match {*watch=1*} [$r2 client info]
|
||||
$r2 unwatch
|
||||
assert_equal [s watching_clients] 0
|
||||
assert_equal [s total_watched_keys] 0
|
||||
assert_match {*watch=0*} [r client info]
|
||||
assert_match {*watch=0*} [$r2 client info]
|
||||
|
||||
# count after watch/multi/exec
|
||||
$r2 watch key
|
||||
assert_equal [s watching_clients] 1
|
||||
$r2 multi
|
||||
$r2 exec
|
||||
assert_equal [s watching_clients] 0
|
||||
# count after watch/multi/discard
|
||||
$r2 watch key
|
||||
assert_equal [s watching_clients] 1
|
||||
$r2 multi
|
||||
$r2 discard
|
||||
assert_equal [s watching_clients] 0
|
||||
# discard without multi has no effect
|
||||
$r2 watch key
|
||||
assert_equal [s watching_clients] 1
|
||||
catch {$r2 discard} e
|
||||
assert_equal [s watching_clients] 1
|
||||
# unwatch without watch has no effect
|
||||
r unwatch
|
||||
assert_equal [s watching_clients] 1
|
||||
# after disconnect
|
||||
$r2 close
|
||||
assert_equal [s watching_clients] 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7,7 +7,7 @@ start_server {tags {"introspection"}} {
|
||||
|
||||
test {CLIENT LIST} {
|
||||
r client list
|
||||
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*}
|
||||
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*}
|
||||
|
||||
test {CLIENT LIST with IDs} {
|
||||
set myid [r client id]
|
||||
@ -17,7 +17,7 @@ start_server {tags {"introspection"}} {
|
||||
|
||||
test {CLIENT INFO} {
|
||||
r client info
|
||||
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*}
|
||||
} {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*}
|
||||
|
||||
test {CLIENT KILL with illegal arguments} {
|
||||
assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill}
|
||||
|
Loading…
x
Reference in New Issue
Block a user