Remove redundant pubsub list to store the patterns. (#8472)

Remove redundant pubsub list to store the patterns.
This commit is contained in:
Harkrishn Patro 2021-02-17 23:13:50 +01:00 committed by GitHub
parent 71ab81ec69
commit 303465af35
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 11 additions and 46 deletions

View File

@ -2126,7 +2126,7 @@ int clusterProcessPacket(clusterLink *link) {
/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
if (dictSize(server.pubsub_channels) ||
listLength(server.pubsub_patterns))
dictSize(server.pubsub_patterns))
{
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);

View File

@ -301,7 +301,7 @@ int dictAdd(dict *d, void *key, void *val)
/* Low level add or find:
* This function adds the entry but instead of setting a value returns the
* dictEntry structure to the user, that will make sure to fill the value
* field as he wishes.
* field as they wish.
*
* This function is also directly exposed to the user API to be called
* mainly in order to store non-pointers inside the hash value, example:

View File

@ -124,20 +124,6 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
* Pubsub low level API
*----------------------------------------------------------------------------*/
void freePubsubPattern(void *p) {
pubsubPattern *pat = p;
decrRefCount(pat->pattern);
zfree(pat);
}
int listMatchPubsubPattern(void *a, void *b) {
pubsubPattern *pa = a, *pb = b;
return (pa->client == pb->client) &&
(equalStringObjects(pa->pattern,pb->pattern));
}
/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
return dictSize(c->pubsub_channels)+
@ -212,18 +198,13 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
retval = 1;
pubsubPattern *pat;
listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern);
pat = zmalloc(sizeof(*pat));
pat->pattern = getDecodedObject(pattern);
pat->client = c;
listAddNodeTail(server.pubsub_patterns,pat);
/* Add the client to the pattern -> list of clients hash table */
de = dictFind(server.pubsub_patterns_dict,pattern);
de = dictFind(server.pubsub_patterns,pattern);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_patterns_dict,pattern,clients);
dictAdd(server.pubsub_patterns,pattern,clients);
incrRefCount(pattern);
} else {
clients = dictGetVal(de);
@ -241,19 +222,14 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
dictEntry *de;
list *clients;
listNode *ln;
pubsubPattern pat;
int retval = 0;
incrRefCount(pattern); /* Protect the object. May be the same we remove */
if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
retval = 1;
listDelNode(c->pubsub_patterns,ln);
pat.client = c;
pat.pattern = pattern;
ln = listSearchKey(server.pubsub_patterns,&pat);
listDelNode(server.pubsub_patterns,ln);
/* Remove the client from the pattern -> clients list hash table */
de = dictFind(server.pubsub_patterns_dict,pattern);
de = dictFind(server.pubsub_patterns,pattern);
serverAssertWithInfo(c,NULL,de != NULL);
clients = dictGetVal(de);
ln = listSearchKey(clients,c);
@ -262,7 +238,7 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
if (listLength(clients) == 0) {
/* Free the list and associated hash entry at all if this was
* the latest client. */
dictDelete(server.pubsub_patterns_dict,pattern);
dictDelete(server.pubsub_patterns,pattern);
}
}
/* Notify the client */
@ -329,7 +305,7 @@ int pubsubPublishMessage(robj *channel, robj *message) {
}
}
/* Send to clients listening to matching channels */
di = dictGetIterator(server.pubsub_patterns_dict);
di = dictGetIterator(server.pubsub_patterns);
if (di) {
channel = getDecodedObject(channel);
while((de = dictNext(di)) != NULL) {
@ -502,7 +478,7 @@ NULL
}
} else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
/* PUBSUB NUMPAT */
addReplyLongLong(c,listLength(server.pubsub_patterns));
addReplyLongLong(c,dictSize(server.pubsub_patterns));
} else {
addReplySubcommandSyntaxError(c);
}

View File

@ -3206,10 +3206,7 @@ void initServer(void) {
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate();
server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL);
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
server.pubsub_patterns = dictCreate(&keylistDictType,NULL);
server.cronloops = 0;
server.in_eval = 0;
server.in_exec = 0;
@ -4959,7 +4956,7 @@ sds genRedisInfoString(const char *section) {
server.stat_keyspace_hits,
server.stat_keyspace_misses,
dictSize(server.pubsub_channels),
listLength(server.pubsub_patterns),
dictSize(server.pubsub_patterns),
server.stat_fork_time,
server.stat_total_forks,
dictSize(server.migrate_cached_sockets),

View File

@ -1521,8 +1521,7 @@ struct redisServer {
long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */
/* Pubsub */
dict *pubsub_channels; /* Map channels to list of subscribed clients */
list *pubsub_patterns; /* A list of pubsub_patterns */
dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */
dict *pubsub_patterns; /* A dict of pubsub_patterns */
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
xor of NOTIFY_... flags. */
/* Cluster */
@ -1609,11 +1608,6 @@ struct redisServer {
int failover_state; /* Failover state */
};
typedef struct pubsubPattern {
client *client;
robj *pattern;
} pubsubPattern;
#define MAX_KEYS_BUFFER 256
/* A result structure for the various getkeys function calls. It lists the
@ -2280,8 +2274,6 @@ int hashZiplistValidateIntegrity(unsigned char *zl, size_t size, int deep);
/* Pub / Sub */
int pubsubUnsubscribeAllChannels(client *c, int notify);
int pubsubUnsubscribeAllPatterns(client *c, int notify);
void freePubsubPattern(void *p);
int listMatchPubsubPattern(void *a, void *b);
int pubsubPublishMessage(robj *channel, robj *message);
void addReplyPubsubMessage(client *c, robj *channel, robj *msg);