Test that module can wake up module blocked on non-empty list key (#8382)

BLPOP and other blocking list commands can only block on empty keys
and LPUSH only wakes up clients when the list is created.

Using the module API, it's possible to block on a non-empty key.
Unblocking a client blocked on a non-empty list (or zset) can only
be done using RedisModule_SignalKeyAsReady(). This commit tests it.
This commit is contained in:
Viktor Söderqvist 2021-01-22 15:19:37 +01:00 committed by GitHub
parent e2dd057de5
commit 89955982c3
2 changed files with 108 additions and 1 deletions

View File

@ -2,6 +2,7 @@
#include "redismodule.h"
#include <string.h>
#include <strings.h>
#include <assert.h>
#include <unistd.h>
@ -347,7 +348,11 @@ int blockonkeys_popall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
/* A module equivalent of LPUSH */
/* BLOCKONKEYS.LPUSH key val [val ..]
* BLOCKONKEYS.LPUSH_UNBLOCK key val [val ..]
*
* A module equivalent of LPUSH. If the name LPUSH_UNBLOCK is used,
* RM_SignalKeyAsReady() is also called. */
int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 3)
return RedisModule_WrongArity(ctx);
@ -366,9 +371,83 @@ int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
}
}
RedisModule_CloseKey(key);
/* signal key as ready if the command is lpush_unblock */
size_t len;
const char *str = RedisModule_StringPtrLen(argv[0], &len);
if (!strncasecmp(str, "blockonkeys.lpush_unblock", len)) {
RedisModule_SignalKeyAsReady(ctx, argv[1]);
}
return RedisModule_ReplyWithSimpleString(ctx, "OK");
}
/* Callback for the BLOCKONKEYS.BLPOPN command */
int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argc);
long long n;
RedisModule_StringToLongLong(argv[2], &n);
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
int result;
if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST &&
RedisModule_ValueLength(key) >= (size_t)n) {
RedisModule_ReplyWithArray(ctx, n);
for (long i = 0; i < n; i++) {
RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD);
RedisModule_ReplyWithString(ctx, elem);
RedisModule_FreeString(ctx, elem);
}
result = REDISMODULE_OK;
} else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST ||
RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
/* continue blocking */
result = REDISMODULE_ERR;
} else {
result = RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
}
RedisModule_CloseKey(key);
return result;
}
int blockonkeys_blpopn_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
return RedisModule_ReplyWithError(ctx, "ERR Timeout");
}
/* BLOCKONKEYS.BLPOPN key N
*
* Blocks until key has N elements and then pops them or fails after 3 seconds.
*/
int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc < 3) return RedisModule_WrongArity(ctx);
long long n;
if (RedisModule_StringToLongLong(argv[2], &n) != REDISMODULE_OK) {
return RedisModule_ReplyWithError(ctx, "ERR Invalid N");
}
RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE);
int keytype = RedisModule_KeyType(key);
if (keytype != REDISMODULE_KEYTYPE_EMPTY &&
keytype != REDISMODULE_KEYTYPE_LIST) {
RedisModule_ReplyWithError(ctx, REDISMODULE_ERRORMSG_WRONGTYPE);
} else if (keytype == REDISMODULE_KEYTYPE_LIST &&
RedisModule_ValueLength(key) >= (size_t)n) {
RedisModule_ReplyWithArray(ctx, n);
for (long i = 0; i < n; i++) {
RedisModuleString *elem = RedisModule_ListPop(key, REDISMODULE_LIST_HEAD);
RedisModule_ReplyWithString(ctx, elem);
RedisModule_FreeString(ctx, elem);
}
} else {
RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback,
blockonkeys_blpopn_timeout_callback,
NULL, 3000, &argv[1], 1, NULL);
}
RedisModule_CloseKey(key);
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@ -413,5 +492,13 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
"", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "blockonkeys.lpush_unblock", blockonkeys_lpush,
"", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "blockonkeys.blpopn", blockonkeys_blpopn,
"", 1, 1, 1) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}

View File

@ -216,4 +216,24 @@ start_server {tags {"modules"}} {
assert_equal {k 42} [$rd read]
$rd close
}
test {Module unblocks module blocked on non-empty list} {
r del k
r lpush k aa
# Module client blocks to pop 5 elements from list
set rd [redis_deferring_client]
$rd blockonkeys.blpopn k 5
# Wait until client is actually blocked
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Client is not blocked"
}
# Check that RM_SignalKeyAsReady() can wake up BLPOPN
r blockonkeys.lpush_unblock k bb cc ;# Not enough elements for BLPOPN
r lpush k dd ee ff ;# Doesn't unblock module
r blockonkeys.lpush_unblock k gg ;# Unblocks module
assert_equal {gg ff ee dd cc} [$rd read]
$rd close
}
}