Merge pull request #7029 from valentinogeron/fix-xack
XACK should be executed in a "all or nothing" fashion.
This commit is contained in:
commit
dfef407499
@ -1922,11 +1922,21 @@ void xackCommand(client *c) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* Start parsing the IDs, so that we abort ASAP if there is a syntax
|
||||
* error: the return value of this command cannot be an error in case
|
||||
* the client successfully acknowledged some messages, so it should be
|
||||
* executed in a "all or nothing" fashion. */
|
||||
for (int j = 3; j < c->argc; j++) {
|
||||
streamID id;
|
||||
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
|
||||
}
|
||||
|
||||
int acknowledged = 0;
|
||||
for (int j = 3; j < c->argc; j++) {
|
||||
streamID id;
|
||||
unsigned char buf[sizeof(streamID)];
|
||||
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return;
|
||||
if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK)
|
||||
serverPanic("StreamID invalid after check. Should not be possible.");
|
||||
streamEncodeID(buf,&id);
|
||||
|
||||
/* Lookup the ID in the group PEL: it will have a reference to the
|
||||
|
@ -93,6 +93,18 @@ start_server {
|
||||
assert {[r XACK mystream mygroup $id1 $id2] eq 1}
|
||||
}
|
||||
|
||||
test {XACK should fail if got at least one invalid ID} {
|
||||
r del mystream
|
||||
r xgroup create s g $ MKSTREAM
|
||||
r xadd s * f1 v1
|
||||
set c [llength [lindex [r xreadgroup group g c streams s >] 0 1]]
|
||||
assert {$c == 1}
|
||||
set pending [r xpending s g - + 10 c]
|
||||
set id1 [lindex $pending 0 0]
|
||||
assert_error "*Invalid stream ID specified*" {r xack s g $id1 invalid-id}
|
||||
assert {[r xack s g $id1] eq 1}
|
||||
}
|
||||
|
||||
test {PEL NACK reassignment after XGROUP SETID event} {
|
||||
r del events
|
||||
r xadd events * f1 v1
|
||||
|
Loading…
x
Reference in New Issue
Block a user