diff --git a/src/blocked.c b/src/blocked.c index 8fd355852..5909d2522 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -385,7 +385,11 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeo } } c->bstate.unblock_on_nokey = unblock_on_nokey; - c->flags |= CLIENT_PENDING_COMMAND; + /* Currently we assume key blocking will require reprocessing the command. + * However in case of modules, they have a different way to handle the reprocessing + * which does not require setting the pending command flag */ + if (btype != BLOCKED_MODULE) + c->flags |= CLIENT_PENDING_COMMAND; blockClient(c,btype); } @@ -630,8 +634,6 @@ static void moduleUnblockClientOnKey(client *c, robj *key) { if (moduleTryServeClientBlockedOnKey(c, key)) { updateStatsOnUnblock(c, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - if (c->flags & CLIENT_PENDING_COMMAND) - c->flags &= ~CLIENT_PENDING_COMMAND; moduleUnblockClient(c); } /* We need to call afterCommand even if the client was not unblocked diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index c24ebdc2a..3011e4170 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -435,6 +435,10 @@ int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **a result = REDISMODULE_OK; } else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST || RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) { + const char *module_cmd = RedisModule_StringPtrLen(argv[0], NULL); + if (!strcasecmp(module_cmd, "blockonkeys.blpopn_or_unblock")) + RedisModule_UnblockClient(RedisModule_GetBlockedClientHandle(ctx), NULL); + /* continue blocking */ result = REDISMODULE_ERR; } else { @@ -450,6 +454,12 @@ int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString * return RedisModule_ReplyWithError(ctx, "ERR Timeout"); } +int blockonkeys_blpopn_abort_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Action aborted"); +} + /* BLOCKONKEYS.BLPOPN key N * * Blocks until key has N elements and then pops them or fails after 3 seconds. @@ -457,11 +467,16 @@ int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString * int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 3) return RedisModule_WrongArity(ctx); - long long n; + long long n, timeout = 3000LL; if (RedisModule_StringToLongLong(argv[2], &n) != REDISMODULE_OK) { return RedisModule_ReplyWithError(ctx, "ERR Invalid N"); } + if (argc > 3 ) { + if (RedisModule_StringToLongLong(argv[3], &timeout) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "ERR Invalid timeout value"); + } + } RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); int keytype = RedisModule_KeyType(key); if (keytype != REDISMODULE_KEYTYPE_EMPTY && @@ -477,8 +492,8 @@ int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) } } else { RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback, - blockonkeys_blpopn_timeout_callback, - NULL, 3000, &argv[1], 1, NULL); + timeout ? blockonkeys_blpopn_timeout_callback : blockonkeys_blpopn_abort_callback, + NULL, timeout, &argv[1], 1, NULL); } RedisModule_CloseKey(key); return REDISMODULE_OK; @@ -536,5 +551,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) "write", 1, 1, 1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "blockonkeys.blpopn_or_unblock", blockonkeys_blpopn, + "write", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index 85faa7525..8e095e351 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -286,4 +286,21 @@ start_server {tags {"modules"}} { assert_equal {gg ff ee dd cc} [$rd read] $rd close } + + test {Module explicit unblock when blocked on keys} { + r del k + r set somekey someval + # Module client blocks to pop 5 elements from list + set rd [redis_deferring_client] + $rd blockonkeys.blpopn_or_unblock k 5 0 + wait_for_blocked_clients_count 1 + # will now cause the module to trigger pop but instead will unblock the client from the reply_callback + r lpush k dd + # we should still get unblocked as the command should not reprocess + wait_for_blocked_clients_count 0 + assert_equal {Action aborted} [$rd read] + $rd get somekey + assert_equal {someval} [$rd read] + $rd close + } }