2022-01-03 01:54:47 +01:00
|
|
|
# Test PUBSUB shard propagation in a cluster slot.
|
|
|
|
|
2024-05-09 10:14:47 +08:00
|
|
|
source tests/support/cluster.tcl
|
2022-01-03 01:54:47 +01:00
|
|
|
|
2024-05-09 10:14:47 +08:00
|
|
|
# Start a cluster with 3 masters and 3 replicas.
|
|
|
|
start_cluster 3 3 {tags {external:skip cluster}} {
|
2022-01-03 01:54:47 +01:00
|
|
|
|
2024-05-09 10:14:47 +08:00
|
|
|
set cluster [valkey_cluster 127.0.0.1:[srv 0 port]]
|
2022-01-03 01:54:47 +01:00
|
|
|
|
2024-05-09 10:14:47 +08:00
|
|
|
test "Pub/Sub shard basics" {
|
2022-01-03 01:54:47 +01:00
|
|
|
set slot [$cluster cluster keyslot "channel.0"]
|
|
|
|
array set publishnode [$cluster masternode_for_slot $slot]
|
|
|
|
array set notshardnode [$cluster masternode_notfor_slot $slot]
|
|
|
|
|
2024-04-10 10:18:47 -04:00
|
|
|
set publishclient [valkey_client_by_addr $publishnode(host) $publishnode(port)]
|
2024-04-09 10:38:09 -04:00
|
|
|
set subscribeclient [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]
|
|
|
|
set subscribeclient2 [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]
|
|
|
|
set anotherclient [valkey_deferring_client_by_addr $notshardnode(host) $notshardnode(port)]
|
2022-01-03 01:54:47 +01:00
|
|
|
|
|
|
|
$subscribeclient ssubscribe channel.0
|
|
|
|
$subscribeclient read
|
|
|
|
|
|
|
|
$subscribeclient2 ssubscribe channel.0
|
|
|
|
$subscribeclient2 read
|
|
|
|
|
|
|
|
$anotherclient ssubscribe channel.0
|
|
|
|
catch {$anotherclient read} err
|
|
|
|
assert_match {MOVED *} $err
|
|
|
|
|
|
|
|
set data [randomValue]
|
|
|
|
$publishclient spublish channel.0 $data
|
|
|
|
|
|
|
|
set msg [$subscribeclient read]
|
|
|
|
assert_equal $data [lindex $msg 2]
|
|
|
|
|
|
|
|
set msg [$subscribeclient2 read]
|
|
|
|
assert_equal $data [lindex $msg 2]
|
|
|
|
|
|
|
|
$publishclient close
|
|
|
|
$subscribeclient close
|
|
|
|
$subscribeclient2 close
|
|
|
|
$anotherclient close
|
|
|
|
}
|
|
|
|
|
|
|
|
test "client can't subscribe to multiple shard channels across different slots in same call" {
|
|
|
|
catch {$cluster ssubscribe channel.0 channel.1} err
|
|
|
|
assert_match {CROSSSLOT Keys*} $err
|
|
|
|
}
|
|
|
|
|
|
|
|
test "client can subscribe to multiple shard channels across different slots in separate call" {
|
|
|
|
$cluster ssubscribe ch3
|
|
|
|
$cluster ssubscribe ch7
|
|
|
|
|
|
|
|
$cluster sunsubscribe ch3
|
|
|
|
$cluster sunsubscribe ch7
|
|
|
|
}
|
|
|
|
|
Replace slots_to_channels radix tree with slot specific dictionaries for shard channels. (#12804)
We have achieved replacing `slots_to_keys` radix tree with key->slot
linked list (#9356), and then replacing the list with slot specific
dictionaries for keys (#11695).
Shard channels behave just like keys in many ways, and we also need a
slots->channels mapping. Currently this is still done by using a radix
tree. So we should split `server.pubsubshard_channels` into 16384 dicts
and drop the radix tree, just like what we did to DBs.
Some benefits (basically the benefits of what we've done to DBs):
1. Optimize counting channels in a slot. This is currently used only in
removing channels in a slot. But this is potentially more useful:
sometimes we need to know how many channels there are in a specific slot
when doing slot migration. Counting is now implemented by traversing the
radix tree, and with this PR it will be as simple as calling `dictSize`,
from O(n) to O(1).
2. The radix tree in the cluster has been removed. The shard channel
names no longer require additional storage, which can save memory.
3. Potentially useful in slot migration, as shard channels are logically
split by slots, thus making it easier to migrate, remove or add as a
whole.
4. Avoid rehashing a big dict when there is a large number of channels.
Drawbacks:
1. Takes more memory than using radix tree when there are relatively few
shard channels.
What this PR does:
1. in cluster mode, split `server.pubsubshard_channels` into 16384
dicts, in standalone mode, still use only one dict.
2. drop the `slots_to_channels` radix tree.
3. to save memory (to solve the drawback above), all 16384 dicts are
created lazily, which means only when a channel is about to be inserted
to the dict will the dict be initialized, and when all channels are
deleted, the dict would delete itself.
5. use `server.shard_channel_count` to keep track of the number of all
shard channels.
---------
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
2023-12-27 17:40:45 +08:00
|
|
|
test "sunsubscribe without specifying any channel would unsubscribe all shard channels subscribed" {
|
2024-04-10 10:18:47 -04:00
|
|
|
set publishclient [valkey_client_by_addr $publishnode(host) $publishnode(port)]
|
2024-04-09 10:38:09 -04:00
|
|
|
set subscribeclient [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]
|
Replace slots_to_channels radix tree with slot specific dictionaries for shard channels. (#12804)
We have achieved replacing `slots_to_keys` radix tree with key->slot
linked list (#9356), and then replacing the list with slot specific
dictionaries for keys (#11695).
Shard channels behave just like keys in many ways, and we also need a
slots->channels mapping. Currently this is still done by using a radix
tree. So we should split `server.pubsubshard_channels` into 16384 dicts
and drop the radix tree, just like what we did to DBs.
Some benefits (basically the benefits of what we've done to DBs):
1. Optimize counting channels in a slot. This is currently used only in
removing channels in a slot. But this is potentially more useful:
sometimes we need to know how many channels there are in a specific slot
when doing slot migration. Counting is now implemented by traversing the
radix tree, and with this PR it will be as simple as calling `dictSize`,
from O(n) to O(1).
2. The radix tree in the cluster has been removed. The shard channel
names no longer require additional storage, which can save memory.
3. Potentially useful in slot migration, as shard channels are logically
split by slots, thus making it easier to migrate, remove or add as a
whole.
4. Avoid rehashing a big dict when there is a large number of channels.
Drawbacks:
1. Takes more memory than using radix tree when there are relatively few
shard channels.
What this PR does:
1. in cluster mode, split `server.pubsubshard_channels` into 16384
dicts, in standalone mode, still use only one dict.
2. drop the `slots_to_channels` radix tree.
3. to save memory (to solve the drawback above), all 16384 dicts are
created lazily, which means only when a channel is about to be inserted
to the dict will the dict be initialized, and when all channels are
deleted, the dict would delete itself.
5. use `server.shard_channel_count` to keep track of the number of all
shard channels.
---------
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
2023-12-27 17:40:45 +08:00
|
|
|
|
|
|
|
set sub_res [ssubscribe $subscribeclient [list "\{channel.0\}1" "\{channel.0\}2" "\{channel.0\}3"]]
|
|
|
|
assert_equal [list 1 2 3] $sub_res
|
|
|
|
sunsubscribe $subscribeclient
|
|
|
|
|
|
|
|
assert_equal 0 [$publishclient spublish "\{channel.0\}1" hello]
|
|
|
|
assert_equal 0 [$publishclient spublish "\{channel.0\}2" hello]
|
|
|
|
assert_equal 0 [$publishclient spublish "\{channel.0\}3" hello]
|
|
|
|
|
|
|
|
$publishclient close
|
|
|
|
$subscribeclient close
|
|
|
|
}
|
2022-01-03 01:54:47 +01:00
|
|
|
|
|
|
|
test "Verify Pub/Sub and Pub/Sub shard no overlap" {
|
|
|
|
set slot [$cluster cluster keyslot "channel.0"]
|
|
|
|
array set publishnode [$cluster masternode_for_slot $slot]
|
|
|
|
array set notshardnode [$cluster masternode_notfor_slot $slot]
|
|
|
|
|
2024-04-10 10:18:47 -04:00
|
|
|
set publishshardclient [valkey_client_by_addr $publishnode(host) $publishnode(port)]
|
2024-04-09 10:38:09 -04:00
|
|
|
set publishclient [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]
|
|
|
|
set subscribeshardclient [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]
|
|
|
|
set subscribeclient [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]
|
2022-01-03 01:54:47 +01:00
|
|
|
|
|
|
|
$subscribeshardclient deferred 1
|
|
|
|
$subscribeshardclient ssubscribe channel.0
|
|
|
|
$subscribeshardclient read
|
|
|
|
|
|
|
|
$subscribeclient deferred 1
|
|
|
|
$subscribeclient subscribe channel.0
|
|
|
|
$subscribeclient read
|
|
|
|
|
|
|
|
set sharddata "testingpubsubdata"
|
|
|
|
$publishshardclient spublish channel.0 $sharddata
|
|
|
|
|
|
|
|
set data "somemoredata"
|
|
|
|
$publishclient publish channel.0 $data
|
|
|
|
|
|
|
|
set msg [$subscribeshardclient read]
|
|
|
|
assert_equal $sharddata [lindex $msg 2]
|
|
|
|
|
|
|
|
set msg [$subscribeclient read]
|
|
|
|
assert_equal $data [lindex $msg 2]
|
|
|
|
|
|
|
|
$cluster close
|
|
|
|
$publishclient close
|
|
|
|
$subscribeclient close
|
|
|
|
$subscribeshardclient close
|
Replace slots_to_channels radix tree with slot specific dictionaries for shard channels. (#12804)
We have achieved replacing `slots_to_keys` radix tree with key->slot
linked list (#9356), and then replacing the list with slot specific
dictionaries for keys (#11695).
Shard channels behave just like keys in many ways, and we also need a
slots->channels mapping. Currently this is still done by using a radix
tree. So we should split `server.pubsubshard_channels` into 16384 dicts
and drop the radix tree, just like what we did to DBs.
Some benefits (basically the benefits of what we've done to DBs):
1. Optimize counting channels in a slot. This is currently used only in
removing channels in a slot. But this is potentially more useful:
sometimes we need to know how many channels there are in a specific slot
when doing slot migration. Counting is now implemented by traversing the
radix tree, and with this PR it will be as simple as calling `dictSize`,
from O(n) to O(1).
2. The radix tree in the cluster has been removed. The shard channel
names no longer require additional storage, which can save memory.
3. Potentially useful in slot migration, as shard channels are logically
split by slots, thus making it easier to migrate, remove or add as a
whole.
4. Avoid rehashing a big dict when there is a large number of channels.
Drawbacks:
1. Takes more memory than using radix tree when there are relatively few
shard channels.
What this PR does:
1. in cluster mode, split `server.pubsubshard_channels` into 16384
dicts, in standalone mode, still use only one dict.
2. drop the `slots_to_channels` radix tree.
3. to save memory (to solve the drawback above), all 16384 dicts are
created lazily, which means only when a channel is about to be inserted
to the dict will the dict be initialized, and when all channels are
deleted, the dict would delete itself.
5. use `server.shard_channel_count` to keep track of the number of all
shard channels.
---------
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
2023-12-27 17:40:45 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
test "PUBSUB channels/shardchannels" {
|
2024-04-09 10:38:09 -04:00
|
|
|
set subscribeclient [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]
|
|
|
|
set subscribeclient2 [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]
|
|
|
|
set subscribeclient3 [valkey_deferring_client_by_addr $publishnode(host) $publishnode(port)]
|
2024-04-10 10:18:47 -04:00
|
|
|
set publishclient [valkey_client_by_addr $publishnode(host) $publishnode(port)]
|
Replace slots_to_channels radix tree with slot specific dictionaries for shard channels. (#12804)
We have achieved replacing `slots_to_keys` radix tree with key->slot
linked list (#9356), and then replacing the list with slot specific
dictionaries for keys (#11695).
Shard channels behave just like keys in many ways, and we also need a
slots->channels mapping. Currently this is still done by using a radix
tree. So we should split `server.pubsubshard_channels` into 16384 dicts
and drop the radix tree, just like what we did to DBs.
Some benefits (basically the benefits of what we've done to DBs):
1. Optimize counting channels in a slot. This is currently used only in
removing channels in a slot. But this is potentially more useful:
sometimes we need to know how many channels there are in a specific slot
when doing slot migration. Counting is now implemented by traversing the
radix tree, and with this PR it will be as simple as calling `dictSize`,
from O(n) to O(1).
2. The radix tree in the cluster has been removed. The shard channel
names no longer require additional storage, which can save memory.
3. Potentially useful in slot migration, as shard channels are logically
split by slots, thus making it easier to migrate, remove or add as a
whole.
4. Avoid rehashing a big dict when there is a large number of channels.
Drawbacks:
1. Takes more memory than using radix tree when there are relatively few
shard channels.
What this PR does:
1. in cluster mode, split `server.pubsubshard_channels` into 16384
dicts, in standalone mode, still use only one dict.
2. drop the `slots_to_channels` radix tree.
3. to save memory (to solve the drawback above), all 16384 dicts are
created lazily, which means only when a channel is about to be inserted
to the dict will the dict be initialized, and when all channels are
deleted, the dict would delete itself.
5. use `server.shard_channel_count` to keep track of the number of all
shard channels.
---------
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
2023-12-27 17:40:45 +08:00
|
|
|
|
|
|
|
ssubscribe $subscribeclient [list "\{channel.0\}1"]
|
|
|
|
ssubscribe $subscribeclient2 [list "\{channel.0\}2"]
|
|
|
|
ssubscribe $subscribeclient3 [list "\{channel.0\}3"]
|
|
|
|
assert_equal {3} [llength [$publishclient pubsub shardchannels]]
|
|
|
|
|
|
|
|
subscribe $subscribeclient [list "\{channel.0\}4"]
|
|
|
|
assert_equal {3} [llength [$publishclient pubsub shardchannels]]
|
|
|
|
|
|
|
|
sunsubscribe $subscribeclient
|
2024-05-09 10:14:47 +08:00
|
|
|
$subscribeclient read
|
Replace slots_to_channels radix tree with slot specific dictionaries for shard channels. (#12804)
We have achieved replacing `slots_to_keys` radix tree with key->slot
linked list (#9356), and then replacing the list with slot specific
dictionaries for keys (#11695).
Shard channels behave just like keys in many ways, and we also need a
slots->channels mapping. Currently this is still done by using a radix
tree. So we should split `server.pubsubshard_channels` into 16384 dicts
and drop the radix tree, just like what we did to DBs.
Some benefits (basically the benefits of what we've done to DBs):
1. Optimize counting channels in a slot. This is currently used only in
removing channels in a slot. But this is potentially more useful:
sometimes we need to know how many channels there are in a specific slot
when doing slot migration. Counting is now implemented by traversing the
radix tree, and with this PR it will be as simple as calling `dictSize`,
from O(n) to O(1).
2. The radix tree in the cluster has been removed. The shard channel
names no longer require additional storage, which can save memory.
3. Potentially useful in slot migration, as shard channels are logically
split by slots, thus making it easier to migrate, remove or add as a
whole.
4. Avoid rehashing a big dict when there is a large number of channels.
Drawbacks:
1. Takes more memory than using radix tree when there are relatively few
shard channels.
What this PR does:
1. in cluster mode, split `server.pubsubshard_channels` into 16384
dicts, in standalone mode, still use only one dict.
2. drop the `slots_to_channels` radix tree.
3. to save memory (to solve the drawback above), all 16384 dicts are
created lazily, which means only when a channel is about to be inserted
to the dict will the dict be initialized, and when all channels are
deleted, the dict would delete itself.
5. use `server.shard_channel_count` to keep track of the number of all
shard channels.
---------
Co-authored-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
2023-12-27 17:40:45 +08:00
|
|
|
set channel_list [$publishclient pubsub shardchannels]
|
|
|
|
assert_equal {2} [llength $channel_list]
|
|
|
|
assert {[lsearch -exact $channel_list "\{channel.0\}2"] >= 0}
|
|
|
|
assert {[lsearch -exact $channel_list "\{channel.0\}3"] >= 0}
|
|
|
|
}
|
2024-05-09 10:14:47 +08:00
|
|
|
|
|
|
|
} ;# start_cluster
|