From 1f75ce30dfdb1c8c4df8474f9580f2aff032cfb4 Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Thu, 26 Dec 2019 15:31:37 +0530 Subject: [PATCH 1/2] Stream: Handle streamID-related edge cases This commit solves several edge cases that are related to exhausting the streamID limits: We should correctly calculate the succeeding streamID instead of blindly incrementing 'seq' This affects both XREAD and XADD. Other (unrelated) changes: Reply with a better error message when trying to add an entry to a stream that has exhausted last_id --- src/blocked.c | 2 +- src/stream.h | 1 + src/t_stream.c | 28 +++++++++++++++++++++++++--- tests/unit/type/stream.tcl | 27 +++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 4 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 20c0e760a..06aa5850e 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -388,7 +388,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { if (streamCompareID(&s->last_id, gt) > 0) { streamID start = *gt; - start.seq++; /* Can't overflow, it's an uint64_t */ + streamIncrID(&start); /* Lookup the consumer for the group, if any. */ streamConsumer *consumer = NULL; diff --git a/src/stream.h b/src/stream.h index 7de769ba1..b69073994 100644 --- a/src/stream.h +++ b/src/stream.h @@ -111,5 +111,6 @@ streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); void streamFreeNACK(streamNACK *na); +void streamIncrID(streamID *id); #endif diff --git a/src/t_stream.c b/src/t_stream.c index a499f7381..3d46ca0da 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -73,6 +73,21 @@ unsigned long streamLength(const robj *subject) { return s->length; } +/* Set 'id' to be its successor streamID */ +void streamIncrID(streamID *id) { + if (id->seq == UINT64_MAX) { + if (id->ms == UINT64_MAX) { + /* Special case where 'id' is the last possible streamID... */ + id->ms = id->seq = 0; + } else { + id->ms++; + id->seq = 0; + } + } else { + id->seq++; + } +} + /* Generate the next stream item ID given the previous one. If the current * milliseconds Unix time is greater than the previous one, just use this * as time part and start with sequence part of zero. Otherwise we use the @@ -83,8 +98,8 @@ void streamNextID(streamID *last_id, streamID *new_id) { new_id->ms = ms; new_id->seq = 0; } else { - new_id->ms = last_id->ms; - new_id->seq = last_id->seq+1; + *new_id = *last_id; + streamIncrID(new_id); } } @@ -1220,6 +1235,13 @@ void xaddCommand(client *c) { if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; s = o->ptr; + /* Return ASAP if the stream has reached the last possible ID */ + if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) { + addReplyError(c,"The stream has exhausted the last possible ID, " + "unable to add more items"); + return; + } + /* Append using the low level function and return the ID. */ if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, &id, id_given ? &id : NULL) @@ -1509,7 +1531,7 @@ void xreadCommand(client *c) { * so start from the next ID, since we want only messages with * IDs greater than start. */ streamID start = *gt; - start.seq++; /* uint64_t can't overflow in this context. */ + streamIncrID(&start); /* Emit the two elements sub-array consisting of the name * of the stream and the data we extracted from it. */ diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 656bac5de..9840e3b74 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -328,6 +328,33 @@ start_server { assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}} } + + test {XREAD streamID edge (no-blocking)} { + r del x + r XADD x 1-1 f v + r XADD x 1-18446744073709551615 f v + r XADD x 2-1 f v + set res [r XREAD BLOCK 0 STREAMS x 1-18446744073709551615] + assert {[lindex $res 0 1 0] == {2-1 {f v}}} + } + + test {XREAD streamID edge (blocking)} { + r del x + set rd [redis_deferring_client] + $rd XREAD BLOCK 0 STREAMS x 1-18446744073709551615 + r XADD x 1-1 f v + r XADD x 1-18446744073709551615 f v + r XADD x 2-1 f v + set res [$rd read] + assert {[lindex $res 0 1 0] == {2-1 {f v}}} + } + + test {XADD streamID edge} { + r del x + r XADD x 2577343934890-18446744073709551615 f v ;# we need the timestamp to be in the future + r XADD x * f2 v2 + assert_equal [r XRANGE x - +] {{2577343934890-18446744073709551615 {f v}} {2577343934891-0 {f2 v2}}} + } } start_server {tags {"stream"} overrides {appendonly yes}} { From 0c3fe52ef7e3acf135428be81dd8e27b4ef191e5 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Thu, 26 Dec 2019 13:59:58 +0200 Subject: [PATCH 2/2] config.c adjust config limits and mutable - make lua-replicate-commands mutable (it never was, but i don't see why) - make tcp-backlog immutable (fix a recent refactory mistake) - increase the max limit of a few configs to match what they were before the recent refactory --- src/config.c | 12 ++++++------ tests/unit/introspection.tcl | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/config.c b/src/config.c index 3d53e656b..9aa847183 100644 --- a/src/config.c +++ b/src/config.c @@ -2137,7 +2137,7 @@ standardConfig configs[] = { createBoolConfig("rdbchecksum", NULL, IMMUTABLE_CONFIG, server.rdb_checksum, 1, NULL, NULL), createBoolConfig("daemonize", NULL, IMMUTABLE_CONFIG, server.daemonize, 0, NULL, NULL), createBoolConfig("io-threads-do-reads", NULL, IMMUTABLE_CONFIG, server.io_threads_do_reads, 0,NULL, NULL), /* Read + parse from threads? */ - createBoolConfig("lua-replicate-commands", NULL, IMMUTABLE_CONFIG, server.lua_always_replicate_commands, 1, NULL, NULL), + createBoolConfig("lua-replicate-commands", NULL, MODIFIABLE_CONFIG, server.lua_always_replicate_commands, 1, NULL, NULL), createBoolConfig("always-show-logo", NULL, IMMUTABLE_CONFIG, server.always_show_logo, 0, NULL, NULL), createBoolConfig("protected-mode", NULL, MODIFIABLE_CONFIG, server.protected_mode, 1, NULL, NULL), createBoolConfig("rdbcompression", NULL, MODIFIABLE_CONFIG, server.rdb_compression, 1, NULL, NULL), @@ -2209,7 +2209,7 @@ standardConfig configs[] = { createIntConfig("maxmemory-samples", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.maxmemory_samples, 5, INTEGER_CONFIG, NULL, NULL), createIntConfig("timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.maxidletime, 0, INTEGER_CONFIG, NULL, NULL), /* Default client timeout: infinite */ createIntConfig("replica-announce-port", "slave-announce-port", MODIFIABLE_CONFIG, 0, 65535, server.slave_announce_port, 0, INTEGER_CONFIG, NULL, NULL), - createIntConfig("tcp-backlog", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tcp_backlog, 511, INTEGER_CONFIG, NULL, NULL), /* TCP listen backlog. */ + createIntConfig("tcp-backlog", NULL, IMMUTABLE_CONFIG, 0, INT_MAX, server.tcp_backlog, 511, INTEGER_CONFIG, NULL, NULL), /* TCP listen backlog. */ createIntConfig("cluster-announce-bus-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_bus_port, 0, INTEGER_CONFIG, NULL, NULL), /* Default: Use +10000 offset. */ createIntConfig("cluster-announce-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.cluster_announce_port, 0, INTEGER_CONFIG, NULL, NULL), /* Use server.port */ createIntConfig("repl-timeout", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_timeout, 60, INTEGER_CONFIG, NULL, NULL), @@ -2235,9 +2235,9 @@ standardConfig configs[] = { createLongLongConfig("cluster-node-timeout", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.cluster_node_timeout, 15000, INTEGER_CONFIG, NULL, NULL), createLongLongConfig("slowlog-log-slower-than", NULL, MODIFIABLE_CONFIG, -1, LLONG_MAX, server.slowlog_log_slower_than, 10000, INTEGER_CONFIG, NULL, NULL), createLongLongConfig("latency-monitor-threshold", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.latency_monitor_threshold, 0, INTEGER_CONFIG, NULL, NULL), - createLongLongConfig("proto-max-bulk-len", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */ - createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL), - createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, server.repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */ + createLongLongConfig("proto-max-bulk-len", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */ + createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, server.stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL), + createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */ /* Unsigned Long Long configs */ createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory), @@ -2246,7 +2246,7 @@ standardConfig configs[] = { createSizeTConfig("hash-max-ziplist-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hash_max_ziplist_entries, 512, INTEGER_CONFIG, NULL, NULL), createSizeTConfig("set-max-intset-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.set_max_intset_entries, 512, INTEGER_CONFIG, NULL, NULL), createSizeTConfig("zset-max-ziplist-entries", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.zset_max_ziplist_entries, 128, INTEGER_CONFIG, NULL, NULL), - createSizeTConfig("active-defrag-ignore-bytes", NULL, MODIFIABLE_CONFIG, 1, LONG_MAX, server.active_defrag_ignore_bytes, 100<<20, MEMORY_CONFIG, NULL, NULL), /* Default: don't defrag if frag overhead is below 100mb */ + createSizeTConfig("active-defrag-ignore-bytes", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, server.active_defrag_ignore_bytes, 100<<20, MEMORY_CONFIG, NULL, NULL), /* Default: don't defrag if frag overhead is below 100mb */ createSizeTConfig("hash-max-ziplist-value", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.hash_max_ziplist_value, 64, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("stream-node-max-bytes", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.stream_node_max_bytes, 4096, MEMORY_CONFIG, NULL, NULL), createSizeTConfig("zset-max-ziplist-value", NULL, MODIFIABLE_CONFIG, 0, LONG_MAX, server.zset_max_ziplist_value, 64, MEMORY_CONFIG, NULL, NULL), diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index 3bf3d367b..cd905084a 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -65,7 +65,7 @@ start_server {tags {"introspection"}} { rdbchecksum daemonize io-threads-do-reads - lua-replicate-commands + tcp-backlog always-show-logo syslog-enabled cluster-enabled