From 97716ce5435465691eb02951d883974212f99638 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 17 Jul 2018 23:57:42 +0800 Subject: [PATCH 1/5] Streams: propagate original MAXLEN argument in XADD context If we rewrite the MAXLEN argument as zero when no trimming was performed, date between master and slave and aof will be inconsistent, because `xtrim maxlen 0` means delete all entries in stream. --- src/t_stream.c | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 72d03b466..5814aa132 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1186,18 +1186,9 @@ void xaddCommand(client *c) { notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); server.dirty++; - /* Remove older elements if MAXLEN was specified. */ - if (maxlen >= 0) { - if (!streamTrimByLength(s,maxlen,approx_maxlen)) { - /* If no trimming was performed, for instance because approximated - * trimming length was specified, rewrite the MAXLEN argument - * as zero, so that the command is propagated without trimming. */ - robj *zeroobj = createStringObjectFromLongLong(0); - rewriteClientCommandArgument(c,maxlen_arg_idx,zeroobj); - decrRefCount(zeroobj); - } else { - notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); - } + /* Notify xtrim event if needed. */ + if (maxlen >= 0 && streamTrimByLength(s,maxlen,approx_maxlen)) { + notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); } /* Let's rewrite the ID argument with the one actually generated for From aa428375b83b2cf9f48ba9ba497380817dc522b9 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 00:12:24 +0800 Subject: [PATCH 2/5] Streams: XTRIM will return an error if MAXLEN with a count < 0 --- src/t_stream.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/t_stream.c b/src/t_stream.c index 5814aa132..bab5b74ff 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2192,7 +2192,7 @@ void xtrimCommand(client *c) { /* Argument parsing. */ int trim_strategy = TRIM_STRATEGY_NONE; - long long maxlen = 0; /* 0 means no maximum length. */ + long long maxlen = -1; /* If left to -1 no trimming is performed. */ int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so the maxium length is not applied verbatim. */ @@ -2211,6 +2211,11 @@ void xtrimCommand(client *c) { } if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) != C_OK) return; + + if (maxlen < 0) { + addReplyError(c,"The MAXLEN argument must be >= 0."); + return; + } i++; } else { addReply(c,shared.syntaxerr); From faaf2dd35660dd4fecb997143a8da404af56bf87 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 00:24:50 +0800 Subject: [PATCH 3/5] Streams: reset approx_maxlen in every maxlen loop --- src/t_stream.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/t_stream.c b/src/t_stream.c index bab5b74ff..7533ba7b0 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1135,6 +1135,7 @@ void xaddCommand(client *c) { * creation. */ break; } else if (!strcasecmp(opt,"maxlen") && moreargs) { + approx_maxlen = 0; char *next = c->argv[i+1]->ptr; /* Check for the form MAXLEN ~ . */ if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { @@ -2202,6 +2203,7 @@ void xtrimCommand(client *c) { int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ char *opt = c->argv[i]->ptr; if (!strcasecmp(opt,"maxlen") && moreargs) { + approx_maxlen = 0; trim_strategy = TRIM_STRATEGY_MAXLEN; char *next = c->argv[i+1]->ptr; /* Check for the form MAXLEN ~ . */ From 7a6234405c3218a4322c844e54ad403eb82e3961 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 01:58:14 +0800 Subject: [PATCH 4/5] Streams: propagate specified MAXLEN instead of approximated Slaves and rebooting redis may have different radix tree struct, by different stream* config options. So propagating approximated MAXLEN to AOF/slaves may lead to date inconsistency. --- src/t_stream.c | 41 +++++++++++++++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 6 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 7533ba7b0..7eaf0c547 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1114,8 +1114,7 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin return streamGenericParseIDOrReply(c,o,id,missing_seq,1); } - -/* XADD key [MAXLEN ] [field value] [field value] ... */ +/* XADD key [MAXLEN [~|=] ] [field value] [field value] ... */ void xaddCommand(client *c) { streamID id; int id_given = 0; /* Was an ID different than "*" specified? */ @@ -1141,6 +1140,8 @@ void xaddCommand(client *c) { if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { approx_maxlen = 1; i++; + } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { + i++; } if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) != C_OK) return; @@ -1187,9 +1188,22 @@ void xaddCommand(client *c) { notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); server.dirty++; - /* Notify xtrim event if needed. */ - if (maxlen >= 0 && streamTrimByLength(s,maxlen,approx_maxlen)) { - notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + if (maxlen >= 0) { + /* Notify xtrim event if needed. */ + if (streamTrimByLength(s,maxlen,approx_maxlen)) { + notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + } + + /* Rewrite approximated MAXLEN as specified s->length. */ + if (approx_maxlen) { + robj *maxlen_obj = createStringObjectFromLongLong(s->length); + rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); + decrRefCount(maxlen_obj); + + robj *equal_obj = createStringObject("=",1); + rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); + decrRefCount(equal_obj); + } } /* Let's rewrite the ID argument with the one actually generated for @@ -2174,7 +2188,7 @@ void xdelCommand(client *c) { * * List of options: * - * MAXLEN [~] -- Trim so that the stream will be capped at + * MAXLEN [~|=] -- Trim so that the stream will be capped at * the specified length. Use ~ before the * count in order to demand approximated trimming * (like XADD MAXLEN option). @@ -2196,6 +2210,7 @@ void xtrimCommand(client *c) { long long maxlen = -1; /* If left to -1 no trimming is performed. */ int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so the maxium length is not applied verbatim. */ + int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */ /* Parse options. */ int i = 2; /* Start of options. */ @@ -2210,6 +2225,8 @@ void xtrimCommand(client *c) { if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { approx_maxlen = 1; i++; + } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { + i++; } if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) != C_OK) return; @@ -2219,6 +2236,7 @@ void xtrimCommand(client *c) { return; } i++; + maxlen_arg_idx = i; } else { addReply(c,shared.syntaxerr); return; @@ -2239,6 +2257,17 @@ void xtrimCommand(client *c) { signalModifiedKey(c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); server.dirty += deleted; + + /* Rewrite approximated MAXLEN as specified s->length. */ + if (approx_maxlen) { + robj *maxlen_obj = createStringObjectFromLongLong(s->length); + rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); + decrRefCount(maxlen_obj); + + robj *equal_obj = createStringObject("=",1); + rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); + decrRefCount(equal_obj); + } } addReplyLongLong(c,deleted); } From fedb5c635309e77e51b9ec3e1679975c554bed93 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Wed, 18 Jul 2018 16:55:25 +0800 Subject: [PATCH 5/5] Streams: add test cases for XADD/XTRIM maxlen --- tests/unit/type/stream.tcl | 46 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 46 insertions(+) diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 5cf6805d7..2b69a2e9e 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -317,3 +317,49 @@ start_server { assert_equal [r xrevrange teststream2 1234567891245 -] {{1234567891240-0 {key1 value2}} {1234567891230-0 {key1 value1}}} } } + +start_server {tags {"stream"} overrides {appendonly yes}} { + test {XADD with MAXLEN > xlen can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN 200 * xitem v + incr j + assert {[r xlen mystream] == $j} + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream"} overrides {appendonly yes}} { + test {XADD with ~ MAXLEN can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN ~ $j * xitem v + incr j + assert {[r xlen mystream] == $j} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} { + test {XTRIM with ~ MAXLEN can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XTRIM mystream MAXLEN ~ 85 + assert {[r xlen mystream] == 89} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == 90} + } +}