diff --git a/src/aof.c b/src/aof.c index 5cb78d077..94dc1bdfa 100644 --- a/src/aof.c +++ b/src/aof.c @@ -673,7 +673,18 @@ struct client *createAOFClient(void) { c->original_argv = NULL; c->argv_len_sum = 0; c->bufpos = 0; - c->flags = 0; + + /* + * The AOF client should never be blocked (unlike master + * replication connection). + * This is because blocking the AOF client might cause + * deadlock (because potentially no one will unblock it). + * Also, if the AOF client will be blocked just for + * background processing there is a chance that the + * command execution order will be violated. + */ + c->flags = CLIENT_DENY_BLOCKING; + c->btype = BLOCKED_NONE; /* We set the fake client as a slave waiting for the synchronization * so that Redis will not try to send replies to this client. */ diff --git a/src/module.c b/src/module.c index 03fde12a0..15ab927cb 100644 --- a/src/module.c +++ b/src/module.c @@ -1947,6 +1947,8 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { flags |= REDISMODULE_CTX_FLAGS_LUA; if (ctx->client->flags & CLIENT_MULTI) flags |= REDISMODULE_CTX_FLAGS_MULTI; + if (ctx->client->flags & CLIENT_DENY_BLOCKING) + flags |= REDISMODULE_CTX_FLAGS_DENY_BLOCKING; /* Module command received from MASTER, is replicated. */ if (ctx->client->flags & CLIENT_MASTER) flags |= REDISMODULE_CTX_FLAGS_REPLICATED; @@ -3392,6 +3394,9 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch /* Setup our fake client for command execution. */ c->flags |= CLIENT_MODULE; + + /* We do not want to allow block, the module do not expect it */ + c->flags |= CLIENT_DENY_BLOCKING; c->db = ctx->client->db; c->argv = argv; c->argc = argc; @@ -3460,6 +3465,8 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch } call(c,call_flags); + serverAssert((c->flags & CLIENT_BLOCKED) == 0); + /* Convert the result of the Redis command into a module reply. */ sds proto = sdsnewlen(c->buf,c->bufpos); c->bufpos = 0; diff --git a/src/multi.c b/src/multi.c index 3ce6d60ec..7790b0223 100644 --- a/src/multi.c +++ b/src/multi.c @@ -100,6 +100,7 @@ void multiCommand(client *c) { return; } c->flags |= CLIENT_MULTI; + addReply(c,shared.ok); } @@ -168,6 +169,11 @@ void execCommand(client *c) { goto handle_monitor; } + uint64_t old_flags = c->flags; + + /* we do not want to allow blocking commands inside multi */ + c->flags |= CLIENT_DENY_BLOCKING; + /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; @@ -206,6 +212,7 @@ void execCommand(client *c) { "no permission to touch the specified keys"); } else { call(c,server.loading ? CMD_CALL_NONE : CMD_CALL_FULL); + serverAssert((c->flags & CLIENT_BLOCKED) == 0); } /* Commands may alter argc/argv, restore mstate. */ @@ -213,6 +220,11 @@ void execCommand(client *c) { c->mstate.commands[j].argv = c->argv; c->mstate.commands[j].cmd = c->cmd; } + + // restore old DENY_BLOCKING value + if (!(old_flags & CLIENT_DENY_BLOCKING)) + c->flags &= ~CLIENT_DENY_BLOCKING; + c->argv = orig_argv; c->argc = orig_argc; c->cmd = orig_cmd; diff --git a/src/pubsub.c b/src/pubsub.c index 6fa397704..b7f7d9673 100644 --- a/src/pubsub.c +++ b/src/pubsub.c @@ -360,6 +360,18 @@ int pubsubPublishMessage(robj *channel, robj *message) { void subscribeCommand(client *c) { int j; + if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) { + /** + * A client that has CLIENT_DENY_BLOCKING flag on + * expect a reply per command and so can not execute subscribe. + * + * Notice that we have a special treatment for multi because of + * backword compatibility + */ + addReplyError(c, "subscribe is not allow on DENY BLOCKING client"); + return; + } + for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); c->flags |= CLIENT_PUBSUB; @@ -380,6 +392,18 @@ void unsubscribeCommand(client *c) { void psubscribeCommand(client *c) { int j; + if ((c->flags & CLIENT_DENY_BLOCKING) && !(c->flags & CLIENT_MULTI)) { + /** + * A client that has CLIENT_DENY_BLOCKING flag on + * expect a reply per command and so can not execute subscribe. + * + * Notice that we have a special treatment for multi because of + * backword compatibility + */ + addReplyError(c, "PSUBSCRIBE is not allowed for DENY BLOCKING client"); + return; + } + for (j = 1; j < c->argc; j++) pubsubSubscribePattern(c,c->argv[j]); c->flags |= CLIENT_PUBSUB; diff --git a/src/redismodule.h b/src/redismodule.h index 0f7134549..59f568c6f 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -120,11 +120,14 @@ #define REDISMODULE_CTX_FLAGS_MULTI_DIRTY (1<<19) /* Redis is currently running inside background child process. */ #define REDISMODULE_CTX_FLAGS_IS_CHILD (1<<20) +/* The current client does not allow blocking, either called from + * within multi, lua, or from another module using RM_Call */ +#define REDISMODULE_CTX_FLAGS_DENY_BLOCKING (1<<21) /* Next context flag, must be updated when adding new flags above! This flag should not be used directly by the module. * Use RedisModule_GetContextFlagsAll instead. */ -#define _REDISMODULE_CTX_FLAGS_NEXT (1<<21) +#define _REDISMODULE_CTX_FLAGS_NEXT (1<<22) /* Keyspace changes notification classes. Every class is associated with a * character for configuration purposes. diff --git a/src/replication.c b/src/replication.c index d1d26a970..745d317ca 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1372,7 +1372,20 @@ void replicationCreateMasterClient(connection *conn, int dbid) { server.master = createClient(conn); if (conn) connSetReadHandler(server.master->conn, readQueryFromClient); + + /** +     * Important note: +     * The CLIENT_DENY_BLOCKING flag is not, and should not, be set here. +     * For commands like BLPOP, it makes no sense to block the master +     * connection, and such blocking attempt will probably cause deadlock and +     * break the replication. We consider such a thing as a bug because +    * commands as BLPOP should never be sent on the replication link. +     * A possible use-case for blocking the replication link is if a module wants +     * to pass the execution to a background thread and unblock after the +     * execution is done. This is the reason why we allow blocking the replication +     * connection. */ server.master->flags |= CLIENT_MASTER; + server.master->authenticated = 1; server.master->reploff = server.master_initial_offset; server.master->read_reploff = server.master->reploff; diff --git a/src/scripting.c b/src/scripting.c index c8ff51b91..732cbfc62 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -724,6 +724,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { call_flags |= CMD_CALL_PROPAGATE_REPL; } call(c,call_flags); + serverAssert((c->flags & CLIENT_BLOCKED) == 0); /* Convert the result of the Redis command into a suitable Lua type. * The first thing we need is to create a single string from the client @@ -1255,6 +1256,9 @@ void scriptingInit(int setup) { if (server.lua_client == NULL) { server.lua_client = createClient(NULL); server.lua_client->flags |= CLIENT_LUA; + + /* We do not want to allow blocking commands inside Lua */ + server.lua_client->flags |= CLIENT_DENY_BLOCKING; } /* Lua beginners often don't use "local", this is likely to introduce @@ -2717,4 +2721,3 @@ void luaLdbLineHook(lua_State *lua, lua_Debug *ar) { server.lua_time_start = mstime(); } } - diff --git a/src/server.c b/src/server.c index 1ce6b68bd..ac5b30e90 100644 --- a/src/server.c +++ b/src/server.c @@ -4887,6 +4887,14 @@ void infoCommand(client *c) { } void monitorCommand(client *c) { + if (c->flags & CLIENT_DENY_BLOCKING) { + /** + * A client that has CLIENT_DENY_BLOCKING flag on + * expects a reply per command and so can't execute MONITOR. */ + addReplyError(c, "MONITOR is not allowed for DENY BLOCKING client"); + return; + } + /* ignore MONITOR if already slave or in monitor mode */ if (c->flags & CLIENT_SLAVE) return; diff --git a/src/server.h b/src/server.h index 66ee066a1..34c4e8360 100644 --- a/src/server.h +++ b/src/server.h @@ -267,6 +267,9 @@ extern int configOOMScoreAdjValuesDefaults[CONFIG_OOM_COUNT]; #define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */ #define CLIENT_CLOSE_AFTER_COMMAND (1ULL<<40) /* Close after executing commands * and writing entire reply. */ +#define CLIENT_DENY_BLOCKING (1ULL<<41) /* Indicate that the client should not be blocked. + currently, turned on inside MULTI, Lua, RM_Call, + and AOF client */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ diff --git a/src/t_list.c b/src/t_list.c index cdfc0ff51..ce1472d93 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -905,9 +905,9 @@ void blockingPopGenericCommand(client *c, int where) { } } - /* If we are inside a MULTI/EXEC and the list is empty the only thing + /* If we are not allowed to block the client, the only thing * we can do is treating it as a timeout (even with timeout 0). */ - if (c->flags & CLIENT_MULTI) { + if (c->flags & CLIENT_DENY_BLOCKING) { addReplyNullArray(c); return; } @@ -930,8 +930,8 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou if (checkType(c,key,OBJ_LIST)) return; if (key == NULL) { - if (c->flags & CLIENT_MULTI) { - /* Blocking against an empty list in a multi state + if (c->flags & CLIENT_DENY_BLOCKING) { + /* Blocking against an empty list when blocking is not allowed * returns immediately. */ addReplyNull(c); } else { diff --git a/src/t_stream.c b/src/t_stream.c index 02e4c3242..b25e54a02 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1521,7 +1521,10 @@ void xreadCommand(client *c) { char *o = c->argv[i]->ptr; if (!strcasecmp(o,"BLOCK") && moreargs) { if (c->flags & CLIENT_LUA) { - /* There is no sense to use BLOCK option within LUA */ + /* + * Although the CLIENT_DENY_BLOCKING flag should protect from blocking the client + * on Lua/MULTI/RM_Call we want special treatment for Lua to keep backword compatibility. + * There is no sense to use BLOCK option within Lua. */ addReplyErrorFormat(c, "%s command is not allowed with BLOCK option from scripts", (char *)c->argv[0]->ptr); return; } @@ -1732,9 +1735,9 @@ void xreadCommand(client *c) { /* Block if needed. */ if (timeout != -1) { - /* If we are inside a MULTI/EXEC and the list is empty the only thing + /* If we are not allowed to block the client, the only thing * we can do is treating it as a timeout (even with timeout 0). */ - if (c->flags & CLIENT_MULTI) { + if (c->flags & CLIENT_DENY_BLOCKING) { addReplyNullArray(c); goto cleanup; } diff --git a/src/t_zset.c b/src/t_zset.c index 6147674ff..f228e76dd 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -3632,9 +3632,9 @@ void blockingGenericZpopCommand(client *c, int where) { } } - /* If we are inside a MULTI/EXEC and the zset is empty the only thing + /* If we are not allowed to block the client and the zset is empty the only thing * we can do is treating it as a timeout (even with timeout 0). */ - if (c->flags & CLIENT_MULTI) { + if (c->flags & CLIENT_DENY_BLOCKING) { addReplyNullArray(c); return; } diff --git a/tests/modules/blockedclient.c b/tests/modules/blockedclient.c index 558e06502..cfa8319d8 100644 --- a/tests/modules/blockedclient.c +++ b/tests/modules/blockedclient.c @@ -65,6 +65,12 @@ int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } + if ((allFlags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING) && + (flags & REDISMODULE_CTX_FLAGS_DENY_BLOCKING)) { + RedisModule_ReplyWithSimpleString(ctx, "Blocked client is not allowed"); + return REDISMODULE_OK; + } + /* This command handler tries to acquire the GIL twice * once in the worker thread using "RedisModule_ThreadSafeContextLock" * second in the sub-worker thread @@ -79,6 +85,28 @@ int acquire_gil(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_OK; } +int do_rm_call(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ + UNUSED(argv); + UNUSED(argc); + + if(argc < 2){ + return RedisModule_WrongArity(ctx); + } + + const char* cmd = RedisModule_StringPtrLen(argv[1], NULL); + + RedisModuleCallReply* rep = RedisModule_Call(ctx, cmd, "v", argv + 2, argc - 2); + if(!rep){ + RedisModule_ReplyWithError(ctx, "NULL reply returned"); + }else{ + RedisModule_ReplyWithCallReply(ctx, rep); + RedisModule_FreeCallReply(rep); + } + + return REDISMODULE_OK; +} + + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -89,5 +117,8 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_CreateCommand(ctx, "acquire_gil", acquire_gil, "", 0, 0, 0) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "do_rm_call", do_rm_call, "", 0, 0, 0) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/blockedclient.tcl b/tests/unit/moduleapi/blockedclient.tcl index 5a541d138..33ecfff22 100644 --- a/tests/unit/moduleapi/blockedclient.tcl +++ b/tests/unit/moduleapi/blockedclient.tcl @@ -14,4 +14,62 @@ start_server {tags {"modules"}} { r acquire_gil assert_equal {{Blocked client is not supported inside multi}} [r exec] } + + test {Locked GIL acquisition from RM_Call} { + assert_equal {Blocked client is not allowed} [r do_rm_call acquire_gil] + } + + test {Blocking command are not block the client on RM_Call} { + r lpush l test + assert_equal [r do_rm_call blpop l 0] {l test} + + r lpush l test + assert_equal [r do_rm_call brpop l 0] {l test} + + r lpush l1 test + assert_equal [r do_rm_call brpoplpush l1 l2 0] {test} + assert_equal [r do_rm_call brpop l2 0] {l2 test} + + r lpush l1 test + assert_equal [r do_rm_call blmove l1 l2 LEFT LEFT 0] {test} + assert_equal [r do_rm_call brpop l2 0] {l2 test} + + r ZADD zset1 0 a 1 b 2 c + assert_equal [r do_rm_call bzpopmin zset1 0] {zset1 a 0} + assert_equal [r do_rm_call bzpopmax zset1 0] {zset1 c 2} + + r xgroup create s g $ MKSTREAM + r xadd s * foo bar + assert {[r do_rm_call xread BLOCK 0 STREAMS s 0-0] ne {}} + assert {[r do_rm_call xreadgroup group g c BLOCK 0 STREAMS s >] ne {}} + + assert {[r do_rm_call blpop empty_list 0] eq {}} + assert {[r do_rm_call brpop empty_list 0] eq {}} + assert {[r do_rm_call brpoplpush empty_list1 empty_list2 0] eq {}} + assert {[r do_rm_call blmove empty_list1 empty_list2 LEFT LEFT 0] eq {}} + + assert {[r do_rm_call bzpopmin empty_zset 0] eq {}} + assert {[r do_rm_call bzpopmax empty_zset 0] eq {}} + + r xgroup create empty_stream g $ MKSTREAM + assert {[r do_rm_call xread BLOCK 0 STREAMS empty_stream $] eq {}} + assert {[r do_rm_call xreadgroup group g c BLOCK 0 STREAMS empty_stream >] eq {}} + + } + + test {Monitor disallow inside RM_Call} { + set e {} + catch { + r do_rm_call monitor + } e + set e + } {*MONITOR is not allow*} + + test {subscribe disallow inside RM_Call} { + set e {} + catch { + r do_rm_call subscribe x + } e + set e + } {*subscribe is not allow*} } diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl index e45a2d8f4..43259b1c0 100644 --- a/tests/unit/multi.tcl +++ b/tests/unit/multi.tcl @@ -504,4 +504,21 @@ start_server {tags {"multi"}} { $r2 config set maxmemory 0 $r2 close } + + test {Blocking commands ignores the timeout} { + r xgroup create s g $ MKSTREAM + + set m [r multi] + r blpop empty_list 0 + r brpop empty_list 0 + r brpoplpush empty_list1 empty_list2 0 + r blmove empty_list1 empty_list2 LEFT LEFT 0 + r bzpopmin empty_zset 0 + r bzpopmax empty_zset 0 + r xread BLOCK 0 STREAMS s $ + r xreadgroup group g c BLOCK 0 STREAMS s > + set res [r exec] + + list $m $res + } {OK {{} {} {} {} {} {} {} {}}} } diff --git a/tests/unit/scripting.tcl b/tests/unit/scripting.tcl index 6bcba4c3f..0da1eda9f 100644 --- a/tests/unit/scripting.tcl +++ b/tests/unit/scripting.tcl @@ -140,12 +140,42 @@ start_server {tags {"scripting"}} { } {*execution time*} } - test {EVAL - Scripts can't run certain commands} { + test {EVAL - Scripts can't run blpop command} { set e {} catch {r eval {return redis.pcall('blpop','x',0)} 0} e set e } {*not allowed*} + test {EVAL - Scripts can't run brpop command} { + set e {} + catch {r eval {return redis.pcall('brpop','empty_list',0)} 0} e + set e + } {*not allowed*} + + test {EVAL - Scripts can't run brpoplpush command} { + set e {} + catch {r eval {return redis.pcall('brpoplpush','empty_list1', 'empty_list2',0)} 0} e + set e + } {*not allowed*} + + test {EVAL - Scripts can't run blmove command} { + set e {} + catch {r eval {return redis.pcall('blmove','empty_list1', 'empty_list2', 'LEFT', 'LEFT', 0)} 0} e + set e + } {*not allowed*} + + test {EVAL - Scripts can't run bzpopmin command} { + set e {} + catch {r eval {return redis.pcall('bzpopmin','empty_zset', 0)} 0} e + set e + } {*not allowed*} + + test {EVAL - Scripts can't run bzpopmax command} { + set e {} + catch {r eval {return redis.pcall('bzpopmax','empty_zset', 0)} 0} e + set e + } {*not allowed*} + test {EVAL - Scripts can't run XREAD and XREADGROUP with BLOCK option} { r del s r xgroup create s g $ MKSTREAM