Fix blocking commands timeout is reset due to re-processing command (#13004)
In #11012, we will reprocess command when client is unblocked on keys, in some blocking commands, for example, in the XREADGROUP BLOCK scenario, because of the re-processing command, we will recalculate the block timeout, causing the blocking time to be reset. This commit add a new CLIENT_REPROCESSING_COMMAND clent flag, explicitly let the command know that it is being re-processed, later in blockForKeys we will not reset the timeout. Affected BLOCK cases: - list / zset / stream, added test cases for each. Unaffected cases: - module (never re-process the commands). - WAIT / WAITAOF (never re-process the commands). Fixes #12998. Signed-off-by: Ping Xie <pingxie@google.com>
This commit is contained in:
parent
5cec6b6b93
commit
0b7f032673
@ -370,7 +370,12 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
|
||||
list *l;
|
||||
int j;
|
||||
|
||||
c->bstate.timeout = timeout;
|
||||
if (!(c->flags & CLIENT_REPROCESSING_COMMAND)) {
|
||||
/* If the client is re-processing the command, we do not set the timeout
|
||||
* because we need to retain the client's original timeout. */
|
||||
c->bstate.timeout = timeout;
|
||||
}
|
||||
|
||||
for (j = 0; j < numkeys; j++) {
|
||||
/* If the key already exists in the dictionary ignore it. */
|
||||
if (!(client_blocked_entry = dictAddRaw(c->bstate.keys,keys[j],NULL))) {
|
||||
@ -392,7 +397,6 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
|
||||
listAddNodeTail(l,c);
|
||||
dictSetVal(c->bstate.keys,client_blocked_entry,listLast(l));
|
||||
|
||||
|
||||
/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
|
||||
* wants to be awakened if key is deleted (like XREADGROUP) */
|
||||
if (unblock_on_nokey) {
|
||||
|
@ -7707,15 +7707,15 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx, ValkeyModuleC
|
||||
bc->background_timer = 0;
|
||||
bc->background_duration = 0;
|
||||
|
||||
c->bstate.timeout = 0;
|
||||
mstime_t timeout = 0;
|
||||
if (timeout_ms) {
|
||||
mstime_t now = mstime();
|
||||
if (timeout_ms > LLONG_MAX - now) {
|
||||
if (timeout_ms > LLONG_MAX - now) {
|
||||
c->bstate.module_blocked_handle = NULL;
|
||||
addReplyError(c, "timeout is out of range"); /* 'timeout_ms+now' would overflow */
|
||||
return bc;
|
||||
}
|
||||
c->bstate.timeout = timeout_ms + now;
|
||||
timeout = timeout_ms + now;
|
||||
}
|
||||
|
||||
if (islua || ismulti) {
|
||||
@ -7731,7 +7731,7 @@ ValkeyModuleBlockedClient *moduleBlockClient(ValkeyModuleCtx *ctx, ValkeyModuleC
|
||||
addReplyError(c, "Clients undergoing module based authentication can only be blocked on auth");
|
||||
} else {
|
||||
if (keys) {
|
||||
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,c->bstate.timeout,flags&VALKEYMODULE_BLOCK_UNBLOCK_DELETED);
|
||||
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,flags&VALKEYMODULE_BLOCK_UNBLOCK_DELETED);
|
||||
} else {
|
||||
blockClient(c,BLOCKED_MODULE);
|
||||
}
|
||||
|
10
src/server.c
10
src/server.c
@ -3512,12 +3512,20 @@ void call(client *c, int flags) {
|
||||
* re-processing and unblock the client.*/
|
||||
c->flags |= CLIENT_EXECUTING_COMMAND;
|
||||
|
||||
/* Setting the CLIENT_REPROCESSING_COMMAND flag so that during the actual
|
||||
* processing of the command proc, the client is aware that it is being
|
||||
* re-processed. */
|
||||
if (reprocessing_command) c->flags |= CLIENT_REPROCESSING_COMMAND;
|
||||
|
||||
monotime monotonic_start = 0;
|
||||
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
|
||||
monotonic_start = getMonotonicUs();
|
||||
|
||||
c->cmd->proc(c);
|
||||
|
||||
/* Clear the CLIENT_REPROCESSING_COMMAND flag after the proc is executed. */
|
||||
if (reprocessing_command) c->flags &= ~CLIENT_REPROCESSING_COMMAND;
|
||||
|
||||
exitExecutionUnit();
|
||||
|
||||
/* In case client is blocked after trying to execute the command,
|
||||
@ -3575,7 +3583,7 @@ void call(client *c, int flags) {
|
||||
|
||||
/* Send the command to clients in MONITOR mode if applicable,
|
||||
* since some administrative commands are considered too dangerous to be shown.
|
||||
* Other exceptions is a client which is unblocked and retring to process the command
|
||||
* Other exceptions is a client which is unblocked and retrying to process the command
|
||||
* or we are currently in the process of loading AOF. */
|
||||
if (update_command_stats && !reprocessing_command &&
|
||||
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN))) {
|
||||
|
@ -400,6 +400,7 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
|
||||
auth had been authenticated from the Module. */
|
||||
#define CLIENT_MODULE_PREVENT_AOF_PROP (1ULL<<48) /* Module client do not want to propagate to AOF */
|
||||
#define CLIENT_MODULE_PREVENT_REPL_PROP (1ULL<<49) /* Module client do not want to propagate to replica */
|
||||
#define CLIENT_REPROCESSING_COMMAND (1ULL<<50) /* The client is re-processing the command. */
|
||||
|
||||
/* Client block type (btype field in client structure)
|
||||
* if CLIENT_BLOCKED flag is set. */
|
||||
|
@ -1186,6 +1186,34 @@ foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
r select 9
|
||||
} {OK} {singledb:skip needs:debug}
|
||||
|
||||
test {BLPOP unblock but the key is expired and then block again - reprocessing command} {
|
||||
r flushall
|
||||
r debug set-active-expire 0
|
||||
set rd [redis_deferring_client]
|
||||
|
||||
set start [clock milliseconds]
|
||||
$rd blpop mylist 1
|
||||
wait_for_blocked_clients_count 1
|
||||
|
||||
# The exec will try to awake the blocked client, but the key is expired,
|
||||
# so the client will be blocked again during the command reprocessing.
|
||||
r multi
|
||||
r rpush mylist a
|
||||
r pexpire mylist 100
|
||||
r debug sleep 0.2
|
||||
r exec
|
||||
|
||||
assert_equal {} [$rd read]
|
||||
set end [clock milliseconds]
|
||||
|
||||
# In the past, this time would have been 1000+200, in order to avoid
|
||||
# timing issues, we increase the range a bit.
|
||||
assert_range [expr $end-$start] 1000 1100
|
||||
|
||||
r debug set-active-expire 1
|
||||
$rd close
|
||||
} {0} {needs:debug}
|
||||
|
||||
foreach {pop} {BLPOP BLMPOP_LEFT} {
|
||||
test "$pop when new key is moved into place" {
|
||||
set rd [redis_deferring_client]
|
||||
|
@ -475,7 +475,7 @@ start_server {
|
||||
$rd close
|
||||
}
|
||||
|
||||
test {Blocking XREADGROUP for stream key that has clients blocked on list - avoid endless loop} {
|
||||
test {Blocking XREADGROUP for stream key that has clients blocked on stream - avoid endless loop} {
|
||||
r DEL mystream
|
||||
r XGROUP CREATE mystream mygroup $ MKSTREAM
|
||||
|
||||
@ -498,6 +498,34 @@ start_server {
|
||||
assert_equal [r ping] {PONG}
|
||||
}
|
||||
|
||||
test {Blocking XREADGROUP for stream key that has clients blocked on stream - reprocessing command} {
|
||||
r DEL mystream
|
||||
r XGROUP CREATE mystream mygroup $ MKSTREAM
|
||||
|
||||
set rd1 [redis_deferring_client]
|
||||
set rd2 [redis_deferring_client]
|
||||
|
||||
$rd1 xreadgroup GROUP mygroup myuser BLOCK 0 STREAMS mystream >
|
||||
wait_for_blocked_clients_count 1
|
||||
|
||||
set start [clock milliseconds]
|
||||
$rd2 xreadgroup GROUP mygroup myuser BLOCK 1000 STREAMS mystream >
|
||||
wait_for_blocked_clients_count 2
|
||||
|
||||
# After a while call xadd and let rd2 re-process the command.
|
||||
after 200
|
||||
r xadd mystream * field value
|
||||
assert_equal {} [$rd2 read]
|
||||
set end [clock milliseconds]
|
||||
|
||||
# In the past, this time would have been 1000+200, in order to avoid
|
||||
# timing issues, we increase the range a bit.
|
||||
assert_range [expr $end-$start] 1000 1100
|
||||
|
||||
$rd1 close
|
||||
$rd2 close
|
||||
}
|
||||
|
||||
test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} {
|
||||
r config resetstat
|
||||
r del mystream
|
||||
|
@ -1942,6 +1942,34 @@ start_server {tags {"zset"}} {
|
||||
}
|
||||
}
|
||||
|
||||
test {BZPOPMIN unblock but the key is expired and then block again - reprocessing command} {
|
||||
r flushall
|
||||
r debug set-active-expire 0
|
||||
set rd [redis_deferring_client]
|
||||
|
||||
set start [clock milliseconds]
|
||||
$rd bzpopmin zset{t} 1
|
||||
wait_for_blocked_clients_count 1
|
||||
|
||||
# The exec will try to awake the blocked client, but the key is expired,
|
||||
# so the client will be blocked again during the command reprocessing.
|
||||
r multi
|
||||
r zadd zset{t} 1 one
|
||||
r pexpire zset{t} 100
|
||||
r debug sleep 0.2
|
||||
r exec
|
||||
|
||||
assert_equal {} [$rd read]
|
||||
set end [clock milliseconds]
|
||||
|
||||
# In the past, this time would have been 1000+200, in order to avoid
|
||||
# timing issues, we increase the range a bit.
|
||||
assert_range [expr $end-$start] 1000 1100
|
||||
|
||||
r debug set-active-expire 1
|
||||
$rd close
|
||||
} {0} {needs:debug}
|
||||
|
||||
test "BZPOPMIN with same key multiple times should work" {
|
||||
set rd [redis_deferring_client]
|
||||
r del z1{t} z2{t}
|
||||
|
Loading…
x
Reference in New Issue
Block a user