From 303465af35c13691f989b3400b028a94df1235d4 Mon Sep 17 00:00:00 2001 From: Harkrishn Patro <30795839+hpatro@users.noreply.github.com> Date: Wed, 17 Feb 2021 23:13:50 +0100 Subject: [PATCH] Remove redundant pubsub list to store the patterns. (#8472) Remove redundant pubsub list to store the patterns. --- src/cluster.c | 2 +- src/dict.c | 2 +- src/pubsub.c | 36 ++++++------------------------------ src/server.c | 7 ++----- src/server.h | 10 +--------- 5 files changed, 11 insertions(+), 46 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e3fab02e9..efe2f652d 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2126,7 +2126,7 @@ int clusterProcessPacket(clusterLink *link) { /* Don't bother creating useless objects if there are no * Pub/Sub subscribers. */ if (dictSize(server.pubsub_channels) || - listLength(server.pubsub_patterns)) + dictSize(server.pubsub_patterns)) { channel_len = ntohl(hdr->data.publish.msg.channel_len); message_len = ntohl(hdr->data.publish.msg.message_len); diff --git a/src/dict.c b/src/dict.c index 6c203b850..9ae066f33 100644 --- a/src/dict.c +++ b/src/dict.c @@ -301,7 +301,7 @@ int dictAdd(dict *d, void *key, void *val) /* Low level add or find: * This function adds the entry but instead of setting a value returns the * dictEntry structure to the user, that will make sure to fill the value - * field as he wishes. + * field as they wish. * * This function is also directly exposed to the user API to be called * mainly in order to store non-pointers inside the hash value, example: diff --git a/src/pubsub.c b/src/pubsub.c index 7355e10b9..a7b370d5d 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -124,20 +124,6 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) { * Pubsub low level API *----------------------------------------------------------------------------*/ -void freePubsubPattern(void *p) { - pubsubPattern *pat = p; - - decrRefCount(pat->pattern); - zfree(pat); -} - -int listMatchPubsubPattern(void *a, void *b) { - pubsubPattern *pa = a, *pb = b; - - return (pa->client == pb->client) && - (equalStringObjects(pa->pattern,pb->pattern)); -} - /* Return the number of channels + patterns a client is subscribed to. */ int clientSubscriptionsCount(client *c) { return dictSize(c->pubsub_channels)+ @@ -212,18 +198,13 @@ int pubsubSubscribePattern(client *c, robj *pattern) { if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { retval = 1; - pubsubPattern *pat; listAddNodeTail(c->pubsub_patterns,pattern); incrRefCount(pattern); - pat = zmalloc(sizeof(*pat)); - pat->pattern = getDecodedObject(pattern); - pat->client = c; - listAddNodeTail(server.pubsub_patterns,pat); /* Add the client to the pattern -> list of clients hash table */ - de = dictFind(server.pubsub_patterns_dict,pattern); + de = dictFind(server.pubsub_patterns,pattern); if (de == NULL) { clients = listCreate(); - dictAdd(server.pubsub_patterns_dict,pattern,clients); + dictAdd(server.pubsub_patterns,pattern,clients); incrRefCount(pattern); } else { clients = dictGetVal(de); @@ -241,19 +222,14 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { dictEntry *de; list *clients; listNode *ln; - pubsubPattern pat; int retval = 0; incrRefCount(pattern); /* Protect the object. May be the same we remove */ if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { retval = 1; listDelNode(c->pubsub_patterns,ln); - pat.client = c; - pat.pattern = pattern; - ln = listSearchKey(server.pubsub_patterns,&pat); - listDelNode(server.pubsub_patterns,ln); /* Remove the client from the pattern -> clients list hash table */ - de = dictFind(server.pubsub_patterns_dict,pattern); + de = dictFind(server.pubsub_patterns,pattern); serverAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); @@ -262,7 +238,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { if (listLength(clients) == 0) { /* Free the list and associated hash entry at all if this was * the latest client. */ - dictDelete(server.pubsub_patterns_dict,pattern); + dictDelete(server.pubsub_patterns,pattern); } } /* Notify the client */ @@ -329,7 +305,7 @@ int pubsubPublishMessage(robj *channel, robj *message) { } } /* Send to clients listening to matching channels */ - di = dictGetIterator(server.pubsub_patterns_dict); + di = dictGetIterator(server.pubsub_patterns); if (di) { channel = getDecodedObject(channel); while((de = dictNext(di)) != NULL) { @@ -502,7 +478,7 @@ NULL } } else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) { /* PUBSUB NUMPAT */ - addReplyLongLong(c,listLength(server.pubsub_patterns)); + addReplyLongLong(c,dictSize(server.pubsub_patterns)); } else { addReplySubcommandSyntaxError(c); } diff --git a/src/server.c b/src/server.c index 993f3cb8b..17a25168f 100644 --- a/src/server.c +++ b/src/server.c @@ -3206,10 +3206,7 @@ void initServer(void) { } evictionPoolAlloc(); /* Initialize the LRU keys pool. */ server.pubsub_channels = dictCreate(&keylistDictType,NULL); - server.pubsub_patterns = listCreate(); - server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL); - listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); - listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); + server.pubsub_patterns = dictCreate(&keylistDictType,NULL); server.cronloops = 0; server.in_eval = 0; server.in_exec = 0; @@ -4959,7 +4956,7 @@ sds genRedisInfoString(const char *section) { server.stat_keyspace_hits, server.stat_keyspace_misses, dictSize(server.pubsub_channels), - listLength(server.pubsub_patterns), + dictSize(server.pubsub_patterns), server.stat_fork_time, server.stat_total_forks, dictSize(server.migrate_cached_sockets), diff --git a/src/server.h b/src/server.h index 2bfcc015d..8f7e70261 100644 --- a/src/server.h +++ b/src/server.h @@ -1521,8 +1521,7 @@ struct redisServer { long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ - list *pubsub_patterns; /* A list of pubsub_patterns */ - dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */ + dict *pubsub_patterns; /* A dict of pubsub_patterns */ int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an xor of NOTIFY_... flags. */ /* Cluster */ @@ -1609,11 +1608,6 @@ struct redisServer { int failover_state; /* Failover state */ }; -typedef struct pubsubPattern { - client *client; - robj *pattern; -} pubsubPattern; - #define MAX_KEYS_BUFFER 256 /* A result structure for the various getkeys function calls. It lists the @@ -2280,8 +2274,6 @@ int hashZiplistValidateIntegrity(unsigned char *zl, size_t size, int deep); /* Pub / Sub */ int pubsubUnsubscribeAllChannels(client *c, int notify); int pubsubUnsubscribeAllPatterns(client *c, int notify); -void freePubsubPattern(void *p); -int listMatchPubsubPattern(void *a, void *b); int pubsubPublishMessage(robj *channel, robj *message); void addReplyPubsubMessage(client *c, robj *channel, robj *msg);