From 7be529790b5f23c8abbca3a3e493903144fb2131 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 17 Jul 2018 23:57:42 +0800 Subject: [PATCH 1/5] Streams: propagate original MAXLEN argument in XADD context If we rewrite the MAXLEN argument as zero when no trimming was performed, date between master and slave and aof will be inconsistent, because `xtrim maxlen 0` means delete all entries in stream. --- src/t_stream.c | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 72d03b466..5814aa132 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1186,18 +1186,9 @@ void xaddCommand(client *c) { notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); server.dirty++; - /* Remove older elements if MAXLEN was specified. */ - if (maxlen >= 0) { - if (!streamTrimByLength(s,maxlen,approx_maxlen)) { - /* If no trimming was performed, for instance because approximated - * trimming length was specified, rewrite the MAXLEN argument - * as zero, so that the command is propagated without trimming. */ - robj *zeroobj = createStringObjectFromLongLong(0); - rewriteClientCommandArgument(c,maxlen_arg_idx,zeroobj); - decrRefCount(zeroobj); - } else { - notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); - } + /* Notify xtrim event if needed. */ + if (maxlen >= 0 && streamTrimByLength(s,maxlen,approx_maxlen)) { + notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); } /* Let's rewrite the ID argument with the one actually generated for From c2552025c33a8d2c139ce4519b95b5fbe4797629 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 00:12:24 +0800 Subject: [PATCH 2/5] Streams: XTRIM will return an error if MAXLEN with a count < 0 --- src/t_stream.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index 5814aa132..bab5b74ff 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2192,7 +2192,7 @@ void xtrimCommand(client *c) { /* Argument parsing. */ int trim_strategy = TRIM_STRATEGY_NONE; - long long maxlen = 0; /* 0 means no maximum length. */ + long long maxlen = -1; /* If left to -1 no trimming is performed. */ int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so the maxium length is not applied verbatim. */ @@ -2211,6 +2211,11 @@ void xtrimCommand(client *c) { } if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) != C_OK) return; + + if (maxlen < 0) { + addReplyError(c,"The MAXLEN argument must be >= 0."); + return; + } i++; } else { addReply(c,shared.syntaxerr); From 7a22f89e9d7f80ea5f1b429110e4449a375f2177 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 00:24:50 +0800 Subject: [PATCH 3/5] Streams: reset approx_maxlen in every maxlen loop --- src/t_stream.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/t_stream.c b/src/t_stream.c index bab5b74ff..7533ba7b0 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1135,6 +1135,7 @@ void xaddCommand(client *c) { * creation. */ break; } else if (!strcasecmp(opt,"maxlen") && moreargs) { + approx_maxlen = 0; char *next = c->argv[i+1]->ptr; /* Check for the form MAXLEN ~ . */ if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { @@ -2202,6 +2203,7 @@ void xtrimCommand(client *c) { int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ char *opt = c->argv[i]->ptr; if (!strcasecmp(opt,"maxlen") && moreargs) { + approx_maxlen = 0; trim_strategy = TRIM_STRATEGY_MAXLEN; char *next = c->argv[i+1]->ptr; /* Check for the form MAXLEN ~ . */ From d25a295bc1b4fd7387298efc51b2c4f05f2bb726 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 01:58:14 +0800 Subject: [PATCH 4/5] Streams: propagate specified MAXLEN instead of approximated Slaves and rebooting redis may have different radix tree struct, by different stream* config options. So propagating approximated MAXLEN to AOF/slaves may lead to date inconsistency. --- src/t_stream.c | 41 +++++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 7533ba7b0..7eaf0c547 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1114,8 +1114,7 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin return streamGenericParseIDOrReply(c,o,id,missing_seq,1); } - -/* XADD key [MAXLEN ] [field value] [field value] ... */ +/* XADD key [MAXLEN [~|=] ] [field value] [field value] ... */ void xaddCommand(client *c) { streamID id; int id_given = 0; /* Was an ID different than "*" specified? */ @@ -1141,6 +1140,8 @@ void xaddCommand(client *c) { if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { approx_maxlen = 1; i++; + } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { + i++; } if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) != C_OK) return; @@ -1187,9 +1188,22 @@ void xaddCommand(client *c) { notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); server.dirty++; - /* Notify xtrim event if needed. */ - if (maxlen >= 0 && streamTrimByLength(s,maxlen,approx_maxlen)) { - notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + if (maxlen >= 0) { + /* Notify xtrim event if needed. */ + if (streamTrimByLength(s,maxlen,approx_maxlen)) { + notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + } + + /* Rewrite approximated MAXLEN as specified s->length. */ + if (approx_maxlen) { + robj *maxlen_obj = createStringObjectFromLongLong(s->length); + rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); + decrRefCount(maxlen_obj); + + robj *equal_obj = createStringObject("=",1); + rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); + decrRefCount(equal_obj); + } } /* Let's rewrite the ID argument with the one actually generated for @@ -2174,7 +2188,7 @@ void xdelCommand(client *c) { * * List of options: * - * MAXLEN [~] -- Trim so that the stream will be capped at + * MAXLEN [~|=] -- Trim so that the stream will be capped at * the specified length. Use ~ before the * count in order to demand approximated trimming * (like XADD MAXLEN option). @@ -2196,6 +2210,7 @@ void xtrimCommand(client *c) { long long maxlen = -1; /* If left to -1 no trimming is performed. */ int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so the maxium length is not applied verbatim. */ + int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */ /* Parse options. */ int i = 2; /* Start of options. */ @@ -2210,6 +2225,8 @@ void xtrimCommand(client *c) { if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { approx_maxlen = 1; i++; + } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { + i++; } if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) != C_OK) return; @@ -2219,6 +2236,7 @@ void xtrimCommand(client *c) { return; } i++; + maxlen_arg_idx = i; } else { addReply(c,shared.syntaxerr); return; @@ -2239,6 +2257,17 @@ void xtrimCommand(client *c) { signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); server.dirty += deleted; + + /* Rewrite approximated MAXLEN as specified s->length. */ + if (approx_maxlen) { + robj *maxlen_obj = createStringObjectFromLongLong(s->length); + rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); + decrRefCount(maxlen_obj); + + robj *equal_obj = createStringObject("=",1); + rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); + decrRefCount(equal_obj); + } } addReplyLongLong(c,deleted); } From d7dc6851ff4e814bc288c2c54e7f2a102dfa3526 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 16:55:25 +0800 Subject: [PATCH 5/5] Streams: add test cases for XADD/XTRIM maxlen --- tests/unit/type/stream.tcl | 46 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 5cf6805d7..2b69a2e9e 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -317,3 +317,49 @@ start_server { assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}} } } + +start_server {tags {"stream"} overrides {appendonly yes}} { + test {XADD with MAXLEN > xlen can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN 200 * xitem v + incr j + assert {[r xlen mystream] == $j} + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream"} overrides {appendonly yes}} { + test {XADD with ~ MAXLEN can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN ~ $j * xitem v + incr j + assert {[r xlen mystream] == $j} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} { + test {XTRIM with ~ MAXLEN can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XTRIM mystream MAXLEN ~ 85 + assert {[r xlen mystream] == 89} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == 90} + } +}