diff --git a/src/multi.c b/src/multi.c index 60a07dfc7..35ddf92af 100644 --- a/src/multi.c +++ b/src/multi.c @@ -36,6 +36,7 @@ void initClientMultiState(client *c) { c->mstate.commands = NULL; c->mstate.count = 0; c->mstate.cmd_flags = 0; + c->mstate.cmd_inv_flags = 0; } /* Release all the resources associated with MULTI/EXEC state */ @@ -76,6 +77,7 @@ void queueMultiCommand(client *c) { incrRefCount(mc->argv[j]); c->mstate.count++; c->mstate.cmd_flags |= c->cmd->flags; + c->mstate.cmd_inv_flags |= ~c->cmd->flags; } void discardTransaction(client *c) { @@ -122,6 +124,23 @@ void execCommandPropagateExec(client *c) { PROPAGATE_AOF|PROPAGATE_REPL); } +/* Aborts a transaction, with a specific error message. + * The transaction is always aboarted with -EXECABORT so that the client knows + * the server exited the multi state, but the actual reason for the abort is + * included too. */ +void execCommandAbort(client *c, sds error) { + discardTransaction(c); + + if (error[0] == '-') error++; + addReplyErrorFormat(c, "-EXECABORT Transaction discarded because of: %s", error); + + /* Send EXEC to clients waiting data from MONITOR. We did send a MULTI + * already, and didn't send any of the queued commands, now we'll just send + * EXEC so it is clear that the transaction is over. */ + if (listLength(server.monitors) && !server.loading) + replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc); +} + void execCommand(client *c) { int j; robj **orig_argv; @@ -135,15 +154,6 @@ void execCommand(client *c) { return; } - /* If we are in -BUSY state, flag the transaction and return the - * -BUSY error, like Redis <= 5. This is a temporary fix, may be changed - * ASAP, see issue #7353 on Github. */ - if (server.lua_timedout) { - flagTransaction(c); - addReply(c, shared.slowscripterr); - return; - } - /* Check if we need to abort the EXEC because: * 1) Some WATCHed key was touched. * 2) There was a previous error while queueing commands. @@ -157,21 +167,6 @@ void execCommand(client *c) { goto handle_monitor; } - /* If there are write commands inside the transaction, and this is a read - * only slave, we want to send an error. This happens when the transaction - * was initiated when the instance was a master or a writable replica and - * then the configuration changed (for example instance was turned into - * a replica). */ - if (!server.loading && server.masterhost && server.repl_slave_ro && - !(c->flags & CLIENT_MASTER) && c->mstate.cmd_flags & CMD_WRITE) - { - addReplyError(c, - "Transaction contains write commands but instance " - "is now a read-only replica. EXEC aborted."); - discardTransaction(c); - goto handle_monitor; - } - /* Exec all the queued commands */ unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */ orig_argv = c->argv; diff --git a/src/networking.c b/src/networking.c index 77b9a6fcf..539b1fb50 100644 --- a/src/networking.c +++ b/src/networking.c @@ -406,19 +406,23 @@ void addReplyError(client *c, const char *err) { addReplyErrorLength(c,err,strlen(err)); } +/* See addReplyErrorLength. + * Makes sure there are no newlines in the string, otherwise invalid protocol + * is emitted. */ +void addReplyErrorSafe(client *c, char *s, size_t len) { + size_t j; + for (j = 0; j < len; j++) { + if (s[j] == '\r' || s[j] == '\n') s[j] = ' '; + } + addReplyErrorLength(c,s,sdslen(s)); +} + void addReplyErrorFormat(client *c, const char *fmt, ...) { - size_t l, j; va_list ap; va_start(ap,fmt); sds s = sdscatvprintf(sdsempty(),fmt,ap); va_end(ap); - /* Make sure there are no newlines in the string, otherwise invalid protocol - * is emitted. */ - l = sdslen(s); - for (j = 0; j < l; j++) { - if (s[j] == '\r' || s[j] == '\n') s[j] = ' '; - } - addReplyErrorLength(c,s,sdslen(s)); + addReplyErrorSafe(c, s, sdslen(s)); sdsfree(s); } diff --git a/src/server.c b/src/server.c index e8e711240..678dc305a 100644 --- a/src/server.c +++ b/src/server.c @@ -3398,6 +3398,34 @@ void call(client *c, int flags) { server.stat_numcommands++; } +/* Used when a command that is ready for execution needs to be rejected, due to + * varios pre-execution checks. it returns the appropriate error to the client. + * If there's a transaction is flags it as dirty, and if the command is EXEC, + * it aborts the transaction. */ +void rejectCommand(client *c, robj *reply) { + flagTransaction(c); + if (c->cmd && c->cmd->proc == execCommand) { + execCommandAbort(c, reply->ptr); + } else { + /* using addReplyError* rather than addReply so that the error can be logged. */ + addReplyErrorSafe(c, reply->ptr, sdslen(reply->ptr)); + } +} + +void rejectCommandFormat(client *c, const char *fmt, ...) { + flagTransaction(c); + va_list ap; + va_start(ap,fmt); + sds s = sdscatvprintf(sdsempty(),fmt,ap); + va_end(ap); + if (c->cmd && c->cmd->proc == execCommand) { + execCommandAbort(c, s); + } else { + addReplyErrorSafe(c, s, sdslen(s)); + } + sdsfree(s); +} + /* If this function gets called we already read a whole * command, arguments are in the client argv/argc fields. * processCommand() execute the command or prepare the @@ -3423,23 +3451,30 @@ int processCommand(client *c) { * such as wrong arity, bad command name and so forth. */ c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); if (!c->cmd) { - flagTransaction(c); sds args = sdsempty(); int i; for (i=1; i < c->argc && sdslen(args) < 128; i++) args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr); - addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s", + rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s", (char*)c->argv[0]->ptr, args); sdsfree(args); return C_OK; } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) || (c->argc < -c->cmd->arity)) { - flagTransaction(c); - addReplyErrorFormat(c,"wrong number of arguments for '%s' command", + rejectCommandFormat(c,"wrong number of arguments for '%s' command", c->cmd->name); return C_OK; } + int is_write_command = (c->cmd->flags & CMD_WRITE) || + (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE)); + int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) || + (c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM)); + int is_denystale_command = !(c->cmd->flags & CMD_STALE) || + (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE)); + int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) || + (c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING)); + /* Check if the user is authenticated. This check is skipped in case * the default user is flagged as "nopass" and is active. */ int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) || @@ -3449,8 +3484,7 @@ int processCommand(client *c) { /* AUTH and HELLO and no auth modules are valid even in * non-authenticated state. */ if (!(c->cmd->flags & CMD_NO_AUTH)) { - flagTransaction(c); - addReply(c,shared.noautherr); + rejectCommand(c,shared.noautherr); return C_OK; } } @@ -3461,13 +3495,12 @@ int processCommand(client *c) { int acl_retval = ACLCheckCommandPerm(c,&acl_keypos); if (acl_retval != ACL_OK) { addACLLogEntry(c,acl_retval,acl_keypos,NULL); - flagTransaction(c); if (acl_retval == ACL_DENIED_CMD) - addReplyErrorFormat(c, + rejectCommandFormat(c, "-NOPERM this user has no permissions to run " "the '%s' command or its subcommand", c->cmd->name); else - addReplyErrorFormat(c, + rejectCommandFormat(c, "-NOPERM this user has no permissions to access " "one of the keys used as arguments"); return C_OK; @@ -3515,13 +3548,11 @@ int processCommand(client *c) { * is trying to execute is denied during OOM conditions or the client * is in MULTI/EXEC context? Error. */ if (out_of_memory && - (c->cmd->flags & CMD_DENYOOM || + (is_denyoom_command || (c->flags & CLIENT_MULTI && - c->cmd->proc != execCommand && c->cmd->proc != discardCommand))) { - flagTransaction(c); - addReply(c, shared.oomerr); + rejectCommand(c, shared.oomerr); return C_OK; } @@ -3542,17 +3573,14 @@ int processCommand(client *c) { int deny_write_type = writeCommandsDeniedByDiskError(); if (deny_write_type != DISK_ERROR_TYPE_NONE && server.masterhost == NULL && - (c->cmd->flags & CMD_WRITE || - c->cmd->proc == pingCommand)) + (is_write_command ||c->cmd->proc == pingCommand)) { - flagTransaction(c); if (deny_write_type == DISK_ERROR_TYPE_RDB) - addReply(c, shared.bgsaveerr); + rejectCommand(c, shared.bgsaveerr); else - addReplySds(c, - sdscatprintf(sdsempty(), + rejectCommandFormat(c, "-MISCONF Errors writing to the AOF file: %s\r\n", - strerror(server.aof_last_write_errno))); + strerror(server.aof_last_write_errno)); return C_OK; } @@ -3561,11 +3589,10 @@ int processCommand(client *c) { if (server.masterhost == NULL && server.repl_min_slaves_to_write && server.repl_min_slaves_max_lag && - c->cmd->flags & CMD_WRITE && + is_write_command && server.repl_good_slaves_count < server.repl_min_slaves_to_write) { - flagTransaction(c); - addReply(c, shared.noreplicaserr); + rejectCommand(c, shared.noreplicaserr); return C_OK; } @@ -3573,10 +3600,9 @@ int processCommand(client *c) { * accept write commands if this is our master. */ if (server.masterhost && server.repl_slave_ro && !(c->flags & CLIENT_MASTER) && - c->cmd->flags & CMD_WRITE) + is_write_command) { - flagTransaction(c); - addReply(c, shared.roslaveerr); + rejectCommand(c, shared.roslaveerr); return C_OK; } @@ -3588,7 +3614,7 @@ int processCommand(client *c) { c->cmd->proc != unsubscribeCommand && c->cmd->proc != psubscribeCommand && c->cmd->proc != punsubscribeCommand) { - addReplyErrorFormat(c, + rejectCommandFormat(c, "Can't execute '%s': only (P)SUBSCRIBE / " "(P)UNSUBSCRIBE / PING / QUIT are allowed in this context", c->cmd->name); @@ -3600,17 +3626,16 @@ int processCommand(client *c) { * link with master. */ if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED && server.repl_serve_stale_data == 0 && - !(c->cmd->flags & CMD_STALE)) + is_denystale_command) { - flagTransaction(c); - addReply(c, shared.masterdownerr); + rejectCommand(c, shared.masterdownerr); return C_OK; } /* Loading DB? Return an error if the command has not the * CMD_LOADING flag. */ - if (server.loading && !(c->cmd->flags & CMD_LOADING)) { - addReply(c, shared.loadingerr); + if (server.loading && is_denyloading_command) { + rejectCommand(c, shared.loadingerr); return C_OK; } @@ -3625,7 +3650,6 @@ int processCommand(client *c) { c->cmd->proc != helloCommand && c->cmd->proc != replconfCommand && c->cmd->proc != multiCommand && - c->cmd->proc != execCommand && c->cmd->proc != discardCommand && c->cmd->proc != watchCommand && c->cmd->proc != unwatchCommand && @@ -3636,8 +3660,7 @@ int processCommand(client *c) { c->argc == 2 && tolower(((char*)c->argv[1]->ptr)[0]) == 'k')) { - flagTransaction(c); - addReply(c, shared.slowscripterr); + rejectCommand(c, shared.slowscripterr); return C_OK; } diff --git a/src/server.h b/src/server.h index a08585292..b887afee1 100644 --- a/src/server.h +++ b/src/server.h @@ -666,6 +666,9 @@ typedef struct multiState { int cmd_flags; /* The accumulated command flags OR-ed together. So if at least a command has a given flag, it will be set in this field. */ + int cmd_inv_flags; /* Same as cmd_flags, OR-ing the ~flags. so that it + is possible to know if all the commands have a + certain flag. */ int minreplicas; /* MINREPLICAS for synchronous replication */ time_t minreplicas_timeout; /* MINREPLICAS timeout as unixtime. */ } multiState; @@ -1626,6 +1629,7 @@ void addReplyBulkLongLong(client *c, long long ll); void addReply(client *c, robj *obj); void addReplySds(client *c, sds s); void addReplyBulkSds(client *c, sds s); +void addReplyErrorSafe(client *c, char *s, size_t len); void addReplyError(client *c, const char *err); void addReplyStatus(client *c, const char *status); void addReplyDouble(client *c, double d); @@ -1724,6 +1728,7 @@ void touchWatchedKey(redisDb *db, robj *key); void touchWatchedKeysOnFlush(int dbid); void discardTransaction(client *c); void flagTransaction(client *c); +void execCommandAbort(client *c, sds error); void execCommandPropagateMulti(client *c); void execCommandPropagateExec(client *c); diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index fba54acb5..ef9bf7fdf 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -196,6 +196,21 @@ proc redis_deferring_client {args} { return $client } +proc redis_client {args} { + set level 0 + if {[llength $args] > 0 && [string is integer [lindex $args 0]]} { + set level [lindex $args 0] + set args [lrange $args 1 end] + } + + # create client that defers reading reply + set client [redis [srv $level "host"] [srv $level "port"] 0 $::tls] + + # select the right db and read the response (OK) + $client select 9 + return $client +} + # Provide easy access to INFO properties. Same semantic as "proc r". proc s {args} { set level 0 diff --git a/tests/unit/multi.tcl b/tests/unit/multi.tcl index 0c70fbde7..44a822ba6 100644 --- a/tests/unit/multi.tcl +++ b/tests/unit/multi.tcl @@ -325,74 +325,145 @@ start_server {tags {"multi"}} { # check that if MULTI arrives during timeout, it is either refused, or # allowed to pass, and we don't end up executing half of the transaction set rd1 [redis_deferring_client] - set rd2 [redis_deferring_client] + set r2 [redis_client] r config set lua-time-limit 10 r set xx 1 $rd1 eval {while true do end} 0 after 200 - catch { $rd2 multi; $rd2 read } e - catch { $rd2 incr xx; $rd2 read } e + catch { $r2 multi; } e + catch { $r2 incr xx; } e r script kill after 200 ; # Give some time to Lua to call the hook again... - catch { $rd2 incr xx; $rd2 read } e - catch { $rd2 exec; $rd2 read } e + catch { $r2 incr xx; } e + catch { $r2 exec; } e + assert_match {EXECABORT*previous errors*} $e set xx [r get xx] # make sure that either the whole transcation passed or none of it (we actually expect none) assert { $xx == 1 || $xx == 3} # check that the connection is no longer in multi state - $rd2 ping asdf - set pong [$rd2 read] + set pong [$r2 ping asdf] assert_equal $pong "asdf" + $rd1 close; $r2 close } test {EXEC and script timeout} { # check that if EXEC arrives during timeout, we don't end up executing # half of the transaction, and also that we exit the multi state set rd1 [redis_deferring_client] - set rd2 [redis_deferring_client] + set r2 [redis_client] r config set lua-time-limit 10 r set xx 1 - catch { $rd2 multi; $rd2 read } e - catch { $rd2 incr xx; $rd2 read } e + catch { $r2 multi; } e + catch { $r2 incr xx; } e $rd1 eval {while true do end} 0 after 200 - catch { $rd2 incr xx; $rd2 read } e - catch { $rd2 exec; $rd2 read } e + catch { $r2 incr xx; } e + catch { $r2 exec; } e + assert_match {EXECABORT*BUSY*} $e r script kill after 200 ; # Give some time to Lua to call the hook again... set xx [r get xx] # make sure that either the whole transcation passed or none of it (we actually expect none) assert { $xx == 1 || $xx == 3} - # Discard the transaction since EXEC likely got -BUSY error - # so the client is still in MULTI state. - catch { $rd2 discard ;$rd2 read } e # check that the connection is no longer in multi state - $rd2 ping asdf - set pong [$rd2 read] + set pong [$r2 ping asdf] assert_equal $pong "asdf" + $rd1 close; $r2 close } test {MULTI-EXEC body and script timeout} { # check that we don't run an imcomplete transaction due to some commands # arriving during busy script set rd1 [redis_deferring_client] - set rd2 [redis_deferring_client] + set r2 [redis_client] r config set lua-time-limit 10 r set xx 1 - catch { $rd2 multi; $rd2 read } e - catch { $rd2 incr xx; $rd2 read } e + catch { $r2 multi; } e + catch { $r2 incr xx; } e $rd1 eval {while true do end} 0 after 200 - catch { $rd2 incr xx; $rd2 read } e + catch { $r2 incr xx; } e r script kill after 200 ; # Give some time to Lua to call the hook again... - catch { $rd2 exec; $rd2 read } e + catch { $r2 exec; } e + assert_match {EXECABORT*previous errors*} $e set xx [r get xx] # make sure that either the whole transcation passed or none of it (we actually expect none) assert { $xx == 1 || $xx == 3} # check that the connection is no longer in multi state - $rd2 ping asdf - set pong [$rd2 read] + set pong [$r2 ping asdf] assert_equal $pong "asdf" + $rd1 close; $r2 close + } + + test {just EXEC and script timeout} { + # check that if EXEC arrives during timeout, we don't end up executing + # actual commands during busy script, and also that we exit the multi state + set rd1 [redis_deferring_client] + set r2 [redis_client] + r config set lua-time-limit 10 + r set xx 1 + catch { $r2 multi; } e + catch { $r2 incr xx; } e + $rd1 eval {while true do end} 0 + after 200 + catch { $r2 exec; } e + assert_match {EXECABORT*BUSY*} $e + r script kill + after 200 ; # Give some time to Lua to call the hook again... + set xx [r get xx] + # make we didn't execute the transaction + assert { $xx == 1} + # check that the connection is no longer in multi state + set pong [$r2 ping asdf] + assert_equal $pong "asdf" + $rd1 close; $r2 close + } + + test {exec with write commands and state change} { + # check that exec that contains write commands fails if server state changed since they were queued + set r1 [redis_client] + r set xx 1 + r multi + r incr xx + $r1 config set min-replicas-to-write 2 + catch {r exec} e + assert_match {*EXECABORT*NOREPLICAS*} $e + set xx [r get xx] + # make sure that the INCR wasn't executed + assert { $xx == 1} + $r1 config set min-replicas-to-write 0 + $r1 close; + } + + test {exec with read commands and stale replica state change} { + # check that exec that contains read commands fails if server state changed since they were queued + r config set replica-serve-stale-data no + set r1 [redis_client] + r set xx 1 + + # check that GET is disallowed on stale replica, even if the replica becomes stale only after queuing. + r multi + r get xx + $r1 replicaof localhsot 0 + catch {r exec} e + assert_match {*EXECABORT*MASTERDOWN*} $e + + # check that PING is allowed + r multi + r ping + $r1 replicaof localhsot 0 + set pong [r exec] + assert {$pong == "PONG"} + + # check that when replica is not stale, GET is allowed + # while we're at it, let's check that multi is allowed on stale replica too + r multi + $r1 replicaof no one + r get xx + set xx [r exec] + # make sure that the INCR was executed + assert { $xx == 1 } + $r1 close; } }