diff --git a/src/module.c b/src/module.c index 61dc25169..85e1497fd 100644 --- a/src/module.c +++ b/src/module.c @@ -4393,14 +4393,26 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF * can really be unblocked, since the module was able to serve the client. * If the callback returns REDISMODULE_OK, then the client can be unblocked, * otherwise the client remains blocked and we'll retry again when one of - * the keys it blocked for becomes "ready" again. */ + * the keys it blocked for becomes "ready" again. + * This function returns 1 if client was served (and should be unblocked) */ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { int served = 0; RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; /* Protect against re-processing: don't serve clients that are already * in the unblocking list for any reason (including RM_UnblockClient() - * explicit call). */ - if (bc->unblocked) return REDISMODULE_ERR; + * explicit call). + * For example, the following pathological case: + * Assume a module called LIST implements the same command as + * the Redis list data type. + * LIST.BRPOPLPUSH src dst 0 ('src' goes into db->blocking_keys) + * LIST.BRPOPLPUSH dst src 0 ('dst' goes into db->blocking_keys) + * LIST.LPUSH src foo + * 'src' is in db->blocking_keys after the first BRPOPLPUSH is served + * (and stays there until the next beforeSleep). + * The second BRPOPLPUSH will signal 'src' as ready, leading to the + * unblocking of the already unblocked (and worst, freed) reply_client + * of the first BRPOPLPUSH. */ + if (bc->unblocked) return 0; RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.blocked_ready_key = key; diff --git a/src/server.c b/src/server.c index 4fdbf30ec..5d8b4fde9 100644 --- a/src/server.c +++ b/src/server.c @@ -3981,7 +3981,7 @@ sds genRedisInfoString(const char *section) { maxin, maxout, server.blocked_clients, server.tracking_clients, - raxSize(server.clients_timeout_table)); + (unsigned long long)raxSize(server.clients_timeout_table)); } /* Memory */ diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index 10dc65b1a..94f31d455 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -109,41 +109,33 @@ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element"); fsl->list[fsl->length++] = ele; - - if (fsl->length >= 2) - RedisModule_SignalKeyAsReady(ctx, argv[1]); + RedisModule_SignalKeyAsReady(ctx, argv[1]); return RedisModule_ReplyWithSimpleString(ctx, "OK"); } -int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); fsl_t *fsl; - if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl) return REDISMODULE_ERR; - if (!fsl || fsl->length < 2) - return REDISMODULE_ERR; - - RedisModule_ReplyWithArray(ctx, 2); - RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); return REDISMODULE_OK; } -int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); } - -/* FSL.BPOP2 - Block clients until list has two or more elements. +/* FSL.BPOP - Block clients until list has two or more elements. * When that happens, unblock client and pop the last two elements (from the right). */ -int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 3) return RedisModule_WrongArity(ctx); @@ -155,13 +147,10 @@ int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) return REDISMODULE_OK; - if (!fsl || fsl->length < 2) { - /* Key is empty or has <2 elements, we must block */ - RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback, + if (!fsl) { + RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback, NULL, timeout, &argv[1], 1, NULL); } else { - RedisModule_ReplyWithArray(ctx, 2); - RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); } @@ -175,10 +164,10 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx); fsl_t *fsl; - if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl) return REDISMODULE_ERR; - if (!fsl || fsl->list[fsl->length-1] <= *pgt) + if (fsl->list[fsl->length-1] <= *pgt) return REDISMODULE_ERR; RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); @@ -218,7 +207,6 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */ long long *pgt = RedisModule_Alloc(sizeof(long long)); *pgt = gt; - /* Key is empty or has <2 elements, we must block */ RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, bpopgt_free_privdata, timeout, &argv[1], 1, pgt); } else { @@ -228,6 +216,88 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return REDISMODULE_OK; } +int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *src_keyname = RedisModule_GetBlockedClientReadyKey(ctx); + RedisModuleString *dst_keyname = RedisModule_GetBlockedClientPrivateData(ctx); + + fsl_t *src; + if (!get_fsl(ctx, src_keyname, REDISMODULE_READ, 0, &src, 0) || !src) + return REDISMODULE_ERR; + + fsl_t *dst; + if (!get_fsl(ctx, dst_keyname, REDISMODULE_WRITE, 1, &dst, 0) || !dst) + return REDISMODULE_ERR; + + long long ele = src->list[--src->length]; + dst->list[dst->length++] = ele; + RedisModule_SignalKeyAsReady(ctx, dst_keyname); + return RedisModule_ReplyWithLongLong(ctx, ele); +} + +int bpoppush_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + +void bpoppush_free_privdata(RedisModuleCtx *ctx, void *privdata) { + RedisModule_FreeString(ctx, privdata); +} + +/* FSL.BPOPPUSH - Block clients until has an element. + * When that happens, unblock client, pop the last element from and push it to + * (from the right). */ +int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) + return RedisModule_WrongArity(ctx); + + long long timeout; + if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *src; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &src, 1)) + return REDISMODULE_OK; + + if (!src) { + /* Retain string for reply callback */ + RedisModule_RetainString(ctx, argv[2]); + /* Key is empty, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpoppush_reply_callback, bpoppush_timeout_callback, + bpoppush_free_privdata, timeout, &argv[1], 1, argv[2]); + } else { + fsl_t *dst; + if (!get_fsl(ctx, argv[2], REDISMODULE_WRITE, 1, &dst, 1)) + return REDISMODULE_OK; + long long ele = src->list[--src->length]; + dst->list[dst->length++] = ele; + RedisModule_SignalKeyAsReady(ctx, argv[2]); + RedisModule_ReplyWithLongLong(ctx, ele); + } + + return REDISMODULE_OK; +} + +/* FSL.GETALL - Reply with an array containing all elements. */ +int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) + return RedisModule_WrongArity(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl) + return RedisModule_ReplyWithArray(ctx, 0); + + RedisModule_ReplyWithArray(ctx, fsl->length); + for (int i = 0; i < fsl->length; i++) + RedisModule_ReplyWithLongLong(ctx, fsl->list[i]); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -252,11 +322,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR) + if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index b380227e0..c8b8f23ed 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -3,37 +3,53 @@ set testmodule [file normalize tests/modules/blockonkeys.so] start_server {tags {"modules"}} { r module load $testmodule + test "Module client blocked on keys: Circular BPOPPUSH" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + r del src dst + + $rd1 fsl.bpoppush src dst 0 + $rd2 fsl.bpoppush dst src 0 + + r fsl.push src 42 + + assert_equal {42} [r fsl.getall src] + assert_equal {} [r fsl.getall dst] + } + + test "Module client blocked on keys: Self-referential BPOPPUSH" { + set rd1 [redis_deferring_client] + + r del src + + $rd1 fsl.bpoppush src src 0 + + r fsl.push src 42 + + assert_equal {42} [r fsl.getall src] + } + test {Module client blocked on keys (no metadata): No block} { r del k r fsl.push k 33 r fsl.push k 34 - r fsl.bpop2 k 0 - } {34 33} + r fsl.bpop k 0 + } {34} test {Module client blocked on keys (no metadata): Timeout} { r del k set rd [redis_deferring_client] - r fsl.push k 33 - $rd fsl.bpop2 k 1 + $rd fsl.bpop k 1 assert_equal {Request timedout} [$rd read] } - test {Module client blocked on keys (no metadata): Blocked, case 1} { + test {Module client blocked on keys (no metadata): Blocked} { r del k set rd [redis_deferring_client] - r fsl.push k 33 - $rd fsl.bpop2 k 0 + $rd fsl.bpop k 0 r fsl.push k 34 - assert_equal {34 33} [$rd read] - } - - test {Module client blocked on keys (no metadata): Blocked, case 2} { - r del k - set rd [redis_deferring_client] - r fsl.push k 33 - r fsl.push k 34 - $rd fsl.bpop2 k 0 - assert_equal {34 33} [$rd read] + assert_equal {34} [$rd read] } test {Module client blocked on keys (with metadata): No block} { @@ -108,13 +124,12 @@ start_server {tags {"modules"}} { test {Module client blocked on keys does not wake up on wrong type} { r del k set rd [redis_deferring_client] - $rd fsl.bpop2 k 0 + $rd fsl.bpop k 0 r lpush k 12 r lpush k 13 r lpush k 14 r del k - r fsl.push k 33 r fsl.push k 34 - assert_equal {34 33} [$rd read] + assert_equal {34} [$rd read] } }