From c0f85db2b5cd8b5231113d23281ee7e53bae67ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 19 Jan 2021 12:15:33 +0100 Subject: [PATCH] Bugfix: Make modules blocked on keys unblock on commands like LPUSH (#8356) This was a regression from #7625 (only in 6.2 RC2). This makes it possible again to implement blocking list and zset commands using the modules API. This commit also includes a test case for the reverse: A module unblocks a client blocked on BLPOP by inserting elements using RedisModule_ListPush(). This already works, but it was untested. --- src/blocked.c | 14 ++++- tests/modules/blockonkeys.c | 78 ++++++++++++++++++++++++++++ tests/unit/moduleapi/blockonkeys.tcl | 30 +++++++++++ 3 files changed, 120 insertions(+), 2 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 46935c79f..1d69ff902 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -684,10 +684,20 @@ static int getBlockedTypeByType(int type) { void signalKeyAsReady(redisDb *db, robj *key, int type) { readyList *rl; - /* If no clients are blocked on this type, just return */ + /* Quick returns. */ int btype = getBlockedTypeByType(type); - if (btype == BLOCKED_NONE || !server.blocked_clients_by_type[btype]) + if (btype == BLOCKED_NONE) { + /* The type can never block. */ return; + } + if (!server.blocked_clients_by_type[btype] && + !server.blocked_clients_by_type[BLOCKED_MODULE]) { + /* No clients block on this type. Note: Blocked modules are represented + * by BLOCKED_MODULE, even if the intention is to wake up by normal + * types (list, zset, stream), so we need to check that there are no + * blocked modules before we do a quick return here. */ + return; + } /* No clients blocking for this key? No need to queue it. */ if (dictFind(db->blocking_keys,key) == NULL) return; diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index 94f31d455..6f157342c 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -298,6 +298,76 @@ int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return REDISMODULE_OK; } +/* Callback for blockonkeys_popall */ +int blockonkeys_popall_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argc); + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST) { + RedisModuleString *elem; + long len = 0; + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + while ((elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD)) != NULL) { + len++; + RedisModule_ReplyWithString(ctx, elem); + RedisModule_FreeString(ctx, elem); + } + RedisModule_ReplySetArrayLength(ctx, len); + } else { + RedisModule_ReplyWithError(ctx, "ERR Not a list"); + } + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int blockonkeys_popall_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithError(ctx, "ERR Timeout"); +} + +/* BLOCKONKEYS.POPALL key + * + * Blocks on an empty key for up to 3 seconds. When unblocked by a list + * operation like LPUSH, all the elements are popped and returned. Fails with an + * error on timeout. */ +int blockonkeys_popall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) + return RedisModule_WrongArity(ctx); + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_READ); + if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) { + RedisModule_BlockClientOnKeys(ctx, blockonkeys_popall_reply_callback, + blockonkeys_popall_timeout_callback, + NULL, 3000, &argv[1], 1, NULL); + } else { + RedisModule_ReplyWithError(ctx, "ERR Key not empty"); + } + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +/* A module equivalent of LPUSH */ +int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc < 3) + return RedisModule_WrongArity(ctx); + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + if (RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_EMPTY && + RedisModule_KeyType(key) != REDISMODULE_KEYTYPE_LIST) { + RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } else { + for (int i = 2; i < argc; i++) { + if (RedisModule_ListPush(key, REDISMODULE_LIST_HEAD, + argv[i]) != REDISMODULE_OK) { + RedisModule_CloseKey(key); + return RedisModule_ReplyWithError(ctx, "ERR Push failed"); + } + } + } + RedisModule_CloseKey(key); + return RedisModule_ReplyWithSimpleString(ctx, "OK"); +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -334,5 +404,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "blockonkeys.popall", blockonkeys_popall, + "", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush", blockonkeys_lpush, + "", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index 5e5d93da3..0f6612164 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -185,4 +185,34 @@ start_server {tags {"modules"}} { r fsl.push k 34 assert_equal {34} [$rd read] } + + test {Module client blocked on keys woken up by LPUSH} { + r del k + set rd [redis_deferring_client] + $rd blockonkeys.popall k + # wait until client is actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Client is not blocked" + } + r lpush k 42 squirrel banana + assert_equal {banana squirrel 42} [$rd read] + $rd close + } + + test {Module client unblocks BLPOP} { + r del k + set rd [redis_deferring_client] + $rd blpop k 3 + # wait until client is actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Client is not blocked" + } + r blockonkeys.lpush k 42 + assert_equal {k 42} [$rd read] + $rd close + } }