diff --git a/src/t_stream.c b/src/t_stream.c index b25e54a02..e57916200 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2136,26 +2136,27 @@ cleanup: if (ids != static_ids) zfree(ids); } -/* XPENDING [ []] +/* XPENDING [[IDLE ] []] * * If start and stop are omitted, the command just outputs information about * the amount of pending messages for the key/group pair, together with * the minimum and maximum ID of pending messages. * * If start and stop are provided instead, the pending messages are returned - * with informations about the current owner, number of deliveries and last + * with information about the current owner, number of deliveries and last * delivery time and so forth. */ void xpendingCommand(client *c) { int justinfo = c->argc == 3; /* Without the range just outputs general informations about the PEL. */ robj *key = c->argv[1]; robj *groupname = c->argv[2]; - robj *consumername = (c->argc == 7) ? c->argv[6] : NULL; + robj *consumername = NULL; streamID startid, endid; long long count; + long long minidle = 0; - /* Start and stop, and the consumer, can be omitted. */ - if (c->argc != 3 && c->argc != 6 && c->argc != 7) { + /* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */ + if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) { addReply(c,shared.syntaxerr); return; } @@ -2163,13 +2164,31 @@ void xpendingCommand(client *c) { /* Parse start/end/count arguments ASAP if needed, in order to report * syntax errors before any other error. */ if (c->argc >= 6) { - if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR) + int startidx = 3; /* Without IDLE */ + + if (!strcasecmp(c->argv[3]->ptr, "IDLE")) { + if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL) == C_ERR) + return; + if (c->argc < 8) { + /* If IDLE was provided we must have at least 'start end count' */ + addReply(c,shared.syntaxerr); + return; + } + /* Search for rest of arguments after 'IDLE ' */ + startidx += 2; + } + if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR) return; if (count < 0) count = 0; - if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR) + if (streamParseIDOrReply(c,c->argv[startidx],&startid,0) == C_ERR) return; - if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR) + if (streamParseIDOrReply(c,c->argv[startidx+1],&endid,UINT64_MAX) == C_ERR) return; + + if (startidx+3 < c->argc) { + /* 'consumer' was provided */ + consumername = c->argv[startidx+3]; + } } /* Lookup the key and the group inside the stream. */ @@ -2228,9 +2247,7 @@ void xpendingCommand(client *c) { setDeferredArrayLen(c,arraylen_ptr,arraylen); raxStop(&ri); } - } - /* XPENDING [] variant. */ - else { + } else { /* , and provided, return actual pending entries (not just info) */ streamConsumer *consumer = NULL; if (consumername) { consumer = streamLookupConsumer(group, @@ -2262,6 +2279,11 @@ void xpendingCommand(client *c) { while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) { streamNACK *nack = ri.data; + if (minidle) { + mstime_t this_idle = now - nack->delivery_time; + if (this_idle < minidle) continue; + } + arraylen++; count--; addReplyArrayLen(c,4); @@ -2495,7 +2517,7 @@ void xclaimCommand(client *c) { * * Note that the nack could be created by FORCE, in this * case there was no pre-existing entry and minidle should - * be ignored, but in that case nick->consumer is NULL. */ + * be ignored, but in that case nack->consumer is NULL. */ if (nack->consumer && minidle) { mstime_t this_idle = now - nack->delivery_time; if (this_idle < minidle) continue; diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 74ddc98ba..41e2ab979 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -70,6 +70,23 @@ start_server { assert {[llength $pending] == 2} } + test {XPENDING only group} { + set pending [r XPENDING mystream mygroup] + assert {[llength $pending] == 4} + } + + test {XPENDING with IDLE} { + after 20 + set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 client-1] + assert {[llength $pending] == 0} + set pending [r XPENDING mystream mygroup IDLE 1 - + 10 client-1] + assert {[llength $pending] == 2} + set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10] + assert {[llength $pending] == 0} + set pending [r XPENDING mystream mygroup IDLE 1 - + 10] + assert {[llength $pending] == 4} + } + test {XACK is able to remove items from the client/group PEL} { set pending [r XPENDING mystream mygroup - + 10 client-1] set id1 [lindex $pending 0 0]