From 814aad65f16ed27cb68c4177e4f8b61d7edb31e1 Mon Sep 17 00:00:00 2001 From: guybe7 Date: Fri, 8 Jan 2021 18:13:25 +0200 Subject: [PATCH] XADD and XTRIM, Trim by MINID, and new LIMIT argument (#8169) This PR adds another trimming strategy to XADD and XTRIM named MINID (complements the existing MAXLEN). It also adds a new LIMIT argument that allows incremental trimming by repeated calls (rather than all at once). This provides the ability to trim all records older than a certain ID (which makes it possible for the user to trim by age too). Example: XTRIM mystream MINID ~ 1608540753 will trim entries with id < 1608540753, but might not trim all (because of the ~ modifier) The purpose is to ease the use of streams. many users use streams as logs and the common case is wanting a log of the last X seconds rather than a log that contains maximum X entries (new MINID vs existing MAXLEN) The new LIMIT modifier is only supported when the trim strategy uses ~. i.e. when the user asked for exact trimming, it all happens in one go (no possibility for incremental trimming). However, when ~ is provided, we trim full rax nodes, up to the limit number of records. The default limit is 100*stream_node_max_entries (used when LIMIT is not provided). I.e. this is a behavior change (even if the existing MAXLEN strategy is used). An explicit limit of 0 means unlimited (but note that it's not the default). Other changes: Refactor arg parsing code for XADD and XTRIM to use common code. --- src/t_stream.c | 583 +++++++++++++++++++++++++++---------- tests/unit/type/stream.tcl | 140 ++++++++- 2 files changed, 571 insertions(+), 152 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index d085a8948..f991765eb 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -46,6 +46,8 @@ void streamFreeCG(streamCG *cg); void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); +int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); +int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. @@ -282,6 +284,65 @@ static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) { #define lpGetInteger(ele) lpGetIntegerIfValid(ele, NULL) +/* Get an edge streamID of a given listpack. + * 'master_id' is an input param, used to build the 'edge_id' output param */ +int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id) +{ + if (lp == NULL) + return 0; + + unsigned char *lp_ele; + + /* We need to seek either the first or the last entry depending + * on the direction of the iteration. */ + if (first) { + /* Get the master fields count. */ + lp_ele = lpFirst(lp); /* Seek items count */ + lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */ + lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */ + int64_t master_fields_count = lpGetInteger(lp_ele); + lp_ele = lpNext(lp, lp_ele); /* Seek first field. */ + + /* If we are iterating in normal order, skip the master fields + * to seek the first actual entry. */ + for (int64_t i = 0; i < master_fields_count; i++) + lp_ele = lpNext(lp, lp_ele); + + /* If we are going forward, skip the previous entry's + * lp-count field (or in case of the master entry, the zero + * term field) */ + lp_ele = lpNext(lp, lp_ele); + if (lp_ele == NULL) + return 0; + } else { + /* If we are iterating in reverse direction, just seek the + * last part of the last entry in the listpack (that is, the + * fields count). */ + lp_ele = lpLast(lp); + + /* If we are going backward, read the number of elements this + * entry is composed of, and jump backward N times to seek + * its start. */ + int64_t lp_count = lpGetInteger(lp_ele); + if (lp_count == 0) /* We reached the master entry. */ + return 0; + + while (lp_count--) + lp_ele = lpPrev(lp, lp_ele); + } + + lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */ + + /* Get the ID: it is encoded as difference between the master + * ID and this entry ID. */ + streamID id = *master_id; + id.ms += lpGetInteger(lp_ele); + lp_ele = lpNext(lp, lp_ele); + id.seq += lpGetInteger(lp_ele); + *edge_id = id; + return 1; +} + /* Debugging function to log the full content of a listpack. Useful * for development and debugging. */ void streamLogListpackContent(unsigned char *lp) { @@ -325,6 +386,39 @@ int streamCompareID(streamID *a, streamID *b) { return 0; } +void streamGetEdgeID(stream *s, int first, streamID *edge_id) +{ + raxIterator ri; + raxStart(&ri, s->rax); + int empty; + if (first) { + raxSeek(&ri, "^", NULL, 0); + empty = !raxNext(&ri); + } else { + raxSeek(&ri, "$", NULL, 0); + empty = !raxPrev(&ri); + } + + if (empty) { + /* Stream is empty, mark edge ID as lowest/highest possible. */ + edge_id->ms = first ? UINT64_MAX : 0; + edge_id->seq = first ? UINT64_MAX : 0; + raxStop(&ri); + return; + } + + unsigned char *lp = ri.data; + + /* Read the master ID from the radix tree key. */ + streamID master_id; + streamDecodeID(ri.key, &master_id); + + /* Construct edge ID. */ + lpGetEdgeStreamID(lp, first, &master_id, edge_id); + + raxStop(&ri); +} + /* Adds a new item into the stream 's' having the specified number of * field-value pairs as specified in 'numfields' and stored into 'argv'. * Returns the new entry ID populating the 'added_id' structure. @@ -525,35 +619,96 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ return C_OK; } -/* Trim the stream 's' to have no more than maxlen elements, and return the +typedef struct { + /* XADD options */ + streamID id; /* User-provided ID, for XADD only. */ + int id_given; /* Was an ID different than "*" specified? for XADD only. */ + int no_mkstream; /* if set to 1 do not create new stream */ + + /* XADD + XTRIM common options */ + int trim_strategy; /* TRIM_STRATEGY_* */ + int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */ + int approx_trim; /* If 1 only delete whole radix tree nodes, so + * the trim argument is not applied verbatim. */ + long long limit; /* Maximum amount of entries to trim. If 0, no limitation + * on the amount of trimming work is enforced. */ + /* TRIM_STRATEGY_MAXLEN options */ + long long maxlen; /* After trimming, leave stream at this length . */ + /* TRIM_STRATEGY_MINID options */ + streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */ +} streamAddTrimArgs; + +#define TRIM_STRATEGY_NONE 0 +#define TRIM_STRATEGY_MAXLEN 1 +#define TRIM_STRATEGY_MINID 2 + +/* Trim the stream 's' according to args->trim_strategy, and return the * number of elements removed from the stream. The 'approx' option, if non-zero, * specifies that the trimming must be performed in a approximated way in * order to maximize performances. This means that the stream may contain - * more elements than 'maxlen', and elements are only removed if we can remove + * entries with IDs < 'id' in case of MINID (or more elements than 'maxlen' + * in case of MAXLEN), and elements are only removed if we can remove * a *whole* node of the radix tree. The elements are removed from the head * of the stream (older elements). * * The function may return zero if: * - * 1) The stream is already shorter or equal to the specified max length. - * 2) The 'approx' option is true and the head node had not enough elements - * to be deleted, leaving the stream with a number of elements >= maxlen. + * 1) The minimal entry ID of the stream is already < 'id' (MINID); or + * 2) The stream is already shorter or equal to the specified max length (MAXLEN); or + * 3) The 'approx' option is true and the head node did not have enough elements + * to be deleted. + * + * args->limit is the maximum number of entries to delete. The purpose is to + * prevent this function from taking to long. + * If 'limit' is 0 then we do not limit the number of deleted entries. + * Much like the 'approx', if 'limit' is smaller than the number of entries + * that should be trimmed, there is a chance we will still have entries with + * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN). */ -int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { - if (s->length <= maxlen) return 0; +int64_t streamTrim(stream *s, streamAddTrimArgs *args) { + size_t maxlen = args->maxlen; + streamID *id = &args->minid; + int approx = args->approx_trim; + int64_t limit = args->limit; + int trim_strategy = args->trim_strategy; + + if (trim_strategy == TRIM_STRATEGY_NONE) + return 0; raxIterator ri; raxStart(&ri,s->rax); raxSeek(&ri,"^",NULL,0); int64_t deleted = 0; - while(s->length > maxlen && raxNext(&ri)) { + while (raxNext(&ri)) { + /* Check if we exceeded the amount of work we could do */ + if (limit && deleted >= limit) + break; + + if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen) + break; + unsigned char *lp = ri.data, *p = lpFirst(lp); int64_t entries = lpGetInteger(p); - /* Check if we can remove the whole node, and still have at - * least maxlen elements. */ - if (s->length - entries >= maxlen) { + /* Check if we can remove the whole node. */ + int remove_node; + streamID master_id = {0}; /* For MINID */ + if (trim_strategy == TRIM_STRATEGY_MAXLEN) { + remove_node = s->length - entries >= maxlen; + } else { + /* Read the master ID from the radix tree key. */ + streamDecodeID(ri.key, &master_id); + + /* Read last ID. */ + streamID last_id; + lpGetEdgeStreamID(lp, 0, &master_id, &last_id); + + /* We can remove the entire node id its last ID < 'id' */ + remove_node = streamCompareID(&last_id, id) < 0; + } + + if (remove_node) { lpFree(lp); raxRemove(s->rax,ri.key,ri.key_len,NULL); raxSeek(&ri,">=",ri.key,ri.key_len); @@ -566,19 +721,15 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { * stop here. */ if (approx) break; - /* Otherwise, we have to mark single entries inside the listpack - * as deleted. We start by updating the entries/deleted counters. */ - int64_t to_delete = s->length - maxlen; - serverAssert(to_delete < entries); - lp = lpReplaceInteger(lp,&p,entries-to_delete); - p = lpNext(lp,p); /* Seek deleted field. */ - int64_t marked_deleted = lpGetInteger(p); - lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete); - p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */ + /* Now we have to trim entries from within 'lp' */ + int64_t deleted_from_lp = 0; + + p = lpNext(lp, p); /* Skip deleted field. */ + p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */ /* Skip all the master fields. */ int64_t master_fields_count = lpGetInteger(p); - p = lpNext(lp,p); /* Seek the first field. */ + p = lpNext(lp,p); /* Skip the first field. */ for (int64_t j = 0; j < master_fields_count; j++) p = lpNext(lp,p); /* Skip all master fields. */ p = lpNext(lp,p); /* Skip the zero master entry terminator. */ @@ -586,37 +737,72 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { /* 'p' is now pointing to the first entry inside the listpack. * We have to run entry after entry, marking entries as deleted * if they are already not deleted. */ - while(p) { + while (p) { + /* We keep a copy of p (which point to flags part) in order to + * update it after (and if) we actually remove the entry */ + unsigned char *pcopy = p; + int flags = lpGetInteger(p); + p = lpNext(lp, p); /* Skip flags. */ int to_skip; - /* Mark the entry as deleted. */ - if (!(flags & STREAM_ITEM_FLAG_DELETED)) { - flags |= STREAM_ITEM_FLAG_DELETED; - lp = lpReplaceInteger(lp,&p,flags); - deleted++; - s->length--; - if (s->length <= maxlen) break; /* Enough entries deleted. */ + int ms_delta = lpGetInteger(p); + p = lpNext(lp, p); /* Skip ID ms delta */ + int seq_delta = lpGetInteger(p); + p = lpNext(lp, p); /* Skip ID seq delta */ + + streamID currid = {0}; /* For MINID */ + if (trim_strategy == TRIM_STRATEGY_MINID) { + currid.ms = master_id.ms + ms_delta; + currid.seq = master_id.seq + seq_delta; } - p = lpNext(lp,p); /* Skip ID ms delta. */ - p = lpNext(lp,p); /* Skip ID seq delta. */ - p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */ + int stop; + if (trim_strategy == TRIM_STRATEGY_MAXLEN) { + stop = s->length <= maxlen; + } else { + /* Following IDs will definitely be greater because the rax + * tree is sorted, no point of continuing. */ + stop = streamCompareID(&currid, id) >= 0; + } + if (stop) + break; + if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) { to_skip = master_fields_count; } else { - to_skip = lpGetInteger(p); - to_skip = 1+(to_skip*2); + to_skip = lpGetInteger(p); /* Get num-fields. */ + p = lpNext(lp,p); /* Skip num-fields. */ + to_skip *= 2; /* Fields and values. */ } while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */ p = lpNext(lp,p); /* Skip the final lp-count field. */ + + /* Mark the entry as deleted. */ + if (!(flags & STREAM_ITEM_FLAG_DELETED)) { + intptr_t delta = p - lp; + flags |= STREAM_ITEM_FLAG_DELETED; + lp = lpReplaceInteger(lp, &pcopy, flags); + deleted_from_lp++; + s->length--; + p = lp + delta; + } } + deleted += deleted_from_lp; + + /* Now we the entries/deleted counters. */ + p = lpFirst(lp); + lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp); + p = lpNext(lp,p); /* Skip deleted field. */ + int64_t marked_deleted = lpGetInteger(p); + lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp); + p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */ /* Here we should perform garbage collection in case at this point * there are too many entries deleted inside the listpack. */ - entries -= to_delete; - marked_deleted += to_delete; + entries -= deleted_from_lp; + marked_deleted += deleted_from_lp; if (entries + marked_deleted > 10 && marked_deleted > entries/2) { /* TODO: perform a garbage collection. */ } @@ -632,6 +818,142 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) { return deleted; } +/* Parse the arguements of XADD/XTRIM. + * + * See streamAddTrimArgs for more details about the arguments handled. + * + * This function returns the position of the ID argument (relevant only to XADD). + * On error -1 is returned and a reply is sent. */ +static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) { + /* Initialize arguments to defaults */ + memset(args, 0, sizeof(*args)); + + /* Parse options. */ + int i = 2; /* This is the first argument position where we could + find an option, or the ID. */ + int limit_given = 0; + for (; i < c->argc; i++) { + int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ + char *opt = c->argv[i]->ptr; + if (xadd && opt[0] == '*' && opt[1] == '\0') { + /* This is just a fast path for the common case of auto-ID + * creation. */ + break; + } else if (!strcasecmp(opt,"maxlen") && moreargs) { + if (args->trim_strategy != TRIM_STRATEGY_NONE) { + addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible"); + return -1; + } + args->approx_trim = 0; + char *next = c->argv[i+1]->ptr; + /* Check for the form MAXLEN ~ . */ + if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { + args->approx_trim = 1; + i++; + } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { + i++; + } + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL) + != C_OK) return -1; + + if (args->maxlen < 0) { + addReplyError(c,"The MAXLEN argument must be >= 0."); + return -1; + } + i++; + args->trim_strategy = TRIM_STRATEGY_MAXLEN; + args->trim_strategy_arg_idx = i; + } else if (!strcasecmp(opt,"minid") && moreargs) { + if (args->trim_strategy != TRIM_STRATEGY_NONE) { + addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible"); + return -1; + } + args->approx_trim = 0; + char *next = c->argv[i+1]->ptr; + /* Check for the form MINID ~ |. */ + if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { + args->approx_trim = 1; + i++; + } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { + i++; + } + + if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0) != C_OK) + return -1; + + i++; + args->trim_strategy = TRIM_STRATEGY_MINID; + args->trim_strategy_arg_idx = i; + } else if (!strcasecmp(opt,"limit") && moreargs) { + /* Note about LIMIT: If it was not provided by the caller we set + * it to 100*server.stream_node_max_entries, and that's to prevent the + * trimming from taking too long, on the expense of not deleting entries + * that should be trimmed. + * If user wanted exact trimming (i.e. no '~') we never limit the number + * of trimmed entries */ + if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL) != C_OK) + return -1; + + if (args->limit < 0) { + addReplyError(c,"The LIMIT argument must be >= 0."); + return -1; + } + limit_given = 1; + i++; + } else if (xadd && !strcasecmp(opt,"nomkstream")) { + args->no_mkstream = 1; + } else if (xadd) { + /* If we are here is a syntax error or a valid ID. */ + if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0) != C_OK) + return -1; + args->id_given = 1; + break; + } else { + addReplyErrorObject(c,shared.syntaxerr); + return -1; + } + } + + if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE) { + addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy"); + return -1; + } + + if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE) { + addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy"); + return -1; + } + + if (c == server.master || c->id == CLIENT_ID_AOF) { + /* If command cam from master or from AOF we must not enforce maxnodes + * (The maxlen/minid argument was re-written to make sure there's no + * inconsistency). */ + args->limit = 0; + } else { + /* We need to set the limit (only if we got '~') */ + if (limit_given) { + if (!args->approx_trim) { + /* LIMIT was provided without ~ */ + addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option"); + return -1; + } + } else { + /* User didn't provide LIMIT, we must set it. */ + + if (args->approx_trim) { + /* In order to prevent from trimming to do too much work and cause + * latency spikes we limit the amount of work it can do */ + args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */ + } else { + /* No LIMIT for exact trimming */ + args->limit = 0; + } + } + } + + return i; +} + /* Initialize the stream iterator, so that we can call iterating functions * to get the next items. This requires a corresponding streamIteratorStop() * at the end. The 'rev' parameter controls the direction. If it's zero the @@ -1375,68 +1697,36 @@ int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, return C_OK; } -/* We propagate MAXLEN ~ as MAXLEN = - * otherwise trimming is no longer determinsitic on replicas / AOF. */ -void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) { - robj *maxlen_obj = createStringObjectFromLongLong(s->length); +void streamRewriteApproxSpecifier(client *c, int idx) { robj *equal_obj = createStringObject("=",1); - - rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj); - rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj); - + rewriteClientCommandArgument(c,idx,equal_obj); decrRefCount(equal_obj); - decrRefCount(maxlen_obj); } -/* XADD key [MAXLEN [~|=] ] [NOMKSTREAM] [field value] [field value] ... */ -void xaddCommand(client *c) { - streamID id; - int id_given = 0; /* Was an ID different than "*" specified? */ - long long maxlen = -1; /* If left to -1 no trimming is performed. */ - int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so - the maximum length is not applied verbatim. */ - int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */ - int no_mkstream = 0; /* if set to 1 do not create new stream */ - - /* Parse options. */ - int i = 2; /* This is the first argument position where we could - find an option, or the ID. */ - for (; i < c->argc; i++) { - int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ - char *opt = c->argv[i]->ptr; - if (opt[0] == '*' && opt[1] == '\0') { - /* This is just a fast path for the common case of auto-ID - * creation. */ - break; - } else if (!strcasecmp(opt,"maxlen") && moreargs) { - approx_maxlen = 0; - char *next = c->argv[i+1]->ptr; - /* Check for the form MAXLEN ~ . */ - if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { - approx_maxlen = 1; - i++; - } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { - i++; - } - if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) - != C_OK) return; - - if (maxlen < 0) { - addReplyError(c,"The MAXLEN argument must be >= 0."); - return; - } - i++; - maxlen_arg_idx = i; - } else if (!strcasecmp(opt,"nomkstream")) { - no_mkstream = 1; - } else { - /* If we are here is a syntax error or a valid ID. */ - if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return; - id_given = 1; - break; - } +/* We propagate MAXLEN/MINID ~ as MAXLEN/MINID = + * otherwise trimming is no longer deterministic on replicas / AOF. */ +void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) { + robj *arg; + if (trim_strategy == TRIM_STRATEGY_MAXLEN) { + arg = createStringObjectFromLongLong(s->length); + } else { + streamID first_id; + streamGetEdgeID(s, 1, &first_id); + arg = createObjectFromStreamID(&first_id); } - int field_pos = i+1; + + rewriteClientCommandArgument(c,idx,arg); + decrRefCount(arg); +} + +/* XADD key [(MAXLEN [~|=] | MINID [~|=] ) [LIMIT ]] [NOMKSTREAM] [field value] [field value] ... */ +void xaddCommand(client *c) { + /* Parse options. */ + streamAddTrimArgs parsed_args; + int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1); + if (idpos < 0) + return; /* streamParseAddOrTrimArgsOrReply already replied. */ + int field_pos = idpos+1; /* The ID is always one argument before the first field */ /* Check arity. */ if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) { @@ -1447,7 +1737,9 @@ void xaddCommand(client *c) { /* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating * a new stream and have streamAppendItem fail, leaving an empty key in the * database. */ - if (id_given && id.ms == 0 && id.seq == 0) { + if (parsed_args.id_given && + parsed_args.id.ms == 0 && parsed_args.id.seq == 0) + { addReplyError(c,"The ID specified in XADD must be greater than 0-0"); return; } @@ -1455,7 +1747,7 @@ void xaddCommand(client *c) { /* Lookup the stream at key. */ robj *o; stream *s; - if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],no_mkstream)) == NULL) return; + if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return; s = o->ptr; /* Return ASAP if the stream has reached the last possible ID */ @@ -1466,8 +1758,9 @@ void xaddCommand(client *c) { } /* Append using the low level function and return the ID. */ + streamID id; if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2, - &id, id_given ? &id : NULL) + &id, parsed_args.id_given ? &parsed_args.id : NULL) == C_ERR) { addReplyError(c,"The ID specified in XADD is equal or smaller than the " @@ -1480,18 +1773,26 @@ void xaddCommand(client *c) { notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id); server.dirty++; - if (maxlen >= 0) { - /* Notify xtrim event if needed. */ - if (streamTrimByLength(s,maxlen,approx_maxlen)) { + /* Trim if needed. */ + if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) { + if (streamTrim(s, &parsed_args)) { notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); } - if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx); + if (parsed_args.approx_trim) { + /* In case our trimming was limited (by LIMIT or by ~) we must + * re-write the relevant trim argument to make sure there will be + * no inconsistencies in AOF loading or in the replica. + * It's enough to check only args->approx because there is no + * way LIMIT is given without the ~ option. */ + streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); + streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); + } } /* Let's rewrite the ID argument with the one actually generated for * AOF/replication propagation. */ robj *idarg = createObjectFromStreamID(&id); - rewriteClientCommandArgument(c,i,idarg); + rewriteClientCommandArgument(c,idpos,idarg); decrRefCount(idarg); /* We need to signal to blocked clients that there is new data on this @@ -2871,14 +3172,25 @@ cleanup: * * List of options: * + * Trim strategies: + * * MAXLEN [~|=] -- Trim so that the stream will be capped at * the specified length. Use ~ before the * count in order to demand approximated trimming * (like XADD MAXLEN option). + * MINID [~|=] -- Trim so that the stream will not contain entries + * with IDs smaller than 'id'. Use ~ before the + * count in order to demand approximated trimming + * (like XADD MINID option). + * + * Other options: + * + * LIMIT -- The maximum number of entries to trim. + * 0 means unlimited. Unless specified, it is set + * to a default of 100*server.stream_node_max_entries, + * and that's in order to keep the trimming time sane. + * Has meaning only if `~` was provided. */ - -#define TRIM_STRATEGY_NONE 0 -#define TRIM_STRATEGY_MAXLEN 1 void xtrimCommand(client *c) { robj *o; @@ -2889,58 +3201,27 @@ void xtrimCommand(client *c) { stream *s = o->ptr; /* Argument parsing. */ - int trim_strategy = TRIM_STRATEGY_NONE; - long long maxlen = -1; /* If left to -1 no trimming is performed. */ - int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so - the maxium length is not applied verbatim. */ - int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */ - - /* Parse options. */ - int i = 2; /* Start of options. */ - for (; i < c->argc; i++) { - int moreargs = (c->argc-1) - i; /* Number of additional arguments. */ - char *opt = c->argv[i]->ptr; - if (!strcasecmp(opt,"maxlen") && moreargs) { - approx_maxlen = 0; - trim_strategy = TRIM_STRATEGY_MAXLEN; - char *next = c->argv[i+1]->ptr; - /* Check for the form MAXLEN ~ . */ - if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') { - approx_maxlen = 1; - i++; - } else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') { - i++; - } - if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL) - != C_OK) return; - - if (maxlen < 0) { - addReplyError(c,"The MAXLEN argument must be >= 0."); - return; - } - i++; - maxlen_arg_idx = i; - } else { - addReplyErrorObject(c,shared.syntaxerr); - return; - } - } + streamAddTrimArgs parsed_args; + if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1) < 0) + return; /* streamParseAddOrTrimArgsOrReply already replied. */ /* Perform the trimming. */ - int64_t deleted = 0; - if (trim_strategy == TRIM_STRATEGY_MAXLEN) { - deleted = streamTrimByLength(s,maxlen,approx_maxlen); - } else { - addReplyError(c,"XTRIM called without an option to trim the stream"); - return; - } - - /* Propagate the write if needed. */ + int64_t deleted = streamTrim(s, &parsed_args); if (deleted) { - signalModifiedKey(c,c->db,c->argv[1]); notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id); + if (parsed_args.approx_trim) { + /* In case our trimming was limited (by LIMIT or by ~) we must + * re-write the relevant trim argument to make sure there will be + * no inconsistencies in AOF loading or in the replica. + * It's enough to check only args->approx because there is no + * way LIMIT is given without the ~ option. */ + streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1); + streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx); + } + + /* Propagate the write. */ + signalModifiedKey(c, c->db,c->argv[1]); server.dirty += deleted; - if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx); } addReplyLongLong(c,deleted); } diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 63cc697c2..a89a65299 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -94,6 +94,7 @@ start_server { r XADD mystream MAXLEN 5 * yitem $j } } + assert {[r xlen mystream] == 5} set res [r xrange mystream - +] set expected 995 foreach r $res { @@ -138,6 +139,39 @@ start_server { assert_equal [lindex $items 1 1] {item 2 value b} } + test {XADD with MINID option} { + r DEL mystream + for {set j 1} {$j < 1001} {incr j} { + set minid 1000 + if {$j >= 5} { + set minid [expr {$j-5}] + } + if {rand() < 0.9} { + r XADD mystream MINID $minid $j xitem $j + } else { + r XADD mystream MINID $minid $j yitem $j + } + } + assert {[r xlen mystream] == 6} + set res [r xrange mystream - +] + set expected 995 + foreach r $res { + assert {[lindex $r 1 1] == $expected} + incr expected + } + } + + test {XTRIM with MINID option} { + r DEL mystream + r XADD mystream 1-0 f v + r XADD mystream 2-0 f v + r XADD mystream 3-0 f v + r XADD mystream 4-0 f v + r XADD mystream 5-0 f v + r XTRIM mystream MINID = 3-0 + assert_equal [r XRANGE mystream - +] {{3-0 {f v}} {4-0 {f v}} {5-0 {f v}}} + } + test {XADD mass insertion and XLEN} { r DEL mystream r multi @@ -448,7 +482,49 @@ start_server { assert {[r XLEN mystream] == 400} } - + test {XADD with LIMIT consecutive calls} { + r del mystream + r config set stream-node-max-entries 10 + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v + assert {[r xlen mystream] == 71} + r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v + assert {[r xlen mystream] == 62} + r config set stream-node-max-entries 100 + } + + test {XTRIM with ~ is limited} { + r del mystream + r config set stream-node-max-entries 1 + for {set j 0} {$j < 102} {incr j} { + r XADD mystream * xitem v + } + r XTRIM mystream MAXLEN ~ 1 + assert {[r xlen mystream] == 2} + r config set stream-node-max-entries 100 + } + + test {XTRIM without ~ is not limited} { + r del mystream + r config set stream-node-max-entries 1 + for {set j 0} {$j < 102} {incr j} { + r XADD mystream * xitem v + } + r XTRIM mystream MAXLEN 1 + assert {[r xlen mystream] == 1} + r config set stream-node-max-entries 100 + } + + test {XTRIM without ~ and with LIMIT} { + r del mystream + r config set stream-node-max-entries 1 + for {set j 0} {$j < 102} {incr j} { + r XADD mystream * xitem v + } + assert_error ERR* {r XTRIM mystream MAXLEN 1 LIMIT 30} + } } start_server {tags {"stream"} overrides {appendonly yes}} { @@ -466,6 +542,22 @@ start_server {tags {"stream"} overrides {appendonly yes}} { } } +start_server {tags {"stream"} overrides {appendonly yes}} { + test {XADD with MINID > lastid can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + set id [expr {$j+1}] + r XADD mystream $id xitem v + } + r XADD mystream MINID 1 * xitem v + incr j + assert {[r xlen mystream] == $j} + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + start_server {tags {"stream"} overrides {appendonly yes}} { test {XADD with ~ MAXLEN can propagate correctly} { for {set j 0} {$j < 100} {incr j} { @@ -482,6 +574,52 @@ start_server {tags {"stream"} overrides {appendonly yes}} { } } +start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} { + test {XADD with ~ MAXLEN and LIMIT can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + r XADD mystream * xitem v + } + r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v + assert {[r xlen mystream] == 71} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + assert {[r xlen mystream] == 72} + } +} + +start_server {tags {"stream"} overrides {appendonly yes}} { + test {XADD with ~ MINID can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + set id [expr {$j+1}] + r XADD mystream $id xitem v + } + r XADD mystream MINID ~ $j * xitem v + incr j + assert {[r xlen mystream] == $j} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + incr j + assert {[r xlen mystream] == $j} + } +} + +start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} { + test {XADD with ~ MINID and LIMIT can propagate correctly} { + for {set j 0} {$j < 100} {incr j} { + set id [expr {$j+1}] + r XADD mystream $id xitem v + } + r XADD mystream MINID ~ 55 LIMIT 30 * xitem v + assert {[r xlen mystream] == 71} + r config set stream-node-max-entries 1 + r debug loadaof + r XADD mystream * xitem v + assert {[r xlen mystream] == 72} + } +} + start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} { test {XTRIM with ~ MAXLEN can propagate correctly} { for {set j 0} {$j < 100} {incr j} {