From 6bb5503524fbc15c4d6a5ac5eb08aedee18d3189 Mon Sep 17 00:00:00 2001 From: guybe7 Date: Mon, 7 Dec 2020 20:31:35 +0100 Subject: [PATCH] More efficient self-XCLAIM (#8098) when the same consumer re-claim an entry that it already has, there's no need to remove-and-insert if it's the same rax. we do need to update the idle time though. this commit only improves efficiency (doesn't change behavior). --- src/t_stream.c | 21 +++++++++++--------- tests/unit/type/stream-cgroups.tcl | 32 ++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 9 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 320c5a2fe..684328de2 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2601,15 +2601,15 @@ void xclaimCommand(client *c) { mstime_t this_idle = now - nack->delivery_time; if (this_idle < minidle) continue; } - /* Remove the entry from the old consumer. - * Note that nack->consumer is NULL if we created the - * NACK above because of the FORCE option. */ - if (nack->consumer) - raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); - /* Update the consumer and idle time. */ if (consumer == NULL) consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL); - nack->consumer = consumer; + if (nack->consumer != consumer) { + /* Remove the entry from the old consumer. + * Note that nack->consumer is NULL if we created the + * NACK above because of the FORCE option. */ + if (nack->consumer) + raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); + } nack->delivery_time = deliverytime; /* Set the delivery attempts counter if given, otherwise * autoincrement unless JUSTID option provided */ @@ -2618,8 +2618,11 @@ void xclaimCommand(client *c) { } else if (!justid) { nack->delivery_count++; } - /* Add the entry in the new consumer local PEL. */ - raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + if (nack->consumer != consumer) { + /* Add the entry in the new consumer local PEL. */ + raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + nack->consumer = consumer; + } /* Send the reply for this entry. */ if (justid) { addReplyStreamID(c,&id); diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 41e2ab979..1d1a68a35 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -235,6 +235,12 @@ start_server { ] assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[lindex $reply 0 1 0 1] eq {a 1}} + + # make sure the entry is present in both the gorup, and the right consumer + assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 0} + r debug sleep 0.2 set reply [ r XCLAIM mystream mygroup client2 10 $id1 @@ -242,6 +248,11 @@ start_server { assert {[llength [lindex $reply 0 1]] == 2} assert {[lindex $reply 0 1] eq {a 1}} + # make sure the entry is present in both the gorup, and the right consumer + assert {[llength [r XPENDING mystream mygroup - + 10]] == 1} + assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 0} + assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 1} + # Client 1 reads another 2 items from stream r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream > r debug sleep 0.2 @@ -311,6 +322,27 @@ start_server { assert {[lindex $reply 0 3] == 2} } + test {XCLAIM same consumer} { + # Add 3 items into the stream, and create a consumer group + r del mystream + set id1 [r XADD mystream * a 1] + set id2 [r XADD mystream * b 2] + set id3 [r XADD mystream * c 3] + r XGROUP CREATE mystream mygroup 0 + + set reply [r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >] + assert {[llength [lindex $reply 0 1 0 1]] == 2} + assert {[lindex $reply 0 1 0 1] eq {a 1}} + r debug sleep 0.2 + # re-claim with the same consumer that already has it + assert {[llength [r XCLAIM mystream mygroup client1 10 $id1]] == 1} + + # make sure the entry is still in the PEL + set reply [r XPENDING mystream mygroup - + 10] + assert {[llength $reply] == 1} + assert {[lindex $reply 0 1] eq {client1}} + } + test {XINFO FULL output} { r del x r XADD x 100 a 1