Fix an issue when module decides to unblock a client which is blocked on keys (#11832)
Currently (starting at #11012) When a module is blocked on keys it sets the CLIENT_PENDING_COMMAND flag. However in case the module decides to unblock the client not via the regular flow (eg timeout, key signal or CLIENT UNBLOCK command) it will attempt to reprocess the module command and potentially blocked again. This fix remove the CLIENT_PENDING_COMMAND flag in case blockedForKeys is issued from module context.
This commit is contained in:
parent
2bb29e4aa3
commit
4988b92850
@ -385,7 +385,11 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo
|
||||
}
|
||||
}
|
||||
c->bstate.unblock_on_nokey = unblock_on_nokey;
|
||||
c->flags |= CLIENT_PENDING_COMMAND;
|
||||
/* Currently we assume key blocking will require reprocessing the command.
|
||||
* However in case of modules, they have a different way to handle the reprocessing
|
||||
* which does not require setting the pending command flag */
|
||||
if (btype != BLOCKED_MODULE)
|
||||
c->flags |= CLIENT_PENDING_COMMAND;
|
||||
blockClient(c,btype);
|
||||
}
|
||||
|
||||
@ -630,8 +634,6 @@ static void moduleUnblockClientOnKey(client *c, robj *key) {
|
||||
|
||||
if (moduleTryServeClientBlockedOnKey(c, key)) {
|
||||
updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
|
||||
if (c->flags & CLIENT_PENDING_COMMAND)
|
||||
c->flags &= ~CLIENT_PENDING_COMMAND;
|
||||
moduleUnblockClient(c);
|
||||
}
|
||||
/* We need to call afterCommand even if the client was not unblocked
|
||||
|
@ -435,6 +435,10 @@ int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **a
|
||||
result = REDISMODULE_OK;
|
||||
} else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST ||
|
||||
RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
|
||||
const char *module_cmd = RedisModule_StringPtrLen(argv[0], NULL);
|
||||
if (!strcasecmp(module_cmd, "blockonkeys.blpopn_or_unblock"))
|
||||
RedisModule_UnblockClient(RedisModule_GetBlockedClientHandle(ctx), NULL);
|
||||
|
||||
/* continue blocking */
|
||||
result = REDISMODULE_ERR;
|
||||
} else {
|
||||
@ -450,6 +454,12 @@ int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString *
|
||||
return RedisModule_ReplyWithError(ctx, "ERR Timeout");
|
||||
}
|
||||
|
||||
int blockonkeys_blpopn_abort_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
REDISMODULE_NOT_USED(argv);
|
||||
REDISMODULE_NOT_USED(argc);
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "Action aborted");
|
||||
}
|
||||
|
||||
/* BLOCKONKEYS.BLPOPN key N
|
||||
*
|
||||
* Blocks until key has N elements and then pops them or fails after 3 seconds.
|
||||
@ -457,11 +467,16 @@ int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString *
|
||||
int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc < 3) return RedisModule_WrongArity(ctx);
|
||||
|
||||
long long n;
|
||||
long long n, timeout = 3000LL;
|
||||
if (RedisModule_StringToLongLong(argv[2], &n) != REDISMODULE_OK) {
|
||||
return RedisModule_ReplyWithError(ctx, "ERR Invalid N");
|
||||
}
|
||||
|
||||
if (argc > 3 ) {
|
||||
if (RedisModule_StringToLongLong(argv[3], &timeout) != REDISMODULE_OK) {
|
||||
return RedisModule_ReplyWithError(ctx, "ERR Invalid timeout value");
|
||||
}
|
||||
}
|
||||
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
|
||||
int keytype = RedisModule_KeyType(key);
|
||||
if (keytype != REDISMODULE_KEYTYPE_EMPTY &&
|
||||
@ -477,8 +492,8 @@ int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
}
|
||||
} else {
|
||||
RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback,
|
||||
blockonkeys_blpopn_timeout_callback,
|
||||
NULL, 3000, &argv[1], 1, NULL);
|
||||
timeout ? blockonkeys_blpopn_timeout_callback : blockonkeys_blpopn_abort_callback,
|
||||
NULL, timeout, &argv[1], 1, NULL);
|
||||
}
|
||||
RedisModule_CloseKey(key);
|
||||
return REDISMODULE_OK;
|
||||
@ -536,5 +551,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
"write", 1, 1, 1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "blockonkeys.blpopn_or_unblock", blockonkeys_blpopn,
|
||||
"write", 1, 1, 1) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
@ -286,4 +286,21 @@ start_server {tags {"modules"}} {
|
||||
assert_equal {gg ff ee dd cc} [$rd read]
|
||||
$rd close
|
||||
}
|
||||
|
||||
test {Module explicit unblock when blocked on keys} {
|
||||
r del k
|
||||
r set somekey someval
|
||||
# Module client blocks to pop 5 elements from list
|
||||
set rd [redis_deferring_client]
|
||||
$rd blockonkeys.blpopn_or_unblock k 5 0
|
||||
wait_for_blocked_clients_count 1
|
||||
# will now cause the module to trigger pop but instead will unblock the client from the reply_callback
|
||||
r lpush k dd
|
||||
# we should still get unblocked as the command should not reprocess
|
||||
wait_for_blocked_clients_count 0
|
||||
assert_equal {Action aborted} [$rd read]
|
||||
$rd get somekey
|
||||
assert_equal {someval} [$rd read]
|
||||
$rd close
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user