Increment delivery counter on XCLAIM unless RETRYCOUNT specified

The XCLAIM docs state the XCLAIM increments the delivery counter for
messages. This PR makes the code match the documentation - which seems
like the desired behaviour - whilst still allowing RETRYCOUNT to be
specified manually.

My understanding of the way streamPropagateXCLAIM() works is that this
change will safely propagate to replicas since retry count is pulled
directly from the streamNACK struct.

Fixes #5194
This commit is contained in:
Steve Webster 2019-03-08 17:09:11 +00:00
parent 0f03312c23
commit f1e7df4b7c
2 changed files with 35 additions and 2 deletions

View File

@ -2279,8 +2279,12 @@ void xclaimCommand(client *c) {
/* Update the consumer and idle time. */ /* Update the consumer and idle time. */
nack->consumer = consumer; nack->consumer = consumer;
nack->delivery_time = deliverytime; nack->delivery_time = deliverytime;
/* Set the delivery attempts counter if given. */ /* Set the delivery attempts counter if given, otherwise autoincrement */
if (retrycount >= 0) nack->delivery_count = retrycount; if (retrycount >= 0) {
nack->delivery_count = retrycount;
} else {
nack->delivery_count++;
}
/* Add the entry in the new consumer local PEL. */ /* Add the entry in the new consumer local PEL. */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
/* Send the reply for this entry. */ /* Send the reply for this entry. */

View File

@ -195,6 +195,35 @@ start_server {
assert_equal "" [lindex $reply 0] assert_equal "" [lindex $reply 0]
} }
test {XCLAIM increments delivery count} {
# Add 3 items into the stream, and create a consumer group
r del mystream
set id1 [r XADD mystream * a 1]
set id2 [r XADD mystream * b 2]
set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0
# Client 1 reads item 1 from the stream without acknowledgements.
# Client 2 then claims pending item 1 from the PEL of client 1
set reply [
r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >
]
assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}}
r debug sleep 0.2
set reply [
r XCLAIM mystream mygroup client2 10 $id1
]
assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1] eq {a 1}}
set reply [
r XPENDING mystream mygroup - + 10
]
assert {[llength [lindex $reply 0]] == 4}
assert {[lindex $reply 0 3] == 2}
}
start_server {} { start_server {} {
set master [srv -1 client] set master [srv -1 client]
set master_host [srv -1 host] set master_host [srv -1 host]