From 8cdc153f58e535eff4be5f8f5483794c4cfeec3c Mon Sep 17 00:00:00 2001 From: Valentino Geron Date: Wed, 25 Mar 2020 21:54:14 +0200 Subject: [PATCH] XACK should be executed in a "all or nothing" fashion. First, we must parse the IDs, so that we abort ASAP. The return value of this command cannot be an error if the client successfully acknowledged some messages, so it should be executed in a "all or nothing" fashion. --- src/t_stream.c | 12 +++++++++++- tests/unit/type/stream-cgroups.tcl | 12 ++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index 00d1cbf1c..3f8cbfcfa 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1922,11 +1922,21 @@ void xackCommand(client *c) { return; } + /* Start parsing the IDs, so that we abort ASAP if there is a syntax + * error: the return value of this command cannot be an error in case + * the client successfully acknowledged some messages, so it should be + * executed in a "all or nothing" fashion. */ + for (int j = 3; j < c->argc; j++) { + streamID id; + if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + } + int acknowledged = 0; for (int j = 3; j < c->argc; j++) { streamID id; unsigned char buf[sizeof(streamID)]; - if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) return; + if (streamParseStrictIDOrReply(c,c->argv[j],&id,0) != C_OK) + serverPanic("StreamID invalid after check. Should not be possible."); streamEncodeID(buf,&id); /* Lookup the ID in the group PEL: it will have a reference to the diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index a27e1f582..04661707b 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -93,6 +93,18 @@ start_server { assert {[r XACK mystream mygroup $id1 $id2] eq 1} } + test {XACK should fail if got at least one invalid ID} { + r del mystream + r xgroup create s g $ MKSTREAM + r xadd s * f1 v1 + set c [llength [lindex [r xreadgroup group g c streams s >] 0 1]] + assert {$c == 1} + set pending [r xpending s g - + 10 c] + set id1 [lindex $pending 0 0] + assert_error "*Invalid stream ID specified*" {r xack s g $id1 invalid-id} + assert {[r xack s g $id1] eq 1} + } + test {PEL NACK reassignment after XGROUP SETID event} { r del events r xadd events * f1 v1