From b784c5375e2a649b7ffcafcd913937d40006a671 Mon Sep 17 00:00:00 2001 From: Harkrishn Patro Date: Thu, 12 Oct 2023 20:48:27 -0700 Subject: [PATCH] Unsubscribe all clients from replica for shard channel if the master ownership changes (#12577) Unsubscribe all clients from replica for shard channel if the master ownership changes --- src/cluster.c | 12 +++++ .../tests/25-pubsubshard-slot-migration.tcl | 51 +++++++++++++++++-- 2 files changed, 58 insertions(+), 5 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index d7f6e3c36..332d76572 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -5147,6 +5147,17 @@ int verifyClusterConfigWithData(void) { return C_OK; } +/* Remove all the shard channel related information not owned by the current shard. */ +static inline void removeAllNotOwnedShardChannelSubscriptions(void) { + if (!dictSize(server.pubsubshard_channels)) return; + clusterNode *currmaster = nodeIsMaster(myself) ? myself : myself->slaveof; + for (int j = 0; j < CLUSTER_SLOTS; j++) { + if (server.cluster->slots[j] != currmaster) { + removeChannelsInSlot(j); + } + } +} + /* ----------------------------------------------------------------------------- * SLAVE nodes handling * -------------------------------------------------------------------------- */ @@ -5169,6 +5180,7 @@ void clusterSetMaster(clusterNode *n) { updateShardId(myself, n->shard_id); clusterNodeAddSlave(n,myself); replicationSetMaster(n->ip, getNodeDefaultReplicationPort(n)); + removeAllNotOwnedShardChannelSubscriptions(); resetManualFailover(); } diff --git a/tests/cluster/tests/25-pubsubshard-slot-migration.tcl b/tests/cluster/tests/25-pubsubshard-slot-migration.tcl index 0f59ffef2..fd774a8d7 100644 --- a/tests/cluster/tests/25-pubsubshard-slot-migration.tcl +++ b/tests/cluster/tests/25-pubsubshard-slot-migration.tcl @@ -10,6 +10,18 @@ test "Cluster is up" { set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]] +proc get_addr_replica_serving_slot slot { + set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]] + array set node [$cluster masternode_for_slot $slot] + + set replicanodeinfo [$cluster cluster replicas $node(id)] + set args [split $replicanodeinfo " "] + set addr [lindex [split [lindex $args 1] @] 0] + set replicahost [lindex [split $addr :] 0] + set replicaport [lindex [split $addr :] 1] + return [list $replicahost $replicaport] +} + test "Migrate a slot, verify client receives sunsubscribe on primary serving the slot." { # Setup the to and from node @@ -98,11 +110,9 @@ test "Migrate a slot, verify client receives sunsubscribe on replica serving the array set nodeto [$cluster masternode_notfor_slot $slot] # Get replica node serving slot (mychannel) to connect a client. - set replicanodeinfo [$cluster cluster replicas $nodefrom(id)] - set args [split $replicanodeinfo " "] - set addr [lindex [split [lindex $args 1] @] 0] - set replicahost [lindex [split $addr :] 0] - set replicaport [lindex [split $addr :] 1] + set replica_addr [get_addr_replica_serving_slot $slot] + set replicahost [lindex $replica_addr 0] + set replicaport [lindex $replica_addr 1] set subscribeclient [redis_deferring_client_by_addr $replicahost $replicaport] $subscribeclient deferred 1 @@ -127,6 +137,37 @@ test "Migrate a slot, verify client receives sunsubscribe on replica serving the $subscribeclient close } +test "Move a replica to another primary, verify client receives sunsubscribe on replica serving the slot." { + # Setup the to and from node + set channelname mychannel2 + set slot [$cluster cluster keyslot $channelname] + + array set nodefrom [$cluster masternode_for_slot $slot] + array set nodeto [$cluster masternode_notfor_slot $slot] + set replica_addr [get_addr_replica_serving_slot $slot] + set replica_host [lindex $replica_addr 0] + set replica_port [lindex $replica_addr 1] + set replica_client [redis_client_by_addr $replica_host $replica_port] + set subscribeclient [redis_deferring_client_by_addr $replica_host $replica_port] + + $subscribeclient deferred 1 + $subscribeclient ssubscribe $channelname + $subscribeclient read + + # Verify subscribe is still valid, able to receive messages. + $nodefrom(link) spublish $channelname hello + assert_equal {smessage mychannel2 hello} [$subscribeclient read] + + assert_equal {OK} [$replica_client cluster replicate $nodeto(id)] + + set msg [$subscribeclient read] + assert {"sunsubscribe" eq [lindex $msg 0]} + assert {$channelname eq [lindex $msg 1]} + assert {"0" eq [lindex $msg 2]} + + $subscribeclient close +} + test "Delete a slot, verify sunsubscribe message" { set channelname ch2 set slot [$cluster cluster keyslot $channelname]