From 3d0b427c30610b45c00b2377ee28bb69974ccea2 Mon Sep 17 00:00:00 2001 From: guybe7 Date: Wed, 10 Mar 2021 17:02:17 +0100 Subject: [PATCH] Fix some issues with modules and MULTI/EXEC (#8617) Bug 1: When a module ctx is freed moduleHandlePropagationAfterCommandCallback is called and handles propagation. We want to prevent it from propagating commands that were not replicated by the same context. Example: 1. module1.foo does: RM_Replicate(cmd1); RM_Call(cmd2); RM_Replicate(cmd3) 2. RM_Replicate(cmd1) propagates MULTI and adds cmd1 to also_propagagte 3. RM_Call(cmd2) create a new ctx, calls call() and destroys the ctx. 4. moduleHandlePropagationAfterCommandCallback is called, calling alsoPropagates EXEC (Note: EXEC is still not written to socket), setting server.in_trnsaction = 0 5. RM_Replicate(cmd3) is called, propagagting yet another MULTI (now we have nested MULTI calls, which is no good) and then cmd3 We must prevent RM_Call(cmd2) from resetting server.in_transaction. REDISMODULE_CTX_MULTI_EMITTED was revived for that purpose. Bug 2: Fix issues with nested RM_Call where some have '!' and some don't. Example: 1. module1.foo does RM_Call of module2.bar without replication (i.e. no '!') 2. module2.bar internally calls RM_Call of INCR with '!' 3. at the end of module1.foo we call RM_ReplicateVerbatim We want the replica/AOF to see only module1.foo and not the INCR from module2.bar Introduced a global replication_allowed flag inside RM_Call to determine whether we need to replicate or not (even if '!' was specified) Other changes: Split beforePropagateMultiOrExec to beforePropagateMulti afterPropagateExec just for better readability --- src/module.c | 27 +++++++++-- src/multi.c | 26 +++++------ src/server.c | 5 ++ src/server.h | 4 +- tests/modules/propagate.c | 74 ++++++++++++++++++++++++++++++ tests/unit/moduleapi/propagate.tcl | 53 +++++++++++++++++++++ 6 files changed, 170 insertions(+), 19 deletions(-) diff --git a/src/module.c b/src/module.c index 8d04215d6..d42207bbd 100644 --- a/src/module.c +++ b/src/module.c @@ -169,6 +169,7 @@ typedef struct RedisModuleCtx RedisModuleCtx; #define REDISMODULE_CTX_THREAD_SAFE (1<<4) #define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5) #define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<6) +#define REDISMODULE_CTX_MULTI_EMITTED (1<<7) /* This represents a Redis key opened with RM_OpenKey(). */ struct RedisModuleKey { @@ -599,6 +600,10 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { /* We don't need to do anything here if the context was never used * in order to propagate commands. */ + if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return; + + /* We don't need to do anything here if the server isn't inside + * a transaction. */ if (!server.propagate_in_transaction) return; /* If this command is executed from with Lua or MULTI/EXEC we do noy @@ -607,9 +612,9 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { /* Handle the replication of the final EXEC, since whatever a command * emits is always wrapped around MULTI/EXEC. */ - beforePropagateMultiOrExec(0); alsoPropagate(server.execCommand,c->db->id,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); + afterPropagateExec(); /* If this is not a module command context (but is instead a simple * callback context), we have to handle directly the "also propagate" @@ -1707,10 +1712,12 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) { * context, we have to setup the op array for the "also propagate" API * so that RM_Replicate() will work. */ if (!(ctx->flags & REDISMODULE_CTX_MODULE_COMMAND_CALL)) { + serverAssert(ctx->saved_oparray.ops == NULL); ctx->saved_oparray = server.also_propagate; redisOpArrayInit(&server.also_propagate); } execCommandPropagateMulti(ctx->client->db->id); + ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED; } /* Replicate the specified command and arguments to slaves and AOF, as effect @@ -4062,20 +4069,30 @@ RedisModuleCallReply *RM_Call(RedisModuleCtx *ctx, const char *cmdname, const ch } } - /* If we are using single commands replication, we need to wrap what - * we propagate into a MULTI/EXEC block, so that it will be atomic like - * a Lua script in the context of AOF and slaves. */ - if (replicate) moduleReplicateMultiIfNeeded(ctx); + /* We need to use a global replication_allowed flag in order to prevent + * replication of nested RM_Calls. Example: + * 1. module1.foo does RM_Call of module2.bar without replication (i.e. no '!') + * 2. module2.bar internally calls RM_Call of INCR with '!' + * 3. at the end of module1.foo we call RM_ReplicateVerbatim + * We want the replica/AOF to see only module1.foo and not the INCR from module2.bar */ + int prev_replication_allowed = server.replication_allowed; + server.replication_allowed = replicate && server.replication_allowed; /* Run the command */ int call_flags = CMD_CALL_SLOWLOG | CMD_CALL_STATS | CMD_CALL_NOWRAP; if (replicate) { + /* If we are using single commands replication, we need to wrap what + * we propagate into a MULTI/EXEC block, so that it will be atomic like + * a Lua script in the context of AOF and slaves. */ + moduleReplicateMultiIfNeeded(ctx); + if (!(flags & REDISMODULE_ARGV_NO_AOF)) call_flags |= CMD_CALL_PROPAGATE_AOF; if (!(flags & REDISMODULE_ARGV_NO_REPLICAS)) call_flags |= CMD_CALL_PROPAGATE_REPL; } call(c,call_flags); + server.replication_allowed = prev_replication_allowed; serverAssert((c->flags & CLIENT_BLOCKED) == 0); diff --git a/src/multi.c b/src/multi.c index d88c5f1b8..5d690b71f 100644 --- a/src/multi.c +++ b/src/multi.c @@ -113,30 +113,30 @@ void discardCommand(client *c) { addReply(c,shared.ok); } -void beforePropagateMultiOrExec(int multi) { - if (multi) { - /* Propagating MULTI */ - serverAssert(!server.propagate_in_transaction); - server.propagate_in_transaction = 1; - } else { - /* Propagating EXEC */ - serverAssert(server.propagate_in_transaction == 1); - server.propagate_in_transaction = 0; - } +void beforePropagateMulti() { + /* Propagating MULTI */ + serverAssert(!server.propagate_in_transaction); + server.propagate_in_transaction = 1; +} + +void afterPropagateExec() { + /* Propagating EXEC */ + serverAssert(server.propagate_in_transaction == 1); + server.propagate_in_transaction = 0; } /* Send a MULTI command to all the slaves and AOF file. Check the execCommand * implementation for more information. */ void execCommandPropagateMulti(int dbid) { - beforePropagateMultiOrExec(1); + beforePropagateMulti(); propagate(server.multiCommand,dbid,&shared.multi,1, PROPAGATE_AOF|PROPAGATE_REPL); } void execCommandPropagateExec(int dbid) { - beforePropagateMultiOrExec(0); propagate(server.execCommand,dbid,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); + afterPropagateExec(); } /* Aborts a transaction, with a specific error message. @@ -254,7 +254,6 @@ void execCommand(client *c) { if (server.propagate_in_transaction) { int is_master = server.masterhost == NULL; server.dirty++; - beforePropagateMultiOrExec(0); /* If inside the MULTI/EXEC block this instance was suddenly * switched from master to slave (using the SLAVEOF command), the * initial MULTI was propagated into the replication backlog, but the @@ -264,6 +263,7 @@ void execCommand(client *c) { char *execcmd = "*1\r\n$4\r\nEXEC\r\n"; feedReplicationBacklog(execcmd,strlen(execcmd)); } + afterPropagateExec(); } server.in_exec = 0; diff --git a/src/server.c b/src/server.c index 81e23d8ee..3ca87b684 100644 --- a/src/server.c +++ b/src/server.c @@ -3158,6 +3158,7 @@ void initServer(void) { server.clients_pending_write = listCreate(); server.clients_pending_read = listCreate(); server.clients_timeout_table = raxNew(); + server.replication_allowed = 1; server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); @@ -3502,6 +3503,7 @@ void redisOpArrayFree(redisOpArray *oa) { zfree(op->argv); } zfree(oa->ops); + oa->ops = NULL; } /* ====================== Commands lookup and execution ===================== */ @@ -3552,6 +3554,9 @@ struct redisCommand *lookupCommandOrOriginal(sds name) { void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { + if (!server.replication_allowed) + return; + /* Propagate a MULTI request once we encounter the first command which * is a write command. * This way we'll deliver the MULTI/..../EXEC block as a whole and diff --git a/src/server.h b/src/server.h index 11fe55397..a18e09ef0 100644 --- a/src/server.h +++ b/src/server.h @@ -1400,6 +1400,7 @@ struct redisServer { int child_info_nread; /* Num of bytes of the last read from pipe */ /* Propagation of commands in AOF / replication */ redisOpArray also_propagate; /* Additional command to propagate. */ + int replication_allowed; /* Are we allowed to replicate? */ /* Logging */ char *logfile; /* Path of log file */ int syslog_enabled; /* Is syslog enabled? */ @@ -1932,7 +1933,8 @@ void flagTransaction(client *c); void execCommandAbort(client *c, sds error); void execCommandPropagateMulti(int dbid); void execCommandPropagateExec(int dbid); -void beforePropagateMultiOrExec(int multi); +void beforePropagateMulti(); +void afterPropagateExec(); /* Redis object implementation */ void decrRefCount(robj *o); diff --git a/tests/modules/propagate.c b/tests/modules/propagate.c index 70cddacbd..e07f8efe7 100644 --- a/tests/modules/propagate.c +++ b/tests/modules/propagate.c @@ -70,6 +70,43 @@ int propagateTestTimerCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int return REDISMODULE_OK; } +/* Timer callback. */ +void timerNestedHandler(RedisModuleCtx *ctx, void *data) { + int repl = (long long)data; + + /* The goal is the trigger a module command that calls RM_Replicate + * in order to test MULTI/EXEC structre */ + RedisModule_Replicate(ctx,"INCRBY","cc","timer-nested-start","1"); + RedisModule_Call(ctx,"propagate-test.nested", repl? "!" : ""); + RedisModule_Replicate(ctx,"INCRBY","cc","timer-nested-end","1"); +} + +int propagateTestTimerNestedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + RedisModuleTimerID timer_id = + RedisModule_CreateTimer(ctx,100,timerNestedHandler,(void*)0); + REDISMODULE_NOT_USED(timer_id); + + RedisModule_ReplyWithSimpleString(ctx,"OK"); + return REDISMODULE_OK; +} + +int propagateTestTimerNestedReplCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + RedisModuleTimerID timer_id = + RedisModule_CreateTimer(ctx,100,timerNestedHandler,(void*)1); + REDISMODULE_NOT_USED(timer_id); + + RedisModule_ReplyWithSimpleString(ctx,"OK"); + return REDISMODULE_OK; +} + /* The thread entry point. */ void *threadMain(void *arg) { REDISMODULE_NOT_USED(arg); @@ -131,6 +168,28 @@ int propagateTestMixedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int return REDISMODULE_OK; } +int propagateTestNestedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + RedisModuleCallReply *reply; + + /* This test mixes multiple propagation systems. */ + reply = RedisModule_Call(ctx, "INCR", "c!", "using-call"); + RedisModule_FreeCallReply(reply); + + RedisModule_Call(ctx,"propagate-test.simple", "!"); + + RedisModule_Replicate(ctx,"INCR","c","counter-3"); + RedisModule_Replicate(ctx,"INCR","c","counter-4"); + + reply = RedisModule_Call(ctx, "INCR", "c!", "after-call"); + RedisModule_FreeCallReply(reply); + + RedisModule_ReplyWithSimpleString(ctx,"OK"); + return REDISMODULE_OK; +} + int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -143,6 +202,16 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) "",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"propagate-test.timer-nested", + propagateTestTimerNestedCommand, + "",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"propagate-test.timer-nested-repl", + propagateTestTimerNestedReplCommand, + "",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"propagate-test.thread", propagateTestThreadCommand, "",1,1,1) == REDISMODULE_ERR) @@ -158,5 +227,10 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) "",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx,"propagate-test.nested", + propagateTestNestedCommand, + "",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + return REDISMODULE_OK; } diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl index adebd37a6..1277d3970 100644 --- a/tests/unit/moduleapi/propagate.tcl +++ b/tests/unit/moduleapi/propagate.tcl @@ -42,6 +42,59 @@ tags "modules" { close_replication_stream $repl } + test {module propagates nested ctx case1} { + set repl [attach_to_replication_stream] + + $master del timer-nested-start + $master del timer-nested-end + $master propagate-test.timer-nested + + wait_for_condition 5000 10 { + [$replica get timer-nested-end] eq "1" + } else { + fail "The two counters don't match the expected value." + } + + assert_replication_stream $repl { + {select *} + {multi} + {incrby timer-nested-start 1} + {incrby timer-nested-end 1} + {exec} + } + close_replication_stream $repl + } + + test {module propagates nested ctx case2} { + set repl [attach_to_replication_stream] + + $master del timer-nested-start + $master del timer-nested-end + $master propagate-test.timer-nested-repl + + wait_for_condition 5000 10 { + [$replica get timer-nested-end] eq "1" + } else { + fail "The two counters don't match the expected value." + } + + # Note the 'after-call' and 'timer-nested-start' propagation below is out of order (known limitation) + assert_replication_stream $repl { + {select *} + {multi} + {incr using-call} + {incr counter-1} + {incr counter-2} + {incr after-call} + {incr counter-3} + {incr counter-4} + {incrby timer-nested-start 1} + {incrby timer-nested-end 1} + {exec} + } + close_replication_stream $repl + } + test {module propagates from thread} { set repl [attach_to_replication_stream]