Unsubscribe all clients from replica for shard channel if the master ownership changes (#12577)
Unsubscribe all clients from replica for shard channel if the master ownership changes
This commit is contained in:
parent
b705049a7a
commit
b784c5375e
@ -5147,6 +5147,17 @@ int verifyClusterConfigWithData(void) {
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
/* Remove all the shard channel related information not owned by the current shard. */
|
||||
static inline void removeAllNotOwnedShardChannelSubscriptions(void) {
|
||||
if (!dictSize(server.pubsubshard_channels)) return;
|
||||
clusterNode *currmaster = nodeIsMaster(myself) ? myself : myself->slaveof;
|
||||
for (int j = 0; j < CLUSTER_SLOTS; j++) {
|
||||
if (server.cluster->slots[j] != currmaster) {
|
||||
removeChannelsInSlot(j);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------------
|
||||
* SLAVE nodes handling
|
||||
* -------------------------------------------------------------------------- */
|
||||
@ -5169,6 +5180,7 @@ void clusterSetMaster(clusterNode *n) {
|
||||
updateShardId(myself, n->shard_id);
|
||||
clusterNodeAddSlave(n,myself);
|
||||
replicationSetMaster(n->ip, getNodeDefaultReplicationPort(n));
|
||||
removeAllNotOwnedShardChannelSubscriptions();
|
||||
resetManualFailover();
|
||||
}
|
||||
|
||||
|
@ -10,6 +10,18 @@ test "Cluster is up" {
|
||||
|
||||
set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
|
||||
|
||||
proc get_addr_replica_serving_slot slot {
|
||||
set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
|
||||
array set node [$cluster masternode_for_slot $slot]
|
||||
|
||||
set replicanodeinfo [$cluster cluster replicas $node(id)]
|
||||
set args [split $replicanodeinfo " "]
|
||||
set addr [lindex [split [lindex $args 1] @] 0]
|
||||
set replicahost [lindex [split $addr :] 0]
|
||||
set replicaport [lindex [split $addr :] 1]
|
||||
return [list $replicahost $replicaport]
|
||||
}
|
||||
|
||||
test "Migrate a slot, verify client receives sunsubscribe on primary serving the slot." {
|
||||
|
||||
# Setup the to and from node
|
||||
@ -98,11 +110,9 @@ test "Migrate a slot, verify client receives sunsubscribe on replica serving the
|
||||
array set nodeto [$cluster masternode_notfor_slot $slot]
|
||||
|
||||
# Get replica node serving slot (mychannel) to connect a client.
|
||||
set replicanodeinfo [$cluster cluster replicas $nodefrom(id)]
|
||||
set args [split $replicanodeinfo " "]
|
||||
set addr [lindex [split [lindex $args 1] @] 0]
|
||||
set replicahost [lindex [split $addr :] 0]
|
||||
set replicaport [lindex [split $addr :] 1]
|
||||
set replica_addr [get_addr_replica_serving_slot $slot]
|
||||
set replicahost [lindex $replica_addr 0]
|
||||
set replicaport [lindex $replica_addr 1]
|
||||
set subscribeclient [redis_deferring_client_by_addr $replicahost $replicaport]
|
||||
|
||||
$subscribeclient deferred 1
|
||||
@ -127,6 +137,37 @@ test "Migrate a slot, verify client receives sunsubscribe on replica serving the
|
||||
$subscribeclient close
|
||||
}
|
||||
|
||||
test "Move a replica to another primary, verify client receives sunsubscribe on replica serving the slot." {
|
||||
# Setup the to and from node
|
||||
set channelname mychannel2
|
||||
set slot [$cluster cluster keyslot $channelname]
|
||||
|
||||
array set nodefrom [$cluster masternode_for_slot $slot]
|
||||
array set nodeto [$cluster masternode_notfor_slot $slot]
|
||||
set replica_addr [get_addr_replica_serving_slot $slot]
|
||||
set replica_host [lindex $replica_addr 0]
|
||||
set replica_port [lindex $replica_addr 1]
|
||||
set replica_client [redis_client_by_addr $replica_host $replica_port]
|
||||
set subscribeclient [redis_deferring_client_by_addr $replica_host $replica_port]
|
||||
|
||||
$subscribeclient deferred 1
|
||||
$subscribeclient ssubscribe $channelname
|
||||
$subscribeclient read
|
||||
|
||||
# Verify subscribe is still valid, able to receive messages.
|
||||
$nodefrom(link) spublish $channelname hello
|
||||
assert_equal {smessage mychannel2 hello} [$subscribeclient read]
|
||||
|
||||
assert_equal {OK} [$replica_client cluster replicate $nodeto(id)]
|
||||
|
||||
set msg [$subscribeclient read]
|
||||
assert {"sunsubscribe" eq [lindex $msg 0]}
|
||||
assert {$channelname eq [lindex $msg 1]}
|
||||
assert {"0" eq [lindex $msg 2]}
|
||||
|
||||
$subscribeclient close
|
||||
}
|
||||
|
||||
test "Delete a slot, verify sunsubscribe message" {
|
||||
set channelname ch2
|
||||
set slot [$cluster cluster keyslot $channelname]
|
||||
|
Loading…
x
Reference in New Issue
Block a user