valkey-cli auto-exit from subscribed mode (#1432)
Resolves issue with valkey-cli not auto exiting from subscribed mode on reaching zero pub/sub subscription (previously filed on Redis) https://github.com/redis/redis/issues/12592 --------- Signed-off-by: Nikhil Manglore <nmanglor@amazon.com>
This commit is contained in:
parent
0a89571dcc
commit
9e0204941d
@ -218,6 +218,8 @@ static struct config {
|
||||
int shutdown;
|
||||
int monitor_mode;
|
||||
int pubsub_mode;
|
||||
int pubsub_unsharded_count; /* channels and patterns */
|
||||
int pubsub_sharded_count; /* shard channels */
|
||||
int blocking_state_aborted; /* used to abort monitor_mode and pubsub_mode. */
|
||||
int latency_mode;
|
||||
int latency_dist_mode;
|
||||
@ -2229,6 +2231,28 @@ static int cliReadReply(int output_raw_strings) {
|
||||
return REDIS_OK;
|
||||
}
|
||||
|
||||
/* Helper method to handle pubsub subscription/unsubscription. */
|
||||
static void handlePubSubMode(redisReply *reply) {
|
||||
char *cmd = reply->element[0]->str;
|
||||
int count = reply->element[2]->integer;
|
||||
|
||||
/* Update counts based on the command type */
|
||||
if (strcmp(cmd, "subscribe") == 0 || strcmp(cmd, "psubscribe") == 0 || strcmp(cmd, "unsubscribe") == 0 || strcmp(cmd, "punsubscribe") == 0) {
|
||||
config.pubsub_unsharded_count = count;
|
||||
} else if (strcmp(cmd, "ssubscribe") == 0 || strcmp(cmd, "sunsubscribe") == 0) {
|
||||
config.pubsub_sharded_count = count;
|
||||
}
|
||||
|
||||
/* Update pubsub mode based on the current counts */
|
||||
if (config.pubsub_unsharded_count + config.pubsub_sharded_count == 0 && config.pubsub_mode) {
|
||||
config.pubsub_mode = 0;
|
||||
cliRefreshPrompt();
|
||||
} else if (config.pubsub_unsharded_count + config.pubsub_sharded_count > 0 && !config.pubsub_mode) {
|
||||
config.pubsub_mode = 1;
|
||||
cliRefreshPrompt();
|
||||
}
|
||||
}
|
||||
|
||||
/* Simultaneously wait for pubsub messages from the server and input on stdin. */
|
||||
static void cliWaitForMessagesOrStdin(void) {
|
||||
int show_info = config.output != OUTPUT_RAW && (isatty(STDOUT_FILENO) || getenv("FAKETTY"));
|
||||
@ -2246,7 +2270,13 @@ static void cliWaitForMessagesOrStdin(void) {
|
||||
sds out = cliFormatReply(reply, config.output, 0);
|
||||
fwrite(out, sdslen(out), 1, stdout);
|
||||
fflush(stdout);
|
||||
|
||||
if (isPubsubPush(reply)) {
|
||||
handlePubSubMode(reply);
|
||||
}
|
||||
|
||||
sdsfree(out);
|
||||
freeReplyObject(reply);
|
||||
}
|
||||
} while (reply);
|
||||
|
||||
@ -2397,13 +2427,11 @@ static int cliSendCommand(int argc, char **argv, long repeat) {
|
||||
fflush(stdout);
|
||||
if (config.pubsub_mode || num_expected_pubsub_push > 0) {
|
||||
if (isPubsubPush(config.last_reply)) {
|
||||
handlePubSubMode(config.last_reply);
|
||||
|
||||
if (num_expected_pubsub_push > 0 && !strcasecmp(config.last_reply->element[0]->str, command)) {
|
||||
/* This pushed message confirms the
|
||||
* [p|s][un]subscribe command. */
|
||||
if (is_subscribe && !config.pubsub_mode) {
|
||||
config.pubsub_mode = 1;
|
||||
cliRefreshPrompt();
|
||||
}
|
||||
if (--num_expected_pubsub_push > 0) {
|
||||
continue; /* We need more of these. */
|
||||
}
|
||||
@ -3117,6 +3145,13 @@ void cliSetPreferences(char **argv, int argc, int interactive) {
|
||||
else {
|
||||
printf("%sunknown valkey-cli preference '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[1]);
|
||||
}
|
||||
} else if (!strcasecmp(argv[0], ":get") && argc >= 2) {
|
||||
if (!strcasecmp(argv[1], "pubsub")) {
|
||||
printf("%d\n", config.pubsub_mode);
|
||||
} else {
|
||||
printf("%sunknown valkey-cli get option '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[1]);
|
||||
}
|
||||
fflush(stdout);
|
||||
} else {
|
||||
printf("%sunknown valkey-cli internal command '%s'\n", interactive ? "" : ".valkeyclirc: ", argv[0]);
|
||||
}
|
||||
@ -9495,6 +9530,8 @@ int main(int argc, char **argv) {
|
||||
config.shutdown = 0;
|
||||
config.monitor_mode = 0;
|
||||
config.pubsub_mode = 0;
|
||||
config.pubsub_unsharded_count = 0;
|
||||
config.pubsub_sharded_count = 0;
|
||||
config.blocking_state_aborted = 0;
|
||||
config.latency_mode = 0;
|
||||
config.latency_dist_mode = 0;
|
||||
|
@ -608,6 +608,232 @@ if {!$::tls} { ;# fake_redis_node doesn't support TLS
|
||||
assert_equal "a\n1\nb\n2\nc\n3" [exec {*}$cmdline ZRANGE new_zset 0 -1 WITHSCORES]
|
||||
}
|
||||
|
||||
test "valkey-cli pubsub mode with single standard channel subscription" {
|
||||
set fd [open_cli]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
write_cli $fd "SUBSCRIBE ch1"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "1" $pubsub_status
|
||||
|
||||
write_cli $fd "UNSUBSCRIBE ch1"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
close_cli $fd
|
||||
}
|
||||
|
||||
test "valkey-cli pubsub mode with multiple standard channel subscriptions" {
|
||||
set fd [open_cli]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
write_cli $fd "SUBSCRIBE ch1 ch2 ch3"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "1" $pubsub_status
|
||||
|
||||
write_cli $fd "UNSUBSCRIBE"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
close_cli $fd
|
||||
}
|
||||
|
||||
test "valkey-cli pubsub mode with single shard channel subscription" {
|
||||
set fd [open_cli]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
write_cli $fd "SSUBSCRIBE schannel1"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "1" $pubsub_status
|
||||
|
||||
write_cli $fd "SUNSUBSCRIBE schannel1"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
close_cli $fd
|
||||
}
|
||||
|
||||
test "valkey-cli pubsub mode with multiple shard channel subscriptions" {
|
||||
|
||||
set fd [open_cli]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
write_cli $fd "SSUBSCRIBE schannel1 schannel2 schannel3"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "1" $pubsub_status
|
||||
|
||||
write_cli $fd "SUNSUBSCRIBE"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
close_cli $fd
|
||||
}
|
||||
|
||||
test "valkey-cli pubsub mode with single pattern channel subscription" {
|
||||
set fd [open_cli]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
write_cli $fd "PSUBSCRIBE pattern1*"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "1" $pubsub_status
|
||||
|
||||
write_cli $fd "PUNSUBSCRIBE pattern1*"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
close_cli $fd
|
||||
}
|
||||
|
||||
test "valkey-cli pubsub mode with multiple pattern channel subscriptions" {
|
||||
set fd [open_cli]
|
||||
|
||||
write_cli $fd "PSUBSCRIBE pattern1* pattern2* pattern3*"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "1" $pubsub_status
|
||||
|
||||
write_cli $fd "PUNSUBSCRIBE"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
close_cli $fd
|
||||
}
|
||||
|
||||
test "valkey-cli pubsub mode when subscribing to the same channel" {
|
||||
set fd [open_cli]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
write_cli $fd "SUBSCRIBE ch1 ch1"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "1" $pubsub_status
|
||||
|
||||
write_cli $fd "UNSUBSCRIBE"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
close_cli $fd
|
||||
}
|
||||
|
||||
test "valkey-cli pubsub mode with multiple subscription types" {
|
||||
set fd [open_cli]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
write_cli $fd "SUBSCRIBE ch1 ch2 ch3"
|
||||
set response [read_cli $fd]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "1" $pubsub_status
|
||||
|
||||
write_cli $fd "PSUBSCRIBE pattern*"
|
||||
set response [read_cli $fd]
|
||||
set lines [split $response "\n"]
|
||||
assert_equal "psubscribe" [lindex $lines 0]
|
||||
assert_equal "pattern*" [lindex $lines 1]
|
||||
assert_equal "4" [lindex $lines 2]
|
||||
|
||||
write_cli $fd "SSUBSCRIBE schannel"
|
||||
set response [read_cli $fd]
|
||||
set lines [split $response "\n"]
|
||||
assert_equal "ssubscribe" [lindex $lines 0]
|
||||
assert_equal "schannel" [lindex $lines 1]
|
||||
assert_equal "1" [lindex $lines 2]
|
||||
|
||||
write_cli $fd "PUNSUBSCRIBE pattern*"
|
||||
set response [read_cli $fd]
|
||||
set lines [split $response "\n"]
|
||||
assert_equal "punsubscribe" [lindex $lines 0]
|
||||
assert_equal "pattern*" [lindex $lines 1]
|
||||
assert_equal "3" [lindex $lines 2]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "1" $pubsub_status
|
||||
|
||||
write_cli $fd "SUNSUBSCRIBE schannel"
|
||||
set response [read_cli $fd]
|
||||
set lines [split $response "\n"]
|
||||
assert_equal "sunsubscribe" [lindex $lines 0]
|
||||
assert_equal "schannel" [lindex $lines 1]
|
||||
assert_equal "0" [lindex $lines 2]
|
||||
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "1" $pubsub_status
|
||||
|
||||
write_cli $fd "UNSUBSCRIBE"
|
||||
set response [read_cli $fd]
|
||||
|
||||
# Verify pubsub mode is no longer active
|
||||
write_cli $fd ":get pubsub"
|
||||
set pubsub_status [string trim [read_cli $fd]]
|
||||
assert_equal "0" $pubsub_status
|
||||
|
||||
close_cli $fd
|
||||
}
|
||||
|
||||
test "Valid Connection Scheme: redis://" {
|
||||
set cmdline [valkeycliuri "redis://" [srv host] [srv port]]
|
||||
assert_equal {PONG} [exec {*}$cmdline PING]
|
||||
|
Loading…
x
Reference in New Issue
Block a user