From 20c6a7fe2c134ad21bfc4ce50e548c3a055e93d0 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 17 Jul 2018 23:57:42 +0800 Subject: [PATCH 1/5] Streams: propagate original MAXLEN argument in XADD context If we rewrite the MAXLEN argument as zero when no trimming was performed, date between master and slave and aof will be inconsistent, because `xtrim maxlen 0` means delete all entries in stream. --- src/t_stream.c | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 72d03b466..5814aa132 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1186,18 +1186,9 @@ void xaddCommand(client *c) { notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); server.dirty++; - /* Remove older elements if MAXLEN was specified. */ - if (maxlen >= 0) { - if (!streamTrimByLength(s,maxlen,approx_maxlen)) { - /* If no trimming was performed, for instance because approximated - * trimming length was specified, rewrite the MAXLEN argument - * as zero, so that the command is propagated without trimming. */ - robj *zeroobj = createStringObjectFromLongLong(0); - rewriteClientCommandArgument(c,maxlen_arg_idx,zeroobj); - decrRefCount(zeroobj); - } else { - notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); - } + /* Notify xtrim event if needed. */ + if (maxlen >= 0 && streamTrimByLength(s,maxlen,approx_maxlen)) { + notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); } /* Let's rewrite the ID argument with the one actually generated for From da6b7516f187b85e7f9a8a2390f02e9c1dc2c7aa Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 00:12:24 +0800 Subject: [PATCH 2/5] Streams: XTRIM will return an error if MAXLEN with a count < 0 --- src/t_stream.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index 5814aa132..bab5b74ff 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2192,7 +2192,7 @@ void xtrimCommand(client *c) { /* Argument parsing. */ int trim_strategy = TRIM_STRATEGY_NONE; - long long maxlen = 0; /* 0 means no maximum length. */ + long long maxlen = -1; /* If left to -1 no trimming is performed. */ int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so the maxium length is not applied verbatim. */ @@ -2211,6 +2211,11 @@ void xtrimCommand(client *c) { } if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) != C_OK) return; + + if (maxlen < 0) { + addReplyError(c,"The MAXLEN argument must be >= 0."); + return; + } i++; } else { addReply(c,shared.syntaxerr); From 14d6318b3225c010f28d26c5563c8140c86c1292 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 00:24:50 +0800 Subject: [PATCH 3/5] Streams: reset approx_maxlen in every maxlen loop --- src/t_stream.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/t_stream.c b/src/t_stream.c index bab5b74ff..7533ba7b0 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1135,6 +1135,7 @@ void xaddCommand(client *c) { * creation. */ break; } else if (!strcasecmp(opt,"maxlen") && moreargs) { + approx_maxlen = 0; char *next = c->argv[i+1]->ptr; /* Check for the form MAXLEN ~ . */ if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { @@ -2202,6 +2203,7 @@ void xtrimCommand(client *c) { int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ char *opt = c->argv[i]->ptr; if (!strcasecmp(opt,"maxlen") && moreargs) { + approx_maxlen = 0; trim_strategy = TRIM_STRATEGY_MAXLEN; char *next = c->argv[i+1]->ptr; /* Check for the form MAXLEN ~ . */ From 9042d1c24966bf229b3fa8d94ada42ebebed7adf Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 01:58:14 +0800 Subject: [PATCH 4/5] Streams: propagate specified MAXLEN instead of approximated Slaves and rebooting redis may have different radix tree struct, by different stream* config options. So propagating approximated MAXLEN to AOF/slaves may lead to date inconsistency. --- src/t_stream.c | 41 +++++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 7533ba7b0..7eaf0c547 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1114,8 +1114,7 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin return streamGenericParseIDOrReply(c,o,id,missing_seq,1); } - -/* XADD key [MAXLEN ] [field value] [field value] ... */ +/* XADD key [MAXLEN [~|=] ] [field value] [field value] ... */ void xaddCommand(client *c) { streamID id; int id_given = 0; /* Was an ID different than "*" specified? */ @@ -1141,6 +1140,8 @@ void xaddCommand(client *c) { if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { approx_maxlen = 1; i++; + } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { + i++; } if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) != C_OK) return; @@ -1187,9 +1188,22 @@ void xaddCommand(client *c) { notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); server.dirty++; - /* Notify xtrim event if needed. */ - if (maxlen >= 0 && streamTrimByLength(s,maxlen,approx_maxlen)) { - notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + if (maxlen >= 0) { + /* Notify xtrim event if needed. */ + if (streamTrimByLength(s,maxlen,approx_maxlen)) { + notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + } + + /* Rewrite approximated MAXLEN as specified s->length. */ + if (approx_maxlen) { + robj *maxlen_obj = createStringObjectFromLongLong(s->length); + rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); + decrRefCount(maxlen_obj); + + robj *equal_obj = createStringObject("=",1); + rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); + decrRefCount(equal_obj); + } } /* Let's rewrite the ID argument with the one actually generated for @@ -2174,7 +2188,7 @@ void xdelCommand(client *c) { * * List of options: * - * MAXLEN [~] -- Trim so that the stream will be capped at + * MAXLEN [~|=] -- Trim so that the stream will be capped at * the specified length. Use ~ before the * count in order to demand approximated trimming * (like XADD MAXLEN option). @@ -2196,6 +2210,7 @@ void xtrimCommand(client *c) { long long maxlen = -1; /* If left to -1 no trimming is performed. */ int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so the maxium length is not applied verbatim. */ + int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */ /* Parse options. */ int i = 2; /* Start of options. */ @@ -2210,6 +2225,8 @@ void xtrimCommand(client *c) { if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { approx_maxlen = 1; i++; + } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { + i++; } if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) != C_OK) return; @@ -2219,6 +2236,7 @@ void xtrimCommand(client *c) { return; } i++; + maxlen_arg_idx = i; } else { addReply(c,shared.syntaxerr); return; @@ -2239,6 +2257,17 @@ void xtrimCommand(client *c) { signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); server.dirty += deleted; + + /* Rewrite approximated MAXLEN as specified s->length. */ + if (approx_maxlen) { + robj *maxlen_obj = createStringObjectFromLongLong(s->length); + rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); + decrRefCount(maxlen_obj); + + robj *equal_obj = createStringObject("=",1); + rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); + decrRefCount(equal_obj); + } } addReplyLongLong(c,deleted); } From 60acac4cd02913385c461465d4cca06d6c015ba7 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 16:55:25 +0800 Subject: [PATCH 5/5] Streams: add test cases for XADD/XTRIM maxlen --- tests/unit/type/stream.tcl | 46 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 5cf6805d7..2b69a2e9e 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -317,3 +317,49 @@ start_server { assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}} } } + +start_server {tags {"stream"} overrides {appendonly yes}} { + test {XADD with MAXLEN > xlen can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN 200 * xitem v + incr j + assert {[r xlen mystream] == $j} + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream"} overrides {appendonly yes}} { + test {XADD with ~ MAXLEN can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN ~ $j * xitem v + incr j + assert {[r xlen mystream] == $j} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} { + test {XTRIM with ~ MAXLEN can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XTRIM mystream MAXLEN ~ 85 + assert {[r xlen mystream] == 89} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == 90} + } +}