Unified MULTI, LUA, and RM_Call with respect to blocking commands (#8025)

Blocking command should not be used with MULTI, LUA, and RM_Call. This is because,
the caller, who executes the command in this context, expects a reply.

Today, LUA and MULTI have a special (and different) treatment to blocking commands:

LUA   - Most commands are marked with no-script flag which are checked when executing
and command from LUA, commands that are not marked (like XREAD) verify that their
blocking mode is not used inside LUA (by checking the CLIENT_LUA client flag).
MULTI - Command that is going to block, first verify that the client is not inside
multi (by checking the CLIENT_MULTI client flag). If the client is inside multi, they
return a result which is a match to the empty key with no timeout (for example blpop
inside MULTI will act as lpop)
For modules that perform RM_Call with blocking command, the returned results type is
REDISMODULE_REPLY_UNKNOWN and the caller can not really know what happened.

Disadvantages of the current state are:

No unified approach, LUA, MULTI, and RM_Call, each has a different treatment
Module can not safely execute blocking command (and get reply or error).
Though It is true that modules are not like LUA or MULTI and should be smarter not
to execute blocking commands on RM_Call, sometimes you want to execute a command base
on client input (for example if you create a module that provides a new scripting
language like javascript or python).
While modules (on modules command) can check for REDISMODULE_CTX_FLAGS_LUA or
REDISMODULE_CTX_FLAGS_MULTI to know not to block the client, there is no way to
check if the command came from another module using RM_Call. So there is no way
for a module to know not to block another module RM_Call execution.

This commit adds a way to unify the treatment for blocking clients by introducing
a new CLIENT_DENY_BLOCKING client flag. On LUA, MULTI, and RM_Call the new flag
turned on to signify that the client should not be blocked. A blocking command
verifies that the flag is turned off before blocking. If a blocking command sees
that the CLIENT_DENY_BLOCKING flag is on, it's not blocking and return results
which are matches to empty key with no timeout (as MULTI does today).

The new flag is checked on the following commands:

List blocking commands: BLPOP, BRPOP, BRPOPLPUSH, BLMOVE,
Zset blocking commands: BZPOPMIN, BZPOPMAX
Stream blocking commands: XREAD, XREADGROUP
SUBSCRIBE, PSUBSCRIBE, MONITOR
In addition, the new flag is turned on inside the AOF client, we do not want to
block the AOF client to prevent deadlocks and commands ordering issues (and there
is also an existing assert in the code that verifies it).

To keep backward compatibility on LUA, all the no-script flags on existing commands
were kept untouched. In addition, a LUA special treatment on XREAD and XREADGROUP was kept.

To keep backward compatibility on MULTI (which today allows SUBSCRIBE, and PSUBSCRIBE).
We added a special treatment on those commands to allow executing them on MULTI.

The only backward compatibility issue that this PR introduces is that now MONITOR
is not allowed inside MULTI.

Tests were added to verify blocking commands are not blocking the client on LUA, MULTI,
or RM_Call. Tests were added to verify the module can check for CLIENT_DENY_BLOCKING flag.

Co-authored-by: Oran Agra <oran@redislabs.com>
Co-authored-by: Itamar Haber <itamar@redislabs.com>
This commit is contained in:
Meir Shpilraien (Spielrein) 2020-11-17 18:58:55 +02:00 committed by GitHub
parent 39f716a121
commit d87a0d0286
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 236 additions and 13 deletions

View File

@ -673,7 +673,18 @@ struct client *createAOFClient(void) {
c->original_argv = NULL;
c->argv_len_sum = 0;
c->bufpos = 0;
c->flags = 0;
/*
* The AOF client should never be blocked (unlike master
* replication connection).
* This is because blocking the AOF client might cause
* deadlock (because potentially no one will unblock it).
* Also, if the AOF client will be blocked just for
* background processing there is a chance that the
* command execution order will be violated.
*/
c->flags = CLIENT_DENY_BLOCKING;
c->btype = BLOCKED_NONE;
/* We set the fake client as a slave waiting for the synchronization
* so that Redis will not try to send replies to this client. */

View File

@ -1947,6 +1947,8 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
flags |= REDISMODULE_CTX_FLAGS_LUA;
if (ctx->client->flags & CLIENT_MULTI)
flags |= REDISMODULE_CTX_FLAGS_MULTI;
if (ctx->client->flags & CLIENT_DENY_BLOCKING)
flags |= REDISMODULE_CTX_FLAGS_DENY_BLOCKING;
/* Module command received from MASTER, is replicated. */
if (ctx->client->flags & CLIENT_MASTER)
flags |= REDISMODULE_CTX_FLAGS_REPLICATED;
@ -3392,6 +3394,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
/* Setup our fake client for command execution. */
c->flags |= CLIENT_MODULE;
/* We do not want to allow block, the module do not expect it */
c->flags |= CLIENT_DENY_BLOCKING;
c->db = ctx->client->db;
c->argv = argv;
c->argc = argc;
@ -3460,6 +3465,8 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch
}
call(c,call_flags);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
/* Convert the result of the Redis command into a module reply. */
sds proto = sdsnewlen(c->buf,c->bufpos);
c->bufpos = 0;

View File

@ -100,6 +100,7 @@ void multiCommand(client *c) {
return;
}
c->flags |= CLIENT_MULTI;
addReply(c,shared.ok);
}
@ -168,6 +169,11 @@ void execCommand(client *c) {
goto handle_monitor;
}
uint64_t old_flags = c->flags;
/* we do not want to allow blocking commands inside multi */
c->flags |= CLIENT_DENY_BLOCKING;
/* Exec all the queued commands */
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
orig_argv = c->argv;
@ -206,6 +212,7 @@ void execCommand(client *c) {
"no permission to touch the specified keys");
} else {
call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
}
/* Commands may alter argc/argv, restore mstate. */
@ -213,6 +220,11 @@ void execCommand(client *c) {
c->mstate.commands[j].argv = c->argv;
c->mstate.commands[j].cmd = c->cmd;
}
// restore old DENY_BLOCKING value
if (!(old_flags & CLIENT_DENY_BLOCKING))
c->flags &= ~CLIENT_DENY_BLOCKING;
c->argv = orig_argv;
c->argc = orig_argc;
c->cmd = orig_cmd;

View File

@ -360,6 +360,18 @@ int pubsubPublishMessage(robj *channel, robj *message) {
void subscribeCommand(client *c) {
int j;
if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
/**
* A client that has CLIENT_DENY_BLOCKING flag on
* expect a reply per command and so can not execute subscribe.
*
* Notice that we have a special treatment for multi because of
* backword compatibility
*/
addReplyError(c, "subscribe is not allow on DENY BLOCKING client");
return;
}
for (j = 1; j < c->argc; j++)
pubsubSubscribeChannel(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;
@ -380,6 +392,18 @@ void unsubscribeCommand(client *c) {
void psubscribeCommand(client *c) {
int j;
if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) {
/**
* A client that has CLIENT_DENY_BLOCKING flag on
* expect a reply per command and so can not execute subscribe.
*
* Notice that we have a special treatment for multi because of
* backword compatibility
*/
addReplyError(c, "PSUBSCRIBE is not allowed for DENY BLOCKING client");
return;
}
for (j = 1; j < c->argc; j++)
pubsubSubscribePattern(c,c->argv[j]);
c->flags |= CLIENT_PUBSUB;

View File

@ -120,11 +120,14 @@
#define REDISMODULE_CTX_FLAGS_MULTI_DIRTY (1<<19)
/* Redis is currently running inside background child process. */
#define REDISMODULE_CTX_FLAGS_IS_CHILD (1<<20)
/* The current client does not allow blocking, either called from
* within multi, lua, or from another module using RM_Call */
#define REDISMODULE_CTX_FLAGS_DENY_BLOCKING (1<<21)
/* Next context flag, must be updated when adding new flags above!
This flag should not be used directly by the module.
* Use RedisModule_GetContextFlagsAll instead. */
#define _REDISMODULE_CTX_FLAGS_NEXT (1<<21)
#define _REDISMODULE_CTX_FLAGS_NEXT (1<<22)
/* Keyspace changes notification classes. Every class is associated with a
* character for configuration purposes.

View File

@ -1372,7 +1372,20 @@ void replicationCreateMasterClient(connection *conn, int dbid) {
server.master = createClient(conn);
if (conn)
connSetReadHandler(server.master->conn, readQueryFromClient);
/**
     * Important note:
     * The CLIENT_DENY_BLOCKING flag is not, and should not, be set here.
     * For commands like BLPOP, it makes no sense to block the master
     * connection, and such blocking attempt will probably cause deadlock and
     * break the replication. We consider such a thing as a bug because
    * commands as BLPOP should never be sent on the replication link.
     * A possible use-case for blocking the replication link is if a module wants
     * to pass the execution to a background thread and unblock after the
     * execution is done. This is the reason why we allow blocking the replication
     * connection. */
server.master->flags |= CLIENT_MASTER;
server.master->authenticated = 1;
server.master->reploff = server.master_initial_offset;
server.master->read_reploff = server.master->reploff;

View File

@ -724,6 +724,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) {
call_flags |= CMD_CALL_PROPAGATE_REPL;
}
call(c,call_flags);
serverAssert((c->flags & CLIENT_BLOCKED) == 0);
/* Convert the result of the Redis command into a suitable Lua type.
* The first thing we need is to create a single string from the client
@ -1255,6 +1256,9 @@ void scriptingInit(int setup) {
if (server.lua_client == NULL) {
server.lua_client = createClient(NULL);
server.lua_client->flags |= CLIENT_LUA;
/* We do not want to allow blocking commands inside Lua */
server.lua_client->flags |= CLIENT_DENY_BLOCKING;
}
/* Lua beginners often don't use "local", this is likely to introduce
@ -2717,4 +2721,3 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
server.lua_time_start = mstime();
}
}

View File

@ -4887,6 +4887,14 @@ void infoCommand(client *c) {
}
void monitorCommand(client *c) {
if (c->flags & CLIENT_DENY_BLOCKING) {
/**
* A client that has CLIENT_DENY_BLOCKING flag on
* expects a reply per command and so can't execute MONITOR. */
addReplyError(c, "MONITOR is not allowed for DENY BLOCKING client");
return;
}
/* ignore MONITOR if already slave or in monitor mode */
if (c->flags & CLIENT_SLAVE) return;

View File

@ -267,6 +267,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT];
#define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */
#define CLIENT_CLOSE_AFTER_COMMAND (1ULL<<40) /* Close after executing commands
* and writing entire reply. */
#define CLIENT_DENY_BLOCKING (1ULL<<41) /* Indicate that the client should not be blocked.
currently, turned on inside MULTI, Lua, RM_Call,
and AOF client */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */

View File

@ -905,9 +905,9 @@ void blockingPopGenericCommand(client *c, int where) {
}
}
/* If we are inside a MULTI/EXEC and the list is empty the only thing
/* If we are not allowed to block the client, the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
if (c->flags & CLIENT_DENY_BLOCKING) {
addReplyNullArray(c);
return;
}
@ -930,8 +930,8 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou
if (checkType(c,key,OBJ_LIST)) return;
if (key == NULL) {
if (c->flags & CLIENT_MULTI) {
/* Blocking against an empty list in a multi state
if (c->flags & CLIENT_DENY_BLOCKING) {
/* Blocking against an empty list when blocking is not allowed
* returns immediately. */
addReplyNull(c);
} else {

View File

@ -1521,7 +1521,10 @@ void xreadCommand(client *c) {
char *o = c->argv[i]->ptr;
if (!strcasecmp(o,"BLOCK") && moreargs) {
if (c->flags & CLIENT_LUA) {
/* There is no sense to use BLOCK option within LUA */
/*
* Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client
* on Lua/MULTI/RM_Call we want special treatment for Lua to keep backword compatibility.
* There is no sense to use BLOCK option within Lua. */
addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr);
return;
}
@ -1732,9 +1735,9 @@ void xreadCommand(client *c) {
/* Block if needed. */
if (timeout != -1) {
/* If we are inside a MULTI/EXEC and the list is empty the only thing
/* If we are not allowed to block the client, the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
if (c->flags & CLIENT_DENY_BLOCKING) {
addReplyNullArray(c);
goto cleanup;
}

View File

@ -3632,9 +3632,9 @@ void blockingGenericZpopCommand(client *c, int where) {
}
}
/* If we are inside a MULTI/EXEC and the zset is empty the only thing
/* If we are not allowed to block the client and the zset is empty the only thing
* we can do is treating it as a timeout (even with timeout 0). */
if (c->flags & CLIENT_MULTI) {
if (c->flags & CLIENT_DENY_BLOCKING) {
addReplyNullArray(c);
return;
}

View File

@ -65,6 +65,12 @@ int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) &&
(flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) {
RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed");
return REDISMODULE_OK;
}
/* This command handler tries to acquire the GIL twice
* once in the worker thread using "RedisModule_ThreadSafeContextLock"
* second in the sub-worker thread
@ -79,6 +85,28 @@ int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
return REDISMODULE_OK;
}
int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
UNUSED(argv);
UNUSED(argc);
if(argc < 2){
return RedisModule_WrongArity(ctx);
}
const char* cmd = RedisModule_StringPtrLen(argv[1], NULL);
RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", argv + 2, argc - 2);
if(!rep){
RedisModule_ReplyWithError(ctx, "NULL reply returned");
}else{
RedisModule_ReplyWithCallReply(ctx, rep);
RedisModule_FreeCallReply(rep);
}
return REDISMODULE_OK;
}
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
@ -89,5 +117,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR)
return REDISMODULE_ERR;
return REDISMODULE_OK;
}

View File

@ -14,4 +14,62 @@ start_server {tags {"modules"}} {
r acquire_gil
assert_equal {{Blocked client is not supported inside multi}} [r exec]
}
test {Locked GIL acquisition from RM_Call} {
assert_equal {Blocked client is not allowed} [r do_rm_call acquire_gil]
}
test {Blocking command are not block the client on RM_Call} {
r lpush l test
assert_equal [r do_rm_call blpop l 0] {l test}
r lpush l test
assert_equal [r do_rm_call brpop l 0] {l test}
r lpush l1 test
assert_equal [r do_rm_call brpoplpush l1 l2 0] {test}
assert_equal [r do_rm_call brpop l2 0] {l2 test}
r lpush l1 test
assert_equal [r do_rm_call blmove l1 l2 LEFT LEFT 0] {test}
assert_equal [r do_rm_call brpop l2 0] {l2 test}
r ZADD zset1 0 a 1 b 2 c
assert_equal [r do_rm_call bzpopmin zset1 0] {zset1 a 0}
assert_equal [r do_rm_call bzpopmax zset1 0] {zset1 c 2}
r xgroup create s g $ MKSTREAM
r xadd s * foo bar
assert {[r do_rm_call xread BLOCK 0 STREAMS s 0-0] ne {}}
assert {[r do_rm_call xreadgroup group g c BLOCK 0 STREAMS s >] ne {}}
assert {[r do_rm_call blpop empty_list 0] eq {}}
assert {[r do_rm_call brpop empty_list 0] eq {}}
assert {[r do_rm_call brpoplpush empty_list1 empty_list2 0] eq {}}
assert {[r do_rm_call blmove empty_list1 empty_list2 LEFT LEFT 0] eq {}}
assert {[r do_rm_call bzpopmin empty_zset 0] eq {}}
assert {[r do_rm_call bzpopmax empty_zset 0] eq {}}
r xgroup create empty_stream g $ MKSTREAM
assert {[r do_rm_call xread BLOCK 0 STREAMS empty_stream $] eq {}}
assert {[r do_rm_call xreadgroup group g c BLOCK 0 STREAMS empty_stream >] eq {}}
}
test {Monitor disallow inside RM_Call} {
set e {}
catch {
r do_rm_call monitor
} e
set e
} {*MONITOR is not allow*}
test {subscribe disallow inside RM_Call} {
set e {}
catch {
r do_rm_call subscribe x
} e
set e
} {*subscribe is not allow*}
}

View File

@ -504,4 +504,21 @@ start_server {tags {"multi"}} {
$r2 config set maxmemory 0
$r2 close
}
test {Blocking commands ignores the timeout} {
r xgroup create s g $ MKSTREAM
set m [r multi]
r blpop empty_list 0
r brpop empty_list 0
r brpoplpush empty_list1 empty_list2 0
r blmove empty_list1 empty_list2 LEFT LEFT 0
r bzpopmin empty_zset 0
r bzpopmax empty_zset 0
r xread BLOCK 0 STREAMS s $
r xreadgroup group g c BLOCK 0 STREAMS s >
set res [r exec]
list $m $res
} {OK {{} {} {} {} {} {} {} {}}}
}

View File

@ -140,12 +140,42 @@ start_server {tags {"scripting"}} {
} {*execution time*}
}
test {EVAL - Scripts can't run certain commands} {
test {EVAL - Scripts can't run blpop command} {
set e {}
catch {r eval {return redis.pcall('blpop','x',0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run brpop command} {
set e {}
catch {r eval {return redis.pcall('brpop','empty_list',0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run brpoplpush command} {
set e {}
catch {r eval {return redis.pcall('brpoplpush','empty_list1', 'empty_list2',0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run blmove command} {
set e {}
catch {r eval {return redis.pcall('blmove','empty_list1', 'empty_list2', 'LEFT', 'LEFT', 0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run bzpopmin command} {
set e {}
catch {r eval {return redis.pcall('bzpopmin','empty_zset', 0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run bzpopmax command} {
set e {}
catch {r eval {return redis.pcall('bzpopmax','empty_zset', 0)} 0} e
set e
} {*not allowed*}
test {EVAL - Scripts can't run XREAD and XREADGROUP with BLOCK option} {
r del s
r xgroup create s g $ MKSTREAM