Fix use after free on blocking RM_Call. (#12342)
blocking RM_Call was introduced on: #11568, It allows a module to perform blocking commands and get the reply asynchronously.If the command gets block, a special promise CallReply is returned that allow to set the unblock handler. The unblock handler will be called when the command invocation finish and it gets, as input, the command real reply. The issue was that the real CallReply was created using a stack allocated RedisModuleCtx which is no longer available after the unblock handler finishes. So if the module keeps the CallReply after the unblock handler finished, the CallReply holds a pointer to invalid memory and will try to access it when the CallReply will be released. The solution is to create the CallReply with a NULL context to make it totally detached and can be freed freely when the module wants. Test was added to cover this case, running the test with valgrind before the fix shows the use after free error. With the fix, there are no valgrind errors. unrelated: adding a missing `$rd close` in many tests in that file.
This commit is contained in:
parent
3230199920
commit
153f8f082e
@ -857,7 +857,7 @@ void moduleCallCommandUnblockedHandler(client *c) {
|
||||
moduleCreateContext(&ctx, module, REDISMODULE_CTX_TEMP_CLIENT);
|
||||
selectDb(ctx.client, c->db->id);
|
||||
|
||||
CallReply *reply = moduleParseReply(c, &ctx);
|
||||
CallReply *reply = moduleParseReply(c, NULL);
|
||||
module->in_call++;
|
||||
promise->on_unblocked(&ctx, reply, promise->private_data);
|
||||
module->in_call--;
|
||||
|
@ -321,6 +321,62 @@ int do_rm_call_async(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
typedef struct ThreadedAsyncRMCallCtx{
|
||||
RedisModuleBlockedClient *bc;
|
||||
RedisModuleCallReply *reply;
|
||||
} ThreadedAsyncRMCallCtx;
|
||||
|
||||
void *send_async_reply(void *arg) {
|
||||
ThreadedAsyncRMCallCtx *ta_rm_call_ctx = arg;
|
||||
rm_call_async_on_unblocked(NULL, ta_rm_call_ctx->reply, ta_rm_call_ctx->bc);
|
||||
RedisModule_Free(ta_rm_call_ctx);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Called when the command that was blocked on 'RM_Call' gets unblocked
|
||||
* and schedule a thread to send the reply to the blocked client. */
|
||||
static void rm_call_async_reply_on_thread(RedisModuleCtx *ctx, RedisModuleCallReply *reply, void *private_data) {
|
||||
UNUSED(ctx);
|
||||
ThreadedAsyncRMCallCtx *ta_rm_call_ctx = RedisModule_Alloc(sizeof(*ta_rm_call_ctx));
|
||||
ta_rm_call_ctx->bc = private_data;
|
||||
ta_rm_call_ctx->reply = reply;
|
||||
pthread_t tid;
|
||||
int res = pthread_create(&tid, NULL, send_async_reply, ta_rm_call_ctx);
|
||||
assert(res == 0);
|
||||
}
|
||||
|
||||
/*
|
||||
* Callback for do_rm_call_async_on_thread.
|
||||
* Gets the command to invoke as the first argument to the command and runs it,
|
||||
* passing the rest of the arguments to the command invocation.
|
||||
* If the command got blocked, blocks the client and unblock on a background thread.
|
||||
* this allows check the K (allow blocking) argument to RM_Call, and make sure that the reply
|
||||
* that passes to unblock handler is owned by the handler and are not attached to any
|
||||
* context that might be freed after the callback ends.
|
||||
*/
|
||||
int do_rm_call_async_on_thread(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
|
||||
UNUSED(argv);
|
||||
UNUSED(argc);
|
||||
|
||||
if(argc < 2){
|
||||
return RedisModule_WrongArity(ctx);
|
||||
}
|
||||
|
||||
const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
|
||||
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "KEv", argv + 2, argc - 2);
|
||||
|
||||
if(RedisModule_CallReplyType(rep) != REDISMODULE_REPLY_PROMISE) {
|
||||
rm_call_async_send_reply(ctx, rep);
|
||||
} else {
|
||||
RedisModuleBlockedClient *bc = RedisModule_BlockClient(ctx, NULL, NULL, NULL, 0);
|
||||
RedisModule_CallReplyPromiseSetUnblockHandler(rep, rm_call_async_reply_on_thread, bc);
|
||||
RedisModule_FreeCallReply(rep);
|
||||
}
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Private data for wait_and_do_rm_call_async that holds information about:
|
||||
* 1. the block client, to unblock when done.
|
||||
* 2. the arguments, contains the command to run using RM_Call */
|
||||
@ -604,6 +660,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
"write", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "do_rm_call_async_on_thread", do_rm_call_async_on_thread,
|
||||
"write", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
||||
if (RedisModule_CreateCommand(ctx, "do_rm_call_async_script_mode", do_rm_call_async,
|
||||
"write", 0, 0, 0) == REDISMODULE_ERR)
|
||||
return REDISMODULE_ERR;
|
||||
|
@ -15,7 +15,18 @@ start_server {tags {"modules"}} {
|
||||
assert_equal {0} [r llen l]
|
||||
}
|
||||
|
||||
foreach cmd {do_rm_call_async do_rm_call_async_script_mode} {
|
||||
test "Blpop on threaded async RM_Call" {
|
||||
set rd [redis_deferring_client]
|
||||
|
||||
$rd do_rm_call_async_on_thread blpop l 0
|
||||
wait_for_blocked_clients_count 1
|
||||
r lpush l a
|
||||
assert_equal [$rd read] {l a}
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
foreach cmd {do_rm_call_async do_rm_call_async_script_mode } {
|
||||
|
||||
test "Blpop on async RM_Call using $cmd" {
|
||||
set rd [redis_deferring_client]
|
||||
@ -25,6 +36,7 @@ start_server {tags {"modules"}} {
|
||||
r lpush l a
|
||||
assert_equal [$rd read] {l a}
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
test "Brpop on async RM_Call using $cmd" {
|
||||
@ -35,6 +47,7 @@ start_server {tags {"modules"}} {
|
||||
r lpush l a
|
||||
assert_equal [$rd read] {l a}
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
test "Brpoplpush on async RM_Call using $cmd" {
|
||||
@ -45,6 +58,7 @@ start_server {tags {"modules"}} {
|
||||
r lpush l1 a
|
||||
assert_equal [$rd read] {a}
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
r lpop l2
|
||||
} {a}
|
||||
|
||||
@ -56,6 +70,7 @@ start_server {tags {"modules"}} {
|
||||
r lpush l1 a
|
||||
assert_equal [$rd read] {a}
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
r lpop l2
|
||||
} {a}
|
||||
|
||||
@ -67,6 +82,7 @@ start_server {tags {"modules"}} {
|
||||
r zadd s 10 foo
|
||||
assert_equal [$rd read] {s foo 10}
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
test "Bzpopmax on async RM_Call using $cmd" {
|
||||
@ -77,6 +93,7 @@ start_server {tags {"modules"}} {
|
||||
r zadd s 10 foo
|
||||
assert_equal [$rd read] {s foo 10}
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
}
|
||||
|
||||
@ -88,6 +105,7 @@ start_server {tags {"modules"}} {
|
||||
r lpush l a
|
||||
assert_equal [$rd read] {l a}
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
test {Test multiple async RM_Call waiting on the same event} {
|
||||
@ -101,6 +119,8 @@ start_server {tags {"modules"}} {
|
||||
assert_equal [$rd1 read] {l element}
|
||||
assert_equal [$rd2 read] {l element}
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd1 close
|
||||
$rd2 close
|
||||
}
|
||||
|
||||
test {async RM_Call calls RM_Call} {
|
||||
@ -136,6 +156,7 @@ start_server {tags {"modules"}} {
|
||||
}
|
||||
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
test {Become replica while having async RM_Call running} {
|
||||
@ -156,6 +177,7 @@ start_server {tags {"modules"}} {
|
||||
r lpush l 1
|
||||
# make sure the async rm_call was aborted
|
||||
assert_equal [r llen l] {1}
|
||||
$rd close
|
||||
}
|
||||
|
||||
test {Pipeline with blocking RM_Call} {
|
||||
@ -175,6 +197,7 @@ start_server {tags {"modules"}} {
|
||||
assert_equal [$rd read] {PONG}
|
||||
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
test {blocking RM_Call abort} {
|
||||
@ -195,6 +218,7 @@ start_server {tags {"modules"}} {
|
||||
r lpush l 1
|
||||
# make sure the async rm_call was aborted
|
||||
assert_equal [r llen l] {1}
|
||||
$rd close
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,6 +244,7 @@ start_server {tags {"modules"}} {
|
||||
close_replication_stream $repl
|
||||
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
test {Test unblock handler are executed as a unit} {
|
||||
@ -245,6 +270,7 @@ start_server {tags {"modules"}} {
|
||||
close_replication_stream $repl
|
||||
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
test {Test no propagation of blocking command} {
|
||||
@ -269,6 +295,7 @@ start_server {tags {"modules"}} {
|
||||
close_replication_stream $repl
|
||||
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
}
|
||||
|
||||
@ -303,6 +330,7 @@ start_server {tags {"modules"}} {
|
||||
close_replication_stream $repl
|
||||
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
test {Test unblock handler are executed as a unit with lazy expire} {
|
||||
@ -355,9 +383,10 @@ start_server {tags {"modules"}} {
|
||||
}
|
||||
close_replication_stream $repl
|
||||
r DEBUG SET-ACTIVE-EXPIRE 1
|
||||
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
|
||||
wait_for_blocked_clients_count 0
|
||||
}
|
||||
|
||||
start_server {tags {"modules"}} {
|
||||
@ -376,5 +405,6 @@ start_server {tags {"modules"}} {
|
||||
assert_equal [$rd read] {4}
|
||||
|
||||
wait_for_blocked_clients_count 0
|
||||
$rd close
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user