Fix WAIT for clients being blocked in a module command (#12220)
So far clients being blocked and unblocked by a module command would update the c->woff variable and so WAIT was ineffective and got released without waiting for the command actions to propagate. This seems to have existed since forever, but not for RM_BlockClientOnKeys. It is problematic though to know if the module did or didn't propagate anything in that command, so for now, instead of adding an API, we'll just update the woff to the latest offset when unblocking, this will cause the client to possibly wait excessively, but that's not that bad.
This commit is contained in:
parent
da46e926ac
commit
6117f28822
@ -8241,6 +8241,11 @@ void moduleHandleBlockedClients(void) {
|
||||
* properly unblocked by the module. */
|
||||
bc->disconnect_callback = NULL;
|
||||
unblockClient(c, 1);
|
||||
|
||||
/* Update the wait offset, we don't know if this blocked client propagated anything,
|
||||
* currently we rather not add any API for that, so we just assume it did. */
|
||||
c->woff = server.master_repl_offset;
|
||||
|
||||
/* Put the client in the list of clients that need to write
|
||||
* if there are pending replies here. This is needed since
|
||||
* during a non blocking command the client may receive output. */
|
||||
|
@ -131,6 +131,8 @@ int fsl_push(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
fsl->list[fsl->length++] = ele;
|
||||
RedisModule_SignalKeyAsReady(ctx, argv[1]);
|
||||
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
@ -145,6 +147,9 @@ int bpop_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
|
||||
RedisModule_Assert(fsl->length);
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||
|
||||
/* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
@ -181,6 +186,8 @@ int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
} else {
|
||||
RedisModule_Assert(fsl->length);
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||
/* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
}
|
||||
|
||||
return REDISMODULE_OK;
|
||||
@ -201,6 +208,8 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
|
||||
|
||||
RedisModule_Assert(fsl->length);
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||
/* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
@ -247,6 +256,8 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
} else {
|
||||
RedisModule_Assert(fsl->length);
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||
/* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
}
|
||||
|
||||
return REDISMODULE_OK;
|
||||
@ -270,6 +281,8 @@ int bpoppush_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int a
|
||||
long long ele = src->list[--src->length];
|
||||
dst->list[dst->length++] = ele;
|
||||
RedisModule_SignalKeyAsReady(ctx, dst_keyname);
|
||||
/* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
return RedisModule_ReplyWithLongLong(ctx, ele);
|
||||
}
|
||||
|
||||
@ -314,6 +327,8 @@ int fsl_bpoppush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
dst->list[dst->length++] = ele;
|
||||
RedisModule_SignalKeyAsReady(ctx, argv[2]);
|
||||
RedisModule_ReplyWithLongLong(ctx, ele);
|
||||
/* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
}
|
||||
|
||||
return REDISMODULE_OK;
|
||||
@ -350,6 +365,8 @@ int blockonkeys_popall_reply_callback(RedisModuleCtx *ctx, RedisModuleString **a
|
||||
RedisModule_ReplyWithString(ctx, elem);
|
||||
RedisModule_FreeString(ctx, elem);
|
||||
}
|
||||
/* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
RedisModule_ReplySetArrayLength(ctx, len);
|
||||
} else {
|
||||
RedisModule_ReplyWithError(ctx, "ERR Not a list");
|
||||
@ -415,6 +432,7 @@ int blockonkeys_lpush(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (!strncasecmp(str, "blockonkeys.lpush_unblock", len)) {
|
||||
RedisModule_SignalKeyAsReady(ctx, argv[1]);
|
||||
}
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
return RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||
}
|
||||
|
||||
@ -433,6 +451,8 @@ int blockonkeys_blpopn_reply_callback(RedisModuleCtx *ctx, RedisModuleString **a
|
||||
RedisModule_ReplyWithString(ctx, elem);
|
||||
RedisModule_FreeString(ctx, elem);
|
||||
}
|
||||
/* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
result = REDISMODULE_OK;
|
||||
} else if (RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_LIST ||
|
||||
RedisModule_KeyType(key) == REDISMODULE_KEYTYPE_EMPTY) {
|
||||
@ -491,6 +511,8 @@ int blockonkeys_blpopn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
RedisModule_ReplyWithString(ctx, elem);
|
||||
RedisModule_FreeString(ctx, elem);
|
||||
}
|
||||
/* I'm lazy so i'll replicate a potentially blocking command, it shouldn't block in this flow. */
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
} else {
|
||||
RedisModule_BlockClientOnKeys(ctx, blockonkeys_blpopn_reply_callback,
|
||||
timeout ? blockonkeys_blpopn_timeout_callback : blockonkeys_blpopn_abort_callback,
|
||||
|
@ -253,6 +253,30 @@ foreach call_type {nested normal} {
|
||||
assert_match {*calls=2,*,rejected_calls=0,failed_calls=2} [cmdrstat do_bg_rm_call r]
|
||||
}
|
||||
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
start_server [list overrides [list loadmodule "$testmodule"]] {
|
||||
set replica [srv 0 client]
|
||||
set replica_host [srv 0 host]
|
||||
set replica_port [srv 0 port]
|
||||
|
||||
# Start the replication process...
|
||||
$replica replicaof $master_host $master_port
|
||||
wait_for_sync $replica
|
||||
|
||||
test {WAIT command on module blocked client} {
|
||||
pause_process [srv 0 pid]
|
||||
|
||||
$master do_bg_rm_call_format ! hset bk1 foo bar
|
||||
|
||||
assert_equal [$master wait 1 1000] 0
|
||||
resume_process [srv 0 pid]
|
||||
assert_equal [$master wait 1 1000] 1
|
||||
assert_equal [$replica hget bk1 foo] bar
|
||||
}
|
||||
}
|
||||
|
||||
test "Unload the module - blockedclient" {
|
||||
assert_equal {OK} [r module unload blockedclient]
|
||||
}
|
||||
|
@ -303,4 +303,39 @@ start_server {tags {"modules"}} {
|
||||
assert_equal {someval} [$rd read]
|
||||
$rd close
|
||||
}
|
||||
|
||||
set master [srv 0 client]
|
||||
set master_host [srv 0 host]
|
||||
set master_port [srv 0 port]
|
||||
start_server [list overrides [list loadmodule "$testmodule"]] {
|
||||
set replica [srv 0 client]
|
||||
set replica_host [srv 0 host]
|
||||
set replica_port [srv 0 port]
|
||||
|
||||
# Start the replication process...
|
||||
$replica replicaof $master_host $master_port
|
||||
wait_for_sync $replica
|
||||
|
||||
test {WAIT command on module blocked client on keys} {
|
||||
set rd [redis_deferring_client -1]
|
||||
$rd set x y
|
||||
$rd read
|
||||
|
||||
pause_process [srv 0 pid]
|
||||
|
||||
$master del k
|
||||
$rd fsl.bpop k 0
|
||||
wait_for_blocked_client -1
|
||||
$master fsl.push k 34
|
||||
$master fsl.push k 35
|
||||
assert_equal {34} [$rd read]
|
||||
|
||||
assert_equal [$master wait 1 1000] 0
|
||||
resume_process [srv 0 pid]
|
||||
assert_equal [$master wait 1 1000] 1
|
||||
$rd close
|
||||
assert_equal {35} [$replica fsl.getall k]
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user