Adds auto-seq-only-generation via XADD ... <ms>-* (#9217)

Adds the ability to autogenerate the sequence part of the millisecond-only explicit ID specified for `XADD`. This is useful in case added entries have an externally-provided timestamp without sub-millisecond resolution.
This commit is contained in:
Itamar Haber 2021-11-30 19:56:39 +02:00 committed by GitHub
parent 2afa41f628
commit 21aa1d4b91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 107 additions and 28 deletions

View File

@ -4232,7 +4232,8 @@ int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisM
use_id.seq = id->seq;
use_id_ptr = &use_id;
}
if (streamAppendItem(s, argv, numfields, &added_id, use_id_ptr) == C_ERR) {
if (streamAppendItem(s,argv,numfields,&added_id,use_id_ptr,1) == C_ERR) {
/* Either the ID not greater than all existing IDs in the stream, or
* the elements are too large to be stored. either way, errno is already
* set by streamAppendItem. */

View File

@ -129,7 +129,7 @@ robj *streamDup(robj *o);
int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep);
int streamParseID(const robj *o, streamID *id);
robj *createObjectFromStreamID(streamID *id);
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id);
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given);
int streamDeleteItem(stream *s, streamID *id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);

View File

@ -56,7 +56,7 @@
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given);
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
/* -----------------------------------------------------------------------
@ -418,6 +418,10 @@ void streamGetEdgeID(stream *s, int first, streamID *edge_id)
* If 'use_id' is not NULL, the ID is not auto-generated by the function,
* but instead the passed ID is used to add the new entry. In this case
* adding the entry may fail as specified later in this comment.
*
* When 'use_id' is used alongside with a zero 'seq-given', the sequence
* part of the passed ID is ignored and the function will attempt to use an
* auto-generated sequence.
*
* The function returns C_OK if the item was added, this is always true
* if the ID was generated by the function. However the function may return
@ -426,14 +430,31 @@ void streamGetEdgeID(stream *s, int first, streamID *edge_id)
* current top ID is greater or equal. errno will be set to EDOM.
* 2. If a size of a single element or the sum of the elements is too big to
* be stored into the stream. errno will be set to ERANGE. */
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id) {
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given) {
/* Generate the new entry ID. */
streamID id;
if (use_id)
id = *use_id;
else
if (use_id) {
if (seq_given) {
id = *use_id;
} else {
/* The automatically generated sequence can be either zero (new
* timestamps) or the incremented sequence of the last ID. In the
* latter case, we need to prevent an overflow/advancing forward
* in time. */
if (s->last_id.ms == use_id->ms) {
if (s->last_id.seq == UINT64_MAX) {
return C_ERR;
}
id = s->last_id;
id.seq++;
} else {
id = *use_id;
}
}
} else {
streamNextID(&s->last_id,&id);
}
/* Check that the new ID is greater than the last entry ID
* or return an error. Automatically generated IDs might
@ -652,6 +673,7 @@ typedef struct {
/* XADD options */
streamID id; /* User-provided ID, for XADD only. */
int id_given; /* Was an ID different than "*" specified? for XADD only. */
int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */
int no_mkstream; /* if set to 1 do not create new stream */
/* XADD + XTRIM common options */
@ -929,7 +951,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i
i++;
}
if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0) != C_OK)
if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK)
return -1;
i++;
@ -955,7 +977,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i
args->no_mkstream = 1;
} else if (xadd) {
/* If we are here is a syntax error or a valid ID. */
if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0) != C_OK)
if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0,&args->seq_given) != C_OK)
return -1;
args->id_given = 1;
break;
@ -1669,8 +1691,13 @@ robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) {
* that can be represented. If 'strict' is set to 1, "-" and "+" will be
* treated as an invalid ID.
*
* The ID form <ms>-* specifies a millisconds-only ID, leaving the sequence part
* to be autogenerated. When a non-NULL 'seq_given' argument is provided, this
* form is accepted and the argument is set to 0 unless the sequence part is
* specified.
*
* If 'c' is set to NULL, no reply is sent to the client. */
int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict) {
int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) {
char buf[128];
if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid;
memcpy(buf,o->ptr,sdslen(o->ptr)+1);
@ -1678,6 +1705,10 @@ int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t
if (strict && (buf[0] == '-' || buf[0] == '+') && buf[1] == '\0')
goto invalid;
if (seq_given != NULL) {
*seq_given = 1;
}
/* Handle the "-" and "+" special cases. */
if (buf[0] == '-' && buf[1] == '\0') {
id->ms = 0;
@ -1690,12 +1721,22 @@ int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t
}
/* Parse <ms>-<seq> form. */
unsigned long long ms, seq;
char *dot = strchr(buf,'-');
if (dot) *dot = '\0';
unsigned long long ms, seq;
if (string2ull(buf,&ms) == 0) goto invalid;
if (dot && string2ull(dot+1,&seq) == 0) goto invalid;
if (!dot) seq = missing_seq;
if (dot) {
size_t seqlen = strlen(dot+1);
if (seq_given != NULL && seqlen == 1 && *(dot + 1) == '*') {
/* Handle the <ms>-* form. */
seq = 0;
*seq_given = 0;
} else if (string2ull(dot+1,&seq) == 0) {
goto invalid;
}
} else {
seq = missing_seq;
}
id->ms = ms;
id->seq = seq;
return C_OK;
@ -1708,20 +1749,20 @@ invalid:
/* Wrapper for streamGenericParseIDOrReply() used by module API. */
int streamParseID(const robj *o, streamID *id) {
return streamGenericParseIDOrReply(NULL, o, id, 0, 0);
return streamGenericParseIDOrReply(NULL,o,id,0,0,NULL);
}
/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
* 0, to be used when - and + are acceptable IDs. */
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
return streamGenericParseIDOrReply(c,o,id,missing_seq,0);
return streamGenericParseIDOrReply(c,o,id,missing_seq,0,NULL);
}
/* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to
* 1, to be used when we want to return an error if the special IDs + or -
* are provided. */
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) {
return streamGenericParseIDOrReply(c,o,id,missing_seq,1);
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given) {
return streamGenericParseIDOrReply(c,o,id,missing_seq,1,seq_given);
}
/* Helper for parsing a stream ID that is a range query interval. When the
@ -1738,7 +1779,7 @@ int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude,
if (exclude != NULL) *exclude = (len > 1 && p[0] == '(');
if (exclude != NULL && *exclude) {
robj *t = createStringObject(p+1,len-1);
invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq) == C_ERR);
invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR);
decrRefCount(t);
} else
invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR);
@ -1785,7 +1826,7 @@ void xaddCommand(client *c) {
/* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
* a new stream and have streamAppendItem fail, leaving an empty key in the
* database. */
if (parsed_args.id_given &&
if (parsed_args.id_given && parsed_args.seq_given &&
parsed_args.id.ms == 0 && parsed_args.id.seq == 0)
{
addReplyError(c,"The ID specified in XADD must be greater than 0-0");
@ -1808,7 +1849,7 @@ void xaddCommand(client *c) {
/* Append using the low level function and return the ID. */
streamID id;
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
&id, parsed_args.id_given ? &parsed_args.id : NULL) == C_ERR)
&id,parsed_args.id_given ? &parsed_args.id : NULL,parsed_args.seq_given) == C_ERR)
{
if (errno == EDOM)
addReplyError(c,"The ID specified in XADD is equal or smaller than "
@ -1841,7 +1882,7 @@ void xaddCommand(client *c) {
/* Let's rewrite the ID argument with the one actually generated for
* AOF/replication propagation. */
if (!parsed_args.id_given) {
if (!parsed_args.id_given || !parsed_args.seq_given) {
robj *idarg = createObjectFromStreamID(&id);
rewriteClientCommandArgument(c, idpos, idarg);
decrRefCount(idarg);
@ -2080,7 +2121,7 @@ void xreadCommand(client *c) {
ids[id_idx].seq = UINT64_MAX;
continue;
}
if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0) != C_OK)
if (streamParseStrictIDOrReply(c,c->argv[i],ids+id_idx,0,NULL) != C_OK)
goto cleanup;
}
@ -2428,7 +2469,7 @@ NULL
id.ms = 0;
id.seq = 0;
}
} else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0) != C_OK) {
} else if (streamParseStrictIDOrReply(c,c->argv[4],&id,0,NULL) != C_OK) {
return;
}
@ -2505,7 +2546,7 @@ void xsetidCommand(client *c) {
stream *s = o->ptr;
streamID id;
if (streamParseStrictIDOrReply(c,c->argv[2],&id,0) != C_OK) return;
if (streamParseStrictIDOrReply(c,c->argv[2],&id,0,NULL) != C_OK) return;
/* If the stream has at least one item, we want to check that the user
* is setting a last ID that is equal or greater than the current top
@ -2559,7 +2600,7 @@ void xackCommand(client *c) {
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (int j = 3; j < c->argc; j++) {
if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0) != C_OK) goto cleanup;
if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-3],0,NULL) != C_OK) goto cleanup;
}
int acknowledged = 0;
@ -2873,7 +2914,7 @@ void xclaimCommand(client *c) {
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (j = 5; j < c->argc; j++) {
if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0) != C_OK) break;
if (streamParseStrictIDOrReply(NULL,c->argv[j],&ids[j-5],0,NULL) != C_OK) break;
}
int last_id_arg = j-1; /* Next time we iterate the IDs we now the range. */
@ -2907,7 +2948,7 @@ void xclaimCommand(client *c) {
!= C_OK) goto cleanup;
} else if (!strcasecmp(opt,"LASTID") && moreargs) {
j++;
if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0) != C_OK) goto cleanup;
if (streamParseStrictIDOrReply(c,c->argv[j],&last_id,0,NULL) != C_OK) goto cleanup;
} else {
addReplyErrorFormat(c,"Unrecognized XCLAIM option '%s'",opt);
goto cleanup;
@ -3213,7 +3254,7 @@ void xdelCommand(client *c) {
if (id_count > STREAMID_STATIC_VECTOR_LEN)
ids = zmalloc(sizeof(streamID)*id_count);
for (int j = 2; j < c->argc; j++) {
if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0) != C_OK) goto cleanup;
if (streamParseStrictIDOrReply(c,c->argv[j],&ids[j-2],0,NULL) != C_OK) goto cleanup;
}
/* Actually apply the command. */

View File

@ -85,6 +85,43 @@ start_server {
assert_error ERR* {r xadd mystream * c d}
}
test {XADD auto-generated sequence is incremented for last ID} {
r DEL mystream
set id1 [r XADD mystream 123-456 item 1 value a]
set id2 [r XADD mystream 123-* item 2 value b]
lassign [split $id2 -] _ seq
assert {$seq == 457}
assert {[streamCompareID $id1 $id2] == -1}
}
test {XADD auto-generated sequence is zero for future timestamp ID} {
r DEL mystream
set id1 [r XADD mystream 123-456 item 1 value a]
set id2 [r XADD mystream 789-* item 2 value b]
lassign [split $id2 -] _ seq
assert {$seq == 0}
assert {[streamCompareID $id1 $id2] == -1}
}
test {XADD auto-generated sequence can't be smaller than last ID} {
r DEL mystream
r XADD mystream 123-456 item 1 value a
assert_error ERR* {r XADD mystream 42-* item 2 value b}
}
test {XADD auto-generated sequence can't overflow} {
r DEL mystream
r xadd mystream 1-18446744073709551615 a b
assert_error ERR* {r xadd mystream 1-* c d}
}
test {XADD 0-* should succeed} {
r DEL mystream
set id [r xadd mystream 0-* a b]
lassign [split $id -] _ seq
assert {$seq == 1}
}
test {XADD with MAXLEN option} {
r DEL mystream
for {set j 0} {$j < 1000} {incr j} {