From 37f45d9e5677e75bc396af5e3d8864125f026969 Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Tue, 8 Dec 2020 11:43:00 +0200 Subject: [PATCH] Adds exclusive range query intervals to XPENDING (#8130) --- src/t_stream.c | 19 ++++++++++++++++--- tests/unit/type/stream-cgroups.tcl | 14 ++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 684328de2..d61fb3eab 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2231,8 +2231,9 @@ void xpendingCommand(client *c) { robj *groupname = c->argv[2]; robj *consumername = NULL; streamID startid, endid; - long long count; + long long count = 0; long long minidle = 0; + int startex = 0, endex = 0; /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */ if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) { @@ -2256,13 +2257,25 @@ void xpendingCommand(client *c) { /* Search for rest of arguments after 'IDLE ' */ startidx += 2; } + + /* count argument. */ if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR) return; if (count < 0) count = 0; - if (streamParseIDOrReply(c,c->argv[startidx],&startid,0) == C_ERR) + + /* start and end arguments. */ + if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK) return; - if (streamParseIDOrReply(c,c->argv[startidx+1],&endid,UINT64_MAX) == C_ERR) + if (startex && streamIncrID(&startid) != C_OK) { + addReplyError(c,"invalid start ID for the interval"); return; + } + if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX) != C_OK) + return; + if (endex && streamDecrID(&endid) != C_OK) { + addReplyError(c,"invalid end ID for the interval"); + return; + } if (startidx+3 < c->argc) { /* 'consumer' was provided */ diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 1d1a68a35..91dc2245e 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -87,6 +87,20 @@ start_server { assert {[llength $pending] == 4} } + test {XPENDING with exclusive range intervals works as expected} { + set pending [r XPENDING mystream mygroup - + 10] + assert {[llength $pending] == 4} + set startid [lindex [lindex $pending 0] 0] + set endid [lindex [lindex $pending 3] 0] + set expending [r XPENDING mystream mygroup ($startid ($endid 10] + assert {[llength $expending] == 2} + for {set j 0} {$j < 2} {incr j} { + set itemid [lindex [lindex $expending $j] 0] + assert {$itemid ne $startid} + assert {$itemid ne $endid} + } + } + test {XACK is able to remove items from the client/group PEL} { set pending [r XPENDING mystream mygroup - + 10 client-1] set id1 [lindex $pending 0 0]