XPENDING with IDLE (#7972)

Used to filter stream pending entries by their idle-time,
useful for XCLAIMing entries that have not been processed
for some time
This commit is contained in:
guybe7 2020-11-29 11:08:47 +01:00 committed by GitHub
parent cb5eadb33b
commit ada2ac9ae2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 12 deletions

View File

@ -2136,26 +2136,27 @@ cleanup:
if (ids != static_ids) zfree(ids);
}
/* XPENDING <key> <group> [<start> <stop> <count> [<consumer>]]
/* XPENDING <key> <group> [[IDLE <idle>] <start> <stop> <count> [<consumer>]]
*
* If start and stop are omitted, the command just outputs information about
* the amount of pending messages for the key/group pair, together with
* the minimum and maximum ID of pending messages.
*
* If start and stop are provided instead, the pending messages are returned
* with informations about the current owner, number of deliveries and last
* with information about the current owner, number of deliveries and last
* delivery time and so forth. */
void xpendingCommand(client *c) {
int justinfo = c->argc == 3; /* Without the range just outputs general
informations about the PEL. */
robj *key = c->argv[1];
robj *groupname = c->argv[2];
robj *consumername = (c->argc == 7) ? c->argv[6] : NULL;
robj *consumername = NULL;
streamID startid, endid;
long long count;
long long minidle = 0;
/* Start and stop, and the consumer, can be omitted. */
if (c->argc != 3 && c->argc != 6 && c->argc != 7) {
/* Start and stop, and the consumer, can be omitted. Also the IDLE modifier. */
if (c->argc != 3 && (c->argc < 6 || c->argc > 9)) {
addReply(c,shared.syntaxerr);
return;
}
@ -2163,13 +2164,31 @@ void xpendingCommand(client *c) {
/* Parse start/end/count arguments ASAP if needed, in order to report
* syntax errors before any other error. */
if (c->argc >= 6) {
if (getLongLongFromObjectOrReply(c,c->argv[5],&count,NULL) == C_ERR)
int startidx = 3; /* Without IDLE */
if (!strcasecmp(c->argv[3]->ptr, "IDLE")) {
if (getLongLongFromObjectOrReply(c, c->argv[4], &minidle, NULL) == C_ERR)
return;
if (c->argc < 8) {
/* If IDLE was provided we must have at least 'start end count' */
addReply(c,shared.syntaxerr);
return;
}
/* Search for rest of arguments after 'IDLE <idle>' */
startidx += 2;
}
if (getLongLongFromObjectOrReply(c,c->argv[startidx+2],&count,NULL) == C_ERR)
return;
if (count < 0) count = 0;
if (streamParseIDOrReply(c,c->argv[3],&startid,0) == C_ERR)
if (streamParseIDOrReply(c,c->argv[startidx],&startid,0) == C_ERR)
return;
if (streamParseIDOrReply(c,c->argv[4],&endid,UINT64_MAX) == C_ERR)
if (streamParseIDOrReply(c,c->argv[startidx+1],&endid,UINT64_MAX) == C_ERR)
return;
if (startidx+3 < c->argc) {
/* 'consumer' was provided */
consumername = c->argv[startidx+3];
}
}
/* Lookup the key and the group inside the stream. */
@ -2228,9 +2247,7 @@ void xpendingCommand(client *c) {
setDeferredArrayLen(c,arraylen_ptr,arraylen);
raxStop(&ri);
}
}
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
else {
} else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */
streamConsumer *consumer = NULL;
if (consumername) {
consumer = streamLookupConsumer(group,
@ -2262,6 +2279,11 @@ void xpendingCommand(client *c) {
while(count && raxNext(&ri) && memcmp(ri.key,endkey,ri.key_len) <= 0) {
streamNACK *nack = ri.data;
if (minidle) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle) continue;
}
arraylen++;
count--;
addReplyArrayLen(c,4);
@ -2495,7 +2517,7 @@ void xclaimCommand(client *c) {
*
* Note that the nack could be created by FORCE, in this
* case there was no pre-existing entry and minidle should
* be ignored, but in that case nick->consumer is NULL. */
* be ignored, but in that case nack->consumer is NULL. */
if (nack->consumer && minidle) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle) continue;

View File

@ -70,6 +70,23 @@ start_server {
assert {[llength $pending] == 2}
}
test {XPENDING only group} {
set pending [r XPENDING mystream mygroup]
assert {[llength $pending] == 4}
}
test {XPENDING with IDLE} {
after 20
set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 client-1]
assert {[llength $pending] == 0}
set pending [r XPENDING mystream mygroup IDLE 1 - + 10 client-1]
assert {[llength $pending] == 2}
set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10]
assert {[llength $pending] == 0}
set pending [r XPENDING mystream mygroup IDLE 1 - + 10]
assert {[llength $pending] == 4}
}
test {XACK is able to remove items from the client/group PEL} {
set pending [r XPENDING mystream mygroup - + 10 client-1]
set id1 [lindex $pending 0 0]