From fd3975684a8e9e45c77d3b14cb1883572956efe7 Mon Sep 17 00:00:00 2001 From: Harkrishn Patro Date: Thu, 2 Feb 2023 09:06:24 -0800 Subject: [PATCH] Propagate message to a node only if the cluster link is healthy. (#11752) Currently while a sharded pubsub message publish tries to propagate the message across the cluster, a NULL check is missing for clusterLink. clusterLink could be NULL if the link is causing memory beyond the set threshold cluster-link-sendbuf-limit and server terminates the link. This change introduces two things: Avoids the engine crashes on the publishing node if a message is tried to be sent to a node and the link is NULL. Adds a debugging tool CLUSTERLINK KILL to terminate the clusterLink between two nodes. --- src/cluster.c | 10 ++++--- src/cluster.h | 1 + src/debug.c | 29 +++++++++++++++++++ tests/unit/cluster/links.tcl | 56 +++++++++++++++++++++++++++++++++++- 4 files changed, 91 insertions(+), 5 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 05ae2bedd..0791d52b3 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -3198,6 +3198,9 @@ void clusterReadHandler(connection *conn) { * the link to be invalidated, so it is safe to call this function * from event handlers that will do stuff with the same link later. */ void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) { + if (!link) { + return; + } if (listLength(link->send_msg_queue) == 0 && msgblock->msg.totlen != 0) connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1); @@ -3228,7 +3231,6 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) { while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); - if (!node->link) continue; if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE)) continue; clusterSendMessage(node->link,msgblock); @@ -3622,9 +3624,9 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD); while((ln = listNext(&li))) { clusterNode *node = listNodeValue(ln); - if (node != myself) { - clusterSendMessage(node->link,msgblock); - } + if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE)) + continue; + clusterSendMessage(node->link,msgblock); } clusterMsgSendBlockDecrRefCount(msgblock); } diff --git a/src/cluster.h b/src/cluster.h index f3e8e32c6..cabc9273f 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -424,5 +424,6 @@ void clusterUpdateMyselfHostname(void); void clusterUpdateMyselfAnnouncedPorts(void); sds clusterGenNodesDescription(int filter, int use_pport); sds genClusterInfoString(); +void freeClusterLink(clusterLink *link); #endif /* __CLUSTER_H */ diff --git a/src/debug.c b/src/debug.c index fe46b5c62..e00263d8e 100644 --- a/src/debug.c +++ b/src/debug.c @@ -492,6 +492,8 @@ void debugCommand(client *c) { " In case RESET is provided the peak reset time will be restored to the default value", "REPLYBUFFER RESIZING <0|1>", " Enable or disable the reply buffer resize cron job", +"CLUSTERLINK KILL ", +" Kills the link based on the direction to/from (both) with the provided node." , NULL }; addReplyHelp(c, help); @@ -997,6 +999,33 @@ NULL return; } addReply(c, shared.ok); + } else if(!strcasecmp(c->argv[1]->ptr,"CLUSTERLINK") && + !strcasecmp(c->argv[2]->ptr,"KILL") && + c->argc == 5) { + if (!server.cluster_enabled) { + addReplyError(c, "Debug option only available for cluster mode enabled setup!"); + return; + } + + /* Find the node. */ + clusterNode *n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr)); + if (!n) { + addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[4]->ptr); + return; + } + + /* Terminate the link based on the direction or all. */ + if (!strcasecmp(c->argv[3]->ptr,"from")) { + freeClusterLink(n->inbound_link); + } else if (!strcasecmp(c->argv[3]->ptr,"to")) { + freeClusterLink(n->link); + } else if (!strcasecmp(c->argv[3]->ptr,"all")) { + freeClusterLink(n->link); + freeClusterLink(n->inbound_link); + } else { + addReplyErrorFormat(c, "Unknown direction %s", (char*) c->argv[3]->ptr); + } + addReply(c,shared.ok); } else { addReplySubcommandSyntaxError(c); return; diff --git a/tests/unit/cluster/links.tcl b/tests/unit/cluster/links.tcl index 95c034f62..63c2b143c 100644 --- a/tests/unit/cluster/links.tcl +++ b/tests/unit/cluster/links.tcl @@ -67,6 +67,61 @@ proc publish_messages {server num_msgs msg_size} { } } +start_cluster 1 2 {tags {external:skip cluster}} { + set primary_id 0 + set replica1_id 1 + + set primary [Rn $primary_id] + set replica1 [Rn $replica1_id] + + test "Broadcast message across a cluster shard while a cluster link is down" { + set replica1_node_id [$replica1 CLUSTER MYID] + + set channelname ch3 + + # subscribe on replica1 + set subscribeclient1 [redis_deferring_client -1] + $subscribeclient1 deferred 1 + $subscribeclient1 SSUBSCRIBE $channelname + $subscribeclient1 read + + # subscribe on replica2 + set subscribeclient2 [redis_deferring_client -2] + $subscribeclient2 deferred 1 + $subscribeclient2 SSUBSCRIBE $channelname + $subscribeclient2 read + + # Verify number of links with cluster stable state + assert_equal [expr [number_of_peers $primary_id]*2] [number_of_links $primary_id] + + # Disconnect the cluster between primary and replica1 and publish a message. + $primary MULTI + $primary DEBUG CLUSTERLINK KILL TO $replica1_node_id + $primary SPUBLISH $channelname hello + set res [$primary EXEC] + + # Verify no client exists on the primary to receive the published message. + assert_equal $res {OK 0} + + # Wait for all the cluster links are healthy + wait_for_condition 50 100 { + [number_of_peers $primary_id]*2 == [number_of_links $primary_id] + } else { + fail "All peer links couldn't be established" + } + + # Publish a message afterwards. + $primary SPUBLISH $channelname world + + # Verify replica1 has received only (world) / hello is lost. + assert_equal "smessage ch3 world" [$subscribeclient1 read] + + # Verify replica2 has received both messages (hello/world) + assert_equal "smessage ch3 hello" [$subscribeclient2 read] + assert_equal "smessage ch3 world" [$subscribeclient2 read] + } {} {needs:debug} +} + start_cluster 3 0 {tags {external:skip cluster}} { test "Each node has two links with each peer" { for {set id 0} {$id < [llength $::servers]} {incr id} { @@ -161,7 +216,6 @@ start_cluster 3 0 {tags {external:skip cluster}} { } else { fail "Cluster link not freed as expected" } - puts -nonewline "$i 128KB messages needed to overflow 256KB buffer limit. " # A new link to primary2 should have been recreated set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]