From 193fc241ca125df703d62ae87da7303f67213e38 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Tue, 21 Jan 2020 15:09:42 +0530 Subject: [PATCH 1/3] Fix memory corruption in moduleHandleBlockedClients By using a "circular BRPOPLPUSH"-like scenario it was possible the get the same client on db->blocking_keys twice (See comment in moduleTryServeClientBlockedOnKey) The fix was actually already implememnted in moduleTryServeClientBlockedOnKey but it had a bug: the funxction should return 0 or 1 (not OK or ERR) Other changes: 1. Added two commands to blockonkeys.c test module (To reproduce the case described above) 2. Simplify blockonkeys.c in order to make testing easier 3. cast raxSize() to avoid warning with format spec --- src/module.c | 18 +++- tests/modules/blockonkeys.c | 122 ++++++++++++++++++++++----- tests/unit/moduleapi/blockonkeys.tcl | 55 +++++++----- 3 files changed, 149 insertions(+), 46 deletions(-) diff --git a/src/module.c b/src/module.c index 61dc25169..85e1497fd 100644 --- a/src/module.c +++ b/src/module.c @@ -4393,14 +4393,26 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF * can really be unblocked, since the module was able to serve the client. * If the callback returns REDISMODULE_OK, then the client can be unblocked, * otherwise the client remains blocked and we'll retry again when one of - * the keys it blocked for becomes "ready" again. */ + * the keys it blocked for becomes "ready" again. + * This function returns 1 if client was served (and should be unblocked) */ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { int served = 0; RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; /* Protect against re-processing: don't serve clients that are already * in the unblocking list for any reason (including RM_UnblockClient() - * explicit call). */ - if (bc->unblocked) return REDISMODULE_ERR; + * explicit call). + * For example, the following pathological case: + * Assume a module called LIST implements the same command as + * the Redis list data type. + * LIST.BRPOPLPUSH src dst 0 ('src' goes into db->blocking_keys) + * LIST.BRPOPLPUSH dst src 0 ('dst' goes into db->blocking_keys) + * LIST.LPUSH src foo + * 'src' is in db->blocking_keys after the first BRPOPLPUSH is served + * (and stays there until the next beforeSleep). + * The second BRPOPLPUSH will signal 'src' as ready, leading to the + * unblocking of the already unblocked (and worst, freed) reply_client + * of the first BRPOPLPUSH. */ + if (bc->unblocked) return 0; RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.blocked_ready_key = key; diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index 10dc65b1a..94f31d455 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -109,41 +109,33 @@ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return RedisModule_ReplyWithError(ctx,"ERR new element has to be greater than the head element"); fsl->list[fsl->length++] = ele; - - if (fsl->length >= 2) - RedisModule_SignalKeyAsReady(ctx, argv[1]); + RedisModule_SignalKeyAsReady(ctx, argv[1]); return RedisModule_ReplyWithSimpleString(ctx, "OK"); } -int bpop2_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); fsl_t *fsl; - if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl) return REDISMODULE_ERR; - if (!fsl || fsl->length < 2) - return REDISMODULE_ERR; - - RedisModule_ReplyWithArray(ctx, 2); - RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); return REDISMODULE_OK; } -int bpop2_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); } - -/* FSL.BPOP2 - Block clients until list has two or more elements. +/* FSL.BPOP - Block clients until list has two or more elements. * When that happens, unblock client and pop the last two elements (from the right). */ -int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { +int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (argc != 3) return RedisModule_WrongArity(ctx); @@ -155,13 +147,10 @@ int fsl_bpop2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) return REDISMODULE_OK; - if (!fsl || fsl->length < 2) { - /* Key is empty or has <2 elements, we must block */ - RedisModule_BlockClientOnKeys(ctx, bpop2_reply_callback, bpop2_timeout_callback, + if (!fsl) { + RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback, NULL, timeout, &argv[1], 1, NULL); } else { - RedisModule_ReplyWithArray(ctx, 2); - RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); } @@ -175,10 +164,10 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg long long *pgt = RedisModule_GetBlockedClientPrivateData(ctx); fsl_t *fsl; - if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0)) + if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl) return REDISMODULE_ERR; - if (!fsl || fsl->list[fsl->length-1] <= *pgt) + if (fsl->list[fsl->length-1] <= *pgt) return REDISMODULE_ERR; RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); @@ -218,7 +207,6 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */ long long *pgt = RedisModule_Alloc(sizeof(long long)); *pgt = gt; - /* Key is empty or has <2 elements, we must block */ RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, bpopgt_free_privdata, timeout, &argv[1], 1, pgt); } else { @@ -228,6 +216,88 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { return REDISMODULE_OK; } +int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleString *src_keyname = RedisModule_GetBlockedClientReadyKey(ctx); + RedisModuleString *dst_keyname = RedisModule_GetBlockedClientPrivateData(ctx); + + fsl_t *src; + if (!get_fsl(ctx, src_keyname, REDISMODULE_READ, 0, &src, 0) || !src) + return REDISMODULE_ERR; + + fsl_t *dst; + if (!get_fsl(ctx, dst_keyname, REDISMODULE_WRITE, 1, &dst, 0) || !dst) + return REDISMODULE_ERR; + + long long ele = src->list[--src->length]; + dst->list[dst->length++] = ele; + RedisModule_SignalKeyAsReady(ctx, dst_keyname); + return RedisModule_ReplyWithLongLong(ctx, ele); +} + +int bpoppush_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + return RedisModule_ReplyWithSimpleString(ctx, "Request timedout"); +} + +void bpoppush_free_privdata(RedisModuleCtx *ctx, void *privdata) { + RedisModule_FreeString(ctx, privdata); +} + +/* FSL.BPOPPUSH - Block clients until has an element. + * When that happens, unblock client, pop the last element from and push it to + * (from the right). */ +int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) + return RedisModule_WrongArity(ctx); + + long long timeout; + if (RedisModule_StringToLongLong(argv[3],&timeout) != REDISMODULE_OK || timeout < 0) + return RedisModule_ReplyWithError(ctx,"ERR invalid timeout"); + + fsl_t *src; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &src, 1)) + return REDISMODULE_OK; + + if (!src) { + /* Retain string for reply callback */ + RedisModule_RetainString(ctx, argv[2]); + /* Key is empty, we must block */ + RedisModule_BlockClientOnKeys(ctx, bpoppush_reply_callback, bpoppush_timeout_callback, + bpoppush_free_privdata, timeout, &argv[1], 1, argv[2]); + } else { + fsl_t *dst; + if (!get_fsl(ctx, argv[2], REDISMODULE_WRITE, 1, &dst, 1)) + return REDISMODULE_OK; + long long ele = src->list[--src->length]; + dst->list[dst->length++] = ele; + RedisModule_SignalKeyAsReady(ctx, argv[2]); + RedisModule_ReplyWithLongLong(ctx, ele); + } + + return REDISMODULE_OK; +} + +/* FSL.GETALL - Reply with an array containing all elements. */ +int fsl_getall(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) + return RedisModule_WrongArity(ctx); + + fsl_t *fsl; + if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) + return REDISMODULE_OK; + + if (!fsl) + return RedisModule_ReplyWithArray(ctx, 0); + + RedisModule_ReplyWithArray(ctx, fsl->length); + for (int i = 0; i < fsl->length; i++) + RedisModule_ReplyWithLongLong(ctx, fsl->list[i]); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -252,11 +322,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx,"fsl.push",fsl_push,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_CreateCommand(ctx,"fsl.bpop2",fsl_bpop2,"",0,0,0) == REDISMODULE_ERR) + if (RedisModule_CreateCommand(ctx,"fsl.bpop",fsl_bpop,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,"fsl.bpopgt",fsl_bpopgt,"",0,0,0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"fsl.bpoppush",fsl_bpoppush,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"fsl.getall",fsl_getall,"",0,0,0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index b380227e0..c8b8f23ed 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -3,37 +3,53 @@ set testmodule [file normalize tests/modules/blockonkeys.so] start_server {tags {"modules"}} { r module load $testmodule + test "Module client blocked on keys: Circular BPOPPUSH" { + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + + r del src dst + + $rd1 fsl.bpoppush src dst 0 + $rd2 fsl.bpoppush dst src 0 + + r fsl.push src 42 + + assert_equal {42} [r fsl.getall src] + assert_equal {} [r fsl.getall dst] + } + + test "Module client blocked on keys: Self-referential BPOPPUSH" { + set rd1 [redis_deferring_client] + + r del src + + $rd1 fsl.bpoppush src src 0 + + r fsl.push src 42 + + assert_equal {42} [r fsl.getall src] + } + test {Module client blocked on keys (no metadata): No block} { r del k r fsl.push k 33 r fsl.push k 34 - r fsl.bpop2 k 0 - } {34 33} + r fsl.bpop k 0 + } {34} test {Module client blocked on keys (no metadata): Timeout} { r del k set rd [redis_deferring_client] - r fsl.push k 33 - $rd fsl.bpop2 k 1 + $rd fsl.bpop k 1 assert_equal {Request timedout} [$rd read] } - test {Module client blocked on keys (no metadata): Blocked, case 1} { + test {Module client blocked on keys (no metadata): Blocked} { r del k set rd [redis_deferring_client] - r fsl.push k 33 - $rd fsl.bpop2 k 0 + $rd fsl.bpop k 0 r fsl.push k 34 - assert_equal {34 33} [$rd read] - } - - test {Module client blocked on keys (no metadata): Blocked, case 2} { - r del k - set rd [redis_deferring_client] - r fsl.push k 33 - r fsl.push k 34 - $rd fsl.bpop2 k 0 - assert_equal {34 33} [$rd read] + assert_equal {34} [$rd read] } test {Module client blocked on keys (with metadata): No block} { @@ -108,13 +124,12 @@ start_server {tags {"modules"}} { test {Module client blocked on keys does not wake up on wrong type} { r del k set rd [redis_deferring_client] - $rd fsl.bpop2 k 0 + $rd fsl.bpop k 0 r lpush k 12 r lpush k 13 r lpush k 14 r del k - r fsl.push k 33 r fsl.push k 34 - assert_equal {34 33} [$rd read] + assert_equal {34} [$rd read] } } From 6fe66e096957f4ceb511629bf6452dcb27442567 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 2 Apr 2020 11:20:09 +0200 Subject: [PATCH 2/3] Simplify comment in moduleTryServeClientBlockedOnKey(). --- src/module.c | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/module.c b/src/module.c index 85e1497fd..16d24152e 100644 --- a/src/module.c +++ b/src/module.c @@ -4398,21 +4398,12 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF int moduleTryServeClientBlockedOnKey(client *c, robj *key) { int served = 0; RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + /* Protect against re-processing: don't serve clients that are already * in the unblocking list for any reason (including RM_UnblockClient() - * explicit call). - * For example, the following pathological case: - * Assume a module called LIST implements the same command as - * the Redis list data type. - * LIST.BRPOPLPUSH src dst 0 ('src' goes into db->blocking_keys) - * LIST.BRPOPLPUSH dst src 0 ('dst' goes into db->blocking_keys) - * LIST.LPUSH src foo - * 'src' is in db->blocking_keys after the first BRPOPLPUSH is served - * (and stays there until the next beforeSleep). - * The second BRPOPLPUSH will signal 'src' as ready, leading to the - * unblocking of the already unblocked (and worst, freed) reply_client - * of the first BRPOPLPUSH. */ + * explicit call). See #6798. */ if (bc->unblocked) return 0; + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.blocked_ready_key = key; From c35a53169ffdbdf73f91224b6e62f6129fb9a838 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Mon, 30 Mar 2020 10:52:59 +0300 Subject: [PATCH 3/3] streamReplyWithRange: Redundant XSETIDs to replica propagate_last_id is declared outside of the loop but used only from within the loop. Once it's '1' it will never go back to '0' and will replicate XSETID even for IDs that don't actually change the last_id. While not a serious bug (XSETID always used group->last_id so there's no risk), it does causes redundant traffic between master and its replicas --- src/t_stream.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index 557d1d642..00d1cbf1c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -935,7 +935,6 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end streamIterator si; int64_t numfields; streamID id; - int propagate_last_id = 0; /* If the client is asking for some history, we serve it using a * different function, so that we return entries *solely* from its @@ -951,6 +950,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end arraylen_ptr = addReplyDeferredLen(c); streamIteratorStart(&si,s,start,end,rev); while(streamIteratorGetID(&si,&id,&numfields)) { + int propagate_last_id = 0; + /* Update the group last_id if needed. */ if (group && streamCompareID(&id,&group->last_id) > 0) { group->last_id = id;