From 8ad8f0f9d8b0f1633a673bd470eed09aeccb53cf Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 20 Jun 2023 18:41:41 +0100 Subject: [PATCH] Fix broken protocol when PUBLISH emits local push inside MULTI (#12326) When a connection that's subscribe to a channel emits PUBLISH inside MULTI-EXEC, the push notification messes up the EXEC response. e.g. MULTI, PING, PUSH foo bar, PING, EXEC the EXEC's response will contain: PONG, {message foo bar}, 1. and the second PONG will be delivered outside the EXEC's response. Additionally, this PR changes the order of responses in case of a plain PUBLISH (when the current client also subscribed to it), by delivering the push after the command's response instead of before it. This also affects modules calling RM_PublishMessage in a similar way, so that we don't run the risk of getting that push mixed together with the module command's response. --- src/networking.c | 30 +++++++++++++++--- src/server.c | 9 ++++++ src/server.h | 1 + tests/modules/publish.c | 15 +++++++++ tests/unit/moduleapi/misc.tcl | 13 ++++++++ tests/unit/moduleapi/publish.tcl | 17 ++++++++++ tests/unit/pubsub.tcl | 54 +++++++++++++++++++++++++++++++- 7 files changed, 134 insertions(+), 5 deletions(-) diff --git a/src/networking.c b/src/networking.c index 7d796456c..56273fc7e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -345,8 +345,8 @@ size_t _addReplyToBuffer(client *c, const char *s, size_t len) { /* Adds the reply to the reply linked list. * Note: some edits to this function need to be relayed to AddReplyFromClient. */ -void _addReplyProtoToList(client *c, const char *s, size_t len) { - listNode *ln = listLast(c->reply); +void _addReplyProtoToList(client *c, list *reply_list, const char *s, size_t len) { + listNode *ln = listLast(reply_list); clientReplyBlock *tail = ln? listNodeValue(ln): NULL; /* Note that 'tail' may be NULL even if we have a tail node, because when @@ -374,13 +374,23 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) { tail->size = usable_size - sizeof(clientReplyBlock); tail->used = len; memcpy(tail->buf, s, len); - listAddNodeTail(c->reply, tail); + listAddNodeTail(reply_list, tail); c->reply_bytes += tail->size; closeClientOnOutputBufferLimitReached(c, 1); } } +/* The subscribe / unsubscribe command family has a push as a reply, + * or in other words, it responds with a push (or several of them + * depending on how many arguments it got), and has no reply. */ +int cmdHasPushAsReply(struct redisCommand *cmd) { + if (!cmd) return 0; + return cmd->proc == subscribeCommand || cmd->proc == unsubscribeCommand || + cmd->proc == psubscribeCommand || cmd->proc == punsubscribeCommand || + cmd->proc == ssubscribeCommand || cmd->proc == sunsubscribeCommand; +} + void _addReplyToBufferOrList(client *c, const char *s, size_t len) { if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return; @@ -399,8 +409,20 @@ void _addReplyToBufferOrList(client *c, const char *s, size_t len) { * buffer offset (see function comment) */ reqresSaveClientReplyOffset(c); + /* If we're processing a push message into the current client (i.e. executing PUBLISH + * to a channel which we are subscribed to, then we wanna postpone that message to be added + * after the command's reply (specifically important during multi-exec). the exception is + * the SUBSCRIBE command family, which (currently) have a push message instead of a proper reply. + * The check for executing_client also avoids affecting push messages that are part of eviction. */ + if (c == server.current_client && (c->flags & CLIENT_PUSHING) && + server.executing_client && !cmdHasPushAsReply(server.executing_client->cmd)) + { + _addReplyProtoToList(c,server.pending_push_messages,s,len); + return; + } + size_t reply_len = _addReplyToBuffer(c,s,len); - if (len > reply_len) _addReplyProtoToList(c,s+reply_len,len-reply_len); + if (len > reply_len) _addReplyProtoToList(c,c->reply,s+reply_len,len-reply_len); } /* ----------------------------------------------------------------------------- diff --git a/src/server.c b/src/server.c index 84fb1aa48..82257db30 100644 --- a/src/server.c +++ b/src/server.c @@ -1717,6 +1717,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * we have to flush them after each command, so when we get here, the list * must be empty. */ serverAssert(listLength(server.tracking_pending_keys) == 0); + serverAssert(listLength(server.pending_push_messages) == 0); /* Send the invalidation messages to clients participating to the * client side caching protocol in broadcasting (BCAST) mode. */ @@ -2611,6 +2612,7 @@ void initServer(void) { server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); server.tracking_pending_keys = listCreate(); + server.pending_push_messages = listCreate(); server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.paused_actions = 0; @@ -3758,7 +3760,14 @@ void afterCommand(client *c) { /* Should be done before trackingHandlePendingKeyInvalidations so that we * reply to client before invalidating cache (makes more sense) */ postExecutionUnitOperations(); + + /* Flush pending tracking invalidations. */ trackingHandlePendingKeyInvalidations(); + + /* Flush other pending push messages. only when we are not in nested call. + * So the messages are not interleaved with transaction response. */ + if (!server.execution_nesting) + listJoin(c->reply, server.pending_push_messages); } /* Check if c->cmd exists, fills `err` with details in case it doesn't. diff --git a/src/server.h b/src/server.h index f262d7766..1397437d4 100644 --- a/src/server.h +++ b/src/server.h @@ -1928,6 +1928,7 @@ struct redisServer { unsigned int tracking_clients; /* # of clients with tracking enabled.*/ size_t tracking_table_max_keys; /* Max number of keys in tracking table. */ list *tracking_pending_keys; /* tracking invalidation keys pending to flush */ + list *pending_push_messages; /* pending publish or other push messages to flush */ /* Sort parameters - qsort_r() is only available under BSD so we * have to take this state global, in order to pass it to sortCompare() */ int sort_desc; diff --git a/tests/modules/publish.c b/tests/modules/publish.c index eee96d689..ff276d8c8 100644 --- a/tests/modules/publish.c +++ b/tests/modules/publish.c @@ -5,6 +5,18 @@ #define UNUSED(V) ((void) V) +int cmd_publish_classic_multi(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + if (argc < 3) + return RedisModule_WrongArity(ctx); + RedisModule_ReplyWithArray(ctx, argc-2); + for (int i = 2; i < argc; i++) { + int receivers = RedisModule_PublishMessage(ctx, argv[1], argv[i]); + RedisModule_ReplyWithLongLong(ctx, receivers); + } + return REDISMODULE_OK; +} + int cmd_publish_classic(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 3) @@ -35,6 +47,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"publish.classic",cmd_publish_classic,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"publish.classic_multi",cmd_publish_classic_multi,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"publish.shard",cmd_publish_shard,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; diff --git a/tests/unit/moduleapi/misc.tcl b/tests/unit/moduleapi/misc.tcl index 9b0989149..cf205462e 100644 --- a/tests/unit/moduleapi/misc.tcl +++ b/tests/unit/moduleapi/misc.tcl @@ -148,6 +148,19 @@ start_server {overrides {save {900 1}} tags {"modules"}} { $rd_trk close } + test {publish to self inside rm_call} { + r hello 3 + r subscribe foo + + # published message comes after the response of the command that issued it. + assert_equal [r test.rm_call publish foo bar] {1} + assert_equal [r read] {message foo bar} + + r unsubscribe foo + r hello 2 + set _ "" + } {} {resp3} + test {test module get/set client name by id api} { catch { r test.getname } e assert_equal "-ERR No name" $e diff --git a/tests/unit/moduleapi/publish.tcl b/tests/unit/moduleapi/publish.tcl index ccc966a43..a6304ea52 100644 --- a/tests/unit/moduleapi/publish.tcl +++ b/tests/unit/moduleapi/publish.tcl @@ -13,5 +13,22 @@ start_server {tags {"modules"}} { assert_equal 1 [r publish.classic chan1 world] assert_equal {smessage chan1 hello} [$rd1 read] assert_equal {message chan1 world} [$rd2 read] + $rd1 close + $rd2 close } + + test {module publish to self with multi message} { + r hello 3 + r subscribe foo + + # published message comes after the response of the command that issued it. + assert_equal [r publish.classic_multi foo bar vaz] {1 1} + assert_equal [r read] {message foo bar} + assert_equal [r read] {message foo vaz} + + r unsubscribe foo + r hello 2 + set _ "" + } {} {resp3} + } diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl index 0dd32c397..3797b00c7 100644 --- a/tests/unit/pubsub.tcl +++ b/tests/unit/pubsub.tcl @@ -188,7 +188,7 @@ start_server {tags {"pubsub network"}} { assert_equal {0} [punsubscribe $rd ch*] $rd close - } + } {0} {resp3} test "PUNSUBSCRIBE from non-subscribed channels" { set rd1 [redis_deferring_client] @@ -451,4 +451,56 @@ start_server {tags {"pubsub network"}} { assert_equal "pmessage * __keyevent@${db}__:new bar" [$rd1 read] $rd1 close } + + test "publish to self inside multi" { + r hello 3 + r subscribe foo + r multi + r ping abc + r publish foo bar + r publish foo vaz + r ping def + assert_equal [r exec] {abc 1 1 def} + assert_equal [r read] {message foo bar} + assert_equal [r read] {message foo vaz} + } {} {resp3} + + test "publish to self inside script" { + r hello 3 + r subscribe foo + set res [r eval { + redis.call("ping","abc") + redis.call("publish","foo","bar") + redis.call("publish","foo","vaz") + redis.call("ping","def") + return "bla"} 0] + assert_equal $res {bla} + assert_equal [r read] {message foo bar} + assert_equal [r read] {message foo vaz} + } {} {resp3} + + test "unsubscribe inside multi, and publish to self" { + r hello 3 + + # Note: SUBSCRIBE and UNSUBSCRIBE with multiple channels in the same command, + # breaks the multi response, see https://github.com/redis/redis/issues/12207 + # this is just a temporary sanity test to detect unintended breakage. + + # subscribe for 3 channels actually emits 3 "responses" + assert_equal "subscribe foo 1" [r subscribe foo bar baz] + assert_equal "subscribe bar 2" [r read] + assert_equal "subscribe baz 3" [r read] + + r multi + r ping abc + r unsubscribe bar + r unsubscribe baz + r ping def + assert_equal [r exec] {abc {unsubscribe bar 2} {unsubscribe baz 1} def} + + # published message comes after the publish command's response. + assert_equal [r publish foo vaz] {1} + assert_equal [r read] {message foo vaz} + } {} {resp3} + }