diff --git a/src/stream.h b/src/stream.h index 7e1a24417..1a1e271a8 100644 --- a/src/stream.h +++ b/src/stream.h @@ -116,7 +116,8 @@ streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); void streamFreeNACK(streamNACK *na); -void streamIncrID(streamID *id); +int streamIncrID(streamID *id); +int streamDecrID(streamID *id); void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername); robj *streamDup(robj *o); diff --git a/src/t_stream.c b/src/t_stream.c index e57916200..bb38b0fb8 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -76,12 +76,16 @@ unsigned long streamLength(const robj *subject) { return s->length; } -/* Set 'id' to be its successor streamID */ -void streamIncrID(streamID *id) { +/* Set 'id' to be its successor stream ID. + * If 'id' is the maximal possible id, it is wrapped around to 0-0 and a + * C_ERR is returned. */ +int streamIncrID(streamID *id) { + int ret = C_OK; if (id->seq == UINT64_MAX) { if (id->ms == UINT64_MAX) { /* Special case where 'id' is the last possible streamID... */ id->ms = id->seq = 0; + ret = C_ERR; } else { id->ms++; id->seq = 0; @@ -89,6 +93,27 @@ void streamIncrID(streamID *id) { } else { id->seq++; } + return ret; +} + +/* Set 'id' to be its predecessor stream ID. + * If 'id' is the minimal possible id, it remains 0-0 and a C_ERR is + * returned. */ +int streamDecrID(streamID *id) { + int ret = C_OK; + if (id->seq == 0) { + if (id->ms == 0) { + /* Special case where 'id' is the first possible streamID... */ + id->ms = id->seq = UINT64_MAX; + ret = C_ERR; + } else { + id->ms--; + id->seq = UINT64_MAX; + } + } else { + id->seq--; + } + return ret; } /* Generate the next stream item ID given the previous one. If the current @@ -1309,6 +1334,29 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin return streamGenericParseIDOrReply(c,o,id,missing_seq,1); } +/* Helper for parsing a stream ID that is a range query interval. When the + * exclude argument is NULL, streamParseIDOrReply() is called and the interval + * is treated as close (inclusive). Otherwise, the exclude argument is set if + * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is + * called in that case. + */ +int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, uint64_t missing_seq) { + char *p = o->ptr; + size_t len = sdslen(p); + int invalid = 0; + + if (exclude != NULL) *exclude = (len > 1 && p[0] == '('); + if (exclude != NULL && *exclude) { + robj *t = createStringObject(p+1,len-1); + invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq) == C_ERR); + decrRefCount(t); + } else + invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR); + if (invalid) + return C_ERR; + return C_OK; +} + /* We propagate MAXLEN ~ as MAXLEN = * otherwise trimming is no longer determinsitic on replicas / AOF. */ void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) { @@ -1433,7 +1481,13 @@ void xaddCommand(client *c) { signalKeyAsReady(c->db, c->argv[1], OBJ_STREAM); } -/* XRANGE/XREVRANGE actual implementation. */ +/* XRANGE/XREVRANGE actual implementation. + * The 'start' and 'end' IDs are parsed as follows: + * Incomplete 'start' has its sequence set to 0, and 'end' to UINT64_MAX. + * "-" and "+"" mean the minimal and maximal ID values, respectively. + * The "(" prefix means an open (exclusive) range, so XRANGE stream (1-0 (2-0 + * will match anything from 1-1 and 1-UINT64_MAX. + */ void xrangeGenericCommand(client *c, int rev) { robj *o; stream *s; @@ -1441,9 +1495,21 @@ void xrangeGenericCommand(client *c, int rev) { long long count = -1; robj *startarg = rev ? c->argv[3] : c->argv[2]; robj *endarg = rev ? c->argv[2] : c->argv[3]; - - if (streamParseIDOrReply(c,startarg,&startid,0) == C_ERR) return; - if (streamParseIDOrReply(c,endarg,&endid,UINT64_MAX) == C_ERR) return; + int startex = 0, endex = 0; + + /* Parse start and end IDs. */ + if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK) + return; + if (startex && streamIncrID(&startid) != C_OK) { + addReplyError(c,"invalid start ID for the interval"); + return; + } + if (streamParseIntervalIDOrReply(c,endarg,&endid,&endex,UINT64_MAX) != C_OK) + return; + if (endex && streamDecrID(&endid) != C_OK) { + addReplyError(c,"invalid end ID for the interval"); + return; + } /* Parse the COUNT option if any. */ if (c->argc > 4) { diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 4b3672d64..63cc697c2 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -192,6 +192,32 @@ start_server { assert {[r xrange mystream - +] == [lreverse [r xrevrange mystream + -]]} } + test {XRANGE exclusive ranges} { + set ids {0-1 0-18446744073709551615 1-0 42-0 42-42 + 18446744073709551615-18446744073709551614 + 18446744073709551615-18446744073709551615} + set total [llength $ids] + r multi + r DEL vipstream + foreach id $ids { + r XADD vipstream $id foo bar + } + r exec + assert {[llength [r xrange vipstream - +]] == $total} + assert {[llength [r xrange vipstream ([lindex $ids 0] +]] == $total-1} + assert {[llength [r xrange vipstream - ([lindex $ids $total-1]]] == $total-1} + assert {[llength [r xrange vipstream (0-1 (1-0]] == 1} + assert {[llength [r xrange vipstream (1-0 (42-42]] == 1} + catch {r xrange vipstream (- +} e + assert_match {ERR*} $e + catch {r xrange vipstream - (+} e + assert_match {ERR*} $e + catch {r xrange vipstream (18446744073709551615-18446744073709551615 +} e + assert_match {ERR*} $e + catch {r xrange vipstream - (0-0} e + assert_match {ERR*} $e + } + test {XREAD with non empty stream} { set res [r XREAD COUNT 1 STREAMS mystream 0-0] assert {[lrange [lindex $res 0 1 0 1] 0 1] eq {item 0}}