From b51062497825f0c6cfd8201702f3d2178b528456 Mon Sep 17 00:00:00 2001 From: Binbin Date: Mon, 19 Jun 2023 21:31:18 +0800 Subject: [PATCH] Optimize PSUBSCRIBE and PUNSUBSCRIBE from O(N*M) to O(N) (#12298) In the original implementation, the time complexity of the commands is actually O(N*M), where N is the number of patterns the client is already subscribed and M is the number of patterns to subscribe to. The docs are all wrong about this. Specifically, because the original client->pubsub_patterns is a list, so we need to do listSearchKey which is O(N). In this PR, we change it to a dict, so the search becomes O(1). At the same time, both pubsub_channels and pubsubshard_channels are dicts. Changing pubsub_patterns to a dictionary improves the readability and maintainability of the code. --- src/acl.c | 17 +++++++++-------- src/commands.def | 8 ++++---- src/commands/psubscribe.json | 2 +- src/commands/punsubscribe.json | 2 +- src/commands/sunsubscribe.json | 2 +- src/commands/unsubscribe.json | 2 +- src/networking.c | 12 +++--------- src/pubsub.c | 26 ++++++++++++++------------ src/server.h | 2 +- 9 files changed, 35 insertions(+), 38 deletions(-) diff --git a/src/acl.c b/src/acl.c index f518dd1cb..aa42c58dc 100644 --- a/src/acl.c +++ b/src/acl.c @@ -1923,26 +1923,28 @@ void ACLKillPubsubClientsIfNeeded(user *new, user *original) { if (c->user == original && getClientType(c) == CLIENT_TYPE_PUBSUB) { /* Check for pattern violations. */ - listRewind(c->pubsub_patterns,&lpi); - while (!kill && ((lpn = listNext(&lpi)) != NULL)) { - - o = lpn->value; + dictIterator *di = dictGetIterator(c->pubsub_patterns); + dictEntry *de; + while (!kill && ((de = dictNext(di)) != NULL)) { + o = dictGetKey(de); int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 1); kill = (res == ACL_DENIED_CHANNEL); } + dictReleaseIterator(di); + /* Check for channel violations. */ if (!kill) { /* Check for global channels violation. */ - dictIterator *di = dictGetIterator(c->pubsub_channels); - - dictEntry *de; + di = dictGetIterator(c->pubsub_channels); while (!kill && ((de = dictNext(di)) != NULL)) { o = dictGetKey(de); int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0); kill = (res == ACL_DENIED_CHANNEL); } dictReleaseIterator(di); + } + if (!kill) { /* Check for shard channels violation. */ di = dictGetIterator(c->pubsubshard_channels); while (!kill && ((de = dictNext(di)) != NULL)) { @@ -1950,7 +1952,6 @@ void ACLKillPubsubClientsIfNeeded(user *new, user *original) { int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0); kill = (res == ACL_DENIED_CHANNEL); } - dictReleaseIterator(di); } diff --git a/src/commands.def b/src/commands.def index 95bdacbad..7e575648c 100644 --- a/src/commands.def +++ b/src/commands.def @@ -10692,15 +10692,15 @@ struct COMMAND_STRUCT redisCommandTable[] = { {MAKE_CMD("rpush","Appends one or more elements to a list. Creates the key if it doesn't exist.","O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments.","1.0.0",CMD_DOC_NONE,NULL,NULL,"list",COMMAND_GROUP_LIST,RPUSH_History,1,RPUSH_Tips,0,rpushCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_LIST,RPUSH_Keyspecs,1,NULL,2),.args=RPUSH_Args}, {MAKE_CMD("rpushx","Appends an element to a list only when the list exists.","O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments.","2.2.0",CMD_DOC_NONE,NULL,NULL,"list",COMMAND_GROUP_LIST,RPUSHX_History,1,RPUSHX_Tips,0,rpushxCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_LIST,RPUSHX_Keyspecs,1,NULL,2),.args=RPUSHX_Args}, /* pubsub */ -{MAKE_CMD("psubscribe","Listens for messages published to channels that match one or more patterns.","O(N) where N is the number of patterns the client is already subscribed to.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PSUBSCRIBE_History,0,PSUBSCRIBE_Tips,0,psubscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,PSUBSCRIBE_Keyspecs,0,NULL,1),.args=PSUBSCRIBE_Args}, +{MAKE_CMD("psubscribe","Listens for messages published to channels that match one or more patterns.","O(N) where N is the number of patterns to subscribe to.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PSUBSCRIBE_History,0,PSUBSCRIBE_Tips,0,psubscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,PSUBSCRIBE_Keyspecs,0,NULL,1),.args=PSUBSCRIBE_Args}, {MAKE_CMD("publish","Posts a message to a channel.","O(N+M) where N is the number of clients subscribed to the receiving channel and M is the total number of subscribed patterns (by any client).","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUBLISH_History,0,PUBLISH_Tips,0,publishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE|CMD_SENTINEL,0,PUBLISH_Keyspecs,0,NULL,2),.args=PUBLISH_Args}, {MAKE_CMD("pubsub","A container for Pub/Sub commands.","Depends on subcommand.","2.8.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUBSUB_History,0,PUBSUB_Tips,0,NULL,-2,0,0,PUBSUB_Keyspecs,0,NULL,0),.subcommands=PUBSUB_Subcommands}, -{MAKE_CMD("punsubscribe","Stops listening to messages published to channels that match one or more patterns.","O(N+M) where N is the number of patterns the client is already subscribed and M is the number of total patterns subscribed in the system (by any client).","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUNSUBSCRIBE_History,0,PUNSUBSCRIBE_Tips,0,punsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,PUNSUBSCRIBE_Keyspecs,0,NULL,1),.args=PUNSUBSCRIBE_Args}, +{MAKE_CMD("punsubscribe","Stops listening to messages published to channels that match one or more patterns.","O(N) where N is the number of patterns to unsubscribe.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUNSUBSCRIBE_History,0,PUNSUBSCRIBE_Tips,0,punsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,PUNSUBSCRIBE_Keyspecs,0,NULL,1),.args=PUNSUBSCRIBE_Args}, {MAKE_CMD("spublish","Post a message to a shard channel","O(N) where N is the number of clients subscribed to the receiving shard channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SPUBLISH_History,0,SPUBLISH_Tips,0,spublishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE,0,SPUBLISH_Keyspecs,1,NULL,2),.args=SPUBLISH_Args}, {MAKE_CMD("ssubscribe","Listens for messages published to shard channels.","O(N) where N is the number of shard channels to subscribe to.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SSUBSCRIBE_History,0,SSUBSCRIBE_Tips,0,ssubscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,SSUBSCRIBE_Keyspecs,1,NULL,1),.args=SSUBSCRIBE_Args}, {MAKE_CMD("subscribe","Listens for messages published to channels.","O(N) where N is the number of channels to subscribe to.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SUBSCRIBE_History,0,SUBSCRIBE_Tips,0,subscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,SUBSCRIBE_Keyspecs,0,NULL,1),.args=SUBSCRIBE_Args}, -{MAKE_CMD("sunsubscribe","Stops listening to messages posted to shard channels.","O(N) where N is the number of clients already subscribed to a shard channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SUNSUBSCRIBE_History,0,SUNSUBSCRIBE_Tips,0,sunsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,SUNSUBSCRIBE_Keyspecs,1,NULL,1),.args=SUNSUBSCRIBE_Args}, -{MAKE_CMD("unsubscribe","Stops listening to messages posted to channels.","O(N) where N is the number of clients already subscribed to a channel.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,UNSUBSCRIBE_History,0,UNSUBSCRIBE_Tips,0,unsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,UNSUBSCRIBE_Keyspecs,0,NULL,1),.args=UNSUBSCRIBE_Args}, +{MAKE_CMD("sunsubscribe","Stops listening to messages posted to shard channels.","O(N) where N is the number of shard channels to unsubscribe.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SUNSUBSCRIBE_History,0,SUNSUBSCRIBE_Tips,0,sunsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,SUNSUBSCRIBE_Keyspecs,1,NULL,1),.args=SUNSUBSCRIBE_Args}, +{MAKE_CMD("unsubscribe","Stops listening to messages posted to channels.","O(N) where N is the number of channels to unsubscribe.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,UNSUBSCRIBE_History,0,UNSUBSCRIBE_Tips,0,unsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,UNSUBSCRIBE_Keyspecs,0,NULL,1),.args=UNSUBSCRIBE_Args}, /* scripting */ {MAKE_CMD("eval","Executes a server-side Lua script.","Depends on the script that is executed.","2.6.0",CMD_DOC_NONE,NULL,NULL,"scripting",COMMAND_GROUP_SCRIPTING,EVAL_History,0,EVAL_Tips,0,evalCommand,-3,CMD_NOSCRIPT|CMD_SKIP_MONITOR|CMD_MAY_REPLICATE|CMD_NO_MANDATORY_KEYS|CMD_STALE,ACL_CATEGORY_SCRIPTING,EVAL_Keyspecs,1,evalGetKeys,4),.args=EVAL_Args}, {MAKE_CMD("evalsha","Executes a server-side Lua script by SHA1 digest.","Depends on the script that is executed.","2.6.0",CMD_DOC_NONE,NULL,NULL,"scripting",COMMAND_GROUP_SCRIPTING,EVALSHA_History,0,EVALSHA_Tips,0,evalShaCommand,-3,CMD_NOSCRIPT|CMD_SKIP_MONITOR|CMD_MAY_REPLICATE|CMD_NO_MANDATORY_KEYS|CMD_STALE,ACL_CATEGORY_SCRIPTING,EVALSHA_Keyspecs,1,evalGetKeys,4),.args=EVALSHA_Args}, diff --git a/src/commands/psubscribe.json b/src/commands/psubscribe.json index db9592139..cab5d14ef 100644 --- a/src/commands/psubscribe.json +++ b/src/commands/psubscribe.json @@ -1,7 +1,7 @@ { "PSUBSCRIBE": { "summary": "Listens for messages published to channels that match one or more patterns.", - "complexity": "O(N) where N is the number of patterns the client is already subscribed to.", + "complexity": "O(N) where N is the number of patterns to subscribe to.", "group": "pubsub", "since": "2.0.0", "arity": -2, diff --git a/src/commands/punsubscribe.json b/src/commands/punsubscribe.json index 3a074517b..cb977d8ca 100644 --- a/src/commands/punsubscribe.json +++ b/src/commands/punsubscribe.json @@ -1,7 +1,7 @@ { "PUNSUBSCRIBE": { "summary": "Stops listening to messages published to channels that match one or more patterns.", - "complexity": "O(N+M) where N is the number of patterns the client is already subscribed and M is the number of total patterns subscribed in the system (by any client).", + "complexity": "O(N) where N is the number of patterns to unsubscribe.", "group": "pubsub", "since": "2.0.0", "arity": -1, diff --git a/src/commands/sunsubscribe.json b/src/commands/sunsubscribe.json index 7965a05ea..2d68fc7b1 100644 --- a/src/commands/sunsubscribe.json +++ b/src/commands/sunsubscribe.json @@ -1,7 +1,7 @@ { "SUNSUBSCRIBE": { "summary": "Stops listening to messages posted to shard channels.", - "complexity": "O(N) where N is the number of clients already subscribed to a shard channel.", + "complexity": "O(N) where N is the number of shard channels to unsubscribe.", "group": "pubsub", "since": "7.0.0", "arity": -1, diff --git a/src/commands/unsubscribe.json b/src/commands/unsubscribe.json index 530facde4..e8586bc7a 100644 --- a/src/commands/unsubscribe.json +++ b/src/commands/unsubscribe.json @@ -1,7 +1,7 @@ { "UNSUBSCRIBE": { "summary": "Stops listening to messages posted to channels.", - "complexity": "O(N) where N is the number of clients already subscribed to a channel.", + "complexity": "O(N) where N is the number of channels to unsubscribe.", "group": "pubsub", "since": "2.0.0", "arity": -1, diff --git a/src/networking.c b/src/networking.c index 7f59c958e..7d796456c 100644 --- a/src/networking.c +++ b/src/networking.c @@ -85,10 +85,6 @@ void freeClientReplyValue(void *o) { zfree(o); } -int listMatchObjects(void *a, void *b) { - return equalStringObjects(a,b); -} - /* This function links the client to the global linked list of clients. * unlinkClient() does the opposite, among other things. */ void linkClient(client *c) { @@ -197,7 +193,7 @@ client *createClient(connection *conn) { c->woff = 0; c->watched_keys = listCreate(); c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType); - c->pubsub_patterns = listCreate(); + c->pubsub_patterns = dictCreate(&objectKeyPointerValueDictType); c->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType); c->peerid = NULL; c->sockname = NULL; @@ -214,8 +210,6 @@ client *createClient(connection *conn) { c->auth_callback_privdata = NULL; c->auth_module = NULL; listInitNode(&c->clients_pending_write_node, c); - listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid); - listSetMatchMethod(c->pubsub_patterns,listMatchObjects); c->mem_usage_bucket = NULL; c->mem_usage_bucket_node = NULL; if (conn) linkClient(c); @@ -1599,7 +1593,7 @@ void freeClient(client *c) { pubsubUnsubscribeShardAllChannels(c, 0); pubsubUnsubscribeAllPatterns(c,0); dictRelease(c->pubsub_channels); - listRelease(c->pubsub_patterns); + dictRelease(c->pubsub_patterns); dictRelease(c->pubsubshard_channels); /* Free data structures. */ @@ -2810,7 +2804,7 @@ sds catClientInfoString(sds s, client *client) { flags, client->db->id, (int) dictSize(client->pubsub_channels), - (int) listLength(client->pubsub_patterns), + (int) dictSize(client->pubsub_patterns), (int) dictSize(client->pubsubshard_channels), (client->flags & CLIENT_MULTI) ? client->mstate.count : -1, (unsigned long long) sdslen(client->querybuf), diff --git a/src/pubsub.c b/src/pubsub.c index ed7350843..a13c5a61f 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -219,7 +219,7 @@ int serverPubsubShardSubscriptionCount(void) { /* Return the number of channels + patterns a client is subscribed to. */ int clientSubscriptionsCount(client *c) { - return dictSize(c->pubsub_channels) + listLength(c->pubsub_patterns); + return dictSize(c->pubsub_channels) + dictSize(c->pubsub_patterns); } /* Return the number of shard level channels a client is subscribed to. */ @@ -345,9 +345,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) { list *clients; int retval = 0; - if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { + if (dictAdd(c->pubsub_patterns, pattern, NULL) == DICT_OK) { retval = 1; - listAddNodeTail(c->pubsub_patterns,pattern); incrRefCount(pattern); /* Add the client to the pattern -> list of clients hash table */ de = dictFind(server.pubsub_patterns,pattern); @@ -374,9 +373,8 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) { int retval = 0; incrRefCount(pattern); /* Protect the object. May be the same we remove */ - if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { + if (dictDelete(c->pubsub_patterns, pattern) == DICT_OK) { retval = 1; - listDelNode(c->pubsub_patterns,ln); /* Remove the client from the pattern -> clients list hash table */ de = dictFind(server.pubsub_patterns,pattern); serverAssertWithInfo(c,NULL,de != NULL); @@ -448,16 +446,20 @@ void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count) { /* Unsubscribe from all the patterns. Return the number of patterns the * client was subscribed from. */ int pubsubUnsubscribeAllPatterns(client *c, int notify) { - listNode *ln; - listIter li; int count = 0; - listRewind(c->pubsub_patterns,&li); - while ((ln = listNext(&li)) != NULL) { - robj *pattern = ln->value; + if (dictSize(c->pubsub_patterns) > 0) { + dictIterator *di = dictGetSafeIterator(c->pubsub_patterns); + dictEntry *de; - count += pubsubUnsubscribePattern(c,pattern,notify); + while ((de = dictNext(di)) != NULL) { + robj *pattern = dictGetKey(de); + count += pubsubUnsubscribePattern(c, pattern, notify); + } + dictReleaseIterator(di); } + + /* We were subscribed to nothing? Still reply to the client. */ if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL); return count; } @@ -743,7 +745,7 @@ void sunsubscribeCommand(client *c) { size_t pubsubMemOverhead(client *c) { /* PubSub patterns */ - size_t mem = listLength(c->pubsub_patterns) * sizeof(listNode); + size_t mem = dictMemUsage(c->pubsub_patterns); /* Global PubSub channels */ mem += dictMemUsage(c->pubsub_channels); /* Sharded PubSub channels */ diff --git a/src/server.h b/src/server.h index 7b7a32ac9..86ae85e58 100644 --- a/src/server.h +++ b/src/server.h @@ -1215,7 +1215,7 @@ typedef struct client { long long woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ - list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ + dict *pubsub_patterns; /* patterns a client is interested in (PSUBSCRIBE) */ dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */ sds peerid; /* Cached peer ID. */ sds sockname; /* Cached connection target address. */