support NOMKSTREAM option in xadd command (#7910)

introduces a NOMKSTREAM option for xadd command, this would be useful for some
use cases when we do not want to create new stream by default:

XADD key [MAXLEN [~|=] <count>] [NOMKSTREAM] <ID or *> [field value] [field value]
This commit is contained in:
Wen Hui 2020-10-18 03:15:43 -04:00 committed by GitHub
parent 1cfb63b699
commit 38f55cc3aa
2 changed files with 22 additions and 3 deletions

View File

@ -1130,10 +1130,14 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
/* Look the stream at 'key' and return the corresponding stream object.
* The function creates a key setting it to an empty stream if needed. */
robj *streamTypeLookupWriteOrCreate(client *c, robj *key) {
robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
robj *o = lookupKeyWrite(c->db,key);
if (checkType(c,o,OBJ_STREAM)) return NULL;
if (o == NULL) {
if (no_create) {
addReplyNull(c);
return NULL;
}
o = createStreamObject();
dbAdd(c->db,key,o);
}
@ -1214,7 +1218,7 @@ void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) {
decrRefCount(maxlen_obj);
}
/* XADD key [MAXLEN [~|=] <count>] <ID or *> [field value] [field value] ... */
/* XADD key [MAXLEN [~|=] <count>] [NOMKSTREAM] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
streamID id;
int id_given = 0; /* Was an ID different than "*" specified? */
@ -1222,6 +1226,7 @@ void xaddCommand(client *c) {
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
the maximum length is not applied verbatim. */
int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
int no_mkstream = 0; /* if set to 1 do not create new stream */
/* Parse options. */
int i = 2; /* This is the first argument position where we could
@ -1252,6 +1257,8 @@ void xaddCommand(client *c) {
}
i++;
maxlen_arg_idx = i;
} else if (!strcasecmp(opt,"nomkstream")) {
no_mkstream = 1;
} else {
/* If we are here is a syntax error or a valid ID. */
if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
@ -1278,7 +1285,7 @@ void xaddCommand(client *c) {
/* Lookup the stream at key. */
robj *o;
stream *s;
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],no_mkstream)) == NULL) return;
s = o->ptr;
/* Return ASAP if the stream has reached the last possible ID */

View File

@ -102,6 +102,18 @@ start_server {
}
}
test {XADD with NOMKSTREAM option} {
r DEL mystream
assert_equal "" [r XADD mystream NOMKSTREAM * item 1 value a]
assert_equal 0 [r EXISTS mystream]
r XADD mystream * item 1 value a
r XADD mystream NOMKSTREAM * item 2 value b
assert_equal 2 [r XLEN mystream]
set items [r XRANGE mystream - +]
assert_equal [lindex $items 0 1] {item 1 value a}
assert_equal [lindex $items 1 1] {item 2 value b}
}
test {XADD mass insertion and XLEN} {
r DEL mystream
r multi