Propagate message to a node only if the cluster link is healthy. (#11752)

Currently while a sharded pubsub message publish tries to propagate the message across the cluster, a NULL check is missing for clusterLink. clusterLink could be NULL if the link is causing memory beyond the set threshold cluster-link-sendbuf-limit and server terminates the link.

This change introduces two things:

Avoids the engine crashes on the publishing node if a message is tried to be sent to a node and the link is NULL.
Adds a debugging tool CLUSTERLINK KILL to terminate the clusterLink between two nodes.
This commit is contained in:
Harkrishn Patro 2023-02-02 09:06:24 -08:00 committed by GitHub
parent 023ff42f98
commit fd3975684a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 91 additions and 5 deletions

View File

@ -3198,6 +3198,9 @@ void clusterReadHandler(connection *conn) {
* the link to be invalidated, so it is safe to call this function
* from event handlers that will do stuff with the same link later. */
void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) {
if (!link) {
return;
}
if (listLength(link->send_msg_queue) == 0 && msgblock->msg.totlen != 0)
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
@ -3228,7 +3231,6 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) {
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (!node->link) continue;
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
clusterSendMessage(node->link,msgblock);
@ -3622,9 +3624,9 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
while((ln = listNext(&li))) {
clusterNode *node = listNodeValue(ln);
if (node != myself) {
clusterSendMessage(node->link,msgblock);
}
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
clusterSendMessage(node->link,msgblock);
}
clusterMsgSendBlockDecrRefCount(msgblock);
}

View File

@ -424,5 +424,6 @@ void clusterUpdateMyselfHostname(void);
void clusterUpdateMyselfAnnouncedPorts(void);
sds clusterGenNodesDescription(int filter, int use_pport);
sds genClusterInfoString();
void freeClusterLink(clusterLink *link);
#endif /* __CLUSTER_H */

View File

@ -492,6 +492,8 @@ void debugCommand(client *c) {
" In case RESET is provided the peak reset time will be restored to the default value",
"REPLYBUFFER RESIZING <0|1>",
" Enable or disable the reply buffer resize cron job",
"CLUSTERLINK KILL <to|from|all> <node-id>",
" Kills the link based on the direction to/from (both) with the provided node." ,
NULL
};
addReplyHelp(c, help);
@ -997,6 +999,33 @@ NULL
return;
}
addReply(c, shared.ok);
} else if(!strcasecmp(c->argv[1]->ptr,"CLUSTERLINK") &&
!strcasecmp(c->argv[2]->ptr,"KILL") &&
c->argc == 5) {
if (!server.cluster_enabled) {
addReplyError(c, "Debug option only available for cluster mode enabled setup!");
return;
}
/* Find the node. */
clusterNode *n = clusterLookupNode(c->argv[4]->ptr, sdslen(c->argv[4]->ptr));
if (!n) {
addReplyErrorFormat(c,"Unknown node %s", (char*)c->argv[4]->ptr);
return;
}
/* Terminate the link based on the direction or all. */
if (!strcasecmp(c->argv[3]->ptr,"from")) {
freeClusterLink(n->inbound_link);
} else if (!strcasecmp(c->argv[3]->ptr,"to")) {
freeClusterLink(n->link);
} else if (!strcasecmp(c->argv[3]->ptr,"all")) {
freeClusterLink(n->link);
freeClusterLink(n->inbound_link);
} else {
addReplyErrorFormat(c, "Unknown direction %s", (char*) c->argv[3]->ptr);
}
addReply(c,shared.ok);
} else {
addReplySubcommandSyntaxError(c);
return;

View File

@ -67,6 +67,61 @@ proc publish_messages {server num_msgs msg_size} {
}
}
start_cluster 1 2 {tags {external:skip cluster}} {
set primary_id 0
set replica1_id 1
set primary [Rn $primary_id]
set replica1 [Rn $replica1_id]
test "Broadcast message across a cluster shard while a cluster link is down" {
set replica1_node_id [$replica1 CLUSTER MYID]
set channelname ch3
# subscribe on replica1
set subscribeclient1 [redis_deferring_client -1]
$subscribeclient1 deferred 1
$subscribeclient1 SSUBSCRIBE $channelname
$subscribeclient1 read
# subscribe on replica2
set subscribeclient2 [redis_deferring_client -2]
$subscribeclient2 deferred 1
$subscribeclient2 SSUBSCRIBE $channelname
$subscribeclient2 read
# Verify number of links with cluster stable state
assert_equal [expr [number_of_peers $primary_id]*2] [number_of_links $primary_id]
# Disconnect the cluster between primary and replica1 and publish a message.
$primary MULTI
$primary DEBUG CLUSTERLINK KILL TO $replica1_node_id
$primary SPUBLISH $channelname hello
set res [$primary EXEC]
# Verify no client exists on the primary to receive the published message.
assert_equal $res {OK 0}
# Wait for all the cluster links are healthy
wait_for_condition 50 100 {
[number_of_peers $primary_id]*2 == [number_of_links $primary_id]
} else {
fail "All peer links couldn't be established"
}
# Publish a message afterwards.
$primary SPUBLISH $channelname world
# Verify replica1 has received only (world) / hello is lost.
assert_equal "smessage ch3 world" [$subscribeclient1 read]
# Verify replica2 has received both messages (hello/world)
assert_equal "smessage ch3 hello" [$subscribeclient2 read]
assert_equal "smessage ch3 world" [$subscribeclient2 read]
} {} {needs:debug}
}
start_cluster 3 0 {tags {external:skip cluster}} {
test "Each node has two links with each peer" {
for {set id 0} {$id < [llength $::servers]} {incr id} {
@ -161,7 +216,6 @@ start_cluster 3 0 {tags {external:skip cluster}} {
} else {
fail "Cluster link not freed as expected"
}
puts -nonewline "$i 128KB messages needed to overflow 256KB buffer limit. "
# A new link to primary2 should have been recreated
set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]