diff --git a/src/module.c b/src/module.c index 1be1b7781..5c7a0ce0b 100644 --- a/src/module.c +++ b/src/module.c @@ -8241,6 +8241,11 @@ void moduleHandleBlockedClients(void) { * properly unblocked by the module. */ bc->disconnect_callback = NULL; unblockClient(c, 1); + + /* Update the wait offset, we don't know if this blocked client propagated anything, + * currently we rather not add any API for that, so we just assume it did. */ + c->woff = server.master_repl_offset; + /* Put the client in the list of clients that need to write * if there are pending replies here. This is needed since * during a non blocking command the client may receive output. */ diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index bc3b6b1a4..49c23f8ab 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -131,6 +131,8 @@ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { fsl->list[fsl->length++] = ele; RedisModule_SignalKeyAsReady(ctx, argv[1]); + RedisModule_ReplicateVerbatim(ctx); + return RedisModule_ReplyWithSimpleString(ctx, "OK"); } @@ -145,6 +147,9 @@ int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_Assert(fsl->length); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); return REDISMODULE_OK; } @@ -181,6 +186,8 @@ int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { } else { RedisModule_Assert(fsl->length); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); } return REDISMODULE_OK; @@ -201,6 +208,8 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg RedisModule_Assert(fsl->length); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); return REDISMODULE_OK; } @@ -247,6 +256,8 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { } else { RedisModule_Assert(fsl->length); RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); } return REDISMODULE_OK; @@ -270,6 +281,8 @@ int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int a long long ele = src->list[--src->length]; dst->list[dst->length++] = ele; RedisModule_SignalKeyAsReady(ctx, dst_keyname); + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); return RedisModule_ReplyWithLongLong(ctx, ele); } @@ -314,6 +327,8 @@ int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { dst->list[dst->length++] = ele; RedisModule_SignalKeyAsReady(ctx, argv[2]); RedisModule_ReplyWithLongLong(ctx, ele); + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); } return REDISMODULE_OK; @@ -350,6 +365,8 @@ int blockonkeys_popall_reply_callback(RedisModuleCtx *ctx, RedisModuleString **a RedisModule_ReplyWithString(ctx, elem); RedisModule_FreeString(ctx, elem); } + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); RedisModule_ReplySetArrayLength(ctx, len); } else { RedisModule_ReplyWithError(ctx, "ERR Not a list"); @@ -415,6 +432,7 @@ int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (!strncasecmp(str, "blockonkeys.lpush_unblock", len)) { RedisModule_SignalKeyAsReady(ctx, argv[1]); } + RedisModule_ReplicateVerbatim(ctx); return RedisModule_ReplyWithSimpleString(ctx, "OK"); } @@ -433,6 +451,8 @@ int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **a RedisModule_ReplyWithString(ctx, elem); RedisModule_FreeString(ctx, elem); } + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); result = REDISMODULE_OK; } else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST || RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) { @@ -491,6 +511,8 @@ int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_ReplyWithString(ctx, elem); RedisModule_FreeString(ctx, elem); } + /* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */ + RedisModule_ReplicateVerbatim(ctx); } else { RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback, timeout ? blockonkeys_blpopn_timeout_callback : blockonkeys_blpopn_abort_callback, diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl index f0faea5c3..6e3c62470 100644 --- a/tests/unit/moduleapi/blockedclient.tcl +++ b/tests/unit/moduleapi/blockedclient.tcl @@ -253,6 +253,30 @@ foreach call_type {nested normal} { assert_match {*calls=2,*,rejected_calls=0,failed_calls=2} [cmdrstat do_bg_rm_call r] } + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + start_server [list overrides [list loadmodule "$testmodule"]] { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + + # Start the replication process... + $replica replicaof $master_host $master_port + wait_for_sync $replica + + test {WAIT command on module blocked client} { + pause_process [srv 0 pid] + + $master do_bg_rm_call_format ! hset bk1 foo bar + + assert_equal [$master wait 1 1000] 0 + resume_process [srv 0 pid] + assert_equal [$master wait 1 1000] 1 + assert_equal [$replica hget bk1 foo] bar + } + } + test "Unload the module - blockedclient" { assert_equal {OK} [r module unload blockedclient] } diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index 8e095e351..ff448e189 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -303,4 +303,39 @@ start_server {tags {"modules"}} { assert_equal {someval} [$rd read] $rd close } + + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + start_server [list overrides [list loadmodule "$testmodule"]] { + set replica [srv 0 client] + set replica_host [srv 0 host] + set replica_port [srv 0 port] + + # Start the replication process... + $replica replicaof $master_host $master_port + wait_for_sync $replica + + test {WAIT command on module blocked client on keys} { + set rd [redis_deferring_client -1] + $rd set x y + $rd read + + pause_process [srv 0 pid] + + $master del k + $rd fsl.bpop k 0 + wait_for_blocked_client -1 + $master fsl.push k 34 + $master fsl.push k 35 + assert_equal {34} [$rd read] + + assert_equal [$master wait 1 1000] 0 + resume_process [srv 0 pid] + assert_equal [$master wait 1 1000] 1 + $rd close + assert_equal {35} [$replica fsl.getall k] + } + } + }