diff --git a/src/cluster.c b/src/cluster.c index 8651a81d3..2cd6b2521 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -5763,7 +5763,7 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in * cluster is down. */ if (error_code) *error_code = CLUSTER_REDIR_DOWN_STATE; return NULL; - } else if (!(cmd->flags & CMD_READONLY) && !(cmd->proc == evalCommand) + } else if ((cmd->flags & CMD_WRITE) && !(cmd->proc == evalCommand) && !(cmd->proc == evalShaCommand)) { /* The cluster is configured to allow read only commands @@ -5812,11 +5812,10 @@ clusterNode *getNodeByQuery(client *c, struct redisCommand *cmd, robj **argv, in /* Handle the read-only client case reading from a slave: if this * node is a slave and the request is about a hash slot our master * is serving, we can reply without redirection. */ - int is_readonly_command = (c->cmd->flags & CMD_READONLY) || - (c->cmd->proc == execCommand && !(c->mstate.cmd_inv_flags & CMD_READONLY)); + int is_write_command = (c->cmd->flags & CMD_WRITE) || + (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); if (c->flags & CLIENT_READONLY && - (is_readonly_command || cmd->proc == evalCommand || - cmd->proc == evalShaCommand) && + (!is_write_command || cmd->proc == evalCommand || cmd->proc == evalShaCommand) && nodeIsSlave(myself) && myself->slaveof == n) { @@ -5901,7 +5900,7 @@ int clusterRedirectBlockedClientIfNeeded(client *c) { /* if the client is read-only and attempting to access key that our * replica can handle, allow it. */ if ((c->flags & CLIENT_READONLY) && - (c->lastcmd->flags & CMD_READONLY) && + !(c->lastcmd->flags & CMD_WRITE) && nodeIsSlave(myself) && myself->slaveof == node) { node = myself; diff --git a/src/module.c b/src/module.c index da9ac29e8..11f5f4489 100644 --- a/src/module.c +++ b/src/module.c @@ -1629,7 +1629,7 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) { ctx->saved_oparray = server.also_propagate; redisOpArrayInit(&server.also_propagate); } - execCommandPropagateMulti(ctx->client); + execCommandPropagateMulti(ctx->client->db->id); } /* Replicate the specified command and arguments to slaves and AOF, as effect diff --git a/src/multi.c b/src/multi.c index a2f9ecccf..af0b0c612 100644 --- a/src/multi.c +++ b/src/multi.c @@ -127,15 +127,15 @@ void beforePropagateMultiOrExec(int multi) { /* Send a MULTI command to all the slaves and AOF file. Check the execCommand * implementation for more information. */ -void execCommandPropagateMulti(client *c) { +void execCommandPropagateMulti(int dbid) { beforePropagateMultiOrExec(1); - propagate(server.multiCommand,c->db->id,&shared.multi,1, + propagate(server.multiCommand,dbid,&shared.multi,1, PROPAGATE_AOF|PROPAGATE_REPL); } -void execCommandPropagateExec(client *c) { +void execCommandPropagateExec(int dbid) { beforePropagateMultiOrExec(0); - propagate(server.execCommand,c->db->id,&shared.exec,1, + propagate(server.execCommand,dbid,&shared.exec,1, PROPAGATE_AOF|PROPAGATE_REPL); } @@ -162,7 +162,6 @@ void execCommand(client *c) { robj **orig_argv; int orig_argc; struct redisCommand *orig_cmd; - int must_propagate = 0; /* Need to propagate MULTI/EXEC to AOF / slaves? */ int was_master = server.masterhost == NULL; if (!(c->flags & CLIENT_MULTI)) { @@ -202,19 +201,6 @@ void execCommand(client *c) { c->argv = c->mstate.commands[j].argv; c->cmd = c->mstate.commands[j].cmd; - /* Propagate a MULTI request once we encounter the first command which - * is not readonly nor an administrative one. - * This way we'll deliver the MULTI/..../EXEC block as a whole and - * both the AOF and the replication link will have the same consistency - * and atomicity guarantees. */ - if (!must_propagate && - !server.loading && - !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) - { - execCommandPropagateMulti(c); - must_propagate = 1; - } - /* ACL permissions are also checked at the time of execution in case * they were changed after the commands were ququed. */ int acl_errpos; @@ -265,7 +251,7 @@ void execCommand(client *c) { /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ - if (must_propagate) { + if (server.propagate_in_transaction) { int is_master = server.masterhost == NULL; server.dirty++; beforePropagateMultiOrExec(0); diff --git a/src/scripting.c b/src/scripting.c index 8dca84478..bc137de25 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -721,7 +721,7 @@ int luaRedisGenericCommand(lua_State *lua, int raise_error) { server.lua_write_dirty && server.lua_repl != PROPAGATE_NONE) { - execCommandPropagateMulti(server.lua_caller); + execCommandPropagateMulti(server.lua_caller->db->id); server.lua_multi_emitted = 1; /* Now we are in the MULTI context, the lua_client should be * flag as CLIENT_MULTI. */ @@ -1638,7 +1638,7 @@ void evalGenericCommand(client *c, int evalsha) { if (server.lua_replicate_commands) { preventCommandPropagation(c); if (server.lua_multi_emitted) { - execCommandPropagateExec(c); + execCommandPropagateExec(c->db->id); } } diff --git a/src/server.c b/src/server.c index 279e75f2a..29c810928 100644 --- a/src/server.c +++ b/src/server.c @@ -115,9 +115,9 @@ struct redisServer server; /* Server global state */ * * write: Write command (may modify the key space). * - * read-only: All the non special commands just reading from keys without - * changing the content, or returning other information like - * the TIME command. Special commands such administrative commands + * read-only: Commands just reading from keys without changing the content. + * Note that commands that don't read from the keyspace such as + * TIME, SELECT, INFO, administrative commands, and connection * or transaction related commands (multi, exec, discard, ...) * are not flagged as read-only commands, since they affect the * server or the connection in other ways. @@ -685,7 +685,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"echo",echoCommand,2, - "read-only fast @connection", + "fast @connection", 0,NULL,0,0,0,0,0,0}, {"save",saveCommand,1, @@ -705,7 +705,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"lastsave",lastsaveCommand,1, - "read-only random fast ok-loading ok-stale @admin @dangerous", + "random fast ok-loading ok-stale @admin @dangerous", 0,NULL,0,0,0,0,0,0}, {"type",typeCommand,2, @@ -781,7 +781,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"role",roleCommand,1, - "ok-loading ok-stale no-script fast read-only @dangerous", + "ok-loading ok-stale no-script fast @dangerous", 0,NULL,0,0,0,0,0,0}, {"debug",debugCommand,-2, @@ -891,7 +891,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"time",timeCommand,1, - "read-only random fast ok-loading ok-stale", + "random fast ok-loading ok-stale", 0,NULL,0,0,0,0,0,0}, {"bitop",bitopCommand,-4, @@ -3377,6 +3377,14 @@ struct redisCommand *lookupCommandOrOriginal(sds name) { void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, int flags) { + /* Propagate a MULTI request once we encounter the first command which + * is a write command. + * This way we'll deliver the MULTI/..../EXEC block as a whole and + * both the AOF and the replication link will have the same consistency + * and atomicity guarantees. */ + if (server.in_exec && !server.propagate_in_transaction) + execCommandPropagateMulti(dbid); + if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); if (flags & PROPAGATE_REPL) @@ -3607,7 +3615,7 @@ void call(client *c, int flags) { !(c->flags & CLIENT_MULTI) && !(flags & CMD_CALL_NOWRAP)) { - execCommandPropagateMulti(c); + execCommandPropagateMulti(c->db->id); multi_emitted = 1; } @@ -3622,7 +3630,7 @@ void call(client *c, int flags) { } if (multi_emitted) { - execCommandPropagateExec(c); + execCommandPropagateExec(c->db->id); } } redisOpArrayFree(&server.also_propagate); diff --git a/src/server.h b/src/server.h index 8a1f191f3..04fd709a8 100644 --- a/src/server.h +++ b/src/server.h @@ -1861,8 +1861,8 @@ void touchWatchedKeysOnFlush(int dbid); void discardTransaction(client *c); void flagTransaction(client *c); void execCommandAbort(client *c, sds error); -void execCommandPropagateMulti(client *c); -void execCommandPropagateExec(client *c); +void execCommandPropagateMulti(int dbid); +void execCommandPropagateExec(int dbid); void beforePropagateMultiOrExec(int multi); /* Redis object implementation */ diff --git a/tests/modules/propagate.c b/tests/modules/propagate.c index 13277b19d..70cddacbd 100644 --- a/tests/modules/propagate.c +++ b/tests/modules/propagate.c @@ -51,18 +51,31 @@ void timerHandler(RedisModuleCtx *ctx, void *data) { RedisModule_Replicate(ctx,"INCR","c","timer"); times++; - if (times < 10) + if (times < 3) RedisModule_CreateTimer(ctx,100,timerHandler,NULL); else times = 0; } +int propagateTestTimerCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +{ + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + + RedisModuleTimerID timer_id = + RedisModule_CreateTimer(ctx,100,timerHandler,NULL); + REDISMODULE_NOT_USED(timer_id); + + RedisModule_ReplyWithSimpleString(ctx,"OK"); + return REDISMODULE_OK; +} + /* The thread entry point. */ void *threadMain(void *arg) { REDISMODULE_NOT_USED(arg); RedisModuleCtx *ctx = RedisModule_GetThreadSafeContext(NULL); RedisModule_SelectDb(ctx,9); /* Tests ran in database number 9. */ - for (int i = 0; i < 10; i++) { + for (int i = 0; i < 3; i++) { RedisModule_ThreadSafeContextLock(ctx); RedisModule_Replicate(ctx,"INCR","c","a-from-thread"); RedisModule_Replicate(ctx,"INCR","c","b-from-thread"); @@ -72,15 +85,11 @@ void *threadMain(void *arg) { return NULL; } -int propagateTestCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int propagateTestThreadCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); - RedisModuleTimerID timer_id = - RedisModule_CreateTimer(ctx,100,timerHandler,NULL); - REDISMODULE_NOT_USED(timer_id); - pthread_t tid; if (pthread_create(&tid,NULL,threadMain,NULL) != 0) return RedisModule_ReplyWithError(ctx,"-ERR Can't start thread"); @@ -90,7 +99,7 @@ int propagateTestCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc return REDISMODULE_OK; } -int propagateTest2Command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int propagateTestSimpleCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -102,7 +111,7 @@ int propagateTest2Command(RedisModuleCtx *ctx, RedisModuleString **argv, int arg return REDISMODULE_OK; } -int propagateTest3Command(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int propagateTestMixedCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { REDISMODULE_NOT_USED(argv); REDISMODULE_NOT_USED(argc); @@ -129,18 +138,23 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) if (RedisModule_Init(ctx,"propagate-test",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_CreateCommand(ctx,"propagate-test", - propagateTestCommand, + if (RedisModule_CreateCommand(ctx,"propagate-test.timer", + propagateTestTimerCommand, "",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_CreateCommand(ctx,"propagate-test-2", - propagateTest2Command, + if (RedisModule_CreateCommand(ctx,"propagate-test.thread", + propagateTestThreadCommand, "",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if (RedisModule_CreateCommand(ctx,"propagate-test-3", - propagateTest3Command, + if (RedisModule_CreateCommand(ctx,"propagate-test.simple", + propagateTestSimpleCommand, + "",1,1,1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx,"propagate-test.mixed", + propagateTestMixedCommand, "",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; diff --git a/tests/unit/moduleapi/propagate.tcl b/tests/unit/moduleapi/propagate.tcl index aa0f55e5e..adebd37a6 100644 --- a/tests/unit/moduleapi/propagate.tcl +++ b/tests/unit/moduleapi/propagate.tcl @@ -14,25 +14,103 @@ tags "modules" { # Start the replication process... $replica replicaof $master_host $master_port wait_for_sync $replica - after 1000 - $master propagate-test - wait_for_condition 5000 10 { - ([$replica get timer] eq "10") && \ - ([$replica get a-from-thread] eq "10") - } else { - fail "The two counters don't match the expected value." + test {module propagates from timer} { + set repl [attach_to_replication_stream] + + $master propagate-test.timer + + wait_for_condition 5000 10 { + [$replica get timer] eq "3" + } else { + fail "The two counters don't match the expected value." + } + + assert_replication_stream $repl { + {select *} + {multi} + {incr timer} + {exec} + {multi} + {incr timer} + {exec} + {multi} + {incr timer} + {exec} + } + close_replication_stream $repl } - $master propagate-test-2 - $master propagate-test-3 - $master multi - $master propagate-test-2 - $master propagate-test-3 - $master exec - wait_for_ofs_sync $master $replica + test {module propagates from thread} { + set repl [attach_to_replication_stream] + $master propagate-test.thread + + wait_for_condition 5000 10 { + [$replica get a-from-thread] eq "3" + } else { + fail "The two counters don't match the expected value." + } + + assert_replication_stream $repl { + {select *} + {incr a-from-thread} + {incr b-from-thread} + {incr a-from-thread} + {incr b-from-thread} + {incr a-from-thread} + {incr b-from-thread} + } + close_replication_stream $repl + } + + test {module propagates from from command} { + set repl [attach_to_replication_stream] + + $master propagate-test.simple + $master propagate-test.mixed + + # Note the 'after-call' propagation below is out of order (known limitation) + assert_replication_stream $repl { + {select *} + {multi} + {incr counter-1} + {incr counter-2} + {exec} + {multi} + {incr using-call} + {incr after-call} + {incr counter-1} + {incr counter-2} + {exec} + } + close_replication_stream $repl + } + + test {module propagates from from multi-exec} { + set repl [attach_to_replication_stream] + + $master multi + $master propagate-test.simple + $master propagate-test.mixed + $master exec + wait_for_ofs_sync $master $replica + + # Note the 'after-call' propagation below is out of order (known limitation) + assert_replication_stream $repl { + {select *} + {multi} + {incr counter-1} + {incr counter-2} + {incr using-call} + {incr after-call} + {incr counter-1} + {incr counter-2} + {exec} + } + close_replication_stream $repl + } assert_equal [s -1 unexpected_error_replies] 0 } } @@ -47,11 +125,11 @@ tags "modules aof" { r config set auto-aof-rewrite-percentage 0 ; # Disable auto-rewrite. waitForBgrewriteaof r - r propagate-test-2 - r propagate-test-3 + r propagate-test.simple + r propagate-test.mixed r multi - r propagate-test-2 - r propagate-test-3 + r propagate-test.simple + r propagate-test.mixed r exec # Load the AOF diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl index 43259b1c0..25e417055 100644 --- a/tests/unit/multi.tcl +++ b/tests/unit/multi.tcl @@ -299,10 +299,14 @@ start_server {tags {"multi"}} { r multi r del foo r exec + + # add another command so that when we see it we know multi-exec wasn't + # propagated + r incr foo + assert_replication_stream $repl { {select *} - {multi} - {exec} + {incr foo} } close_replication_stream $repl } @@ -521,4 +525,83 @@ start_server {tags {"multi"}} { list $m $res } {OK {{} {} {} {} {} {} {} {}}} + + test {MULTI propagation of PUBLISH} { + set repl [attach_to_replication_stream] + + # make sure that PUBLISH inside MULTI is propagated in a transaction + r multi + r publish bla bla + r exec + + assert_replication_stream $repl { + {select *} + {multi} + {publish bla bla} + {exec} + } + close_replication_stream $repl + } + + test {MULTI propagation of SCRIPT LOAD} { + set repl [attach_to_replication_stream] + + # make sure that SCRIPT LOAD inside MULTI is propagated in a transaction + r multi + r script load {redis.call('set', KEYS[1], 'foo')} + set res [r exec] + set sha [lindex $res 0] + + assert_replication_stream $repl { + {select *} + {multi} + {script load *} + {exec} + } + close_replication_stream $repl + } + + test {MULTI propagation of SCRIPT LOAD} { + set repl [attach_to_replication_stream] + + # make sure that EVAL inside MULTI is propagated in a transaction + r config set lua-replicate-commands no + r multi + r eval {redis.call('set', KEYS[1], 'bar')} 1 bar + r exec + + assert_replication_stream $repl { + {select *} + {multi} + {eval *} + {exec} + } + close_replication_stream $repl + } + + tags {"stream"} { + test {MULTI propagation of XREADGROUP} { + # stream is a special case because it calls propagate() directly for XREADGROUP + set repl [attach_to_replication_stream] + + r XADD mystream * foo bar + r XGROUP CREATE mystream mygroup 0 + + # make sure the XCALIM (propagated by XREADGROUP) is indeed inside MULTI/EXEC + r multi + r XREADGROUP GROUP mygroup consumer1 STREAMS mystream ">" + r exec + + assert_replication_stream $repl { + {select *} + {xadd *} + {xgroup CREATE *} + {multi} + {xclaim *} + {exec} + } + close_replication_stream $repl + } + } + }