Fix CLIENT UNBLOCK crashing modules. (#9167)
Modules that use background threads with thread safe contexts are likely to use RM_BlockClient() without a timeout function, because they do not set up a timeout. Before this commit, `CLIENT UNBLOCK` would result with a crash as the `NULL` timeout callback is called. Beyond just crashing, this is also logically wrong as it may throw the module into an unexpected client state. This commits makes `CLIENT UNBLOCK` on such clients behave the same as any other client that is not in a blocked state and therefore cannot be unblocked. (cherry picked from commit aa139e2f02292d668370afde8c91575363c2d611)
This commit is contained in:
parent
4079f79974
commit
ffe1e9107c
21
src/module.c
21
src/module.c
@ -5367,8 +5367,8 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
|
|||||||
* reply_callback: called after a successful RedisModule_UnblockClient()
|
* reply_callback: called after a successful RedisModule_UnblockClient()
|
||||||
* call in order to reply to the client and unblock it.
|
* call in order to reply to the client and unblock it.
|
||||||
*
|
*
|
||||||
* timeout_callback: called when the timeout is reached in order to send an
|
* timeout_callback: called when the timeout is reached or if `CLIENT UNBLOCK`
|
||||||
* error to the client.
|
* is invoked, in order to send an error to the client.
|
||||||
*
|
*
|
||||||
* free_privdata: called in order to free the private data that is passed
|
* free_privdata: called in order to free the private data that is passed
|
||||||
* by RedisModule_UnblockClient() call.
|
* by RedisModule_UnblockClient() call.
|
||||||
@ -5385,6 +5385,12 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
|
|||||||
* In these cases, a call to RedisModule_BlockClient() will **not** block the
|
* In these cases, a call to RedisModule_BlockClient() will **not** block the
|
||||||
* client, but instead produce a specific error reply.
|
* client, but instead produce a specific error reply.
|
||||||
*
|
*
|
||||||
|
* A module that registers a timeout_callback function can also be unblocked
|
||||||
|
* using the `CLIENT UNBLOCK` command, which will trigger the timeout callback.
|
||||||
|
* If a callback function is not registered, then the blocked client will be
|
||||||
|
* treated as if it is not in a blocked state and `CLIENT UNBLOCK` will return
|
||||||
|
* a zero value.
|
||||||
|
*
|
||||||
* Measuring background time: By default the time spent in the blocked command
|
* Measuring background time: By default the time spent in the blocked command
|
||||||
* is not account for the total command duration. To include such time you should
|
* is not account for the total command duration. To include such time you should
|
||||||
* use RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() one,
|
* use RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() one,
|
||||||
@ -5654,6 +5660,17 @@ void moduleHandleBlockedClients(void) {
|
|||||||
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
|
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Check if the specified client can be safely timed out using
|
||||||
|
* moduleBlockedClientTimedOut().
|
||||||
|
*/
|
||||||
|
int moduleBlockedClientMayTimeout(client *c) {
|
||||||
|
if (c->btype != BLOCKED_MODULE)
|
||||||
|
return 1;
|
||||||
|
|
||||||
|
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||||
|
return (bc && bc->timeout_callback != NULL);
|
||||||
|
}
|
||||||
|
|
||||||
/* Called when our client timed out. After this function unblockClient()
|
/* Called when our client timed out. After this function unblockClient()
|
||||||
* is called, and it will invalidate the blocked client. So this function
|
* is called, and it will invalidate the blocked client. So this function
|
||||||
* does not need to do any cleanup. Eventually the module will call the
|
* does not need to do any cleanup. Eventually the module will call the
|
||||||
|
@ -2677,7 +2677,7 @@ NULL
|
|||||||
if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
|
if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
|
||||||
!= C_OK) return;
|
!= C_OK) return;
|
||||||
struct client *target = lookupClientByID(id);
|
struct client *target = lookupClientByID(id);
|
||||||
if (target && target->flags & CLIENT_BLOCKED) {
|
if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
|
||||||
if (unblock_error)
|
if (unblock_error)
|
||||||
addReplyError(target,
|
addReplyError(target,
|
||||||
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");
|
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");
|
||||||
|
@ -1779,6 +1779,7 @@ void moduleFireServerEvent(uint64_t eid, int subid, void *data);
|
|||||||
void processModuleLoadingProgressEvent(int is_aof);
|
void processModuleLoadingProgressEvent(int is_aof);
|
||||||
int moduleTryServeClientBlockedOnKey(client *c, robj *key);
|
int moduleTryServeClientBlockedOnKey(client *c, robj *key);
|
||||||
void moduleUnblockClient(client *c);
|
void moduleUnblockClient(client *c);
|
||||||
|
int moduleBlockedClientMayTimeout(client *c);
|
||||||
int moduleClientIsBlockedOnKeys(client *c);
|
int moduleClientIsBlockedOnKeys(client *c);
|
||||||
void moduleNotifyUserChanged(client *c);
|
void moduleNotifyUserChanged(client *c);
|
||||||
void moduleNotifyKeyUnlink(robj *key, robj *val);
|
void moduleNotifyKeyUnlink(robj *key, robj *val);
|
||||||
|
@ -195,6 +195,63 @@ int HelloDoubleBlock_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
|||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RedisModuleBlockedClient *blocked_client = NULL;
|
||||||
|
|
||||||
|
/* BLOCK.BLOCK [TIMEOUT] -- Blocks the current client until released
|
||||||
|
* or TIMEOUT seconds. If TIMEOUT is zero, no timeout function is
|
||||||
|
* registered.
|
||||||
|
*/
|
||||||
|
int Block_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
if (RedisModule_IsBlockedReplyRequest(ctx)) {
|
||||||
|
RedisModuleString *r = RedisModule_GetBlockedClientPrivateData(ctx);
|
||||||
|
return RedisModule_ReplyWithString(ctx, r);
|
||||||
|
} else if (RedisModule_IsBlockedTimeoutRequest(ctx)) {
|
||||||
|
blocked_client = NULL;
|
||||||
|
return RedisModule_ReplyWithSimpleString(ctx, "Timed out");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (argc != 2) return RedisModule_WrongArity(ctx);
|
||||||
|
long long timeout;
|
||||||
|
|
||||||
|
if (RedisModule_StringToLongLong(argv[1], &timeout) != REDISMODULE_OK) {
|
||||||
|
return RedisModule_ReplyWithError(ctx, "ERR invalid timeout");
|
||||||
|
}
|
||||||
|
if (blocked_client) {
|
||||||
|
return RedisModule_ReplyWithError(ctx, "ERR another client already blocked");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Block client. We use this function as both a reply and optional timeout
|
||||||
|
* callback and differentiate the different code flows above.
|
||||||
|
*/
|
||||||
|
blocked_client = RedisModule_BlockClient(ctx, Block_RedisCommand,
|
||||||
|
timeout > 0 ? Block_RedisCommand : NULL, NULL, timeout);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* BLOCK.IS_BLOCKED -- Returns 1 if we have a blocked client, or 0 otherwise.
|
||||||
|
*/
|
||||||
|
int IsBlocked_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
UNUSED(argv);
|
||||||
|
UNUSED(argc);
|
||||||
|
RedisModule_ReplyWithLongLong(ctx, blocked_client ? 1 : 0);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* BLOCK.RELEASE [reply] -- Releases the blocked client and produce the specified reply.
|
||||||
|
*/
|
||||||
|
int Release_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
if (argc != 2) return RedisModule_WrongArity(ctx);
|
||||||
|
if (!blocked_client) {
|
||||||
|
return RedisModule_ReplyWithError(ctx, "ERR No blocked client");
|
||||||
|
}
|
||||||
|
|
||||||
|
RedisModule_UnblockClient(blocked_client, argv[1]);
|
||||||
|
blocked_client = NULL;
|
||||||
|
|
||||||
|
RedisModule_ReplyWithSimpleString(ctx, "OK");
|
||||||
|
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
UNUSED(argv);
|
UNUSED(argv);
|
||||||
@ -215,5 +272,17 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||||||
HelloBlockNoTracking_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
|
HelloBlockNoTracking_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
|
||||||
return REDISMODULE_ERR;
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx, "block.block",
|
||||||
|
Block_RedisCommand, "", 0, 0, 0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"block.is_blocked",
|
||||||
|
IsBlocked_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"block.release",
|
||||||
|
Release_RedisCommand,"",0,0,0) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
@ -85,4 +85,33 @@ start_server {tags {"modules"}} {
|
|||||||
assert_equal [r slowlog len] 0
|
assert_equal [r slowlog len] 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test "client unblock works only for modules with timeout support" {
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
$rd client id
|
||||||
|
set id [$rd read]
|
||||||
|
|
||||||
|
# Block with a timeout function - may unblock
|
||||||
|
$rd block.block 20000
|
||||||
|
wait_for_condition 50 100 {
|
||||||
|
[r block.is_blocked] == 1
|
||||||
|
} else {
|
||||||
|
fail "Module did not block"
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_equal 1 [r client unblock $id]
|
||||||
|
assert_match {*Timed out*} [$rd read]
|
||||||
|
|
||||||
|
# Block without a timeout function - cannot unblock
|
||||||
|
$rd block.block 0
|
||||||
|
wait_for_condition 50 100 {
|
||||||
|
[r block.is_blocked] == 1
|
||||||
|
} else {
|
||||||
|
fail "Module did not block"
|
||||||
|
}
|
||||||
|
|
||||||
|
assert_equal 0 [r client unblock $id]
|
||||||
|
assert_equal "OK" [r block.release foobar]
|
||||||
|
assert_equal "foobar" [$rd read]
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user