Merge branch 'pubsub_patterns_boost' of https://github.com/leeyiw/redis into leeyiw-pubsub_patterns_boost
This commit is contained in:
commit
1b4bc60999
52
src/pubsub.c
52
src/pubsub.c
@ -206,6 +206,8 @@ int pubsubUnsubscribeChannel(client *c, robj *channel, int notify) {
|
|||||||
|
|
||||||
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
|
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */
|
||||||
int pubsubSubscribePattern(client *c, robj *pattern) {
|
int pubsubSubscribePattern(client *c, robj *pattern) {
|
||||||
|
dictEntry *de;
|
||||||
|
list *clients;
|
||||||
int retval = 0;
|
int retval = 0;
|
||||||
|
|
||||||
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
|
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
|
||||||
@ -217,6 +219,16 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
|
|||||||
pat->pattern = getDecodedObject(pattern);
|
pat->pattern = getDecodedObject(pattern);
|
||||||
pat->client = c;
|
pat->client = c;
|
||||||
listAddNodeTail(server.pubsub_patterns,pat);
|
listAddNodeTail(server.pubsub_patterns,pat);
|
||||||
|
/* Add the client to the pattern -> list of clients hash table */
|
||||||
|
de = dictFind(server.pubsub_patterns_dict,pattern);
|
||||||
|
if (de == NULL) {
|
||||||
|
clients = listCreate();
|
||||||
|
dictAdd(server.pubsub_patterns_dict,pattern,clients);
|
||||||
|
incrRefCount(pattern);
|
||||||
|
} else {
|
||||||
|
clients = dictGetVal(de);
|
||||||
|
}
|
||||||
|
listAddNodeTail(clients,c);
|
||||||
}
|
}
|
||||||
/* Notify the client */
|
/* Notify the client */
|
||||||
addReplyPubsubPatSubscribed(c,pattern);
|
addReplyPubsubPatSubscribed(c,pattern);
|
||||||
@ -226,6 +238,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
|
|||||||
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
|
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or
|
||||||
* 0 if the client was not subscribed to the specified channel. */
|
* 0 if the client was not subscribed to the specified channel. */
|
||||||
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
|
int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
|
||||||
|
dictEntry *de;
|
||||||
|
list *clients;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
pubsubPattern pat;
|
pubsubPattern pat;
|
||||||
int retval = 0;
|
int retval = 0;
|
||||||
@ -238,6 +252,18 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
|
|||||||
pat.pattern = pattern;
|
pat.pattern = pattern;
|
||||||
ln = listSearchKey(server.pubsub_patterns,&pat);
|
ln = listSearchKey(server.pubsub_patterns,&pat);
|
||||||
listDelNode(server.pubsub_patterns,ln);
|
listDelNode(server.pubsub_patterns,ln);
|
||||||
|
/* Remove the client from the pattern -> clients list hash table */
|
||||||
|
de = dictFind(server.pubsub_patterns_dict,pattern);
|
||||||
|
serverAssertWithInfo(c,NULL,de != NULL);
|
||||||
|
clients = dictGetVal(de);
|
||||||
|
ln = listSearchKey(clients,c);
|
||||||
|
serverAssertWithInfo(c,NULL,ln != NULL);
|
||||||
|
listDelNode(clients,ln);
|
||||||
|
if (listLength(clients) == 0) {
|
||||||
|
/* Free the list and associated hash entry at all if this was
|
||||||
|
* the latest client. */
|
||||||
|
dictDelete(server.pubsub_patterns_dict,pattern);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
/* Notify the client */
|
/* Notify the client */
|
||||||
if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
|
if (notify) addReplyPubsubPatUnsubscribed(c,pattern);
|
||||||
@ -284,6 +310,7 @@ int pubsubUnsubscribeAllPatterns(client *c, int notify) {
|
|||||||
int pubsubPublishMessage(robj *channel, robj *message) {
|
int pubsubPublishMessage(robj *channel, robj *message) {
|
||||||
int receivers = 0;
|
int receivers = 0;
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
|
dictIterator *di;
|
||||||
listNode *ln;
|
listNode *ln;
|
||||||
listIter li;
|
listIter li;
|
||||||
|
|
||||||
@ -302,23 +329,26 @@ int pubsubPublishMessage(robj *channel, robj *message) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
/* Send to clients listening to matching channels */
|
/* Send to clients listening to matching channels */
|
||||||
if (listLength(server.pubsub_patterns)) {
|
di = dictGetIterator(server.pubsub_patterns_dict);
|
||||||
listRewind(server.pubsub_patterns,&li);
|
if (di) {
|
||||||
channel = getDecodedObject(channel);
|
channel = getDecodedObject(channel);
|
||||||
while ((ln = listNext(&li)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
pubsubPattern *pat = ln->value;
|
robj *pattern = dictGetKey(de);
|
||||||
|
list *clients = dictGetVal(de);
|
||||||
if (stringmatchlen((char*)pat->pattern->ptr,
|
if (!stringmatchlen((char*)pattern->ptr,
|
||||||
sdslen(pat->pattern->ptr),
|
sdslen(pattern->ptr),
|
||||||
(char*)channel->ptr,
|
(char*)channel->ptr,
|
||||||
sdslen(channel->ptr),0))
|
sdslen(channel->ptr),0)) continue;
|
||||||
{
|
|
||||||
addReplyPubsubPatMessage(pat->client,
|
listRewind(clients,&li);
|
||||||
pat->pattern,channel,message);
|
while ((ln = listNext(&li)) != NULL) {
|
||||||
|
client *c = listNodeValue(ln);
|
||||||
|
addReplyPubsubPatMessage(c,pattern,channel,message);
|
||||||
receivers++;
|
receivers++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
decrRefCount(channel);
|
decrRefCount(channel);
|
||||||
|
dictReleaseIterator(di);
|
||||||
}
|
}
|
||||||
return receivers;
|
return receivers;
|
||||||
}
|
}
|
||||||
|
@ -2756,6 +2756,7 @@ void initServer(void) {
|
|||||||
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
|
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
|
||||||
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
|
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
|
||||||
server.pubsub_patterns = listCreate();
|
server.pubsub_patterns = listCreate();
|
||||||
|
server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL);
|
||||||
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
|
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
|
||||||
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
|
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
|
||||||
server.cronloops = 0;
|
server.cronloops = 0;
|
||||||
|
@ -1348,6 +1348,7 @@ struct redisServer {
|
|||||||
/* Pubsub */
|
/* Pubsub */
|
||||||
dict *pubsub_channels; /* Map channels to list of subscribed clients */
|
dict *pubsub_channels; /* Map channels to list of subscribed clients */
|
||||||
list *pubsub_patterns; /* A list of pubsub_patterns */
|
list *pubsub_patterns; /* A list of pubsub_patterns */
|
||||||
|
dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */
|
||||||
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
|
int notify_keyspace_events; /* Events to propagate via Pub/Sub. This is an
|
||||||
xor of NOTIFY_... flags. */
|
xor of NOTIFY_... flags. */
|
||||||
/* Cluster */
|
/* Cluster */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user