Fix the bug that CLIENT REPLY OFF|SKIP cannot receive push notifications ()

This bug seems to be there forever, CLIENT REPLY OFF|SKIP will
mark the client with CLIENT_REPLY_OFF or CLIENT_REPLY_SKIP flags.
With these flags, prepareClientToWrite called by addReply* will
return C_ERR directly. So the client can't receive the Pub/Sub
messages and any other push notifications, e.g client side tracking.

In this PR, we adding a CLIENT_PUSHING flag, disables the reply
silencing flags. When adding push replies, set the flag, after the reply,
clear the flag. Then add the flag check in prepareClientToWrite.

Fixes 

Note, the SUBSCRIBE command response is a bit awkward,
see https://github.com/redis/redis-doc/pull/2327

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
Binbin 2023-03-12 23:50:44 +08:00 committed by GitHub
parent 4e7eb16ae7
commit 416842e6c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 227 additions and 5 deletions

@ -807,9 +807,12 @@ NULL
addReplyError(c,"RESP2 is not supported by this command");
return;
}
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
addReplyPushLen(c,2);
addReplyBulkCString(c,"server-cpu-usage");
addReplyLongLong(c,42);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
/* Push replies are not synchronous replies, so we emit also a
* normal reply in order for blocking clients just discarding the
* push reply, to actually consume the reply and continue. */

@ -292,8 +292,10 @@ int prepareClientToWrite(client *c) {
/* If CLIENT_CLOSE_ASAP flag is set, we need not write anything. */
if (c->flags & CLIENT_CLOSE_ASAP) return C_ERR;
/* CLIENT REPLY OFF / SKIP handling: don't send replies. */
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) return C_ERR;
/* CLIENT REPLY OFF / SKIP handling: don't send replies.
* CLIENT_PUSHING handling: disables the reply silencing flags. */
if ((c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) &&
!(c->flags & CLIENT_PUSHING)) return C_ERR;
/* Masters don't receive replies, unless CLIENT_MASTER_FORCE_REPLY flag
* is set. */
@ -976,6 +978,7 @@ void addReplyAttributeLen(client *c, long length) {
void addReplyPushLen(client *c, long length) {
serverAssert(c->resp >= 3);
serverAssertWithInfo(c, NULL, c->flags & CLIENT_PUSHING);
addReplyAggregateLen(c,length,'>');
}

@ -105,6 +105,8 @@ pubsubtype pubSubShardType = {
* to send a special message (for instance an Array type) by using the
* addReply*() API family. */
void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bulk) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
@ -112,12 +114,15 @@ void addReplyPubsubMessage(client *c, robj *channel, robj *msg, robj *message_bu
addReply(c,message_bulk);
addReplyBulk(c,channel);
if (msg) addReplyBulk(c,msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* Send a pubsub message of type "pmessage" to the client. The difference
* with the "message" type delivered by addReplyPubsubMessage() is that
* this message format also includes the pattern that matched the message. */
void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[4]);
else
@ -126,10 +131,13 @@ void addReplyPubsubPatMessage(client *c, robj *pat, robj *channel, robj *msg) {
addReplyBulk(c,pat);
addReplyBulk(c,channel);
addReplyBulk(c,msg);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* Send the pubsub subscription notification to the client. */
void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
@ -137,6 +145,7 @@ void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
addReply(c,*type.subscribeMsg);
addReplyBulk(c,channel);
addReplyLongLong(c,type.subscriptionCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* Send the pubsub unsubscription notification to the client.
@ -144,6 +153,8 @@ void addReplyPubsubSubscribed(client *c, robj *channel, pubsubtype type) {
* unsubscribe command but there are no channels to unsubscribe from: we
* still send a notification. */
void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
@ -154,10 +165,13 @@ void addReplyPubsubUnsubscribed(client *c, robj *channel, pubsubtype type) {
else
addReplyNull(c);
addReplyLongLong(c,type.subscriptionCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* Send the pubsub pattern subscription notification to the client. */
void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
@ -165,6 +179,7 @@ void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
addReply(c,shared.psubscribebulk);
addReplyBulk(c,pattern);
addReplyLongLong(c,clientSubscriptionsCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* Send the pubsub pattern unsubscription notification to the client.
@ -172,6 +187,8 @@ void addReplyPubsubPatSubscribed(client *c, robj *pattern) {
* punsubscribe command but there are no pattern to unsubscribe from: we
* still send a notification. */
void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
if (c->resp == 2)
addReply(c,shared.mbulkhdr[3]);
else
@ -182,6 +199,7 @@ void addReplyPubsubPatUnsubscribed(client *c, robj *pattern) {
else
addReplyNull(c);
addReplyLongLong(c,clientSubscriptionsCount(c));
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/*-----------------------------------------------------------------------------

@ -391,6 +391,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_ALLOW_OOM (1ULL<<44) /* Client used by RM_Call is allowed to fully execute
scripts even when in OOM */
#define CLIENT_NO_TOUCH (1ULL<<45) /* This client will not touch LFU/LRU stats. */
#define CLIENT_PUSHING (1ULL<<46) /* This client is pushing notifications. */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */

@ -266,6 +266,9 @@ void trackingRememberKeys(client *tracking, client *executing) {
* - Following a flush command, to send a single RESP NULL to indicate
* that all keys are now invalid. */
void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
uint64_t old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
int using_redirection = 0;
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
@ -279,10 +282,14 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyBulkCBuffer(c,"tracking-redir-broken",21);
addReplyLongLong(c,c->client_tracking_redirection);
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;
}
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
c = redir;
using_redirection = 1;
old_flags = c->flags;
c->flags |= CLIENT_PUSHING;
}
/* Only send such info for clients in RESP version 3 or more. However
@ -301,6 +308,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
* redirecting to another client. We can't send anything to
* it since RESP2 does not support push messages in the same
* connection. */
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
return;
}
@ -312,6 +320,7 @@ void sendTrackingMessage(client *c, char *keyname, size_t keylen, int proto) {
addReplyBulkCBuffer(c,keyname,keylen);
}
updateClientMemUsageAndBucket(c);
if (!(old_flags & CLIENT_PUSHING)) c->flags &= ~CLIENT_PUSHING;
}
/* This function is called when a key is modified in Redis and in the case

@ -94,6 +94,48 @@ start_server {tags {"introspection"}} {
}
} {} {needs:save}
test "CLIENT REPLY OFF/ON: disable all commands reply" {
set rd [redis_deferring_client]
# These replies were silenced.
$rd client reply off
$rd ping pong
$rd ping pong2
$rd client reply on
assert_equal {OK} [$rd read]
$rd ping pong3
assert_equal {pong3} [$rd read]
$rd close
}
test "CLIENT REPLY SKIP: skip the next command reply" {
set rd [redis_deferring_client]
# The first pong reply was silenced.
$rd client reply skip
$rd ping pong
$rd ping pong2
assert_equal {pong2} [$rd read]
$rd close
}
test "CLIENT REPLY ON: unset SKIP flag" {
set rd [redis_deferring_client]
$rd client reply skip
$rd client reply on
assert_equal {OK} [$rd read] ;# OK from CLIENT REPLY ON command
$rd ping
assert_equal {PONG} [$rd read]
$rd close
}
test {MONITOR can log executed commands} {
set rd [redis_deferring_client]
$rd monitor

@ -166,6 +166,30 @@ start_server {tags {"pubsub network"}} {
$rd1 close
}
test "PubSub messages with CLIENT REPLY OFF" {
set rd [redis_deferring_client]
$rd hello 3
$rd read ;# Discard the hello reply
# Test that the subscribe/psubscribe notification is ok
$rd client reply off
assert_equal {1} [subscribe $rd channel]
assert_equal {2} [psubscribe $rd ch*]
# Test that the publish notification is ok
$rd client reply off
assert_equal 2 [r publish channel hello]
assert_equal {message channel hello} [$rd read]
assert_equal {pmessage ch* channel hello} [$rd read]
# Test that the unsubscribe/punsubscribe notification is ok
$rd client reply off
assert_equal {1} [unsubscribe $rd channel]
assert_equal {0} [punsubscribe $rd ch*]
$rd close
}
test "PUNSUBSCRIBE from non-subscribed channels" {
set rd1 [redis_deferring_client]
assert_equal {0 0 0} [punsubscribe $rd1 {foo.* bar.* quux.*}]
@ -226,6 +250,7 @@ start_server {tags {"pubsub network"}} {
test "Keyspace notifications: we receive keyspace notifications" {
r config set notify-keyspace-events KA
set rd1 [redis_deferring_client]
$rd1 CLIENT REPLY OFF ;# Make sure it works even if replies are silenced
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal "pmessage * __keyspace@${db}__:foo set" [$rd1 read]
@ -235,6 +260,7 @@ start_server {tags {"pubsub network"}} {
test "Keyspace notifications: we receive keyevent notifications" {
r config set notify-keyspace-events EA
set rd1 [redis_deferring_client]
$rd1 CLIENT REPLY SKIP ;# Make sure it works even if replies are silenced
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal "pmessage * __keyevent@${db}__:set foo" [$rd1 read]
@ -244,6 +270,8 @@ start_server {tags {"pubsub network"}} {
test "Keyspace notifications: we can receive both kind of events" {
r config set notify-keyspace-events KEA
set rd1 [redis_deferring_client]
$rd1 CLIENT REPLY ON ;# Just coverage
assert_equal {OK} [$rd1 read]
assert_equal {1} [psubscribe $rd1 *]
r set foo bar
assert_equal "pmessage * __keyspace@${db}__:foo set" [$rd1 read]

@ -40,7 +40,7 @@ start_server {tags {"pubsubshard external:skip"}} {
$rd2 close
}
test "PUBLISH/SUBSCRIBE after UNSUBSCRIBE without arguments" {
test "SPUBLISH/SSUBSCRIBE after UNSUBSCRIBE without arguments" {
set rd1 [redis_deferring_client]
assert_equal {1} [ssubscribe $rd1 {chan1}]
assert_equal {2} [ssubscribe $rd1 {chan2}]
@ -54,7 +54,7 @@ start_server {tags {"pubsubshard external:skip"}} {
$rd1 close
}
test "SUBSCRIBE to one channel more than once" {
test "SSUBSCRIBE to one channel more than once" {
set rd1 [redis_deferring_client]
assert_equal {1 1 1} [ssubscribe $rd1 {chan1 chan1 chan1}]
assert_equal 1 [r SPUBLISH chan1 hello]
@ -64,7 +64,7 @@ start_server {tags {"pubsubshard external:skip"}} {
$rd1 close
}
test "UNSUBSCRIBE from non-subscribed channels" {
test "SUNSUBSCRIBE from non-subscribed channels" {
set rd1 [redis_deferring_client]
assert_equal {0} [sunsubscribe $rd1 {foo}]
assert_equal {0} [sunsubscribe $rd1 {bar}]
@ -105,6 +105,33 @@ start_server {tags {"pubsubshard external:skip"}} {
assert_equal "chan1 1" [r pubsub numsub chan1]
assert_equal "chan1" [r pubsub shardchannels]
assert_equal "chan1" [r pubsub channels]
$rd1 close
$rd2 close
}
test "PubSubShard with CLIENT REPLY OFF" {
set rd [redis_deferring_client]
$rd hello 3
$rd read ;# Discard the hello reply
# Test that the ssubscribe notification is ok
$rd client reply off
$rd ping
assert_equal {1} [ssubscribe $rd channel]
# Test that the spublish notification is ok
$rd client reply off
$rd ping
assert_equal 1 [r spublish channel hello]
assert_equal {smessage channel hello} [$rd read]
# Test that sunsubscribe notification is ok
$rd client reply off
$rd ping
assert_equal {0} [sunsubscribe $rd channel]
$rd close
}
}

@ -781,7 +781,98 @@ start_server {tags {"tracking network logreqres:skip"}} {
r debug pause-cron 0
} {OK} {needs:debug}
foreach resp {3 2} {
test "RESP$resp based basic invalidation with client reply off" {
# This entire test is mostly irrelevant for RESP2, but we run it anyway just for some extra coverage.
clean_all
$rd hello $resp
$rd read
$rd client tracking on
$rd read
$rd_sg set foo bar
$rd get foo
$rd read
$rd client reply off
$rd_sg set foo bar2
if {$resp == 3} {
assert_equal {invalidate foo} [$rd read]
} elseif {$resp == 2} { } ;# Just coverage
# Verify things didn't get messed up and no unexpected reply was pushed to the client.
$rd client reply on
assert_equal {OK} [$rd read]
$rd ping
assert_equal {PONG} [$rd read]
}
}
test {RESP3 based basic redirect invalidation with client reply off} {
clean_all
set rd_redir [redis_deferring_client]
$rd_redir hello 3
$rd_redir read
$rd_redir client id
set rd_redir_id [$rd_redir read]
$rd client tracking on redirect $rd_redir_id
$rd read
$rd_sg set foo bar
$rd get foo
$rd read
$rd_redir client reply off
$rd_sg set foo bar2
assert_equal {invalidate foo} [$rd_redir read]
# Verify things didn't get messed up and no unexpected reply was pushed to the client.
$rd_redir client reply on
assert_equal {OK} [$rd_redir read]
$rd_redir ping
assert_equal {PONG} [$rd_redir read]
$rd_redir close
}
test {RESP3 based basic tracking-redir-broken with client reply off} {
clean_all
$rd hello 3
$rd read
$rd client tracking on redirect $redir_id
$rd read
$rd_sg set foo bar
$rd get foo
$rd read
$rd client reply off
$rd_redirection quit
$rd_redirection read
$rd_sg set foo bar2
set res [lsearch -exact [$rd read] "tracking-redir-broken"]
assert_morethan_equal $res 0
# Verify things didn't get messed up and no unexpected reply was pushed to the client.
$rd client reply on
assert_equal {OK} [$rd read]
$rd ping
assert_equal {PONG} [$rd read]
}
$rd_redirection close
$rd_sg close
$rd close
}