From d9f970d8d3f0b694f1e8915cab6d4eab9cfb2ef1 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Fri, 10 Jul 2020 10:25:55 +0300 Subject: [PATCH] TLS: Add missing redis-cli options. (#7456) * Tests: fix and reintroduce redis-cli tests. These tests have been broken and disabled for 10 years now! * TLS: add remaining redis-cli support. This adds support for the redis-cli --pipe, --rdb and --replica options previously unsupported in --tls mode. * Fix writeConn(). --- src/redis-cli.c | 102 +++++++++++++++++++--------- tests/integration/redis-cli.tcl | 115 ++++++++++++++++++++++++++------ tests/test_helper.tcl | 1 + 3 files changed, 166 insertions(+), 52 deletions(-) diff --git a/src/redis-cli.c b/src/redis-cli.c index c5ba48447..0148964bf 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -1989,6 +1989,7 @@ static void repl(void) { if (argv == NULL) { printf("Invalid argument(s)\n"); + fflush(stdout); linenoiseFree(line); continue; } else if (argc > 0) { @@ -6784,10 +6785,53 @@ void sendCapa() { sendReplconf("capa", "eof"); } +/* Wrapper around hiredis to allow arbitrary reads and writes. + * + * We piggybacks on top of hiredis to achieve transparent TLS support, + * and use its internal buffers so it can co-exist with commands + * previously/later issued on the connection. + * + * Interface is close to enough to read()/write() so things should mostly + * work transparently. + */ + +/* Write a raw buffer through a redisContext. If we already have something + * in the buffer (leftovers from hiredis operations) it will be written + * as well. + */ +static ssize_t writeConn(redisContext *c, const char *buf, size_t buf_len) +{ + int done = 0; + + c->obuf = sdscatlen(c->obuf, buf, buf_len); + if (redisBufferWrite(c, &done) == REDIS_ERR) { + sdsrange(c->obuf, 0, -(buf_len+1)); + if (!(c->flags & REDIS_BLOCK)) + errno = EAGAIN; + return -1; + } + + size_t left = sdslen(c->obuf); + sdsclear(c->obuf); + if (!done) { + return buf_len - left; + } + + return buf_len; +} + +/* Read raw bytes through a redisContext. The read operation is not greedy + * and may not fill the buffer entirely. + */ +static ssize_t readConn(redisContext *c, char *buf, size_t len) +{ + return c->funcs->read(c, buf, len); +} + /* Sends SYNC and reads the number of bytes in the payload. Used both by * slaveMode() and getRDB(). * returns 0 in case an EOF marker is used. */ -unsigned long long sendSync(int fd, char *out_eof) { +unsigned long long sendSync(redisContext *c, char *out_eof) { /* To start we need to send the SYNC command and return the payload. * The hiredis client lib does not understand this part of the protocol * and we don't want to mess with its buffers, so everything is performed @@ -6796,7 +6840,7 @@ unsigned long long sendSync(int fd, char *out_eof) { ssize_t nread; /* Send the SYNC command. */ - if (write(fd,"SYNC\r\n",6) != 6) { + if (writeConn(c, "SYNC\r\n", 6) != 6) { fprintf(stderr,"Error writing to master\n"); exit(1); } @@ -6804,7 +6848,7 @@ unsigned long long sendSync(int fd, char *out_eof) { /* Read $\r\n, making sure to read just up to "\n" */ p = buf; while(1) { - nread = read(fd,p,1); + nread = readConn(c,p,1); if (nread <= 0) { fprintf(stderr,"Error reading bulk length while SYNCing\n"); exit(1); @@ -6825,11 +6869,10 @@ unsigned long long sendSync(int fd, char *out_eof) { } static void slaveMode(void) { - int fd = context->fd; static char eofmark[RDB_EOF_MARK_SIZE]; static char lastbytes[RDB_EOF_MARK_SIZE]; static int usemark = 0; - unsigned long long payload = sendSync(fd, eofmark); + unsigned long long payload = sendSync(context,eofmark); char buf[1024]; int original_output = config.output; @@ -6849,7 +6892,7 @@ static void slaveMode(void) { while(payload) { ssize_t nread; - nread = read(fd,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); + nread = readConn(context,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); if (nread <= 0) { fprintf(stderr,"Error reading RDB payload while SYNCing\n"); exit(1); @@ -6892,14 +6935,15 @@ static void slaveMode(void) { /* This function implements --rdb, so it uses the replication protocol in order * to fetch the RDB file from a remote server. */ static void getRDB(clusterManagerNode *node) { - int s, fd; + int fd; + redisContext *s; char *filename; if (node != NULL) { assert(node->context); - s = node->context->fd; + s = node->context; filename = clusterManagerGetNodeRDBFilename(node); } else { - s = context->fd; + s = context; filename = config.rdb_filename; } static char eofmark[RDB_EOF_MARK_SIZE]; @@ -6934,7 +6978,7 @@ static void getRDB(clusterManagerNode *node) { while(payload) { ssize_t nread, nwritten; - nread = read(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); + nread = readConn(s,buf,(payload > sizeof(buf)) ? sizeof(buf) : payload); if (nread <= 0) { fprintf(stderr,"I/O Error reading RDB payload from socket\n"); exit(1); @@ -6968,7 +7012,7 @@ static void getRDB(clusterManagerNode *node) { } else { fprintf(stderr,"Transfer finished with success.\n"); } - close(s); /* Close the file descriptor ASAP as fsync() may take time. */ + redisFree(s); /* Close the file descriptor ASAP as fsync() may take time. */ fsync(fd); close(fd); fprintf(stderr,"Transfer finished with success.\n"); @@ -6985,11 +7029,9 @@ static void getRDB(clusterManagerNode *node) { #define PIPEMODE_WRITE_LOOP_MAX_BYTES (128*1024) static void pipeMode(void) { - int fd = context->fd; long long errors = 0, replies = 0, obuf_len = 0, obuf_pos = 0; - char ibuf[1024*16], obuf[1024*16]; /* Input and output buffers */ + char obuf[1024*16]; /* Output buffer */ char aneterr[ANET_ERR_LEN]; - redisReader *reader = redisReaderCreate(); redisReply *reply; int eof = 0; /* True once we consumed all the standard input. */ int done = 0; @@ -6999,47 +7041,38 @@ static void pipeMode(void) { srand(time(NULL)); /* Use non blocking I/O. */ - if (anetNonBlock(aneterr,fd) == ANET_ERR) { + if (anetNonBlock(aneterr,context->fd) == ANET_ERR) { fprintf(stderr, "Can't set the socket in non blocking mode: %s\n", aneterr); exit(1); } + context->flags &= ~REDIS_BLOCK; + /* Transfer raw protocol and read replies from the server at the same * time. */ while(!done) { int mask = AE_READABLE; if (!eof || obuf_len != 0) mask |= AE_WRITABLE; - mask = aeWait(fd,mask,1000); + mask = aeWait(context->fd,mask,1000); /* Handle the readable state: we can read replies from the server. */ if (mask & AE_READABLE) { - ssize_t nread; int read_error = 0; - /* Read from socket and feed the hiredis reader. */ do { - nread = read(fd,ibuf,sizeof(ibuf)); - if (nread == -1 && errno != EAGAIN && errno != EINTR) { - fprintf(stderr, "Error reading from the server: %s\n", - strerror(errno)); + if (!read_error && redisBufferRead(context) == REDIS_ERR) { read_error = 1; - break; } - if (nread > 0) { - redisReaderFeed(reader,ibuf,nread); - last_read_time = time(NULL); - } - } while(nread > 0); - /* Consume replies. */ - do { - if (redisReaderGetReply(reader,(void**)&reply) == REDIS_ERR) { + reply = NULL; + if (redisGetReply(context, (void **) &reply) == REDIS_ERR) { fprintf(stderr, "Error reading replies from server\n"); exit(1); } if (reply) { + last_read_time = time(NULL); if (reply->type == REDIS_REPLY_ERROR) { fprintf(stderr,"%s\n", reply->str); errors++; @@ -7072,7 +7105,7 @@ static void pipeMode(void) { while(1) { /* Transfer current buffer to server. */ if (obuf_len != 0) { - ssize_t nwritten = write(fd,obuf+obuf_pos,obuf_len); + ssize_t nwritten = writeConn(context,obuf+obuf_pos,obuf_len); if (nwritten == -1) { if (errno != EAGAIN && errno != EINTR) { @@ -7088,6 +7121,10 @@ static void pipeMode(void) { loop_nwritten += nwritten; if (obuf_len != 0) break; /* Can't accept more data. */ } + if (context->err) { + fprintf(stderr, "Server I/O Error: %s\n", context->errstr); + exit(1); + } /* If buffer is empty, load from stdin. */ if (obuf_len == 0 && !eof) { ssize_t nread = read(STDIN_FILENO,obuf,sizeof(obuf)); @@ -7138,7 +7175,6 @@ static void pipeMode(void) { break; } } - redisReaderFree(reader); printf("errors: %lld, replies: %lld\n", errors, replies); if (errors) exit(1); diff --git a/tests/integration/redis-cli.tcl b/tests/integration/redis-cli.tcl index 5d1635950..016e4915c 100644 --- a/tests/integration/redis-cli.tcl +++ b/tests/integration/redis-cli.tcl @@ -1,14 +1,13 @@ source tests/support/cli.tcl start_server {tags {"cli"}} { - proc open_cli {} { + proc open_cli {{opts "-n 9"}} { set ::env(TERM) dumb - set cmdline [rediscli [srv port] "-n 9"] + set cmdline [rediscli [srv port] $opts] set fd [open "|$cmdline" "r+"] fconfigure $fd -buffering none fconfigure $fd -blocking false fconfigure $fd -translation binary - assert_equal "redis> " [read_cli $fd] set _ $fd } @@ -32,11 +31,14 @@ start_server {tags {"cli"}} { } # Helpers to run tests in interactive mode + + proc format_output {output} { + set _ [string trimright [regsub -all "\r" $output ""] "\n"] + } + proc run_command {fd cmd} { write_cli $fd $cmd - set lines [split [read_cli $fd] "\n"] - assert_equal "redis> " [lindex $lines end] - join [lrange $lines 0 end-1] "\n" + set _ [format_output [read_cli $fd]] } proc test_interactive_cli {name code} { @@ -58,7 +60,7 @@ start_server {tags {"cli"}} { proc _run_cli {opts args} { set cmd [rediscli [srv port] [list -n 9 {*}$args]] - foreach {key value} $args { + foreach {key value} $opts { if {$key eq "pipe"} { set cmd "sh -c \"$value | $cmd\"" } @@ -72,7 +74,7 @@ start_server {tags {"cli"}} { fconfigure $fd -translation binary set resp [read $fd 1048576] close $fd - set _ $resp + set _ [format_output $resp] } proc run_cli {args} { @@ -80,11 +82,11 @@ start_server {tags {"cli"}} { } proc run_cli_with_input_pipe {cmd args} { - _run_cli [list pipe $cmd] {*}$args + _run_cli [list pipe $cmd] -x {*}$args } proc run_cli_with_input_file {path args} { - _run_cli [list path $path] {*}$args + _run_cli [list path $path] -x {*}$args } proc test_nontty_cli {name code} { @@ -101,7 +103,7 @@ start_server {tags {"cli"}} { test_interactive_cli "INFO response should be printed raw" { set lines [split [run_command $fd info] "\n"] foreach line $lines { - assert [regexp {^[a-z0-9_]+:[a-z0-9_]+} $line] + assert [regexp {^$|^#|^[a-z0-9_]+:.+} $line] } } @@ -121,7 +123,7 @@ start_server {tags {"cli"}} { test_interactive_cli "Multi-bulk reply" { r rpush list foo r rpush list bar - assert_equal "1. \"foo\"\n2. \"bar\"" [run_command $fd "lrange list 0 -1"] + assert_equal "1) \"foo\"\n2) \"bar\"" [run_command $fd "lrange list 0 -1"] } test_interactive_cli "Parsing quotes" { @@ -144,35 +146,35 @@ start_server {tags {"cli"}} { } test_tty_cli "Status reply" { - assert_equal "OK\n" [run_cli set key bar] + assert_equal "OK" [run_cli set key bar] assert_equal "bar" [r get key] } test_tty_cli "Integer reply" { r del counter - assert_equal "(integer) 1\n" [run_cli incr counter] + assert_equal "(integer) 1" [run_cli incr counter] } test_tty_cli "Bulk reply" { r set key "tab\tnewline\n" - assert_equal "\"tab\\tnewline\\n\"\n" [run_cli get key] + assert_equal "\"tab\\tnewline\\n\"" [run_cli get key] } test_tty_cli "Multi-bulk reply" { r del list r rpush list foo r rpush list bar - assert_equal "1. \"foo\"\n2. \"bar\"\n" [run_cli lrange list 0 -1] + assert_equal "1) \"foo\"\n2) \"bar\"" [run_cli lrange list 0 -1] } test_tty_cli "Read last argument from pipe" { - assert_equal "OK\n" [run_cli_with_input_pipe "echo foo" set key] + assert_equal "OK" [run_cli_with_input_pipe "echo foo" set key] assert_equal "foo\n" [r get key] } test_tty_cli "Read last argument from file" { set tmpfile [write_tmpfile "from file"] - assert_equal "OK\n" [run_cli_with_input_file $tmpfile set key] + assert_equal "OK" [run_cli_with_input_file $tmpfile set key] assert_equal "from file" [r get key] } @@ -188,7 +190,7 @@ start_server {tags {"cli"}} { test_nontty_cli "Bulk reply" { r set key "tab\tnewline\n" - assert_equal "tab\tnewline\n" [run_cli get key] + assert_equal "tab\tnewline" [run_cli get key] } test_nontty_cli "Multi-bulk reply" { @@ -208,4 +210,79 @@ start_server {tags {"cli"}} { assert_equal "OK" [run_cli_with_input_file $tmpfile set key] assert_equal "from file" [r get key] } + + proc test_redis_cli_rdb_dump {} { + r flushdb + + set dir [lindex [r config get dir] 1] + + assert_equal "OK" [r debug populate 100000 key 1000] + catch {run_cli --rdb "$dir/cli.rdb"} output + assert_match {*Transfer finished with success*} $output + + file delete "$dir/dump.rdb" + file rename "$dir/cli.rdb" "$dir/dump.rdb" + + assert_equal "OK" [r set should-not-exist 1] + assert_equal "OK" [r debug reload nosave] + assert_equal {} [r get should-not-exist] + } + + test_nontty_cli "Dumping an RDB" { + # Disk-based master + assert_match "OK" [r config set repl-diskless-sync no] + test_redis_cli_rdb_dump + + # Disk-less master + assert_match "OK" [r config set repl-diskless-sync yes] + assert_match "OK" [r config set repl-diskless-sync-delay 0] + test_redis_cli_rdb_dump + } + + test_nontty_cli "Connecting as a replica" { + set fd [open_cli "--replica"] + wait_for_condition 50 500 { + [string match {*slave0:*state=online*} [r info]] + } else { + fail "redis-cli --replica did not connect" + } + + for {set i 0} {$i < 100} {incr i} { + r set test-key test-value-$i + } + r client kill type slave + catch { + assert_match {*SET*key-a*} [read_cli $fd] + } + + close_cli $fd + } + + test_nontty_cli "Piping raw protocol" { + set fd [open_cli "--pipe"] + fconfigure $fd -blocking true + + # Create a new deferring client and overwrite its fd + set client [redis [srv 0 "host"] [srv 0 "port"] 1 0] + set ::redis::fd($::redis::id) $fd + $client select 9 + + r del test-counter + for {set i 0} {$i < 10000} {incr i} { + $client incr test-counter + $client set large-key [string repeat "x" 20000] + } + + for {set i 0} {$i < 1000} {incr i} { + $client set very-large-key [string repeat "x" 512000] + } + + close $fd write + set output [read_cli $fd] + + assert_equal {10000} [r get test-counter] + assert_match {*All data transferred*errors: 0*replies: 21001*} $output + + close_cli $fd + } } diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index ef9bf7fdf..1527aa1b5 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -48,6 +48,7 @@ set ::all_tests { integration/psync2 integration/psync2-reg integration/psync2-pingoff + integration/redis-cli unit/pubsub unit/slowlog unit/scripting