diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index 6e4b5b79c..2b7b614a9 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -2,6 +2,7 @@ #include "redismodule.h" #include +#include #include #include @@ -347,7 +348,11 @@ int blockonkeys_popall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } -/* A module equivalent of LPUSH */ +/* BLOCKONKEYS.LPUSH key val [val ..] + * BLOCKONKEYS.LPUSH_UNBLOCK key val [val ..] + * + * A module equivalent of LPUSH. If the name LPUSH_UNBLOCK is used, + * RM_SignalKeyAsReady() is also called. */ int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc < 3) return RedisModule_WrongArity(ctx); @@ -366,9 +371,83 @@ int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { } } RedisModule_CloseKey(key); + + /* signal key as ready if the command is lpush_unblock */ + size_t len; + const char *str = RedisModule_StringPtrLen(argv[0], &len); + if (!strncasecmp(str, "blockonkeys.lpush_unblock", len)) { + RedisModule_SignalKeyAsReady(ctx, argv[1]); + } return RedisModule_ReplyWithSimpleString(ctx, "OK"); } +/* Callback for the BLOCKONKEYS.BLPOPN command */ +int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argc); + long long n; + RedisModule_StringToLongLong(argv[2], &n); + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + int result; + if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST && + RedisModule_ValueLength(key) >= (size_t)n) { + RedisModule_ReplyWithArray(ctx, n); + for (long i = 0; i < n; i++) { + RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD); + RedisModule_ReplyWithString(ctx, elem); + RedisModule_FreeString(ctx, elem); + } + result = REDISMODULE_OK; + } else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST || + RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) { + /* continue blocking */ + result = REDISMODULE_ERR; + } else { + result = RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } + RedisModule_CloseKey(key); + return result; +} + +int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithError(ctx, "ERR Timeout"); +} + +/* BLOCKONKEYS.BLPOPN key N + * + * Blocks until key has N elements and then pops them or fails after 3 seconds. + */ +int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc < 3) return RedisModule_WrongArity(ctx); + + long long n; + if (RedisModule_StringToLongLong(argv[2], &n) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "ERR Invalid N"); + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + int keytype = RedisModule_KeyType(key); + if (keytype != REDISMODULE_KEYTYPE_EMPTY && + keytype != REDISMODULE_KEYTYPE_LIST) { + RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE); + } else if (keytype == REDISMODULE_KEYTYPE_LIST && + RedisModule_ValueLength(key) >= (size_t)n) { + RedisModule_ReplyWithArray(ctx, n); + for (long i = 0; i < n; i++) { + RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD); + RedisModule_ReplyWithString(ctx, elem); + RedisModule_FreeString(ctx, elem); + } + } else { + RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback, + blockonkeys_blpopn_timeout_callback, + NULL, 3000, &argv[1], 1, NULL); + } + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -413,5 +492,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) "", 1, 1, 1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush_unblock", blockonkeys_lpush, + "", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "blockonkeys.blpopn", blockonkeys_blpopn, + "", 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index 00041674f..75191b3c7 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -216,4 +216,24 @@ start_server {tags {"modules"}} { assert_equal {k 42} [$rd read] $rd close } + + test {Module unblocks module blocked on non-empty list} { + r del k + r lpush k aa + # Module client blocks to pop 5 elements from list + set rd [redis_deferring_client] + $rd blockonkeys.blpopn k 5 + # Wait until client is actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Client is not blocked" + } + # Check that RM_SignalKeyAsReady() can wake up BLPOPN + r blockonkeys.lpush_unblock k bb cc ;# Not enough elements for BLPOPN + r lpush k dd ee ff ;# Doesn't unblock module + r blockonkeys.lpush_unblock k gg ;# Unblocks module + assert_equal {gg ff ee dd cc} [$rd read] + $rd close + } }