diff --git a/src/blocked.c b/src/blocked.c index 2ada95760..ff915aab0 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -727,21 +727,6 @@ void unblockClientOnError(client *c, const char *err_str) { unblockClient(c, 1); } -/* sets blocking_keys to the total number of keys which has at least one client blocked on them - * sets blocking_keys_on_nokey to the total number of keys which has at least one client - * blocked on them to be written or deleted */ -void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey) { - unsigned long bkeys=0, bkeys_on_nokey=0; - for (int j = 0; j < server.dbnum; j++) { - bkeys += dictSize(server.db[j].blocking_keys); - bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey); - } - if (blocking_keys) - *blocking_keys = bkeys; - if (bloking_keys_on_nokey) - *bloking_keys_on_nokey = bkeys_on_nokey; -} - void blockedBeforeSleep(void) { /* Handle precise timeouts of blocked clients. */ handleBlockedClientsTimeout(); diff --git a/src/multi.c b/src/multi.c index 1e331b36f..a51687363 100644 --- a/src/multi.c +++ b/src/multi.c @@ -303,6 +303,8 @@ void watchForKey(client *c, robj *key) { listNode *ln; watchedKey *wk; + if (listLength(c->watched_keys) == 0) server.watching_clients++; + /* Check if we are already watching for this key */ listRewind(c->watched_keys,&li); while((ln = listNext(&li))) { @@ -353,6 +355,7 @@ void unwatchAllKeys(client *c) { decrRefCount(wk->key); zfree(wk); } + server.watching_clients--; } /* Iterates over the watched_keys list and looks for an expired key. Keys which diff --git a/src/networking.c b/src/networking.c index 69c183d6e..d36d64dee 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2858,6 +2858,7 @@ sds catClientInfoString(sds s, client *client) { " psub=%i", (int) dictSize(client->pubsub_patterns), " ssub=%i", (int) dictSize(client->pubsubshard_channels), " multi=%i", (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, + " watch=%i", (int) listLength(client->watched_keys), " qbuf=%U", (unsigned long long) sdslen(client->querybuf), " qbuf-free=%U", (unsigned long long) sdsavail(client->querybuf), " argv-mem=%U", (unsigned long long) client->argv_len_sum, diff --git a/src/server.c b/src/server.c index 1040c50c8..7ec315e8f 100644 --- a/src/server.c +++ b/src/server.c @@ -2675,6 +2675,7 @@ void initServer(void) { server.pubsub_patterns = dictCreate(&objToDictDictType); server.pubsubshard_channels = kvstoreCreate(&objToDictDictType, slot_count_bits, KVSTORE_ALLOCATE_DICTS_ON_DEMAND | KVSTORE_FREE_EMPTY_DICTS); server.pubsub_clients = 0; + server.watching_clients = 0; server.cronloops = 0; server.in_exec = 0; server.busy_module_yield_flags = BUSY_MODULE_YIELD_NONE; @@ -5475,6 +5476,25 @@ dict *genInfoSectionDict(robj **argv, int argc, char **defaults, int *out_all, i return section_dict; } +/* sets blocking_keys to the total number of keys which has at least one client blocked on them. + * sets blocking_keys_on_nokey to the total number of keys which has at least one client + * blocked on them to be written or deleted. + * sets watched_keys to the total number of keys which has at least on client watching on them. */ +void totalNumberOfStatefulKeys(unsigned long *blocking_keys, unsigned long *blocking_keys_on_nokey, unsigned long *watched_keys) { + unsigned long bkeys=0, bkeys_on_nokey=0, wkeys=0; + for (int j = 0; j < server.dbnum; j++) { + bkeys += dictSize(server.db[j].blocking_keys); + bkeys_on_nokey += dictSize(server.db[j].blocking_keys_unblock_on_nokey); + wkeys += dictSize(server.db[j].watched_keys); + } + if (blocking_keys) + *blocking_keys = bkeys; + if (blocking_keys_on_nokey) + *blocking_keys_on_nokey = bkeys_on_nokey; + if (watched_keys) + *watched_keys = wkeys; +} + /* Create the string returned by the INFO command. This is decoupled * by the INFO command itself as we need to report the same information * on memory corruption problems. */ @@ -5554,9 +5574,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { /* Clients */ if (all_sections || (dictFind(section_dict,"clients") != NULL)) { size_t maxin, maxout; - unsigned long blocking_keys, blocking_keys_on_nokey; + unsigned long blocking_keys, blocking_keys_on_nokey, watched_keys; getExpansiveClientsInfo(&maxin,&maxout); - totalNumberOfBlockingKeys(&blocking_keys, &blocking_keys_on_nokey); + totalNumberOfStatefulKeys(&blocking_keys, &blocking_keys_on_nokey, &watched_keys); if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# Clients\r\n" FMTARGS( "connected_clients:%lu\r\n", listLength(server.clients) - listLength(server.slaves), @@ -5567,7 +5587,9 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) { "blocked_clients:%d\r\n", server.blocked_clients, "tracking_clients:%d\r\n", server.tracking_clients, "pubsub_clients:%d\r\n", server.pubsub_clients, + "watching_clients:%d\r\n", server.watching_clients, "clients_in_timeout_table:%llu\r\n", (unsigned long long) raxSize(server.clients_timeout_table), + "total_watched_keys:%lu\r\n", watched_keys, "total_blocking_keys:%lu\r\n", blocking_keys, "total_blocking_keys_on_nokey:%lu\r\n", blocking_keys_on_nokey)); } diff --git a/src/server.h b/src/server.h index 3b0e9e96e..f0be18fe2 100644 --- a/src/server.h +++ b/src/server.h @@ -1982,6 +1982,7 @@ struct redisServer { xor of NOTIFY_... flags. */ kvstore *pubsubshard_channels; /* Map shard channels in every slot to list of subscribed clients */ unsigned int pubsub_clients; /* # of clients in Pub/Sub mode */ + unsigned int watching_clients; /* # of clients are wathcing keys */ /* Cluster */ int cluster_enabled; /* Is cluster enabled? */ int cluster_port; /* Set the cluster port for a node. */ @@ -3420,7 +3421,7 @@ void blockForAofFsync(client *c, mstime_t timeout, long long offset, int numloca void signalDeletedKeyAsReady(redisDb *db, robj *key, int type); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors); void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with); -void totalNumberOfBlockingKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey); +void totalNumberOfStatefulKeys(unsigned long *blocking_keys, unsigned long *bloking_keys_on_nokey, unsigned long *watched_keys); void blockedBeforeSleep(void); /* timeout.c -- Blocked clients timeout and connections timeout. */ diff --git a/tests/unit/info.tcl b/tests/unit/info.tcl index 05e4bbb07..1852717ab 100644 --- a/tests/unit/info.tcl +++ b/tests/unit/info.tcl @@ -401,5 +401,66 @@ start_server {tags {"info" "external:skip"}} { fail "pubsub clients did not clear" } } + + test {clients: watching clients} { + set r2 [redis_client] + assert_equal [s watching_clients] 0 + assert_equal [s total_watched_keys] 0 + assert_match {*watch=0*} [r client info] + assert_match {*watch=0*} [$r2 client info] + # count after watch key + $r2 watch key + assert_equal [s watching_clients] 1 + assert_equal [s total_watched_keys] 1 + assert_match {*watch=0*} [r client info] + assert_match {*watch=1*} [$r2 client info] + # the same client watch the same key has no effect + $r2 watch key + assert_equal [s watching_clients] 1 + assert_equal [s total_watched_keys] 1 + assert_match {*watch=0*} [r client info] + assert_match {*watch=1*} [$r2 client info] + # different client watch different key + r watch key2 + assert_equal [s watching_clients] 2 + assert_equal [s total_watched_keys] 2 + assert_match {*watch=1*} [$r2 client info] + assert_match {*watch=1*} [r client info] + # count after unwatch + r unwatch + assert_equal [s watching_clients] 1 + assert_equal [s total_watched_keys] 1 + assert_match {*watch=0*} [r client info] + assert_match {*watch=1*} [$r2 client info] + $r2 unwatch + assert_equal [s watching_clients] 0 + assert_equal [s total_watched_keys] 0 + assert_match {*watch=0*} [r client info] + assert_match {*watch=0*} [$r2 client info] + + # count after watch/multi/exec + $r2 watch key + assert_equal [s watching_clients] 1 + $r2 multi + $r2 exec + assert_equal [s watching_clients] 0 + # count after watch/multi/discard + $r2 watch key + assert_equal [s watching_clients] 1 + $r2 multi + $r2 discard + assert_equal [s watching_clients] 0 + # discard without multi has no effect + $r2 watch key + assert_equal [s watching_clients] 1 + catch {$r2 discard} e + assert_equal [s watching_clients] 1 + # unwatch without watch has no effect + r unwatch + assert_equal [s watching_clients] 1 + # after disconnect + $r2 close + assert_equal [s watching_clients] 0 + } } } diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 5e3528439..194ff09e0 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -7,7 +7,7 @@ start_server {tags {"introspection"}} { test {CLIENT LIST} { r client list - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|list user=* redir=-1 resp=*} test {CLIENT LIST with IDs} { set myid [r client id] @@ -17,7 +17,7 @@ start_server {tags {"introspection"}} { test {CLIENT INFO} { r client info - } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*} + } {id=* addr=*:* laddr=*:* fd=* name=* age=* idle=* flags=N db=* sub=0 psub=0 ssub=0 multi=-1 watch=0 qbuf=26 qbuf-free=* argv-mem=* multi-mem=0 rbs=* rbp=* obl=0 oll=0 omem=0 tot-mem=* events=r cmd=client|info user=* redir=-1 resp=*} test {CLIENT KILL with illegal arguments} { assert_error "ERR wrong number of arguments for 'client|kill' command" {r client kill}