More efficient self-XCLAIM (#8098)

when the same consumer re-claim an entry that it already has, there's
no need to remove-and-insert if it's the same rax.
we do need to update the idle time though.
this commit only improves efficiency (doesn't change behavior).
This commit is contained in:
guybe7 2020-12-07 20:31:35 +01:00 committed by GitHub
parent b5e99bd064
commit 6bb5503524
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 44 additions and 9 deletions

View File

@ -2601,15 +2601,15 @@ void xclaimCommand(client *c) {
mstime_t this_idle = now - nack->delivery_time; mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle) continue; if (this_idle < minidle) continue;
} }
if (consumer == NULL)
consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL);
if (nack->consumer != consumer) {
/* Remove the entry from the old consumer. /* Remove the entry from the old consumer.
* Note that nack->consumer is NULL if we created the * Note that nack->consumer is NULL if we created the
* NACK above because of the FORCE option. */ * NACK above because of the FORCE option. */
if (nack->consumer) if (nack->consumer)
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and idle time. */ }
if (consumer == NULL)
consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL);
nack->consumer = consumer;
nack->delivery_time = deliverytime; nack->delivery_time = deliverytime;
/* Set the delivery attempts counter if given, otherwise /* Set the delivery attempts counter if given, otherwise
* autoincrement unless JUSTID option provided */ * autoincrement unless JUSTID option provided */
@ -2618,8 +2618,11 @@ void xclaimCommand(client *c) {
} else if (!justid) { } else if (!justid) {
nack->delivery_count++; nack->delivery_count++;
} }
if (nack->consumer != consumer) {
/* Add the entry in the new consumer local PEL. */ /* Add the entry in the new consumer local PEL. */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
nack->consumer = consumer;
}
/* Send the reply for this entry. */ /* Send the reply for this entry. */
if (justid) { if (justid) {
addReplyStreamID(c,&id); addReplyStreamID(c,&id);

View File

@ -235,6 +235,12 @@ start_server {
] ]
assert {[llength [lindex $reply 0 1 0 1]] == 2} assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}} assert {[lindex $reply 0 1 0 1] eq {a 1}}
# make sure the entry is present in both the gorup, and the right consumer
assert {[llength [r XPENDING mystream mygroup - + 10]] == 1}
assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 1}
assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 0}
r debug sleep 0.2 r debug sleep 0.2
set reply [ set reply [
r XCLAIM mystream mygroup client2 10 $id1 r XCLAIM mystream mygroup client2 10 $id1
@ -242,6 +248,11 @@ start_server {
assert {[llength [lindex $reply 0 1]] == 2} assert {[llength [lindex $reply 0 1]] == 2}
assert {[lindex $reply 0 1] eq {a 1}} assert {[lindex $reply 0 1] eq {a 1}}
# make sure the entry is present in both the gorup, and the right consumer
assert {[llength [r XPENDING mystream mygroup - + 10]] == 1}
assert {[llength [r XPENDING mystream mygroup - + 10 client1]] == 0}
assert {[llength [r XPENDING mystream mygroup - + 10 client2]] == 1}
# Client 1 reads another 2 items from stream # Client 1 reads another 2 items from stream
r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream > r XREADGROUP GROUP mygroup client1 count 2 STREAMS mystream >
r debug sleep 0.2 r debug sleep 0.2
@ -311,6 +322,27 @@ start_server {
assert {[lindex $reply 0 3] == 2} assert {[lindex $reply 0 3] == 2}
} }
test {XCLAIM same consumer} {
# Add 3 items into the stream, and create a consumer group
r del mystream
set id1 [r XADD mystream * a 1]
set id2 [r XADD mystream * b 2]
set id3 [r XADD mystream * c 3]
r XGROUP CREATE mystream mygroup 0
set reply [r XREADGROUP GROUP mygroup client1 count 1 STREAMS mystream >]
assert {[llength [lindex $reply 0 1 0 1]] == 2}
assert {[lindex $reply 0 1 0 1] eq {a 1}}
r debug sleep 0.2
# re-claim with the same consumer that already has it
assert {[llength [r XCLAIM mystream mygroup client1 10 $id1]] == 1}
# make sure the entry is still in the PEL
set reply [r XPENDING mystream mygroup - + 10]
assert {[llength $reply] == 1}
assert {[lindex $reply 0 1] eq {client1}}
}
test {XINFO FULL output} { test {XINFO FULL output} {
r del x r del x
r XADD x 100 a 1 r XADD x 100 a 1