From 8fd63c220ac171e4fc0f1339a7cde61c499d1a30 Mon Sep 17 00:00:00 2001 From: Steve Webster Date: Fri, 8 Mar 2019 17:09:11 +0000 Subject: [PATCH 1/2] Increment delivery counter on XCLAIM unless RETRYCOUNT specified The XCLAIM docs state the XCLAIM increments the delivery counter for messages. This PR makes the code match the documentation - which seems like the desired behaviour - whilst still allowing RETRYCOUNT to be specified manually. My understanding of the way streamPropagateXCLAIM() works is that this change will safely propagate to replicas since retry count is pulled directly from the streamNACK struct. Fixes #5194 --- src/t_stream.c | 8 ++++++-- tests/unit/type/stream-cgroups.tcl | 29 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 2 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 1a5acac42..f02b9e99b 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2279,8 +2279,12 @@ void xclaimCommand(client *c) { /* Update the consumer and idle time. */ nack->consumer = consumer; nack->delivery_time = deliverytime; - /* Set the delivery attempts counter if given. */ - if (retrycount >= 0) nack->delivery_count = retrycount; + /* Set the delivery attempts counter if given, otherwise autoincrement */ + if (retrycount >= 0) { + nack->delivery_count = retrycount; + } else { + nack->delivery_count++; + } /* Add the entry in the new consumer local PEL. */ raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); /* Send the reply for this entry. */ diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 13981cc22..3a056bfab 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -195,6 +195,35 @@ start_server { assert_equal "" [lindex $reply 0] } + test {XCLAIM increments delivery count} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + # Client 1 reads item 1 from the stream without acknowledgements. + # Client 2 then claims pending item 1 from the PEL of client 1 + set reply [ + r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream > + ] + assert {[llength [lindex $reply 0 1 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {a 1}} + r debug sleep 0.2 + set reply [ + r XCLAIM mystream mygroup client2 10 $id1 + ] + assert {[llength [lindex $reply 0 1]] == 2} + assert {[lindex $reply 0 1] eq {a 1}} + + set reply [ + r XPENDING mystream mygroup - + 10 + ] + assert {[llength [lindex $reply 0]] == 4} + assert {[lindex $reply 0 3] == 2} + } + start_server {} { set master [srv -1 client] set master_host [srv -1 host] From c652f706cb394fd5bd25077fde7d17a8cc7f2abf Mon Sep 17 00:00:00 2001 From: Steve Webster Date: Tue, 12 Mar 2019 20:27:53 +0000 Subject: [PATCH 2/2] Only increment delivery count if JUSTID option is omitted --- src/t_stream.c | 5 +++-- tests/unit/type/stream-cgroups.tcl | 16 +++++++++++++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index f02b9e99b..7816c775c 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2279,10 +2279,11 @@ void xclaimCommand(client *c) { /* Update the consumer and idle time. */ nack->consumer = consumer; nack->delivery_time = deliverytime; - /* Set the delivery attempts counter if given, otherwise autoincrement */ + /* Set the delivery attempts counter if given, otherwise + * autoincrement unless JUSTID option provided */ if (retrycount >= 0) { nack->delivery_count = retrycount; - } else { + } else if (!justid) { nack->delivery_count++; } /* Add the entry in the new consumer local PEL. */ diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 3a056bfab..34d4061c2 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -195,7 +195,7 @@ start_server { assert_equal "" [lindex $reply 0] } - test {XCLAIM increments delivery count} { + test {XCLAIM without JUSTID increments delivery count} { # Add 3 items into the stream, and create a consumer group r del mystream set id1 [r XADD mystream * a 1] @@ -222,6 +222,20 @@ start_server { ] assert {[llength [lindex $reply 0]] == 4} assert {[lindex $reply 0 3] == 2} + + # Client 3 then claims pending item 1 from the PEL of client 2 using JUSTID + r debug sleep 0.2 + set reply [ + r XCLAIM mystream mygroup client3 10 $id1 JUSTID + ] + assert {[llength $reply] == 1} + assert {[lindex $reply 0] eq $id1} + + set reply [ + r XPENDING mystream mygroup - + 10 + ] + assert {[llength [lindex $reply 0]] == 4} + assert {[lindex $reply 0 3] == 2} } start_server {} {