X[AUTO]CLAIM should skip deleted entries (#10227)

Fix #7021 #8924 #10198

# Intro
Before this commit X[AUTO]CLAIM used to transfer deleted entries from one
PEL to another, but reply with "nil" for every such entry (instead of the entry id).
The idea (for XCLAIM) was that the caller could see this "nil", realize the entry
no longer exists, and XACK it in order to remove it from PEL.
The main problem with that approach is that it assumes there's a correlation
between the index of the "id" arguments and the array indices, which there
isn't (in case some of the input IDs to XCLAIM never existed/read):

```
127.0.0.1:6379> XADD x 1 f1 v1
"1-0"
127.0.0.1:6379> XADD x 2 f1 v1
"2-0"
127.0.0.1:6379> XADD x 3 f1 v1
"3-0"
127.0.0.1:6379> XGROUP CREATE x grp 0
OK
127.0.0.1:6379> XREADGROUP GROUP grp Alice COUNT 2 STREAMS x >
1) 1) "x"
   2) 1) 1) "1-0"
         2) 1) "f1"
            2) "v1"
      2) 1) "2-0"
         2) 1) "f1"
            2) "v1"
127.0.0.1:6379> XDEL x 1 2
(integer) 2
127.0.0.1:6379> XCLAIM x grp Bob 0 0-99 1-0 1-99 2-0
1) (nil)
2) (nil)
```

# Changes
Now,  X[AUTO]CLAIM acts in the following way:
1. If one tries to claim a deleted entry, we delete it from the PEL we found it in
  (and the group PEL too). So de facto, such entry is not claimed, just cleared
  from PEL (since anyway it doesn't exist in the stream)
2. since we never claim deleted entries, X[AUTO]CLAIM will never return "nil"
  instead of an entry.
3. add a new element to XAUTOCLAIM's response (see below)

# Knowing which entries were cleared from the PEL
The caller may want to log any entries that were found in a PEL but deleted from
the stream itself (it would suggest that there might be a bug in the application:
trimming the stream while some entries were still no processed by the consumers)

## XCLAIM
the set {XCLAIM input ids} - {XCLAIM returned ids} contains all the entry ids that were
not claimed which means they were deleted (assuming the input contains only entries
from some PEL). The user doesn't need to XACK them because XCLAIM had already
deleted them from the source PEL.

## XAUTOCLAIM
XAUTOCLAIM has a new element added to its reply: it's an array of all the deleted
stream IDs it stumbled upon.

This is somewhat of a breaking change since X[AUTO]CLAIM used to be able to reply
with "nil" and now it can't... But since it was undocumented (and generally a bad idea
to rely on it, as explained above) the breakage is not that bad.
This commit is contained in:
guybe7 2022-02-08 09:20:09 +01:00 committed by GitHub
parent 66be30f7fc
commit 3c3e6cc1c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 189 additions and 48 deletions

View File

@ -5922,7 +5922,10 @@ struct redisCommandArg XADD_Args[] = {
/********** XAUTOCLAIM ********************/
/* XAUTOCLAIM history */
#define XAUTOCLAIM_History NULL
commandHistory XAUTOCLAIM_History[] = {
{"7.0.0","Added an element to the reply array, containing deleted entries the command cleared from the PEL"},
{0}
};
/* XAUTOCLAIM tips */
const char *XAUTOCLAIM_tips[] = {

View File

@ -6,6 +6,12 @@
"since": "6.2.0",
"arity": -6,
"function": "xautoclaimCommand",
"history": [
[
"7.0.0",
"Added an element to the reply array, containing deleted entries the command cleared from the PEL"
]
],
"command_flags": [
"WRITE",
"FAST"

View File

@ -1325,6 +1325,20 @@ void streamIteratorStop(streamIterator *si) {
raxStop(&si->ri);
}
/* Return 1 if `id` exists in `s` (and not marked as deleted) */
int streamEntryExists(stream *s, streamID *id) {
streamIterator si;
streamIteratorStart(&si,s,id,id,0);
streamID myid;
int64_t numfields;
int found = streamIteratorGetID(&si,&myid,&numfields);
streamIteratorStop(&si);
if (!found)
return 0;
serverAssert(streamCompareID(id,&myid) == 0);
return 1;
}
/* Delete the specified item ID from the stream, returning 1 if the item
* was deleted 0 otherwise (if it does not exist). */
int streamDeleteItem(stream *s, streamID *id) {
@ -2980,23 +2994,28 @@ void xclaimCommand(client *c) {
/* Lookup the ID in the group PEL. */
streamNACK *nack = raxFind(group->pel,buf,sizeof(buf));
/* Item must exist for us to transfer it to another consumer. */
if (!streamEntryExists(o->ptr,&id)) {
/* Clear this entry from the PEL, it no longer exists */
if (nack != raxNotFound) {
/* Propagate this change (we are going to delete the NACK). */
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
server.dirty++;
/* Release the NACK */
raxRemove(group->pel,buf,sizeof(buf),NULL);
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
streamFreeNACK(nack);
}
continue;
}
/* If FORCE is passed, let's check if at least the entry
* exists in the Stream. In such case, we'll create a new
* entry in the PEL from scratch, so that XCLAIM can also
* be used to create entries in the PEL. Useful for AOF
* and replication of consumer groups. */
if (force && nack == raxNotFound) {
streamIterator myiterator;
streamIteratorStart(&myiterator,o->ptr,&id,&id,0);
int64_t numfields;
int found = 0;
streamID item_id;
if (streamIteratorGetID(&myiterator,&item_id,&numfields)) found = 1;
streamIteratorStop(&myiterator);
/* Item must exist for us to create a NACK for it. */
if (!found) continue;
/* Create the NACK. */
nack = streamCreateNACK(NULL);
raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
@ -3013,6 +3032,7 @@ void xclaimCommand(client *c) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle) continue;
}
if (consumer == NULL &&
(consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL)
{
@ -3042,9 +3062,7 @@ void xclaimCommand(client *c) {
if (justid) {
addReplyStreamID(c,&id);
} else {
size_t emitted = streamReplyWithRange(c,o->ptr,&id,&id,1,0,
NULL,NULL,STREAM_RWR_RAWENTRIES,NULL);
if (!emitted) addReplyNull(c);
serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1);
}
arraylen++;
@ -3138,9 +3156,9 @@ void xautoclaimCommand(client *c) {
streamConsumer *consumer = NULL;
long long attempts = count*10;
addReplyArrayLen(c, 2);
void *endidptr = addReplyDeferredLen(c);
void *arraylenptr = addReplyDeferredLen(c);
addReplyArrayLen(c, 3); /* We add another reply later */
void *endidptr = addReplyDeferredLen(c); /* reply[0] */
void *arraylenptr = addReplyDeferredLen(c); /* reply[1] */
unsigned char startkey[sizeof(streamID)];
streamEncodeID(startkey,&startid);
@ -3150,18 +3168,37 @@ void xautoclaimCommand(client *c) {
size_t arraylen = 0;
mstime_t now = mstime();
sds name = c->argv[3]->ptr;
streamID *deleted_ids = zmalloc(count * sizeof(streamID));
int deleted_id_num = 0;
while (attempts-- && count && raxNext(&ri)) {
streamNACK *nack = ri.data;
streamID id;
streamDecodeID(ri.key, &id);
/* Item must exist for us to transfer it to another consumer. */
if (!streamEntryExists(o->ptr,&id)) {
/* Propagate this change (we are going to delete the NACK). */
robj *idstr = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
decrRefCount(idstr);
server.dirty++;
/* Clear this entry from the PEL, it no longer exists */
raxRemove(group->pel,ri.key,ri.key_len,NULL);
raxRemove(nack->consumer->pel,ri.key,ri.key_len,NULL);
streamFreeNACK(nack);
/* Remember the ID for later */
deleted_ids[deleted_id_num++] = id;
raxSeek(&ri,">=",ri.key,ri.key_len);
continue;
}
if (minidle) {
mstime_t this_idle = now - nack->delivery_time;
if (this_idle < minidle)
continue;
}
streamID id;
streamDecodeID(ri.key, &id);
if (consumer == NULL &&
(consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL)
{
@ -3191,11 +3228,7 @@ void xautoclaimCommand(client *c) {
if (justid) {
addReplyStreamID(c,&id);
} else {
size_t emitted =
streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,
STREAM_RWR_RAWENTRIES,NULL);
if (!emitted)
addReplyNull(c);
serverAssert(streamReplyWithRange(c,o->ptr,&id,&id,1,0,NULL,NULL,STREAM_RWR_RAWENTRIES,NULL) == 1);
}
arraylen++;
count--;
@ -3221,6 +3254,12 @@ void xautoclaimCommand(client *c) {
setDeferredArrayLen(c,arraylenptr,arraylen);
setDeferredReplyStreamID(c,endidptr,&endid);
addReplyArrayLen(c, deleted_id_num); /* reply[2] */
for (int i = 0; i < deleted_id_num; i++) {
addReplyStreamID(c, &deleted_ids[i]);
}
zfree(deleted_ids);
preventCommandPropagation(c);
}

View File

@ -355,24 +355,22 @@ start_server {
# Delete item 2 from the stream. Now consumer 1 has PEL that contains
# only item 3. Try to use consumer 2 to claim the deleted item 2
# from the PEL of consumer 1, this should return nil
# from the PEL of consumer 1, this should be NOP
r XDEL mystream $id2
set reply [
r XCLAIM mystream mygroup consumer2 10 $id2
]
assert {[llength $reply] == 1}
assert_equal "" [lindex $reply 0]
assert {[llength $reply] == 0}
# Delete item 3 from the stream. Now consumer 1 has PEL that is empty.
# Try to use consumer 2 to claim the deleted item 3 from the PEL
# of consumer 1, this should return nil
# of consumer 1, this should be NOP
after 200
r XDEL mystream $id3
set reply [
r XCLAIM mystream mygroup consumer2 10 $id3
]
assert {[llength $reply] == 1}
assert_equal "" [lindex $reply 0]
assert {[llength $reply] == 0}
}
test {XCLAIM without JUSTID increments delivery count} {
@ -445,6 +443,7 @@ start_server {
set id1 [r XADD mystream * a 1]
set id2 [r XADD mystream * b 2]
set id3 [r XADD mystream * c 3]
set id4 [r XADD mystream * d 4]
r XGROUP CREATE mystream mygroup 0
# Consumer 1 reads item 1 from the stream without acknowledgements.
@ -454,7 +453,7 @@ start_server {
assert_equal [lindex $reply 0 1 0 1] {a 1}
after 200
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 1]
assert_equal [llength $reply] 2
assert_equal [llength $reply] 3
assert_equal [lindex $reply 0] "0-0"
assert_equal [llength [lindex $reply 1]] 1
assert_equal [llength [lindex $reply 1 0]] 2
@ -462,7 +461,7 @@ start_server {
assert_equal [lindex $reply 1 0 1] {a 1}
# Consumer 1 reads another 2 items from stream
r XREADGROUP GROUP mygroup consumer1 count 2 STREAMS mystream >
r XREADGROUP GROUP mygroup consumer1 count 3 STREAMS mystream >
# For min-idle-time
after 200
@ -471,33 +470,37 @@ start_server {
# only item 3. Try to use consumer 2 to claim the deleted item 2
# from the PEL of consumer 1, this should return nil
r XDEL mystream $id2
# id1 and id3 are self-claimed here but not id2 ('count' was set to 2)
# we make sure id2 is indeed skipped (the cursor points to id4)
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2]
# id1 is self-claimed here but not id2 ('count' was set to 2)
assert_equal [llength $reply] 2
assert_equal [lindex $reply 0] $id3
assert_equal [llength $reply] 3
assert_equal [lindex $reply 0] $id4
assert_equal [llength [lindex $reply 1]] 2
assert_equal [llength [lindex $reply 1 0]] 2
assert_equal [llength [lindex $reply 1 0 1]] 2
assert_equal [lindex $reply 1 0 1] {a 1}
assert_equal [lindex $reply 1 1] ""
assert_equal [lindex $reply 1 1 1] {c 3}
# Delete item 3 from the stream. Now consumer 1 has PEL that is empty.
# Try to use consumer 2 to claim the deleted item 3 from the PEL
# of consumer 1, this should return nil
after 200
r XDEL mystream $id3
r XDEL mystream $id4
# id1 and id3 are self-claimed here but not id2 and id4 ('count' is default 100)
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - JUSTID]
# id1 is self-claimed here but not id2 and id3 ('count' is default 100)
# we also test the JUSTID modifier here. note that, when using JUSTID,
# deleted entries are returned in reply (consistent with XCLAIM).
assert_equal [llength $reply] 2
assert_equal [lindex $reply 0] "0-0"
assert_equal [llength [lindex $reply 1]] 3
assert_equal [llength $reply] 3
assert_equal [lindex $reply 0] {0-0}
assert_equal [llength [lindex $reply 1]] 2
assert_equal [lindex $reply 1 0] $id1
assert_equal [lindex $reply 1 1] $id2
assert_equal [lindex $reply 1 2] $id3
assert_equal [lindex $reply 1 1] $id3
}
test {XAUTOCLAIM as an iterator} {
@ -518,7 +521,7 @@ start_server {
# Claim 2 entries
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 - COUNT 2]
assert_equal [llength $reply] 2
assert_equal [llength $reply] 3
set cursor [lindex $reply 0]
assert_equal $cursor $id3
assert_equal [llength [lindex $reply 1]] 2
@ -527,7 +530,7 @@ start_server {
# Claim 2 more entries
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 $cursor COUNT 2]
assert_equal [llength $reply] 2
assert_equal [llength $reply] 3
set cursor [lindex $reply 0]
assert_equal $cursor $id5
assert_equal [llength [lindex $reply 1]] 2
@ -536,7 +539,7 @@ start_server {
# Claim last entry
set reply [r XAUTOCLAIM mystream mygroup consumer2 10 $cursor COUNT 1]
assert_equal [llength $reply] 2
assert_equal [llength $reply] 3
set cursor [lindex $reply 0]
assert_equal $cursor {0-0}
assert_equal [llength [lindex $reply 1]] 1
@ -548,6 +551,56 @@ start_server {
assert_error "ERR COUNT must be > 0" {r XAUTOCLAIM key group consumer 1 1 COUNT 0}
}
test {XCLAIM with XDEL} {
r DEL x
r XADD x 1-0 f v
r XADD x 2-0 f v
r XADD x 3-0 f v
r XGROUP CREATE x grp 0
assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}}
r XDEL x 2-0
assert_equal [r XCLAIM x grp Bob 0 1-0 2-0 3-0] {{1-0 {f v}} {3-0 {f v}}}
assert_equal [r XPENDING x grp - + 10 Alice] {}
}
test {XCLAIM with trimming} {
r DEL x
r config set stream-node-max-entries 2
r XADD x 1-0 f v
r XADD x 2-0 f v
r XADD x 3-0 f v
r XGROUP CREATE x grp 0
assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}}
r XTRIM x MAXLEN 1
assert_equal [r XCLAIM x grp Bob 0 1-0 2-0 3-0] {{3-0 {f v}}}
assert_equal [r XPENDING x grp - + 10 Alice] {}
}
test {XAUTOCLAIM with XDEL} {
r DEL x
r XADD x 1-0 f v
r XADD x 2-0 f v
r XADD x 3-0 f v
r XGROUP CREATE x grp 0
assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}}
r XDEL x 2-0
assert_equal [r XAUTOCLAIM x grp Bob 0 0-0] {0-0 {{1-0 {f v}} {3-0 {f v}}} 2-0}
assert_equal [r XPENDING x grp - + 10 Alice] {}
}
test {XCLAIM with trimming} {
r DEL x
r config set stream-node-max-entries 2
r XADD x 1-0 f v
r XADD x 2-0 f v
r XADD x 3-0 f v
r XGROUP CREATE x grp 0
assert_equal [r XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}}}}}
r XTRIM x MAXLEN 1
assert_equal [r XAUTOCLAIM x grp Bob 0 0-0] {0-0 {{3-0 {f v}}} {1-0 2-0}}
assert_equal [r XPENDING x grp - + 10 Alice] {}
}
test {XINFO FULL output} {
r del x
r XADD x 100 a 1
@ -733,6 +786,46 @@ start_server {
}
}
start_server {tags {"external:skip"}} {
set master [srv -1 client]
set master_host [srv -1 host]
set master_port [srv -1 port]
set replica [srv 0 client]
foreach autoclaim {0 1} {
test "Replication tests of XCLAIM with deleted entries (autclaim=$autoclaim)" {
$replica replicaof $master_host $master_port
wait_for_condition 50 100 {
[s 0 master_link_status] eq {up}
} else {
fail "Replication not started."
}
$master DEL x
$master XADD x 1-0 f v
$master XADD x 2-0 f v
$master XADD x 3-0 f v
$master XADD x 4-0 f v
$master XADD x 5-0 f v
$master XGROUP CREATE x grp 0
assert_equal [$master XREADGROUP GROUP grp Alice STREAMS x >] {{x {{1-0 {f v}} {2-0 {f v}} {3-0 {f v}} {4-0 {f v}} {5-0 {f v}}}}}
wait_for_ofs_sync $master $replica
assert_equal [llength [$replica XPENDING x grp - + 10 Alice]] 5
$master XDEL x 2-0
$master XDEL x 4-0
if {$autoclaim} {
assert_equal [$master XAUTOCLAIM x grp Bob 0 0-0] {0-0 {{1-0 {f v}} {3-0 {f v}} {5-0 {f v}}} {2-0 4-0}}
wait_for_ofs_sync $master $replica
assert_equal [llength [$replica XPENDING x grp - + 10 Alice]] 0
} else {
assert_equal [$master XCLAIM x grp Bob 0 1-0 2-0 3-0 4-0] {{1-0 {f v}} {3-0 {f v}}}
wait_for_ofs_sync $master $replica
assert_equal [llength [$replica XPENDING x grp - + 10 Alice]] 1
}
}
}
}
start_server {tags {"stream needs:debug"} overrides {appendonly yes aof-use-rdb-preamble no}} {
test {Empty stream with no lastid can be rewrite into AOF correctly} {
r XGROUP CREATE mystream group-name $ MKSTREAM