diff --git a/src/blocked.c b/src/blocked.c index 6ad4667db..7b48fcab6 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -370,7 +370,12 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo list *l; int j; - c->bstate.timeout = timeout; + if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) { + /* If the client is re-processing the command, we do not set the timeout + * because we need to retain the client's original timeout. */ + c->bstate.timeout = timeout; + } + for (j = 0; j < numkeys; j++) { /* If the key already exists in the dictionary ignore it. */ if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) { @@ -392,7 +397,6 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo listAddNodeTail(l,c); dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l)); - /* We need to add the key to blocking_keys_unblock_on_nokey, if the client * wants to be awakened if key is deleted (like XREADGROUP) */ if (unblock_on_nokey) { diff --git a/src/module.c b/src/module.c index 1e1d0d2ae..7ee32286d 100644 --- a/src/module.c +++ b/src/module.c @@ -7707,15 +7707,15 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx, ValkeyModuleC bc->background_timer = 0; bc->background_duration = 0; - c->bstate.timeout = 0; + mstime_t timeout = 0; if (timeout_ms) { mstime_t now = mstime(); - if (timeout_ms > LLONG_MAX - now) { + if (timeout_ms > LLONG_MAX - now) { c->bstate.module_blocked_handle = NULL; addReplyError(c, "timeout is out of range"); /* 'timeout_ms+now' would overflow */ return bc; } - c->bstate.timeout = timeout_ms + now; + timeout = timeout_ms + now; } if (islua || ismulti) { @@ -7731,7 +7731,7 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx, ValkeyModuleC addReplyError(c, "Clients undergoing module based authentication can only be blocked on auth"); } else { if (keys) { - blockForKeys(c,BLOCKED_MODULE,keys,numkeys,c->bstate.timeout,flags&VALKEYMODULE_BLOCK_UNBLOCK_DELETED); + blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&VALKEYMODULE_BLOCK_UNBLOCK_DELETED); } else { blockClient(c,BLOCKED_MODULE); } diff --git a/src/server.c b/src/server.c index 53be33a09..3b02d609d 100644 --- a/src/server.c +++ b/src/server.c @@ -3512,12 +3512,20 @@ void call(client *c, int flags) { * re-processing and unblock the client.*/ c->flags |= CLIENT_EXECUTING_COMMAND; + /* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual + * processing of the command proc, the client is aware that it is being + * re-processed. */ + if (reprocessing_command) c->flags |= CLIENT_REPROCESSING_COMMAND; + monotime monotonic_start = 0; if (monotonicGetType() == MONOTONIC_CLOCK_HW) monotonic_start = getMonotonicUs(); c->cmd->proc(c); + /* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */ + if (reprocessing_command) c->flags &= ~CLIENT_REPROCESSING_COMMAND; + exitExecutionUnit(); /* In case client is blocked after trying to execute the command, @@ -3575,7 +3583,7 @@ void call(client *c, int flags) { /* Send the command to clients in MONITOR mode if applicable, * since some administrative commands are considered too dangerous to be shown. - * Other exceptions is a client which is unblocked and retring to process the command + * Other exceptions is a client which is unblocked and retrying to process the command * or we are currently in the process of loading AOF. */ if (update_command_stats && !reprocessing_command && !(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) { diff --git a/src/server.h b/src/server.h index 5eb0b5470..f4c890401 100644 --- a/src/server.h +++ b/src/server.h @@ -400,6 +400,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; auth had been authenticated from the Module. */ #define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */ #define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */ +#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 993b6d135..0dece6cb7 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -1186,6 +1186,34 @@ foreach {pop} {BLPOP BLMPOP_LEFT} { r select 9 } {OK} {singledb:skip needs:debug} + test {BLPOP unblock but the key is expired and then block again - reprocessing command} { + r flushall + r debug set-active-expire 0 + set rd [redis_deferring_client] + + set start [clock milliseconds] + $rd blpop mylist 1 + wait_for_blocked_clients_count 1 + + # The exec will try to awake the blocked client, but the key is expired, + # so the client will be blocked again during the command reprocessing. + r multi + r rpush mylist a + r pexpire mylist 100 + r debug sleep 0.2 + r exec + + assert_equal {} [$rd read] + set end [clock milliseconds] + + # In the past, this time would have been 1000+200, in order to avoid + # timing issues, we increase the range a bit. + assert_range [expr $end-$start] 1000 1100 + + r debug set-active-expire 1 + $rd close + } {0} {needs:debug} + foreach {pop} {BLPOP BLMPOP_LEFT} { test "$pop when new key is moved into place" { set rd [redis_deferring_client] diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index a6cc5da7d..242059ef7 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -475,7 +475,7 @@ start_server { $rd close } - test {Blocking XREADGROUP for stream key that has clients blocked on list - avoid endless loop} { + test {Blocking XREADGROUP for stream key that has clients blocked on stream - avoid endless loop} { r DEL mystream r XGROUP CREATE mystream mygroup $ MKSTREAM @@ -498,6 +498,34 @@ start_server { assert_equal [r ping] {PONG} } + test {Blocking XREADGROUP for stream key that has clients blocked on stream - reprocessing command} { + r DEL mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + $rd1 xreadgroup GROUP mygroup myuser BLOCK 0 STREAMS mystream > + wait_for_blocked_clients_count 1 + + set start [clock milliseconds] + $rd2 xreadgroup GROUP mygroup myuser BLOCK 1000 STREAMS mystream > + wait_for_blocked_clients_count 2 + + # After a while call xadd and let rd2 re-process the command. + after 200 + r xadd mystream * field value + assert_equal {} [$rd2 read] + set end [clock milliseconds] + + # In the past, this time would have been 1000+200, in order to avoid + # timing issues, we increase the range a bit. + assert_range [expr $end-$start] 1000 1100 + + $rd1 close + $rd2 close + } + test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} { r config resetstat r del mystream diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index 33427d89c..7836577a2 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -1942,6 +1942,34 @@ start_server {tags {"zset"}} { } } + test {BZPOPMIN unblock but the key is expired and then block again - reprocessing command} { + r flushall + r debug set-active-expire 0 + set rd [redis_deferring_client] + + set start [clock milliseconds] + $rd bzpopmin zset{t} 1 + wait_for_blocked_clients_count 1 + + # The exec will try to awake the blocked client, but the key is expired, + # so the client will be blocked again during the command reprocessing. + r multi + r zadd zset{t} 1 one + r pexpire zset{t} 100 + r debug sleep 0.2 + r exec + + assert_equal {} [$rd read] + set end [clock milliseconds] + + # In the past, this time would have been 1000+200, in order to avoid + # timing issues, we increase the range a bit. + assert_range [expr $end-$start] 1000 1100 + + r debug set-active-expire 1 + $rd close + } {0} {needs:debug} + test "BZPOPMIN with same key multiple times should work" { set rd [redis_deferring_client] r del z1{t} z2{t}