From a84a6160ecf3d6aaab11911b741d0ecebd370aca Mon Sep 17 00:00:00 2001 From: guybe7 Date: Sun, 6 Dec 2020 12:14:18 +0100 Subject: [PATCH] Make sure we do not propagate nested MULTI/EXEC (#8097) One way this was happening is when a module issued an RM_Call which would inject MULTI. If the module command that does that was itself issued by something else that already did added MULTI (e.g. another module, or a Lua script), it would have caused nested MULTI. In fact the MULTI state in the client or the MULTI_EMITTED flag in the context isn't the right indication that we need to propagate MULTI or not, because on a nested calls (possibly a module action called by a keyspace event of another module action), these flags aren't retained / reflected. instead there's now a global propagate_in_transaction flag for that. in addition to that, we now have a global in_eval and in_exec flags, to serve the flags of RM_GetContextFlags, since their dependence on the current client is wrong for the same reasons mentioned above. --- src/module.c | 47 +++++----- src/multi.c | 20 +++++ src/scripting.c | 3 + src/server.c | 3 + src/server.h | 4 + tests/modules/keyspace_events.c | 109 ++++++++++++++++++++++- tests/unit/moduleapi/keyspace_events.tcl | 51 ++++++++++- 7 files changed, 212 insertions(+), 25 deletions(-) diff --git a/src/module.c b/src/module.c index 515363583..b5921ed55 100644 --- a/src/module.c +++ b/src/module.c @@ -158,14 +158,13 @@ struct RedisModuleCtx { typedef struct RedisModuleCtx RedisModuleCtx; #define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, NULL, {0}} -#define REDISMODULE_CTX_MULTI_EMITTED (1<<0) -#define REDISMODULE_CTX_AUTO_MEMORY (1<<1) -#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2) -#define REDISMODULE_CTX_BLOCKED_REPLY (1<<3) -#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4) -#define REDISMODULE_CTX_THREAD_SAFE (1<<5) -#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6) -#define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<7) +#define REDISMODULE_CTX_AUTO_MEMORY (1<<0) +#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<1) +#define REDISMODULE_CTX_BLOCKED_REPLY (1<<2) +#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<3) +#define REDISMODULE_CTX_THREAD_SAFE (1<<4) +#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5) +#define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<6) /* This represents a Redis key opened with RM_OpenKey(). */ struct RedisModuleKey { @@ -578,12 +577,15 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { /* We don't need to do anything here if the context was never used * in order to propagate commands. */ - if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return; + if (!server.propagate_in_transaction) return; - if (c->flags & CLIENT_LUA) return; + /* If this command is executed from with Lua or MULTI/EXEC we do noy + * need to propagate EXEC */ + if (server.in_eval || server.in_exec) return; /* Handle the replication of the final EXEC, since whatever a command * emits is always wrapped around MULTI/EXEC. */ + beforePropagateMultiOrExec(0); alsoPropagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); @@ -1610,9 +1612,9 @@ int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) { void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) { /* Skip this if client explicitly wrap the command with MULTI, or if * the module command was called by a script. */ - if (ctx->client->flags & (CLIENT_MULTI|CLIENT_LUA)) return; + if (server.lua_caller || server.in_exec) return; /* If we already emitted MULTI return ASAP. */ - if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return; + if (server.propagate_in_transaction) return; /* If this is a thread safe context, we do not want to wrap commands * executed into MULTI/EXEC, they are executed as single commands * from an external client in essence. */ @@ -1625,7 +1627,6 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) { redisOpArrayInit(&server.also_propagate); } execCommandPropagateMulti(ctx->client); - ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED; } /* Replicate the specified command and arguments to slaves and AOF, as effect @@ -1938,20 +1939,16 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) { * background child process. */ int RM_GetContextFlags(RedisModuleCtx *ctx) { - int flags = 0; + /* Client specific flags */ if (ctx) { if (ctx->client) { - if (ctx->client->flags & CLIENT_LUA) - flags |= REDISMODULE_CTX_FLAGS_LUA; - if (ctx->client->flags & CLIENT_MULTI) - flags |= REDISMODULE_CTX_FLAGS_MULTI; if (ctx->client->flags & CLIENT_DENY_BLOCKING) - flags |= REDISMODULE_CTX_FLAGS_DENY_BLOCKING; + flags |= REDISMODULE_CTX_FLAGS_DENY_BLOCKING; /* Module command received from MASTER, is replicated. */ if (ctx->client->flags & CLIENT_MASTER) - flags |= REDISMODULE_CTX_FLAGS_REPLICATED; + flags |= REDISMODULE_CTX_FLAGS_REPLICATED; } /* For DIRTY flags, we need the blocked client if used */ @@ -1961,6 +1958,12 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) { } } + if (server.in_eval) + flags |= REDISMODULE_CTX_FLAGS_LUA; + + if (server.in_exec) + flags |= REDISMODULE_CTX_FLAGS_MULTI; + if (server.cluster_enabled) flags |= REDISMODULE_CTX_FLAGS_CLUSTER; @@ -4497,8 +4500,8 @@ void unblockClientFromModule(client *c) { */ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { client *c = ctx->client; - int islua = c->flags & CLIENT_LUA; - int ismulti = c->flags & CLIENT_MULTI; + int islua = server.in_eval; + int ismulti = server.in_exec; c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient)); RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; diff --git a/src/multi.c b/src/multi.c index c519828c4..a2f9ecccf 100644 --- a/src/multi.c +++ b/src/multi.c @@ -113,14 +113,28 @@ void discardCommand(client *c) { addReply(c,shared.ok); } +void beforePropagateMultiOrExec(int multi) { + if (multi) { + /* Propagating MULTI */ + serverAssert(!server.propagate_in_transaction); + server.propagate_in_transaction = 1; + } else { + /* Propagating EXEC */ + serverAssert(server.propagate_in_transaction == 1); + server.propagate_in_transaction = 0; + } +} + /* Send a MULTI command to all the slaves and AOF file. Check the execCommand * implementation for more information. */ void execCommandPropagateMulti(client *c) { + beforePropagateMultiOrExec(1); propagate(server.multiCommand,c->db->id,&shared.multi,1, PROPAGATE_AOF|PROPAGATE_REPL); } void execCommandPropagateExec(client *c) { + beforePropagateMultiOrExec(0); propagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); } @@ -176,6 +190,9 @@ void execCommand(client *c) { /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ + + server.in_exec = 1; + orig_argv = c->argv; orig_argc = c->argc; orig_cmd = c->cmd; @@ -251,6 +268,7 @@ void execCommand(client *c) { if (must_propagate) { int is_master = server.masterhost == NULL; server.dirty++; + beforePropagateMultiOrExec(0); /* If inside the MULTI/EXEC block this instance was suddenly * switched from master to slave (using the SLAVEOF command), the * initial MULTI was propagated into the replication backlog, but the @@ -262,6 +280,8 @@ void execCommand(client *c) { } } + server.in_exec = 0; + handle_monitor: /* Send EXEC to clients waiting data from MONITOR. We do it here * since the natural order of commands execution is actually: diff --git a/src/scripting.c b/src/scripting.c index 3ed3e2f64..8dca84478 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -1498,6 +1498,7 @@ void evalGenericCommand(client *c, int evalsha) { server.lua_replicate_commands = server.lua_always_replicate_commands; server.lua_multi_emitted = 0; server.lua_repl = PROPAGATE_AOF|PROPAGATE_REPL; + server.in_eval = 1; /* Get the number of arguments that are keys */ if (getLongLongFromObjectOrReply(c,c->argv[2],&numkeys,NULL) != C_OK) @@ -1678,6 +1679,8 @@ void evalGenericCommand(client *c, int evalsha) { forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF); } } + + server.in_eval = 0; } void evalCommand(client *c) { diff --git a/src/server.c b/src/server.c index bfd256310..c30577da4 100644 --- a/src/server.c +++ b/src/server.c @@ -3049,6 +3049,9 @@ void initServer(void) { listSetFreeMethod(server.pubsub_patterns,freePubsubPattern); listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern); server.cronloops = 0; + server.in_eval = 0; + server.in_exec = 0; + server.propagate_in_transaction = 0; server.rdb_child_pid = -1; server.aof_child_pid = -1; server.module_child_pid = -1; diff --git a/src/server.h b/src/server.h index 772775bea..5bbe7876c 100644 --- a/src/server.h +++ b/src/server.h @@ -1116,6 +1116,9 @@ struct redisServer { int sentinel_mode; /* True if this instance is a Sentinel. */ size_t initial_memory_usage; /* Bytes used after initialization. */ int always_show_logo; /* Show logo even for non-stdout logging. */ + int in_eval; /* Are we inside EVAL? */ + int in_exec; /* Are we inside EXEC? */ + int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */ /* Modules */ dict *moduleapi; /* Exported core APIs dictionary for modules. */ dict *sharedapi; /* Like moduleapi but containing the APIs that @@ -1834,6 +1837,7 @@ void flagTransaction(client *c); void execCommandAbort(client *c, sds error); void execCommandPropagateMulti(client *c); void execCommandPropagateExec(client *c); +void beforePropagateMultiOrExec(int multi); /* Redis object implementation */ void decrRefCount(robj *o); diff --git a/tests/modules/keyspace_events.c b/tests/modules/keyspace_events.c index fac7edd1f..be259d738 100644 --- a/tests/modules/keyspace_events.c +++ b/tests/modules/keyspace_events.c @@ -39,7 +39,7 @@ /** strores all the keys on which we got 'loaded' keyspace notification **/ RedisModuleDict *loaded_event_log = NULL; -static int KeySpace_Notification(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ +static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){ REDISMODULE_NOT_USED(ctx); REDISMODULE_NOT_USED(type); @@ -55,6 +55,29 @@ static int KeySpace_Notification(RedisModuleCtx *ctx, int type, const char *even return REDISMODULE_OK; } +static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) { + REDISMODULE_NOT_USED(type); + + if (strcmp(event, "del") == 0) { + RedisModuleString *copykey = RedisModule_CreateStringPrintf(ctx, "%s_copy", RedisModule_StringPtrLen(key, NULL)); + RedisModuleCallReply* rep = RedisModule_Call(ctx, "DEL", "s!", copykey); + RedisModule_FreeString(ctx, copykey); + RedisModule_FreeCallReply(rep); + + int ctx_flags = RedisModule_GetContextFlags(ctx); + if (ctx_flags & REDISMODULE_CTX_FLAGS_LUA) { + RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c", "lua"); + RedisModule_FreeCallReply(rep); + } + if (ctx_flags & REDISMODULE_CTX_FLAGS_MULTI) { + RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c", "multi"); + RedisModule_FreeCallReply(rep); + } + } + + return REDISMODULE_OK; +} + static int cmdIsKeyLoaded(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){ if(argc != 2){ return RedisModule_WrongArity(ctx); @@ -75,6 +98,68 @@ static int cmdIsKeyLoaded(RedisModuleCtx *ctx, RedisModuleString **argv, int arg return REDISMODULE_OK; } +static int cmdDelKeyCopy(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) + return RedisModule_WrongArity(ctx); + + RedisModuleCallReply* rep = RedisModule_Call(ctx, "DEL", "s!", argv[1]); + if (!rep) { + RedisModule_ReplyWithError(ctx, "NULL reply returned"); + } else { + RedisModule_ReplyWithCallReply(ctx, rep); + RedisModule_FreeCallReply(rep); + } + return REDISMODULE_OK; +} + +/* Call INCR and propagate using RM_Call with `!`. */ +static int cmdIncrCase1(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) + return RedisModule_WrongArity(ctx); + + RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s!", argv[1]); + if (!rep) { + RedisModule_ReplyWithError(ctx, "NULL reply returned"); + } else { + RedisModule_ReplyWithCallReply(ctx, rep); + RedisModule_FreeCallReply(rep); + } + return REDISMODULE_OK; +} + +/* Call INCR and propagate using RM_Replicate. */ +static int cmdIncrCase2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) + return RedisModule_WrongArity(ctx); + + RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s", argv[1]); + if (!rep) { + RedisModule_ReplyWithError(ctx, "NULL reply returned"); + } else { + RedisModule_ReplyWithCallReply(ctx, rep); + RedisModule_FreeCallReply(rep); + } + RedisModule_Replicate(ctx, "INCR", "s", argv[1]); + return REDISMODULE_OK; +} + +/* Call INCR and propagate using RM_ReplicateVerbatim. */ +static int cmdIncrCase3(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 2) + return RedisModule_WrongArity(ctx); + + RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s", argv[1]); + if (!rep) { + RedisModule_ReplyWithError(ctx, "NULL reply returned"); + } else { + RedisModule_ReplyWithCallReply(ctx, rep); + RedisModule_FreeCallReply(rep); + } + RedisModule_ReplicateVerbatim(ctx); + return REDISMODULE_OK; +} + + /* This function must be present on each Redis module. It is used in order to * register the commands into the Redis server. */ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { @@ -94,7 +179,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } - if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_LOADED, KeySpace_Notification) != REDISMODULE_OK){ + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_LOADED, KeySpace_NotificationLoaded) != REDISMODULE_OK){ + return REDISMODULE_ERR; + } + + if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_GENERIC, KeySpace_NotificationGeneric) != REDISMODULE_OK){ return REDISMODULE_ERR; } @@ -102,6 +191,22 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) return REDISMODULE_ERR; } + if (RedisModule_CreateCommand(ctx,"keyspace.del_key_copy", cmdDelKeyCopy,"",0,0,0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx,"keyspace.incr_case1", cmdIncrCase1,"",0,0,0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx,"keyspace.incr_case2", cmdIncrCase2,"",0,0,0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + + if (RedisModule_CreateCommand(ctx,"keyspace.incr_case3", cmdIncrCase3,"",0,0,0) == REDISMODULE_ERR){ + return REDISMODULE_ERR; + } + return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/keyspace_events.tcl b/tests/unit/moduleapi/keyspace_events.tcl index 5b3db0c0a..293a62e4e 100644 --- a/tests/unit/moduleapi/keyspace_events.tcl +++ b/tests/unit/moduleapi/keyspace_events.tcl @@ -18,5 +18,54 @@ tags "modules" { assert_equal {1 t} [r keyspace.is_key_loaded t] assert_equal {1 s} [r keyspace.is_key_loaded s] } + + test {Nested multi due to RM_Call} { + r del multi + r del lua + + r set x 1 + r set x_copy 1 + r keyspace.del_key_copy x + r keyspace.incr_case1 x + r keyspace.incr_case2 x + r keyspace.incr_case3 x + assert_equal {} [r get multi] + assert_equal {} [r get lua] + r get x + } {3} + + test {Nested multi due to RM_Call, with client MULTI} { + r del multi + r del lua + + r set x 1 + r set x_copy 1 + r multi + r keyspace.del_key_copy x + r keyspace.incr_case1 x + r keyspace.incr_case2 x + r keyspace.incr_case3 x + r exec + assert_equal {1} [r get multi] + assert_equal {} [r get lua] + r get x + } {3} + + test {Nested multi due to RM_Call, with EVAL} { + r del multi + r del lua + + r set x 1 + r set x_copy 1 + r eval { + redis.pcall('keyspace.del_key_copy', KEYS[1]) + redis.pcall('keyspace.incr_case1', KEYS[1]) + redis.pcall('keyspace.incr_case2', KEYS[1]) + redis.pcall('keyspace.incr_case3', KEYS[1]) + } 1 x + assert_equal {} [r get multi] + assert_equal {1} [r get lua] + r get x + } {3} } -} \ No newline at end of file +}