Adds exclusive range query intervals to XPENDING (#8130)
This commit is contained in:
parent
ec02c761aa
commit
37f45d9e56
@ -2231,8 +2231,9 @@ void xpendingCommand(client *c) {
|
|||||||
robj *groupname = c->argv[2];
|
robj *groupname = c->argv[2];
|
||||||
robj *consumername = NULL;
|
robj *consumername = NULL;
|
||||||
streamID startid, endid;
|
streamID startid, endid;
|
||||||
long long count;
|
long long count = 0;
|
||||||
long long minidle = 0;
|
long long minidle = 0;
|
||||||
|
int startex = 0, endex = 0;
|
||||||
|
|
||||||
/* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */
|
/* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */
|
||||||
if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) {
|
if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) {
|
||||||
@ -2256,13 +2257,25 @@ void xpendingCommand(client *c) {
|
|||||||
/* Search for rest of arguments after 'IDLE <idle>' */
|
/* Search for rest of arguments after 'IDLE <idle>' */
|
||||||
startidx += 2;
|
startidx += 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* count argument. */
|
||||||
if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR)
|
if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR)
|
||||||
return;
|
return;
|
||||||
if (count < 0) count = 0;
|
if (count < 0) count = 0;
|
||||||
if (streamParseIDOrReply(c,c->argv[startidx],&startid,0) == C_ERR)
|
|
||||||
|
/* start and end arguments. */
|
||||||
|
if (streamParseIntervalIDOrReply(c,c->argv[startidx],&startid,&startex,0) != C_OK)
|
||||||
return;
|
return;
|
||||||
if (streamParseIDOrReply(c,c->argv[startidx+1],&endid,UINT64_MAX) == C_ERR)
|
if (startex && streamIncrID(&startid) != C_OK) {
|
||||||
|
addReplyError(c,"invalid start ID for the interval");
|
||||||
return;
|
return;
|
||||||
|
}
|
||||||
|
if (streamParseIntervalIDOrReply(c,c->argv[startidx+1],&endid,&endex,UINT64_MAX) != C_OK)
|
||||||
|
return;
|
||||||
|
if (endex && streamDecrID(&endid) != C_OK) {
|
||||||
|
addReplyError(c,"invalid end ID for the interval");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (startidx+3 < c->argc) {
|
if (startidx+3 < c->argc) {
|
||||||
/* 'consumer' was provided */
|
/* 'consumer' was provided */
|
||||||
|
@ -87,6 +87,20 @@ start_server {
|
|||||||
assert {[llength $pending] == 4}
|
assert {[llength $pending] == 4}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {XPENDING with exclusive range intervals works as expected} {
|
||||||
|
set pending [r XPENDING mystream mygroup - + 10]
|
||||||
|
assert {[llength $pending] == 4}
|
||||||
|
set startid [lindex [lindex $pending 0] 0]
|
||||||
|
set endid [lindex [lindex $pending 3] 0]
|
||||||
|
set expending [r XPENDING mystream mygroup ($startid ($endid 10]
|
||||||
|
assert {[llength $expending] == 2}
|
||||||
|
for {set j 0} {$j < 2} {incr j} {
|
||||||
|
set itemid [lindex [lindex $expending $j] 0]
|
||||||
|
assert {$itemid ne $startid}
|
||||||
|
assert {$itemid ne $endid}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
test {XACK is able to remove items from the client/group PEL} {
|
test {XACK is able to remove items from the client/group PEL} {
|
||||||
set pending [r XPENDING mystream mygroup - + 10 client-1]
|
set pending [r XPENDING mystream mygroup - + 10 client-1]
|
||||||
set id1 [lindex $pending 0 0]
|
set id1 [lindex $pending 0 0]
|
||||||
|
Loading…
x
Reference in New Issue
Block a user