diff --git a/src/networking.c b/src/networking.c index 77b25d0b7..29e5605d9 100644 --- a/src/networking.c +++ b/src/networking.c @@ -559,13 +559,9 @@ void *addReplyDeferredLen(client *c) { return listLast(c->reply); } -/* Populate the length object and try gluing it to the next chunk. */ -void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { - serverAssert(length >= 0); +void setDeferredReply(client *c, void *node, const char *s, size_t length) { listNode *ln = (listNode*)node; clientReplyBlock *next; - char lenstr[128]; - size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length); /* Abort when *node is NULL: when the client should not accept writes * we return NULL in addReplyDeferredLen() */ @@ -583,25 +579,39 @@ void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { * - It has enough room already allocated * - And not too large (avoid large memmove) */ if (ln->next != NULL && (next = listNodeValue(ln->next)) && - next->size - next->used >= lenstr_len && - next->used < PROTO_REPLY_CHUNK_BYTES * 4) { - memmove(next->buf + lenstr_len, next->buf, next->used); - memcpy(next->buf, lenstr, lenstr_len); - next->used += lenstr_len; + next->size - next->used >= length && + next->used < PROTO_REPLY_CHUNK_BYTES * 4) + { + memmove(next->buf + length, next->buf, next->used); + memcpy(next->buf, s, length); + next->used += length; listDelNode(c->reply,ln); } else { /* Create a new node */ - clientReplyBlock *buf = zmalloc(lenstr_len + sizeof(clientReplyBlock)); + clientReplyBlock *buf = zmalloc(length + sizeof(clientReplyBlock)); /* Take over the allocation's internal fragmentation */ buf->size = zmalloc_usable_size(buf) - sizeof(clientReplyBlock); - buf->used = lenstr_len; - memcpy(buf->buf, lenstr, lenstr_len); + buf->used = length; + memcpy(buf->buf, s, length); listNodeValue(ln) = buf; c->reply_bytes += buf->size; } asyncCloseClientOnOutputBufferLimitReached(c); } +/* Populate the length object and try gluing it to the next chunk. */ +void setDeferredAggregateLen(client *c, void *node, long length, char prefix) { + serverAssert(length >= 0); + + /* Abort when *node is NULL: when the client should not accept writes + * we return NULL in addReplyDeferredLen() */ + if (node == NULL) return; + + char lenstr[128]; + size_t lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length); + setDeferredReply(c, node, lenstr, lenstr_len); +} + void setDeferredArrayLen(client *c, void *node, long length) { setDeferredAggregateLen(c,node,length,'*'); } @@ -797,6 +807,14 @@ void addReplyBulkSds(client *c, sds s) { addReply(c,shared.crlf); } +/* Set sds to a deferred reply (for symmetry with addReplyBulkSds it also frees the sds) */ +void setDeferredReplyBulkSds(client *c, void *node, sds s) { + sds reply = sdscatprintf(sdsempty(), "$%d\r\n%s\r\n", (unsigned)sdslen(s), s); + setDeferredReply(c, node, reply, sdslen(reply)); + sdsfree(reply); + sdsfree(s); +} + /* Add a C null term string as bulk reply */ void addReplyBulkCString(client *c, const char *s) { if (s == NULL) { diff --git a/src/server.c b/src/server.c index 56f7484b1..dc5d52af8 100644 --- a/src/server.c +++ b/src/server.c @@ -1027,6 +1027,10 @@ struct redisCommand redisCommandTable[] = { "write random fast @stream", 0,NULL,1,1,1,0,0,0}, + {"xautoclaim",xautoclaimCommand,-6, + "write random fast @stream", + 0,NULL,1,1,1,0,0,0}, + {"xinfo",xinfoCommand,-2, "read-only random @stream", 0,NULL,2,2,1,0,0,0}, diff --git a/src/server.h b/src/server.h index d79bf5b23..8b98f803d 100644 --- a/src/server.h +++ b/src/server.h @@ -1759,6 +1759,7 @@ void addReplyBulkLongLong(client *c, long long ll); void addReply(client *c, robj *obj); void addReplySds(client *c, sds s); void addReplyBulkSds(client *c, sds s); +void setDeferredReplyBulkSds(client *c, void *node, sds s); void addReplyErrorObject(client *c, robj *err); void addReplyErrorSds(client *c, sds err); void addReplyError(client *c, const char *err); @@ -2584,6 +2585,7 @@ void xsetidCommand(client *c); void xackCommand(client *c); void xpendingCommand(client *c); void xclaimCommand(client *c); +void xautoclaimCommand(client *c); void xinfoCommand(client *c); void xdelCommand(client *c); void xtrimCommand(client *c); diff --git a/src/t_stream.c b/src/t_stream.c index 46918bae8..d085a8948 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -960,6 +960,11 @@ void addReplyStreamID(client *c, streamID *id) { addReplyBulkSds(c,replyid); } +void setDeferredReplyStreamID(client *c, void *dr, streamID *id) { + sds replyid = sdscatfmt(sdsempty(),"%U-%U",id->ms,id->seq); + setDeferredReplyBulkSds(c, dr, replyid); +} + /* Similar to the above function, but just creates an object, usually useful * for replication purposes to create arguments. */ robj *createObjectFromStreamID(streamID *id) { @@ -2666,6 +2671,160 @@ cleanup: if (ids != static_ids) zfree(ids); } +/* XAUTOCLAIM [COUNT ] [JUSTID] + * + * Gets ownership of one or multiple messages in the Pending Entries List + * of a given stream consumer group. + * + * For each PEL entry, if its idle time greater or equal to , + * then the message new owner becomes the specified . + * If the minimum idle time specified is zero, messages are claimed + * regardless of their idle time. + * + * This command creates the consumer as side effect if it does not yet + * exists. Moreover the command reset the idle time of the message to 0. + * + * The command returns an array of messages that the user + * successfully claimed, so that the caller is able to understand + * what messages it is now in charge of. */ +void xautoclaimCommand(client *c) { + streamCG *group = NULL; + robj *o = lookupKeyRead(c->db,c->argv[1]); + long long minidle; /* Minimum idle time argument, in milliseconds. */ + long count = 100; /* Maximum entries to claim. */ + streamID startid; + int startex; + int justid = 0; + + /* Parse idle/start/end/count arguments ASAP if needed, in order to report + * syntax errors before any other error. */ + if (getLongLongFromObjectOrReply(c,c->argv[4],&minidle,"Invalid min-idle-time argument for XAUTOCLAIM") != C_OK) + return; + if (minidle < 0) minidle = 0; + + if (streamParseIntervalIDOrReply(c,c->argv[5],&startid,&startex,0) != C_OK) + return; + if (startex && streamIncrID(&startid) != C_OK) { + addReplyError(c,"invalid start ID for the interval"); + return; + } + + int j = 6; /* options start at argv[6] */ + while(j < c->argc) { + int moreargs = (c->argc-1) - j; /* Number of additional arguments. */ + char *opt = c->argv[j]->ptr; + if (!strcasecmp(opt,"COUNT") && moreargs) { + if (getPositiveLongFromObjectOrReply(c,c->argv[j+1],&count,NULL) != C_OK) + return; + if (count == 0) { + addReplyError(c,"COUNT must be > 0"); + return; + } + j++; + } else if (!strcasecmp(opt,"JUSTID")) { + justid = 1; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return; + } + j++; + } + + if (o) { + if (checkType(c,o,OBJ_STREAM)) + return; /* Type error. */ + group = streamLookupCG(o->ptr,c->argv[2]->ptr); + } + + /* No key or group? Send an error given that the group creation + * is mandatory. */ + if (o == NULL || group == NULL) { + addReplyErrorFormat(c,"-NOGROUP No such key '%s' or consumer group '%s'", + (char*)c->argv[1]->ptr, + (char*)c->argv[2]->ptr); + return; + } + + /* Do the actual claiming. */ + streamConsumer *consumer = NULL; + long long attempts = count*10; + + addReplyArrayLen(c, 2); + void *endidptr = addReplyDeferredLen(c); + void *arraylenptr = addReplyDeferredLen(c); + + unsigned char startkey[sizeof(streamID)]; + streamEncodeID(startkey,&startid); + raxIterator ri; + raxStart(&ri,group->pel); + raxSeek(&ri,">=",startkey,sizeof(startkey)); + size_t arraylen = 0; + mstime_t now = mstime(); + while (attempts-- && count && raxNext(&ri)) { + streamNACK *nack = ri.data; + + if (minidle) { + mstime_t this_idle = now - nack->delivery_time; + if (this_idle < minidle) + continue; + } + + streamID id; + streamDecodeID(ri.key, &id); + + if (consumer == NULL) + consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL); + if (nack->consumer != consumer) { + /* Remove the entry from the old consumer. + * Note that nack->consumer is NULL if we created the + * NACK above because of the FORCE option. */ + if (nack->consumer) + raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL); + } + + /* Update the consumer and idle time. */ + nack->delivery_time = now; + nack->delivery_count++; + + if (nack->consumer != consumer) { + /* Add the entry in the new consumer local PEL. */ + raxInsert(consumer->pel,ri.key,ri.key_len,nack,NULL); + nack->consumer = consumer; + } + + /* Send the reply for this entry. */ + if (justid) { + addReplyStreamID(c,&id); + } else { + size_t emitted = + streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL, + STREAM_RWR_RAWENTRIES,NULL); + if (!emitted) + addReplyNull(c); + } + arraylen++; + count--; + + /* Propagate this change. */ + robj *idstr = createObjectFromStreamID(&id); + streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); + decrRefCount(idstr); + server.dirty++; + } + + streamID endid; + if (raxEOF(&ri)) { + endid.ms = endid.seq = 0; + } else { + streamDecodeID(ri.key, &endid); + } + raxStop(&ri); + + setDeferredArrayLen(c,arraylenptr,arraylen); + setDeferredReplyStreamID(c,endidptr,&endid); + + preventCommandPropagation(c); +} /* XDEL [ ... ] * diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 91dc2245e..f8de0741d 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -27,7 +27,7 @@ start_server { # and not the element "foo bar" which was pre existing in the # stream (see previous test) set reply [ - r XREADGROUP GROUP mygroup client-1 STREAMS mystream ">" + r XREADGROUP GROUP mygroup consumer-1 STREAMS mystream ">" ] assert {[llength [lindex $reply 0 1]] == 2} lindex $reply 0 1 0 1 @@ -39,13 +39,13 @@ start_server { r XADD mystream * d 4 # Read a few elements using a different consumer name set reply [ - r XREADGROUP GROUP mygroup client-2 STREAMS mystream ">" + r XREADGROUP GROUP mygroup consumer-2 STREAMS mystream ">" ] assert {[llength [lindex $reply 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {c 3}} - set r1 [r XREADGROUP GROUP mygroup client-1 COUNT 10 STREAMS mystream 0] - set r2 [r XREADGROUP GROUP mygroup client-2 COUNT 10 STREAMS mystream 0] + set r1 [r XREADGROUP GROUP mygroup consumer-1 COUNT 10 STREAMS mystream 0] + set r2 [r XREADGROUP GROUP mygroup consumer-2 COUNT 10 STREAMS mystream 0] assert {[lindex $r1 0 1 0 1] eq {a 1}} assert {[lindex $r2 0 1 0 1] eq {c 3}} } @@ -56,9 +56,9 @@ start_server { for {set j 0} {$j < 4} {incr j} { set item [lindex $pending $j] if {$j < 2} { - set owner client-1 + set owner consumer-1 } else { - set owner client-2 + set owner consumer-2 } assert {[lindex $item 1] eq $owner} assert {[lindex $item 1] eq $owner} @@ -66,7 +66,7 @@ start_server { } test {XPENDING can return single consumer items} { - set pending [r XPENDING mystream mygroup - + 10 client-1] + set pending [r XPENDING mystream mygroup - + 10 consumer-1] assert {[llength $pending] == 2} } @@ -77,9 +77,9 @@ start_server { test {XPENDING with IDLE} { after 20 - set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 client-1] + set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10 consumer-1] assert {[llength $pending] == 0} - set pending [r XPENDING mystream mygroup IDLE 1 - + 10 client-1] + set pending [r XPENDING mystream mygroup IDLE 1 - + 10 consumer-1] assert {[llength $pending] == 2} set pending [r XPENDING mystream mygroup IDLE 99999999 - + 10] assert {[llength $pending] == 0} @@ -101,12 +101,12 @@ start_server { } } - test {XACK is able to remove items from the client/group PEL} { - set pending [r XPENDING mystream mygroup - + 10 client-1] + test {XACK is able to remove items from the consumer/group PEL} { + set pending [r XPENDING mystream mygroup - + 10 consumer-1] set id1 [lindex $pending 0 0] set id2 [lindex $pending 1 0] assert {[r XACK mystream mygroup $id1] eq 1} - set pending [r XPENDING mystream mygroup - + 10 client-1] + set pending [r XPENDING mystream mygroup - + 10 consumer-1] assert {[llength $pending] == 1} set id [lindex $pending 0 0] assert {$id eq $id2} @@ -242,52 +242,52 @@ start_server { set id3 [r XADD mystream * c 3] r XGROUP CREATE mystream mygroup 0 - # Client 1 reads item 1 from the stream without acknowledgements. - # Client 2 then claims pending item 1 from the PEL of client 1 + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 set reply [ - r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream > ] assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {a 1}} # make sure the entry is present in both the gorup, and the right consumer assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} - assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 1} - assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 0} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 0} - r debug sleep 0.2 + after 200 set reply [ - r XCLAIM mystream mygroup client2 10 $id1 + r XCLAIM mystream mygroup consumer2 10 $id1 ] assert {[llength [lindex $reply 0 1]] == 2} assert {[lindex $reply 0 1] eq {a 1}} # make sure the entry is present in both the gorup, and the right consumer assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} - assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 0} - assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer1]] == 0} + assert {[llength [r XPENDING mystream mygroup - + 10 consumer2]] == 1} - # Client 1 reads another 2 items from stream - r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream > - r debug sleep 0.2 + # Consumer 1 reads another 2 items from stream + r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream > + after 200 - # Delete item 2 from the stream. Now client 1 has PEL that contains - # only item 3. Try to use client 2 to claim the deleted item 2 - # from the PEL of client 1, this should return nil + # Delete item 2 from the stream. Now consumer 1 has PEL that contains + # only item 3. Try to use consumer 2 to claim the deleted item 2 + # from the PEL of consumer 1, this should return nil r XDEL mystream $id2 set reply [ - r XCLAIM mystream mygroup client2 10 $id2 + r XCLAIM mystream mygroup consumer2 10 $id2 ] assert {[llength $reply] == 1} assert_equal "" [lindex $reply 0] - # Delete item 3 from the stream. Now client 1 has PEL that is empty. - # Try to use client 2 to claim the deleted item 3 from the PEL - # of client 1, this should return nil - r debug sleep 0.2 + # Delete item 3 from the stream. Now consumer 1 has PEL that is empty. + # Try to use consumer 2 to claim the deleted item 3 from the PEL + # of consumer 1, this should return nil + after 200 r XDEL mystream $id3 set reply [ - r XCLAIM mystream mygroup client2 10 $id3 + r XCLAIM mystream mygroup consumer2 10 $id3 ] assert {[llength $reply] == 1} assert_equal "" [lindex $reply 0] @@ -301,16 +301,16 @@ start_server { set id3 [r XADD mystream * c 3] r XGROUP CREATE mystream mygroup 0 - # Client 1 reads item 1 from the stream without acknowledgements. - # Client 2 then claims pending item 1 from the PEL of client 1 + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 set reply [ - r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream > ] assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {a 1}} - r debug sleep 0.2 + after 200 set reply [ - r XCLAIM mystream mygroup client2 10 $id1 + r XCLAIM mystream mygroup consumer2 10 $id1 ] assert {[llength [lindex $reply 0 1]] == 2} assert {[lindex $reply 0 1] eq {a 1}} @@ -321,10 +321,10 @@ start_server { assert {[llength [lindex $reply 0]] == 4} assert {[lindex $reply 0 3] == 2} - # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID - r debug sleep 0.2 + # Consumer 3 then claims pending item 1 from the PEL of consumer 2 using JUSTID + after 200 set reply [ - r XCLAIM mystream mygroup client3 10 $id1 JUSTID + r XCLAIM mystream mygroup consumer3 10 $id1 JUSTID ] assert {[llength $reply] == 1} assert {[lindex $reply 0] eq $id1} @@ -344,17 +344,122 @@ start_server { set id3 [r XADD mystream * c 3] r XGROUP CREATE mystream mygroup 0 - set reply [r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >] + set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >] assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {a 1}} - r debug sleep 0.2 + after 200 # re-claim with the same consumer that already has it - assert {[llength [r XCLAIM mystream mygroup client1 10 $id1]] == 1} + assert {[llength [r XCLAIM mystream mygroup consumer1 10 $id1]] == 1} # make sure the entry is still in the PEL set reply [r XPENDING mystream mygroup - + 10] assert {[llength $reply] == 1} - assert {[lindex $reply 0 1] eq {client1}} + assert {[lindex $reply 0 1] eq {consumer1}} + } + + test {XAUTOCLAIM can claim PEL items from another consumer} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Consumer 1 reads item 1 from the stream without acknowledgements. + # Consumer 2 then claims pending item 1 from the PEL of consumer 1 + set reply [r XREADGROUP GROUP mygroup consumer1 count 1 STREAMS mystream >] + assert_equal [llength [lindex $reply 0 1 0 1]] 2 + assert_equal [lindex $reply 0 1 0 1] {a 1} + after 200 + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 1] + assert_equal [llength $reply] 2 + assert_equal [lindex $reply 0] $id1 + assert_equal [llength [lindex $reply 1]] 1 + assert_equal [llength [lindex $reply 1 0]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + + # Consumer 1 reads another 2 items from stream + r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream > + + # For min-idle-time + after 200 + + # Delete item 2 from the stream. Now consumer 1 has PEL that contains + # only item 3. Try to use consumer 2 to claim the deleted item 2 + # from the PEL of consumer 1, this should return nil + r XDEL mystream $id2 + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2] + # id1 is self-claimed here but not id2 ('count' was set to 2) + assert_equal [llength $reply] 2 + assert_equal [lindex $reply 0] $id2 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + assert_equal [lindex $reply 1 1] "" + + # Delete item 3 from the stream. Now consumer 1 has PEL that is empty. + # Try to use consumer 2 to claim the deleted item 3 from the PEL + # of consumer 1, this should return nil + after 200 + r XDEL mystream $id3 + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - JUSTID] + # id1 is self-claimed here but not id2 and id3 ('count' is default 100) + + # we also test the JUSTID modifier here. note that, when using JUSTID, + # deleted entries are returned in reply (consistent with XCLAIM). + + assert_equal [llength $reply] 2 + assert_equal [lindex $reply 0] "0-0" + assert_equal [llength [lindex $reply 1]] 3 + assert_equal [lindex $reply 1 0] $id1 + assert_equal [lindex $reply 1 1] $id2 + assert_equal [lindex $reply 1 2] $id3 + } + + test {XAUTOCLAIM as an iterator} { + # Add 5 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + set id4 [r XADD mystream * d 4] + set id5 [r XADD mystream * e 5] + r XGROUP CREATE mystream mygroup 0 + + # Read 5 messages into consumer1 + r XREADGROUP GROUP mygroup consumer1 count 90 STREAMS mystream > + + # For min-idle-time + after 200 + + # Claim 2 entries + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2] + assert_equal [llength $reply] 2 + set cursor [lindex $reply 0] + assert_equal $cursor $id2 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {a 1} + + # Claim 2 more entries + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2] + assert_equal [llength $reply] 2 + set cursor [lindex $reply 0] + assert_equal $cursor $id4 + assert_equal [llength [lindex $reply 1]] 2 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {c 3} + + # Claim last entry + set reply [r XAUTOCLAIM mystream mygroup consumer2 10 ($cursor COUNT 2] + assert_equal [llength $reply] 2 + set cursor [lindex $reply 0] + assert_equal $cursor {0-0} + assert_equal [llength [lindex $reply 1]] 1 + assert_equal [llength [lindex $reply 1 0 1]] 2 + assert_equal [lindex $reply 1 0 1] {e 5} } test {XINFO FULL output} { @@ -477,7 +582,7 @@ start_server { assert {$curr_grpinfo == $grpinfo} set n_consumers [lindex $grpinfo 3] - # Bob should be created only when there will be new data for this client + # Bob should be created only when there will be new data for this consumer assert_equal $n_consumers 2 set reply [r xinfo consumers mystream mygroup] set consumer_info [lindex $reply 0]