Sharded pubsub publish messagebulk as smessage (#10792)

To easily distinguish between sharded channel message and a global
channel message, introducing `smessage` (instead of `message`) as
message bulk for sharded channel publish message.

This is gonna be a breaking change in 7.0.1!

Background:
Sharded pubsub introduced in redis 7.0, but after the release we quickly
realized that the fact that it's problematic that the client can't distinguish
between normal (global) pubsub messages and sharded ones.
This is important because the same connection can subscribe to both,
but messages sent to one pubsub system are not propagated to the
other (they're completely separate), so if one connection is used to
subscribe to both, we need to assist the client library to know which
message it got so it can forward it to the correct callback.
This commit is contained in:
Harkrishn Patro 2022-05-30 22:03:59 -07:00 committed by GitHub
parent d7ae858745
commit 4065b4f27e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 24 additions and 20 deletions

View File

@ -39,6 +39,7 @@ typedef struct pubsubtype {
dict **serverPubSubChannels;
robj **subscribeMsg;
robj **unsubscribeMsg;
robj **messageBulk;
}pubsubtype;
/*
@ -78,6 +79,7 @@ pubsubtype pubSubType = {
.serverPubSubChannels = &server.pubsub_channels,
.subscribeMsg = &shared.subscribebulk,
.unsubscribeMsg = &shared.unsubscribebulk,
.messageBulk = &shared.messagebulk,
};
/*
@ -89,7 +91,8 @@ pubsubtype pubSubShardType = {
.subscriptionCount = clientShardSubscriptionsCount,
.serverPubSubChannels = &server.pubsubshard_channels,
.subscribeMsg = &shared.ssubscribebulk,
.unsubscribeMsg = &shared.sunsubscribebulk
.unsubscribeMsg = &shared.sunsubscribebulk,
.messageBulk = &shared.smessagebulk,
};
/*-----------------------------------------------------------------------------
@ -101,12 +104,12 @@ pubsubtype pubSubShardType = {
* message. However if the caller sets 'msg' as NULL, it will be able
* to send a special message (for instance an Array type) by using the
* addReply*() API family. */
void addReplyPubsubMessage(client *c, robj *channel, robj *msg) {
void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) {
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
addReplyPushLen(c,3);
addReply(c,shared.messagebulk);
addReply(c,message_bulk);
addReplyBulk(c,channel);
if (msg) addReplyBulk(c,msg);
}
@ -461,7 +464,7 @@ int pubsubPublishMessageInternal(robj *channel, robj *message, pubsubtype type)
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
client *c = ln->value;
addReplyPubsubMessage(c,channel,message);
addReplyPubsubMessage(c,channel,message,*type.messageBulk);
updateClientMemUsage(c);
receivers++;
}

View File

@ -1755,6 +1755,7 @@ void createSharedObjects(void) {
shared.unsubscribebulk = createStringObject("$11\r\nunsubscribe\r\n",18);
shared.ssubscribebulk = createStringObject("$10\r\nssubscribe\r\n", 17);
shared.sunsubscribebulk = createStringObject("$12\r\nsunsubscribe\r\n", 19);
shared.smessagebulk = createStringObject("$8\r\nsmessage\r\n", 14);
shared.psubscribebulk = createStringObject("$10\r\npsubscribe\r\n",17);
shared.punsubscribebulk = createStringObject("$12\r\npunsubscribe\r\n",19);

View File

@ -1228,7 +1228,7 @@ struct sharedObjectsStruct {
*time, *pxat, *absttl, *retrycount, *force, *justid, *entriesread,
*lastid, *ping, *setid, *keepttl, *load, *createconsumer,
*getack, *special_asterick, *special_equals, *default_username, *redacted,
*ssubscribebulk,*sunsubscribebulk,
*ssubscribebulk,*sunsubscribebulk, *smessagebulk,
*select[PROTO_SHARED_SELECT_CMDS],
*integers[OBJ_SHARED_INTEGERS],
*mbulkhdr[OBJ_SHARED_BULKHDR_LEN], /* "*<value>\r\n" */
@ -2991,7 +2991,7 @@ void pubsubUnsubscribeShardChannels(robj **channels, unsigned int count);
int pubsubUnsubscribeAllPatterns(client *c, int notify);
int pubsubPublishMessage(robj *channel, robj *message, int sharded);
int pubsubPublishMessageAndPropagateToCluster(robj *channel, robj *message, int sharded);
void addReplyPubsubMessage(client *c, robj *channel, robj *msg);
void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk);
int serverPubsubSubscriptionCount();
int serverPubsubShardSubscriptionCount();

View File

@ -295,7 +295,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
} else if (using_redirection && c->flags & CLIENT_PUBSUB) {
/* We use a static object to speedup things, however we assume
* that addReplyPubsubMessage() will not take a reference. */
addReplyPubsubMessage(c,TrackingChannelName,NULL);
addReplyPubsubMessage(c,TrackingChannelName,NULL,shared.messagebulk);
} else {
/* If are here, the client is not using RESP3, nor is
* redirecting to another client. We can't send anything to

View File

@ -29,7 +29,7 @@ test "Migrate a slot, verify client receives sunsubscribe on primary serving the
# Verify subscribe is still valid, able to receive messages.
$nodefrom(link) spublish $channelname hello
assert_equal {message mychannel hello} [$subscribeclient read]
assert_equal {smessage mychannel hello} [$subscribeclient read]
assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
@ -66,7 +66,7 @@ test "Client subscribes to multiple channels, migrate a slot, verify client rece
# Verify subscribe is still valid, able to receive messages.
$nodefrom(link) spublish $channelname hello
assert_equal {message ch3 hello} [$subscribeclient read]
assert_equal {smessage ch3 hello} [$subscribeclient read]
assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
@ -82,7 +82,7 @@ test "Client subscribes to multiple channels, migrate a slot, verify client rece
# Verify the client is still connected and receives message from the other channel.
set msg [$subscribeclient read]
assert {"message" eq [lindex $msg 0]}
assert {"smessage" eq [lindex $msg 0]}
assert {$anotherchannelname eq [lindex $msg 1]}
assert {"hello" eq [lindex $msg 2]}
@ -114,7 +114,7 @@ test "Migrate a slot, verify client receives sunsubscribe on replica serving the
# Verify subscribe is still valid, able to receive messages.
$nodefrom(link) spublish $channelname hello
assert_equal {message mychannel1 hello} [$subscribeclient read]
assert_equal {smessage mychannel1 hello} [$subscribeclient read]
assert_equal {OK} [$nodefrom(link) cluster setslot $slot node $nodeto(id)]
assert_equal {OK} [$nodeto(link) cluster setslot $slot node $nodeto(id)]

View File

@ -11,7 +11,7 @@ start_server {tags {"modules"}} {
assert_equal {1} [subscribe $rd2 {chan1}]
assert_equal 1 [r publish.shard chan1 hello]
assert_equal 1 [r publish.classic chan1 world]
assert_equal {message chan1 hello} [$rd1 read]
assert_equal {smessage chan1 hello} [$rd1 read]
assert_equal {message chan1 world} [$rd2 read]
}
}

View File

@ -7,14 +7,14 @@ start_server {tags {"pubsubshard external:skip"}} {
assert_equal {2} [ssubscribe $rd1 {chan2}]
assert_equal 1 [r SPUBLISH chan1 hello]
assert_equal 1 [r SPUBLISH chan2 world]
assert_equal {message chan1 hello} [$rd1 read]
assert_equal {message chan2 world} [$rd1 read]
assert_equal {smessage chan1 hello} [$rd1 read]
assert_equal {smessage chan2 world} [$rd1 read]
# unsubscribe from one of the channels
sunsubscribe $rd1 {chan1}
assert_equal 0 [r SPUBLISH chan1 hello]
assert_equal 1 [r SPUBLISH chan2 world]
assert_equal {message chan2 world} [$rd1 read]
assert_equal {smessage chan2 world} [$rd1 read]
# unsubscribe from the remaining channel
sunsubscribe $rd1 {chan2}
@ -32,8 +32,8 @@ start_server {tags {"pubsubshard external:skip"}} {
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {1} [ssubscribe $rd2 {chan1}]
assert_equal 2 [r SPUBLISH chan1 hello]
assert_equal {message chan1 hello} [$rd1 read]
assert_equal {message chan1 hello} [$rd2 read]
assert_equal {smessage chan1 hello} [$rd1 read]
assert_equal {smessage chan1 hello} [$rd2 read]
# clean up clients
$rd1 close
@ -58,7 +58,7 @@ start_server {tags {"pubsubshard external:skip"}} {
set rd1 [redis_deferring_client]
assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}]
assert_equal 1 [r SPUBLISH chan1 hello]
assert_equal {message chan1 hello} [$rd1 read]
assert_equal {smessage chan1 hello} [$rd1 read]
# clean up clients
$rd1 close
@ -129,9 +129,9 @@ start_server {tags {"pubsubshard external:skip"}} {
assert_equal {1} [ssubscribe $rd1 {chan1}]
$rd0 SPUBLISH chan1 hello
assert_equal {message chan1 hello} [$rd1 read]
assert_equal {smessage chan1 hello} [$rd1 read]
$rd0 SPUBLISH chan1 world
assert_equal {message chan1 world} [$rd1 read]
assert_equal {smessage chan1 world} [$rd1 read]
}
}
}