diff --git a/src/t_stream.c b/src/t_stream.c index 1a5acac42..f02b9e99b 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2279,8 +2279,12 @@ void xclaimCommand(client *c) { /* Update the consumer and idle time. */ nack->consumer = consumer; nack->delivery_time = deliverytime; - /* Set the delivery attempts counter if given. */ - if (retrycount >= 0) nack->delivery_count = retrycount; + /* Set the delivery attempts counter if given, otherwise autoincrement */ + if (retrycount >= 0) { + nack->delivery_count = retrycount; + } else { + nack->delivery_count++; + } /* Add the entry in the new consumer local PEL. */ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); /* Send the reply for this entry. */ diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 13981cc22..3a056bfab 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -195,6 +195,35 @@ start_server { assert_equal "" [lindex $reply 0] } + test {XCLAIM increments delivery count} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Client 1 reads item 1 from the stream without acknowledgements. + # Client 2 then claims pending item 1 from the PEL of client 1 + set reply [ + r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + ] + assert {[llength [lindex $reply 0 1 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {a 1}} + r debug sleep 0.2 + set reply [ + r XCLAIM mystream mygroup client2 10 $id1 + ] + assert {[llength [lindex $reply 0 1]] == 2} + assert {[lindex $reply 0 1] eq {a 1}} + + set reply [ + r XPENDING mystream mygroup - + 10 + ] + assert {[llength [lindex $reply 0]] == 4} + assert {[lindex $reply 0 3] == 2} + } + start_server {} { set master [srv -1 client] set master_host [srv -1 host]