Add XAUTOCLAIM (#7973)

New command: XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID]

The purpose is to claim entries from a stale consumer without the usual
XPENDING+XCLAIM combo which takes two round trips.

The syntax for XAUTOCLAIM is similar to scan: A cursor is returned (streamID)
by each call and should be used as start for the next call. 0-0 means the scan is complete.

This PR extends the deferred reply mechanism for any bulk string (not just counts)

This PR carries some unrelated test code changes:
- Renames the term "client" into "consumer" in the stream-cgroups test
- And also changes DEBUG SLEEP into "after"

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
guybe7 2021-01-06 10:34:27 +02:00 committed by GitHub
parent f2bde2268a
commit 68463baf82
5 changed files with 347 additions and 59 deletions

View File

@ -559,13 +559,9 @@ void *addReplyDeferredLen(client *c) {
return listLast(c->reply); return listLast(c->reply);
} }
/* Populate the length object and try gluing it to the next chunk. */ void setDeferredReply(client *c, void *node, const char *s, size_t length) {
void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
serverAssert(length >= 0);
listNode *ln = (listNode*)node; listNode *ln = (listNode*)node;
clientReplyBlock *next; clientReplyBlock *next;
char lenstr[128];
size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
/* Abort when *node is NULL: when the client should not accept writes /* Abort when *node is NULL: when the client should not accept writes
* we return NULL in addReplyDeferredLen() */ * we return NULL in addReplyDeferredLen() */
@ -583,25 +579,39 @@ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
* - It has enough room already allocated * - It has enough room already allocated
* - And not too large (avoid large memmove) */ * - And not too large (avoid large memmove) */
if (ln->next != NULL && (next = listNodeValue(ln->next)) && if (ln->next != NULL && (next = listNodeValue(ln->next)) &&
next->size - next->used >= lenstr_len && next->size - next->used >= length &&
next->used < PROTO_REPLY_CHUNK_BYTES * 4) { next->used < PROTO_REPLY_CHUNK_BYTES * 4)
memmove(next->buf + lenstr_len, next->buf, next->used); {
memcpy(next->buf, lenstr, lenstr_len); memmove(next->buf + length, next->buf, next->used);
next->used += lenstr_len; memcpy(next->buf, s, length);
next->used += length;
listDelNode(c->reply,ln); listDelNode(c->reply,ln);
} else { } else {
/* Create a new node */ /* Create a new node */
clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock)); clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock));
/* Take over the allocation's internal fragmentation */ /* Take over the allocation's internal fragmentation */
buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock); buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock);
buf->used = lenstr_len; buf->used = length;
memcpy(buf->buf, lenstr, lenstr_len); memcpy(buf->buf, s, length);
listNodeValue(ln) = buf; listNodeValue(ln) = buf;
c->reply_bytes += buf->size; c->reply_bytes += buf->size;
} }
asyncCloseClientOnOutputBufferLimitReached(c); asyncCloseClientOnOutputBufferLimitReached(c);
} }
/* Populate the length object and try gluing it to the next chunk. */
void setDeferredAggregateLen(client *c, void *node, long length, char prefix) {
serverAssert(length >= 0);
/* Abort when *node is NULL: when the client should not accept writes
* we return NULL in addReplyDeferredLen() */
if (node == NULL) return;
char lenstr[128];
size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
setDeferredReply(c, node, lenstr, lenstr_len);
}
void setDeferredArrayLen(client *c, void *node, long length) { void setDeferredArrayLen(client *c, void *node, long length) {
setDeferredAggregateLen(c,node,length,'*'); setDeferredAggregateLen(c,node,length,'*');
} }
@ -797,6 +807,14 @@ void addReplyBulkSds(client *c, sds s) {
addReply(c,shared.crlf); addReply(c,shared.crlf);
} }
/* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */
void setDeferredReplyBulkSds(client *c, void *node, sds s) {
sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s);
setDeferredReply(c, node, reply, sdslen(reply));
sdsfree(reply);
sdsfree(s);
}
/* Add a C null term string as bulk reply */ /* Add a C null term string as bulk reply */
void addReplyBulkCString(client *c, const char *s) { void addReplyBulkCString(client *c, const char *s) {
if (s == NULL) { if (s == NULL) {

View File

@ -1027,6 +1027,10 @@ struct redisCommand redisCommandTable[] = {
"write random fast @stream", "write random fast @stream",
0,NULL,1,1,1,0,0,0}, 0,NULL,1,1,1,0,0,0},
{"xautoclaim",xautoclaimCommand,-6,
"write random fast @stream",
0,NULL,1,1,1,0,0,0},
{"xinfo",xinfoCommand,-2, {"xinfo",xinfoCommand,-2,
"read-only random @stream", "read-only random @stream",
0,NULL,2,2,1,0,0,0}, 0,NULL,2,2,1,0,0,0},

View File

@ -1759,6 +1759,7 @@ void addReplyBulkLongLong(client *c, long long ll);
void addReply(client *c, robj *obj); void addReply(client *c, robj *obj);
void addReplySds(client *c, sds s); void addReplySds(client *c, sds s);
void addReplyBulkSds(client *c, sds s); void addReplyBulkSds(client *c, sds s);
void setDeferredReplyBulkSds(client *c, void *node, sds s);
void addReplyErrorObject(client *c, robj *err); void addReplyErrorObject(client *c, robj *err);
void addReplyErrorSds(client *c, sds err); void addReplyErrorSds(client *c, sds err);
void addReplyError(client *c, const char *err); void addReplyError(client *c, const char *err);
@ -2584,6 +2585,7 @@ void xsetidCommand(client *c);
void xackCommand(client *c); void xackCommand(client *c);
void xpendingCommand(client *c); void xpendingCommand(client *c);
void xclaimCommand(client *c); void xclaimCommand(client *c);
void xautoclaimCommand(client *c);
void xinfoCommand(client *c); void xinfoCommand(client *c);
void xdelCommand(client *c); void xdelCommand(client *c);
void xtrimCommand(client *c); void xtrimCommand(client *c);

View File

@ -960,6 +960,11 @@ void addReplyStreamID(client *c, streamID *id) {
addReplyBulkSds(c,replyid); addReplyBulkSds(c,replyid);
} }
void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq);
setDeferredReplyBulkSds(c, dr, replyid);
}
/* Similar to the above function, but just creates an object, usually useful /* Similar to the above function, but just creates an object, usually useful
* for replication purposes to create arguments. */ * for replication purposes to create arguments. */
robj *createObjectFromStreamID(streamID *id) { robj *createObjectFromStreamID(streamID *id) {
@ -2666,6 +2671,160 @@ cleanup:
if (ids != static_ids) zfree(ids); if (ids != static_ids) zfree(ids);
} }
/* XAUTOCLAIM <key> <group> <consumer> <min-idle-time> <start> [COUNT <count>] [JUSTID]
*
* Gets ownership of one or multiple messages in the Pending Entries List
* of a given stream consumer group.
*
* For each PEL entry, if its idle time greater or equal to <min-idle-time>,
* then the message new owner becomes the specified <consumer>.
* If the minimum idle time specified is zero, messages are claimed
* regardless of their idle time.
*
* This command creates the consumer as side effect if it does not yet
* exists. Moreover the command reset the idle time of the message to 0.
*
* The command returns an array of messages that the user
* successfully claimed, so that the caller is able to understand
* what messages it is now in charge of. */
void xautoclaimCommand(client *c) {
streamCG *group = NULL;
robj *o = lookupKeyRead(c->db,c->argv[1]);
long long minidle; /* Minimum idle time argument, in milliseconds. */
long count = 100; /* Maximum entries to claim. */
streamID startid;
int startex;
int justid = 0;
/* Parse idle/start/end/count arguments ASAP if needed, in order to report
* syntax errors before any other error. */
if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK)
return;
if (minidle < 0) minidle = 0;
if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK)
return;
if (startex && streamIncrID(&startid) != C_OK) {
addReplyError(c,"invalid start ID for the interval");
return;
}
int j = 6; /* options start at argv[6] */
while(j < c->argc) {
int moreargs = (c->argc-1) - j; /* Number of additional arguments. */
char *opt = c->argv[j]->ptr;
if (!strcasecmp(opt,"COUNT") && moreargs) {
if (getPositiveLongFromObjectOrReply(c,c->argv[j+1],&count,NULL) != C_OK)
return;
if (count == 0) {
addReplyError(c,"COUNT must be > 0");
return;
}
j++;
} else if (!strcasecmp(opt,"JUSTID")) {
justid = 1;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
j++;
}
if (o) {
if (checkType(c,o,OBJ_STREAM))
return; /* Type error. */
group = streamLookupCG(o->ptr,c->argv[2]->ptr);
}
/* No key or group? Send an error given that the group creation
* is mandatory. */
if (o == NULL || group == NULL) {
addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'",
(char*)c->argv[1]->ptr,
(char*)c->argv[2]->ptr);
return;
}
/* Do the actual claiming. */
streamConsumer *consumer = NULL;
long long attempts = count*10;
addReplyArrayLen(c, 2);
void *endidptr = addReplyDeferredLen(c);
void *arraylenptr = addReplyDeferredLen(c);
unsigned char startkey[sizeof(streamID)];
streamEncodeID(startkey,&startid);
raxIterator ri;
raxStart(&ri,group->pel);
raxSeek(&ri,">=",startkey,sizeof(startkey));
size_t arraylen = 0;
mstime_t now = mstime();
while (attempts-- && count && raxNext(&ri)) {
streamNACK *nack = ri.data;
if (minidle) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle)
continue;
}
streamID id;
streamDecodeID(ri.key, &id);
if (consumer == NULL)
consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL);
if (nack->consumer != consumer) {
/* Remove the entry from the old consumer.
* Note that nack->consumer is NULL if we created the
* NACK above because of the FORCE option. */
if (nack->consumer)
raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
}
/* Update the consumer and idle time. */
nack->delivery_time = now;
nack->delivery_count++;
if (nack->consumer != consumer) {
/* Add the entry in the new consumer local PEL. */
raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL);
nack->consumer = consumer;
}
/* Send the reply for this entry. */
if (justid) {
addReplyStreamID(c,&id);
} else {
size_t emitted =
streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
if (!emitted)
addReplyNull(c);
}
arraylen++;
count--;
/* Propagate this change. */
robj *idstr = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
decrRefCount(idstr);
server.dirty++;
}
streamID endid;
if (raxEOF(&ri)) {
endid.ms = endid.seq = 0;
} else {
streamDecodeID(ri.key, &endid);
}
raxStop(&ri);
setDeferredArrayLen(c,arraylenptr,arraylen);
setDeferredReplyStreamID(c,endidptr,&endid);
preventCommandPropagation(c);
}
/* XDEL <key> [<ID1> <ID2> ... <IDN>] /* XDEL <key> [<ID1> <ID2> ... <IDN>]
* *

View File

@ -27,7 +27,7 @@ start_server {
# and not the element "foo bar" which was pre existing in the # and not the element "foo bar" which was pre existing in the
# stream (see previous test) # stream (see previous test)
set reply [ set reply [
r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">" r XREADGROUP GROUP mygroup consumer-1 STREAMS mystream ">"
] ]
assert {[llength [lindex $reply 0 1]] == 2} assert {[llength [lindex $reply 0 1]] == 2}
lindex $reply 0 1 0 1 lindex $reply 0 1 0 1
@ -39,13 +39,13 @@ start_server {
r XADD mystream * d 4 r XADD mystream * d 4
# Read a few elements using a different consumer name # Read a few elements using a different consumer name
set reply [ set reply [
r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">" r XREADGROUP GROUP mygroup consumer-2 STREAMS mystream ">"
] ]
assert {[llength [lindex $reply 0 1]] == 2} assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {c 3}} assert {[lindex $reply 0 1 0 1] eq {c 3}}
set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0] set r1 [r XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS mystream 0]
set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0] set r2 [r XREADGROUP GROUP mygroup consumer-2 COUNT 10 STREAMS mystream 0]
assert {[lindex $r1 0 1 0 1] eq {a 1}} assert {[lindex $r1 0 1 0 1] eq {a 1}}
assert {[lindex $r2 0 1 0 1] eq {c 3}} assert {[lindex $r2 0 1 0 1] eq {c 3}}
} }
@ -56,9 +56,9 @@ start_server {
for {set j 0} {$j < 4} {incr j} { for {set j 0} {$j < 4} {incr j} {
set item [lindex $pending $j] set item [lindex $pending $j]
if {$j < 2} { if {$j < 2} {
set owner client-1 set owner consumer-1
} else { } else {
set owner client-2 set owner consumer-2
} }
assert {[lindex $item 1] eq $owner} assert {[lindex $item 1] eq $owner}
assert {[lindex $item 1] eq $owner} assert {[lindex $item 1] eq $owner}
@ -66,7 +66,7 @@ start_server {
} }
test {XPENDING can return single consumer items} { test {XPENDING can return single consumer items} {
set pending [r XPENDING mystream mygroup - + 10 client-1] set pending [r XPENDING mystream mygroup - + 10 consumer-1]
assert {[llength $pending] == 2} assert {[llength $pending] == 2}
} }
@ -77,9 +77,9 @@ start_server {
test {XPENDING with IDLE} { test {XPENDING with IDLE} {
after 20 after 20
set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 client-1] set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 consumer-1]
assert {[llength $pending] == 0} assert {[llength $pending] == 0}
set pending [r XPENDING mystream mygroup IDLE 1 - + 10 client-1] set pending [r XPENDING mystream mygroup IDLE 1 - + 10 consumer-1]
assert {[llength $pending] == 2} assert {[llength $pending] == 2}
set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10] set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10]
assert {[llength $pending] == 0} assert {[llength $pending] == 0}
@ -101,12 +101,12 @@ start_server {
} }
} }
test {XACK is able to remove items from the client/group PEL} { test {XACK is able to remove items from the consumer/group PEL} {
set pending [r XPENDING mystream mygroup - + 10 client-1] set pending [r XPENDING mystream mygroup - + 10 consumer-1]
set id1 [lindex $pending 0 0] set id1 [lindex $pending 0 0]
set id2 [lindex $pending 1 0] set id2 [lindex $pending 1 0]
assert {[r XACK mystream mygroup $id1] eq 1} assert {[r XACK mystream mygroup $id1] eq 1}
set pending [r XPENDING mystream mygroup - + 10 client-1] set pending [r XPENDING mystream mygroup - + 10 consumer-1]
assert {[llength $pending] == 1} assert {[llength $pending] == 1}
set id [lindex $pending 0 0] set id [lindex $pending 0 0]
assert {$id eq $id2} assert {$id eq $id2}
@ -242,52 +242,52 @@ start_server {
set id3 [r XADD mystream * c 3] set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0 r XGROUP CREATE mystream mygroup 0
# Client 1 reads item 1 from the stream without acknowledgements. # Consumer 1 reads item 1 from the stream without acknowledgements.
# Client 2 then claims pending item 1 from the PEL of client 1 # Consumer 2 then claims pending item 1 from the PEL of consumer 1
set reply [ set reply [
r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >
] ]
assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}} assert {[lindex $reply 0 1 0 1] eq {a 1}}
# make sure the entry is present in both the gorup, and the right consumer # make sure the entry is present in both the gorup, and the right consumer
assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} assert {[llength [r XPENDING mystream mygroup - + 10]] == 1}
assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 1} assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 1}
assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 0} assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 0}
r debug sleep 0.2 after 200
set reply [ set reply [
r XCLAIM mystream mygroup client2 10 $id1 r XCLAIM mystream mygroup consumer2 10 $id1
] ]
assert {[llength [lindex $reply 0 1]] == 2} assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1] eq {a 1}} assert {[lindex $reply 0 1] eq {a 1}}
# make sure the entry is present in both the gorup, and the right consumer # make sure the entry is present in both the gorup, and the right consumer
assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} assert {[llength [r XPENDING mystream mygroup - + 10]] == 1}
assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 0} assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 0}
assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 1} assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 1}
# Client 1 reads another 2 items from stream # Consumer 1 reads another 2 items from stream
r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream > r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream >
r debug sleep 0.2 after 200
# Delete item 2 from the stream. Now client 1 has PEL that contains # Delete item 2 from the stream. Now consumer 1 has PEL that contains
# only item 3. Try to use client 2 to claim the deleted item 2 # only item 3. Try to use consumer 2 to claim the deleted item 2
# from the PEL of client 1, this should return nil # from the PEL of consumer 1, this should return nil
r XDEL mystream $id2 r XDEL mystream $id2
set reply [ set reply [
r XCLAIM mystream mygroup client2 10 $id2 r XCLAIM mystream mygroup consumer2 10 $id2
] ]
assert {[llength $reply] == 1} assert {[llength $reply] == 1}
assert_equal "" [lindex $reply 0] assert_equal "" [lindex $reply 0]
# Delete item 3 from the stream. Now client 1 has PEL that is empty. # Delete item 3 from the stream. Now consumer 1 has PEL that is empty.
# Try to use client 2 to claim the deleted item 3 from the PEL # Try to use consumer 2 to claim the deleted item 3 from the PEL
# of client 1, this should return nil # of consumer 1, this should return nil
r debug sleep 0.2 after 200
r XDEL mystream $id3 r XDEL mystream $id3
set reply [ set reply [
r XCLAIM mystream mygroup client2 10 $id3 r XCLAIM mystream mygroup consumer2 10 $id3
] ]
assert {[llength $reply] == 1} assert {[llength $reply] == 1}
assert_equal "" [lindex $reply 0] assert_equal "" [lindex $reply 0]
@ -301,16 +301,16 @@ start_server {
set id3 [r XADD mystream * c 3] set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0 r XGROUP CREATE mystream mygroup 0
# Client 1 reads item 1 from the stream without acknowledgements. # Consumer 1 reads item 1 from the stream without acknowledgements.
# Client 2 then claims pending item 1 from the PEL of client 1 # Consumer 2 then claims pending item 1 from the PEL of consumer 1
set reply [ set reply [
r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >
] ]
assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}} assert {[lindex $reply 0 1 0 1] eq {a 1}}
r debug sleep 0.2 after 200
set reply [ set reply [
r XCLAIM mystream mygroup client2 10 $id1 r XCLAIM mystream mygroup consumer2 10 $id1
] ]
assert {[llength [lindex $reply 0 1]] == 2} assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1] eq {a 1}} assert {[lindex $reply 0 1] eq {a 1}}
@ -321,10 +321,10 @@ start_server {
assert {[llength [lindex $reply 0]] == 4} assert {[llength [lindex $reply 0]] == 4}
assert {[lindex $reply 0 3] == 2} assert {[lindex $reply 0 3] == 2}
# Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID # Consumer 3 then claims pending item 1 from the PEL of consumer 2 using JUSTID
r debug sleep 0.2 after 200
set reply [ set reply [
r XCLAIM mystream mygroup client3 10 $id1 JUSTID r XCLAIM mystream mygroup consumer3 10 $id1 JUSTID
] ]
assert {[llength $reply] == 1} assert {[llength $reply] == 1}
assert {[lindex $reply 0] eq $id1} assert {[lindex $reply 0] eq $id1}
@ -344,17 +344,122 @@ start_server {
set id3 [r XADD mystream * c 3] set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0 r XGROUP CREATE mystream mygroup 0
set reply [r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >] set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >]
assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}} assert {[lindex $reply 0 1 0 1] eq {a 1}}
r debug sleep 0.2 after 200
# re-claim with the same consumer that already has it # re-claim with the same consumer that already has it
assert {[llength [r XCLAIM mystream mygroup client1 10 $id1]] == 1} assert {[llength [r XCLAIM mystream mygroup consumer1 10 $id1]] == 1}
# make sure the entry is still in the PEL # make sure the entry is still in the PEL
set reply [r XPENDING mystream mygroup - + 10] set reply [r XPENDING mystream mygroup - + 10]
assert {[llength $reply] == 1} assert {[llength $reply] == 1}
assert {[lindex $reply 0 1] eq {client1}} assert {[lindex $reply 0 1] eq {consumer1}}
}
test {XAUTOCLAIM can claim PEL items from another consumer} {
# Add 3 items into the stream, and create a consumer group
r del mystream
set id1 [r XADD mystream * a 1]
set id2 [r XADD mystream * b 2]
set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0
# Consumer 1 reads item 1 from the stream without acknowledgements.
# Consumer 2 then claims pending item 1 from the PEL of consumer 1
set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >]
assert_equal [llength [lindex $reply 0 1 0 1]] 2
assert_equal [lindex $reply 0 1 0 1] {a 1}
after 200
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 1]
assert_equal [llength $reply] 2
assert_equal [lindex $reply 0] $id1
assert_equal [llength [lindex $reply 1]] 1
assert_equal [llength [lindex $reply 1 0]] 2
assert_equal [llength [lindex $reply 1 0 1]] 2
assert_equal [lindex $reply 1 0 1] {a 1}
# Consumer 1 reads another 2 items from stream
r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream >
# For min-idle-time
after 200
# Delete item 2 from the stream. Now consumer 1 has PEL that contains
# only item 3. Try to use consumer 2 to claim the deleted item 2
# from the PEL of consumer 1, this should return nil
r XDEL mystream $id2
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2]
# id1 is self-claimed here but not id2 ('count' was set to 2)
assert_equal [llength $reply] 2
assert_equal [lindex $reply 0] $id2
assert_equal [llength [lindex $reply 1]] 2
assert_equal [llength [lindex $reply 1 0]] 2
assert_equal [llength [lindex $reply 1 0 1]] 2
assert_equal [lindex $reply 1 0 1] {a 1}
assert_equal [lindex $reply 1 1] ""
# Delete item 3 from the stream. Now consumer 1 has PEL that is empty.
# Try to use consumer 2 to claim the deleted item 3 from the PEL
# of consumer 1, this should return nil
after 200
r XDEL mystream $id3
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - JUSTID]
# id1 is self-claimed here but not id2 and id3 ('count' is default 100)
# we also test the JUSTID modifier here. note that, when using JUSTID,
# deleted entries are returned in reply (consistent with XCLAIM).
assert_equal [llength $reply] 2
assert_equal [lindex $reply 0] "0-0"
assert_equal [llength [lindex $reply 1]] 3
assert_equal [lindex $reply 1 0] $id1
assert_equal [lindex $reply 1 1] $id2
assert_equal [lindex $reply 1 2] $id3
}
test {XAUTOCLAIM as an iterator} {
# Add 5 items into the stream, and create a consumer group
r del mystream
set id1 [r XADD mystream * a 1]
set id2 [r XADD mystream * b 2]
set id3 [r XADD mystream * c 3]
set id4 [r XADD mystream * d 4]
set id5 [r XADD mystream * e 5]
r XGROUP CREATE mystream mygroup 0
# Read 5 messages into consumer1
r XREADGROUP GROUP mygroup consumer1 count 90 STREAMS mystream >
# For min-idle-time
after 200
# Claim 2 entries
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2]
assert_equal [llength $reply] 2
set cursor [lindex $reply 0]
assert_equal $cursor $id2
assert_equal [llength [lindex $reply 1]] 2
assert_equal [llength [lindex $reply 1 0 1]] 2
assert_equal [lindex $reply 1 0 1] {a 1}
# Claim 2 more entries
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2]
assert_equal [llength $reply] 2
set cursor [lindex $reply 0]
assert_equal $cursor $id4
assert_equal [llength [lindex $reply 1]] 2
assert_equal [llength [lindex $reply 1 0 1]] 2
assert_equal [lindex $reply 1 0 1] {c 3}
# Claim last entry
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2]
assert_equal [llength $reply] 2
set cursor [lindex $reply 0]
assert_equal $cursor {0-0}
assert_equal [llength [lindex $reply 1]] 1
assert_equal [llength [lindex $reply 1 0 1]] 2
assert_equal [lindex $reply 1 0 1] {e 5}
} }
test {XINFO FULL output} { test {XINFO FULL output} {
@ -477,7 +582,7 @@ start_server {
assert {$curr_grpinfo == $grpinfo} assert {$curr_grpinfo == $grpinfo}
set n_consumers [lindex $grpinfo 3] set n_consumers [lindex $grpinfo 3]
# Bob should be created only when there will be new data for this client # Bob should be created only when there will be new data for this consumer
assert_equal $n_consumers 2 assert_equal $n_consumers 2
set reply [r xinfo consumers mystream mygroup] set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0] set consumer_info [lindex $reply 0]