From f328194d12e06a748aac7a35b1dda179bfc6cb0f Mon Sep 17 00:00:00 2001 From: Wen Hui Date: Sun, 18 Oct 2020 03:15:43 -0400 Subject: [PATCH] support NOMKSTREAM option in xadd command (#7910) introduces a NOMKSTREAM option for xadd command, this would be useful for some use cases when we do not want to create new stream by default: XADD key [MAXLEN [~|=] ] [NOMKSTREAM] [field value] [field value] --- src/t_stream.c | 13 ++++++++++--- tests/unit/type/stream.tcl | 12 ++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 6eb6363fc..d6f6a3011 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1130,10 +1130,14 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start /* Look the stream at 'key' and return the corresponding stream object. * The function creates a key setting it to an empty stream if needed. */ -robj *streamTypeLookupWriteOrCreate(client *c, robj *key) { +robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) { robj *o = lookupKeyWrite(c->db,key); if (checkType(c,o,OBJ_STREAM)) return NULL; if (o == NULL) { + if (no_create) { + addReplyNull(c); + return NULL; + } o = createStreamObject(); dbAdd(c->db,key,o); } @@ -1214,7 +1218,7 @@ void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) { decrRefCount(maxlen_obj); } -/* XADD key [MAXLEN [~|=] ] [field value] [field value] ... */ +/* XADD key [MAXLEN [~|=] ] [NOMKSTREAM] [field value] [field value] ... */ void xaddCommand(client *c) { streamID id; int id_given = 0; /* Was an ID different than "*" specified? */ @@ -1222,6 +1226,7 @@ void xaddCommand(client *c) { int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so the maximum length is not applied verbatim. */ int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */ + int no_mkstream = 0; /* if set to 1 do not create new stream */ /* Parse options. */ int i = 2; /* This is the first argument position where we could @@ -1252,6 +1257,8 @@ void xaddCommand(client *c) { } i++; maxlen_arg_idx = i; + } else if (!strcasecmp(opt,"nomkstream")) { + no_mkstream = 1; } else { /* If we are here is a syntax error or a valid ID. */ if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return; @@ -1278,7 +1285,7 @@ void xaddCommand(client *c) { /* Lookup the stream at key. */ robj *o; stream *s; - if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return; + if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],no_mkstream)) == NULL) return; s = o->ptr; /* Return ASAP if the stream has reached the last possible ID */ diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 0ff570cab..b02d6a0fb 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -102,6 +102,18 @@ start_server { } } + test {XADD with NOMKSTREAM option} { + r DEL mystream + assert_equal "" [r XADD mystream NOMKSTREAM * item 1 value a] + assert_equal 0 [r EXISTS mystream] + r XADD mystream * item 1 value a + r XADD mystream NOMKSTREAM * item 2 value b + assert_equal 2 [r XLEN mystream] + set items [r XRANGE mystream - +] + assert_equal [lindex $items 0 1] {item 1 value a} + assert_equal [lindex $items 1 1] {item 2 value b} + } + test {XADD mass insertion and XLEN} { r DEL mystream r multi