Optimize PSUBSCRIBE and PUNSUBSCRIBE from O(N*M) to O(N) (#12298)

In the original implementation, the time complexity of the commands
is actually O(N*M), where N is the number of patterns the client is
already subscribed and M is the number of patterns to subscribe to.
The docs are all wrong about this.

Specifically, because the original client->pubsub_patterns is a list,
so we need to do listSearchKey which is O(N). In this PR, we change it
to a dict, so the search becomes O(1).

At the same time, both pubsub_channels and pubsubshard_channels are dicts.
Changing pubsub_patterns to a dictionary improves the readability and
maintainability of the code.
This commit is contained in:
Binbin 2023-06-19 21:31:18 +08:00 committed by GitHub
parent 07c14672bf
commit b510624978
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 35 additions and 38 deletions

View File

@ -1923,26 +1923,28 @@ void ACLKillPubsubClientsIfNeeded(user *new, user *original) {
if (c->user == original && getClientType(c) == CLIENT_TYPE_PUBSUB) {
/* Check for pattern violations. */
listRewind(c->pubsub_patterns,&lpi);
while (!kill && ((lpn = listNext(&lpi)) != NULL)) {
o = lpn->value;
dictIterator *di = dictGetIterator(c->pubsub_patterns);
dictEntry *de;
while (!kill && ((de = dictNext(di)) != NULL)) {
o = dictGetKey(de);
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 1);
kill = (res == ACL_DENIED_CHANNEL);
}
dictReleaseIterator(di);
/* Check for channel violations. */
if (!kill) {
/* Check for global channels violation. */
dictIterator *di = dictGetIterator(c->pubsub_channels);
dictEntry *de;
di = dictGetIterator(c->pubsub_channels);
while (!kill && ((de = dictNext(di)) != NULL)) {
o = dictGetKey(de);
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0);
kill = (res == ACL_DENIED_CHANNEL);
}
dictReleaseIterator(di);
}
if (!kill) {
/* Check for shard channels violation. */
di = dictGetIterator(c->pubsubshard_channels);
while (!kill && ((de = dictNext(di)) != NULL)) {
@ -1950,7 +1952,6 @@ void ACLKillPubsubClientsIfNeeded(user *new, user *original) {
int res = ACLCheckChannelAgainstList(upcoming, o->ptr, sdslen(o->ptr), 0);
kill = (res == ACL_DENIED_CHANNEL);
}
dictReleaseIterator(di);
}

View File

@ -10692,15 +10692,15 @@ struct COMMAND_STRUCT redisCommandTable[] = {
{MAKE_CMD("rpush","Appends one or more elements to a list. Creates the key if it doesn't exist.","O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments.","1.0.0",CMD_DOC_NONE,NULL,NULL,"list",COMMAND_GROUP_LIST,RPUSH_History,1,RPUSH_Tips,0,rpushCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_LIST,RPUSH_Keyspecs,1,NULL,2),.args=RPUSH_Args},
{MAKE_CMD("rpushx","Appends an element to a list only when the list exists.","O(1) for each element added, so O(N) to add N elements when the command is called with multiple arguments.","2.2.0",CMD_DOC_NONE,NULL,NULL,"list",COMMAND_GROUP_LIST,RPUSHX_History,1,RPUSHX_Tips,0,rpushxCommand,-3,CMD_WRITE|CMD_DENYOOM|CMD_FAST,ACL_CATEGORY_LIST,RPUSHX_Keyspecs,1,NULL,2),.args=RPUSHX_Args},
/* pubsub */
{MAKE_CMD("psubscribe","Listens for messages published to channels that match one or more patterns.","O(N) where N is the number of patterns the client is already subscribed to.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PSUBSCRIBE_History,0,PSUBSCRIBE_Tips,0,psubscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,PSUBSCRIBE_Keyspecs,0,NULL,1),.args=PSUBSCRIBE_Args},
{MAKE_CMD("psubscribe","Listens for messages published to channels that match one or more patterns.","O(N) where N is the number of patterns to subscribe to.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PSUBSCRIBE_History,0,PSUBSCRIBE_Tips,0,psubscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,PSUBSCRIBE_Keyspecs,0,NULL,1),.args=PSUBSCRIBE_Args},
{MAKE_CMD("publish","Posts a message to a channel.","O(N+M) where N is the number of clients subscribed to the receiving channel and M is the total number of subscribed patterns (by any client).","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUBLISH_History,0,PUBLISH_Tips,0,publishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE|CMD_SENTINEL,0,PUBLISH_Keyspecs,0,NULL,2),.args=PUBLISH_Args},
{MAKE_CMD("pubsub","A container for Pub/Sub commands.","Depends on subcommand.","2.8.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUBSUB_History,0,PUBSUB_Tips,0,NULL,-2,0,0,PUBSUB_Keyspecs,0,NULL,0),.subcommands=PUBSUB_Subcommands},
{MAKE_CMD("punsubscribe","Stops listening to messages published to channels that match one or more patterns.","O(N+M) where N is the number of patterns the client is already subscribed and M is the number of total patterns subscribed in the system (by any client).","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUNSUBSCRIBE_History,0,PUNSUBSCRIBE_Tips,0,punsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,PUNSUBSCRIBE_Keyspecs,0,NULL,1),.args=PUNSUBSCRIBE_Args},
{MAKE_CMD("punsubscribe","Stops listening to messages published to channels that match one or more patterns.","O(N) where N is the number of patterns to unsubscribe.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,PUNSUBSCRIBE_History,0,PUNSUBSCRIBE_Tips,0,punsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,PUNSUBSCRIBE_Keyspecs,0,NULL,1),.args=PUNSUBSCRIBE_Args},
{MAKE_CMD("spublish","Post a message to a shard channel","O(N) where N is the number of clients subscribed to the receiving shard channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SPUBLISH_History,0,SPUBLISH_Tips,0,spublishCommand,3,CMD_PUBSUB|CMD_LOADING|CMD_STALE|CMD_FAST|CMD_MAY_REPLICATE,0,SPUBLISH_Keyspecs,1,NULL,2),.args=SPUBLISH_Args},
{MAKE_CMD("ssubscribe","Listens for messages published to shard channels.","O(N) where N is the number of shard channels to subscribe to.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SSUBSCRIBE_History,0,SSUBSCRIBE_Tips,0,ssubscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,SSUBSCRIBE_Keyspecs,1,NULL,1),.args=SSUBSCRIBE_Args},
{MAKE_CMD("subscribe","Listens for messages published to channels.","O(N) where N is the number of channels to subscribe to.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SUBSCRIBE_History,0,SUBSCRIBE_Tips,0,subscribeCommand,-2,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,SUBSCRIBE_Keyspecs,0,NULL,1),.args=SUBSCRIBE_Args},
{MAKE_CMD("sunsubscribe","Stops listening to messages posted to shard channels.","O(N) where N is the number of clients already subscribed to a shard channel.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SUNSUBSCRIBE_History,0,SUNSUBSCRIBE_Tips,0,sunsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,SUNSUBSCRIBE_Keyspecs,1,NULL,1),.args=SUNSUBSCRIBE_Args},
{MAKE_CMD("unsubscribe","Stops listening to messages posted to channels.","O(N) where N is the number of clients already subscribed to a channel.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,UNSUBSCRIBE_History,0,UNSUBSCRIBE_Tips,0,unsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,UNSUBSCRIBE_Keyspecs,0,NULL,1),.args=UNSUBSCRIBE_Args},
{MAKE_CMD("sunsubscribe","Stops listening to messages posted to shard channels.","O(N) where N is the number of shard channels to unsubscribe.","7.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,SUNSUBSCRIBE_History,0,SUNSUBSCRIBE_Tips,0,sunsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE,0,SUNSUBSCRIBE_Keyspecs,1,NULL,1),.args=SUNSUBSCRIBE_Args},
{MAKE_CMD("unsubscribe","Stops listening to messages posted to channels.","O(N) where N is the number of channels to unsubscribe.","2.0.0",CMD_DOC_NONE,NULL,NULL,"pubsub",COMMAND_GROUP_PUBSUB,UNSUBSCRIBE_History,0,UNSUBSCRIBE_Tips,0,unsubscribeCommand,-1,CMD_PUBSUB|CMD_NOSCRIPT|CMD_LOADING|CMD_STALE|CMD_SENTINEL,0,UNSUBSCRIBE_Keyspecs,0,NULL,1),.args=UNSUBSCRIBE_Args},
/* scripting */
{MAKE_CMD("eval","Executes a server-side Lua script.","Depends on the script that is executed.","2.6.0",CMD_DOC_NONE,NULL,NULL,"scripting",COMMAND_GROUP_SCRIPTING,EVAL_History,0,EVAL_Tips,0,evalCommand,-3,CMD_NOSCRIPT|CMD_SKIP_MONITOR|CMD_MAY_REPLICATE|CMD_NO_MANDATORY_KEYS|CMD_STALE,ACL_CATEGORY_SCRIPTING,EVAL_Keyspecs,1,evalGetKeys,4),.args=EVAL_Args},
{MAKE_CMD("evalsha","Executes a server-side Lua script by SHA1 digest.","Depends on the script that is executed.","2.6.0",CMD_DOC_NONE,NULL,NULL,"scripting",COMMAND_GROUP_SCRIPTING,EVALSHA_History,0,EVALSHA_Tips,0,evalShaCommand,-3,CMD_NOSCRIPT|CMD_SKIP_MONITOR|CMD_MAY_REPLICATE|CMD_NO_MANDATORY_KEYS|CMD_STALE,ACL_CATEGORY_SCRIPTING,EVALSHA_Keyspecs,1,evalGetKeys,4),.args=EVALSHA_Args},

View File

@ -1,7 +1,7 @@
{
"PSUBSCRIBE": {
"summary": "Listens for messages published to channels that match one or more patterns.",
"complexity": "O(N) where N is the number of patterns the client is already subscribed to.",
"complexity": "O(N) where N is the number of patterns to subscribe to.",
"group": "pubsub",
"since": "2.0.0",
"arity": -2,

View File

@ -1,7 +1,7 @@
{
"PUNSUBSCRIBE": {
"summary": "Stops listening to messages published to channels that match one or more patterns.",
"complexity": "O(N+M) where N is the number of patterns the client is already subscribed and M is the number of total patterns subscribed in the system (by any client).",
"complexity": "O(N) where N is the number of patterns to unsubscribe.",
"group": "pubsub",
"since": "2.0.0",
"arity": -1,

View File

@ -1,7 +1,7 @@
{
"SUNSUBSCRIBE": {
"summary": "Stops listening to messages posted to shard channels.",
"complexity": "O(N) where N is the number of clients already subscribed to a shard channel.",
"complexity": "O(N) where N is the number of shard channels to unsubscribe.",
"group": "pubsub",
"since": "7.0.0",
"arity": -1,

View File

@ -1,7 +1,7 @@
{
"UNSUBSCRIBE": {
"summary": "Stops listening to messages posted to channels.",
"complexity": "O(N) where N is the number of clients already subscribed to a channel.",
"complexity": "O(N) where N is the number of channels to unsubscribe.",
"group": "pubsub",
"since": "2.0.0",
"arity": -1,

View File

@ -85,10 +85,6 @@ void freeClientReplyValue(void *o) {
zfree(o);
}
int listMatchObjects(void *a, void *b) {
return equalStringObjects(a,b);
}
/* This function links the client to the global linked list of clients.
* unlinkClient() does the opposite, among other things. */
void linkClient(client *c) {
@ -197,7 +193,7 @@ client *createClient(connection *conn) {
c->woff = 0;
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType);
c->pubsub_patterns = listCreate();
c->pubsub_patterns = dictCreate(&objectKeyPointerValueDictType);
c->pubsubshard_channels = dictCreate(&objectKeyPointerValueDictType);
c->peerid = NULL;
c->sockname = NULL;
@ -214,8 +210,6 @@ client *createClient(connection *conn) {
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
listInitNode(&c->clients_pending_write_node, c);
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
c->mem_usage_bucket = NULL;
c->mem_usage_bucket_node = NULL;
if (conn) linkClient(c);
@ -1599,7 +1593,7 @@ void freeClient(client *c) {
pubsubUnsubscribeShardAllChannels(c, 0);
pubsubUnsubscribeAllPatterns(c,0);
dictRelease(c->pubsub_channels);
listRelease(c->pubsub_patterns);
dictRelease(c->pubsub_patterns);
dictRelease(c->pubsubshard_channels);
/* Free data structures. */
@ -2810,7 +2804,7 @@ sds catClientInfoString(sds s, client *client) {
flags,
client->db->id,
(int) dictSize(client->pubsub_channels),
(int) listLength(client->pubsub_patterns),
(int) dictSize(client->pubsub_patterns),
(int) dictSize(client->pubsubshard_channels),
(client->flags & CLIENT_MULTI) ? client->mstate.count : -1,
(unsigned long long) sdslen(client->querybuf),

View File

@ -219,7 +219,7 @@ int serverPubsubShardSubscriptionCount(void) {
/* Return the number of channels + patterns a client is subscribed to. */
int clientSubscriptionsCount(client *c) {
return dictSize(c->pubsub_channels) + listLength(c->pubsub_patterns);
return dictSize(c->pubsub_channels) + dictSize(c->pubsub_patterns);
}
/* Return the number of shard level channels a client is subscribed to. */
@ -345,9 +345,8 @@ int pubsubSubscribePattern(client *c, robj *pattern) {
list *clients;
int retval = 0;
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) {
if (dictAdd(c->pubsub_patterns, pattern, NULL) == DICT_OK) {
retval = 1;
listAddNodeTail(c->pubsub_patterns,pattern);
incrRefCount(pattern);
/* Add the client to the pattern -> list of clients hash table */
de = dictFind(server.pubsub_patterns,pattern);
@ -374,9 +373,8 @@ int pubsubUnsubscribePattern(client *c, robj *pattern, int notify) {
int retval = 0;
incrRefCount(pattern); /* Protect the object. May be the same we remove */
if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) {
if (dictDelete(c->pubsub_patterns, pattern) == DICT_OK) {
retval = 1;
listDelNode(c->pubsub_patterns,ln);
/* Remove the client from the pattern -> clients list hash table */
de = dictFind(server.pubsub_patterns,pattern);
serverAssertWithInfo(c,NULL,de != NULL);
@ -448,16 +446,20 @@ void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count) {
/* Unsubscribe from all the patterns. Return the number of patterns the
* client was subscribed from. */
int pubsubUnsubscribeAllPatterns(client *c, int notify) {
listNode *ln;
listIter li;
int count = 0;
listRewind(c->pubsub_patterns,&li);
while ((ln = listNext(&li)) != NULL) {
robj *pattern = ln->value;
if (dictSize(c->pubsub_patterns) > 0) {
dictIterator *di = dictGetSafeIterator(c->pubsub_patterns);
dictEntry *de;
count += pubsubUnsubscribePattern(c,pattern,notify);
while ((de = dictNext(di)) != NULL) {
robj *pattern = dictGetKey(de);
count += pubsubUnsubscribePattern(c, pattern, notify);
}
dictReleaseIterator(di);
}
/* We were subscribed to nothing? Still reply to the client. */
if (notify && count == 0) addReplyPubsubPatUnsubscribed(c,NULL);
return count;
}
@ -743,7 +745,7 @@ void sunsubscribeCommand(client *c) {
size_t pubsubMemOverhead(client *c) {
/* PubSub patterns */
size_t mem = listLength(c->pubsub_patterns) * sizeof(listNode);
size_t mem = dictMemUsage(c->pubsub_patterns);
/* Global PubSub channels */
mem += dictMemUsage(c->pubsub_channels);
/* Sharded PubSub channels */

View File

@ -1215,7 +1215,7 @@ typedef struct client {
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
dict *pubsub_patterns; /* patterns a client is interested in (PSUBSCRIBE) */
dict *pubsubshard_channels; /* shard level channels a client is interested in (SSUBSCRIBE) */
sds peerid; /* Cached peer ID. */
sds sockname; /* Cached connection target address. */