XADD and XTRIM, Trim by MINID, and new LIMIT argument (#8169)

This PR adds another trimming strategy to XADD and XTRIM named MINID
(complements the existing MAXLEN).
It also adds a new LIMIT argument that allows incremental trimming by repeated
calls (rather than all at once).

This provides the ability to trim all records older than a certain ID (which makes it
possible for the user to trim by age too).
Example:
XTRIM mystream MINID ~ 1608540753 will trim entries with id < 1608540753,
but might not trim all (because of the ~ modifier)

The purpose is to ease the use of streams. many users use streams as logs and
the common case is wanting a log
of the last X seconds rather than a log that contains maximum X entries (new
MINID vs existing MAXLEN)

The new LIMIT modifier is only supported when the trim strategy uses ~.
i.e. when the user asked for exact trimming, it all happens in one go (no
possibility for incremental trimming).
However, when ~ is provided, we trim full rax nodes, up to the limit number
of records.
The default limit is 100*stream_node_max_entries (used when LIMIT is not
provided).
I.e. this is a behavior change (even if the existing MAXLEN strategy is used).
An explicit limit of 0 means unlimited (but note that it's not the default).

Other changes:

Refactor arg parsing code for XADD and XTRIM to use common code.
This commit is contained in:
guybe7 2021-01-08 18:13:25 +02:00 committed by GitHub
parent 5843a45d01
commit 814aad65f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 571 additions and 152 deletions

View File

@ -46,6 +46,8 @@
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
/* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks.
@ -282,6 +284,65 @@ static inline int64_t lpGetIntegerIfValid(unsigned char *ele, int *valid) {
#define lpGetInteger(ele) lpGetIntegerIfValid(ele, NULL)
/* Get an edge streamID of a given listpack.
* 'master_id' is an input param, used to build the 'edge_id' output param */
int lpGetEdgeStreamID(unsigned char *lp, int first, streamID *master_id, streamID *edge_id)
{
if (lp == NULL)
return 0;
unsigned char *lp_ele;
/* We need to seek either the first or the last entry depending
* on the direction of the iteration. */
if (first) {
/* Get the master fields count. */
lp_ele = lpFirst(lp); /* Seek items count */
lp_ele = lpNext(lp, lp_ele); /* Seek deleted count. */
lp_ele = lpNext(lp, lp_ele); /* Seek num fields. */
int64_t master_fields_count = lpGetInteger(lp_ele);
lp_ele = lpNext(lp, lp_ele); /* Seek first field. */
/* If we are iterating in normal order, skip the master fields
* to seek the first actual entry. */
for (int64_t i = 0; i < master_fields_count; i++)
lp_ele = lpNext(lp, lp_ele);
/* If we are going forward, skip the previous entry's
* lp-count field (or in case of the master entry, the zero
* term field) */
lp_ele = lpNext(lp, lp_ele);
if (lp_ele == NULL)
return 0;
} else {
/* If we are iterating in reverse direction, just seek the
* last part of the last entry in the listpack (that is, the
* fields count). */
lp_ele = lpLast(lp);
/* If we are going backward, read the number of elements this
* entry is composed of, and jump backward N times to seek
* its start. */
int64_t lp_count = lpGetInteger(lp_ele);
if (lp_count == 0) /* We reached the master entry. */
return 0;
while (lp_count--)
lp_ele = lpPrev(lp, lp_ele);
}
lp_ele = lpNext(lp, lp_ele); /* Seek ID (lp_ele currently points to 'flags'). */
/* Get the ID: it is encoded as difference between the master
* ID and this entry ID. */
streamID id = *master_id;
id.ms += lpGetInteger(lp_ele);
lp_ele = lpNext(lp, lp_ele);
id.seq += lpGetInteger(lp_ele);
*edge_id = id;
return 1;
}
/* Debugging function to log the full content of a listpack. Useful
* for development and debugging. */
void streamLogListpackContent(unsigned char *lp) {
@ -325,6 +386,39 @@ int streamCompareID(streamID *a, streamID *b) {
return 0;
}
void streamGetEdgeID(stream *s, int first, streamID *edge_id)
{
raxIterator ri;
raxStart(&ri, s->rax);
int empty;
if (first) {
raxSeek(&ri, "^", NULL, 0);
empty = !raxNext(&ri);
} else {
raxSeek(&ri, "$", NULL, 0);
empty = !raxPrev(&ri);
}
if (empty) {
/* Stream is empty, mark edge ID as lowest/highest possible. */
edge_id->ms = first ? UINT64_MAX : 0;
edge_id->seq = first ? UINT64_MAX : 0;
raxStop(&ri);
return;
}
unsigned char *lp = ri.data;
/* Read the master ID from the radix tree key. */
streamID master_id;
streamDecodeID(ri.key, &master_id);
/* Construct edge ID. */
lpGetEdgeStreamID(lp, first, &master_id, edge_id);
raxStop(&ri);
}
/* Adds a new item into the stream 's' having the specified number of
* field-value pairs as specified in 'numfields' and stored into 'argv'.
* Returns the new entry ID populating the 'added_id' structure.
@ -525,35 +619,96 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
return C_OK;
}
/* Trim the stream 's' to have no more than maxlen elements, and return the
typedef struct {
/* XADD options */
streamID id; /* User-provided ID, for XADD only. */
int id_given; /* Was an ID different than "*" specified? for XADD only. */
int no_mkstream; /* if set to 1 do not create new stream */
/* XADD + XTRIM common options */
int trim_strategy; /* TRIM_STRATEGY_* */
int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
int approx_trim; /* If 1 only delete whole radix tree nodes, so
* the trim argument is not applied verbatim. */
long long limit; /* Maximum amount of entries to trim. If 0, no limitation
* on the amount of trimming work is enforced. */
/* TRIM_STRATEGY_MAXLEN options */
long long maxlen; /* After trimming, leave stream at this length . */
/* TRIM_STRATEGY_MINID options */
streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */
} streamAddTrimArgs;
#define TRIM_STRATEGY_NONE 0
#define TRIM_STRATEGY_MAXLEN 1
#define TRIM_STRATEGY_MINID 2
/* Trim the stream 's' according to args->trim_strategy, and return the
* number of elements removed from the stream. The 'approx' option, if non-zero,
* specifies that the trimming must be performed in a approximated way in
* order to maximize performances. This means that the stream may contain
* more elements than 'maxlen', and elements are only removed if we can remove
* entries with IDs < 'id' in case of MINID (or more elements than 'maxlen'
* in case of MAXLEN), and elements are only removed if we can remove
* a *whole* node of the radix tree. The elements are removed from the head
* of the stream (older elements).
*
* The function may return zero if:
*
* 1) The stream is already shorter or equal to the specified max length.
* 2) The 'approx' option is true and the head node had not enough elements
* to be deleted, leaving the stream with a number of elements >= maxlen.
* 1) The minimal entry ID of the stream is already < 'id' (MINID); or
* 2) The stream is already shorter or equal to the specified max length (MAXLEN); or
* 3) The 'approx' option is true and the head node did not have enough elements
* to be deleted.
*
* args->limit is the maximum number of entries to delete. The purpose is to
* prevent this function from taking to long.
* If 'limit' is 0 then we do not limit the number of deleted entries.
* Much like the 'approx', if 'limit' is smaller than the number of entries
* that should be trimmed, there is a chance we will still have entries with
* IDs < 'id' (or number of elements >= maxlen in case of MAXLEN).
*/
int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
if (s->length <= maxlen) return 0;
int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
size_t maxlen = args->maxlen;
streamID *id = &args->minid;
int approx = args->approx_trim;
int64_t limit = args->limit;
int trim_strategy = args->trim_strategy;
if (trim_strategy == TRIM_STRATEGY_NONE)
return 0;
raxIterator ri;
raxStart(&ri,s->rax);
raxSeek(&ri,"^",NULL,0);
int64_t deleted = 0;
while(s->length > maxlen && raxNext(&ri)) {
while (raxNext(&ri)) {
/* Check if we exceeded the amount of work we could do */
if (limit && deleted >= limit)
break;
if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen)
break;
unsigned char *lp = ri.data, *p = lpFirst(lp);
int64_t entries = lpGetInteger(p);
/* Check if we can remove the whole node, and still have at
* least maxlen elements. */
if (s->length - entries >= maxlen) {
/* Check if we can remove the whole node. */
int remove_node;
streamID master_id = {0}; /* For MINID */
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
remove_node = s->length - entries >= maxlen;
} else {
/* Read the master ID from the radix tree key. */
streamDecodeID(ri.key, &master_id);
/* Read last ID. */
streamID last_id;
lpGetEdgeStreamID(lp, 0, &master_id, &last_id);
/* We can remove the entire node id its last ID < 'id' */
remove_node = streamCompareID(&last_id, id) < 0;
}
if (remove_node) {
lpFree(lp);
raxRemove(s->rax,ri.key,ri.key_len,NULL);
raxSeek(&ri,">=",ri.key,ri.key_len);
@ -566,19 +721,15 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
* stop here. */
if (approx) break;
/* Otherwise, we have to mark single entries inside the listpack
* as deleted. We start by updating the entries/deleted counters. */
int64_t to_delete = s->length - maxlen;
serverAssert(to_delete < entries);
lp = lpReplaceInteger(lp,&p,entries-to_delete);
p = lpNext(lp,p); /* Seek deleted field. */
int64_t marked_deleted = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,marked_deleted+to_delete);
p = lpNext(lp,p); /* Seek num-of-fields in the master entry. */
/* Now we have to trim entries from within 'lp' */
int64_t deleted_from_lp = 0;
p = lpNext(lp, p); /* Skip deleted field. */
p = lpNext(lp, p); /* Skip num-of-fields in the master entry. */
/* Skip all the master fields. */
int64_t master_fields_count = lpGetInteger(p);
p = lpNext(lp,p); /* Seek the first field. */
p = lpNext(lp,p); /* Skip the first field. */
for (int64_t j = 0; j < master_fields_count; j++)
p = lpNext(lp,p); /* Skip all master fields. */
p = lpNext(lp,p); /* Skip the zero master entry terminator. */
@ -586,37 +737,72 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
/* 'p' is now pointing to the first entry inside the listpack.
* We have to run entry after entry, marking entries as deleted
* if they are already not deleted. */
while(p) {
while (p) {
/* We keep a copy of p (which point to flags part) in order to
* update it after (and if) we actually remove the entry */
unsigned char *pcopy = p;
int flags = lpGetInteger(p);
p = lpNext(lp, p); /* Skip flags. */
int to_skip;
/* Mark the entry as deleted. */
if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
flags |= STREAM_ITEM_FLAG_DELETED;
lp = lpReplaceInteger(lp,&p,flags);
deleted++;
s->length--;
if (s->length <= maxlen) break; /* Enough entries deleted. */
int ms_delta = lpGetInteger(p);
p = lpNext(lp, p); /* Skip ID ms delta */
int seq_delta = lpGetInteger(p);
p = lpNext(lp, p); /* Skip ID seq delta */
streamID currid = {0}; /* For MINID */
if (trim_strategy == TRIM_STRATEGY_MINID) {
currid.ms = master_id.ms + ms_delta;
currid.seq = master_id.seq + seq_delta;
}
p = lpNext(lp,p); /* Skip ID ms delta. */
p = lpNext(lp,p); /* Skip ID seq delta. */
p = lpNext(lp,p); /* Seek num-fields or values (if compressed). */
int stop;
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
stop = s->length <= maxlen;
} else {
/* Following IDs will definitely be greater because the rax
* tree is sorted, no point of continuing. */
stop = streamCompareID(&currid, id) >= 0;
}
if (stop)
break;
if (flags & STREAM_ITEM_FLAG_SAMEFIELDS) {
to_skip = master_fields_count;
} else {
to_skip = lpGetInteger(p);
to_skip = 1+(to_skip*2);
to_skip = lpGetInteger(p); /* Get num-fields. */
p = lpNext(lp,p); /* Skip num-fields. */
to_skip *= 2; /* Fields and values. */
}
while(to_skip--) p = lpNext(lp,p); /* Skip the whole entry. */
p = lpNext(lp,p); /* Skip the final lp-count field. */
/* Mark the entry as deleted. */
if (!(flags & STREAM_ITEM_FLAG_DELETED)) {
intptr_t delta = p - lp;
flags |= STREAM_ITEM_FLAG_DELETED;
lp = lpReplaceInteger(lp, &pcopy, flags);
deleted_from_lp++;
s->length--;
p = lp + delta;
}
}
deleted += deleted_from_lp;
/* Now we the entries/deleted counters. */
p = lpFirst(lp);
lp = lpReplaceInteger(lp,&p,entries-deleted_from_lp);
p = lpNext(lp,p); /* Skip deleted field. */
int64_t marked_deleted = lpGetInteger(p);
lp = lpReplaceInteger(lp,&p,marked_deleted+deleted_from_lp);
p = lpNext(lp,p); /* Skip num-of-fields in the master entry. */
/* Here we should perform garbage collection in case at this point
* there are too many entries deleted inside the listpack. */
entries -= to_delete;
marked_deleted += to_delete;
entries -= deleted_from_lp;
marked_deleted += deleted_from_lp;
if (entries + marked_deleted > 10 && marked_deleted > entries/2) {
/* TODO: perform a garbage collection. */
}
@ -632,6 +818,142 @@ int64_t streamTrimByLength(stream *s, size_t maxlen, int approx) {
return deleted;
}
/* Parse the arguements of XADD/XTRIM.
*
* See streamAddTrimArgs for more details about the arguments handled.
*
* This function returns the position of the ID argument (relevant only to XADD).
* On error -1 is returned and a reply is sent. */
static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, int xadd) {
/* Initialize arguments to defaults */
memset(args, 0, sizeof(*args));
/* Parse options. */
int i = 2; /* This is the first argument position where we could
find an option, or the ID. */
int limit_given = 0;
for (; i < c->argc; i++) {
int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
char *opt = c->argv[i]->ptr;
if (xadd && opt[0] == '*' && opt[1] == '\0') {
/* This is just a fast path for the common case of auto-ID
* creation. */
break;
} else if (!strcasecmp(opt,"maxlen") && moreargs) {
if (args->trim_strategy != TRIM_STRATEGY_NONE) {
addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
return -1;
}
args->approx_trim = 0;
char *next = c->argv[i+1]->ptr;
/* Check for the form MAXLEN ~ <count>. */
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
args->approx_trim = 1;
i++;
} else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->maxlen,NULL)
!= C_OK) return -1;
if (args->maxlen < 0) {
addReplyError(c,"The MAXLEN argument must be >= 0.");
return -1;
}
i++;
args->trim_strategy = TRIM_STRATEGY_MAXLEN;
args->trim_strategy_arg_idx = i;
} else if (!strcasecmp(opt,"minid") && moreargs) {
if (args->trim_strategy != TRIM_STRATEGY_NONE) {
addReplyError(c,"syntax error, MAXLEN and MINID options at the same time are not compatible");
return -1;
}
args->approx_trim = 0;
char *next = c->argv[i+1]->ptr;
/* Check for the form MINID ~ <id>|<age>. */
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
args->approx_trim = 1;
i++;
} else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
i++;
}
if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0) != C_OK)
return -1;
i++;
args->trim_strategy = TRIM_STRATEGY_MINID;
args->trim_strategy_arg_idx = i;
} else if (!strcasecmp(opt,"limit") && moreargs) {
/* Note about LIMIT: If it was not provided by the caller we set
* it to 100*server.stream_node_max_entries, and that's to prevent the
* trimming from taking too long, on the expense of not deleting entries
* that should be trimmed.
* If user wanted exact trimming (i.e. no '~') we never limit the number
* of trimmed entries */
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&args->limit,NULL) != C_OK)
return -1;
if (args->limit < 0) {
addReplyError(c,"The LIMIT argument must be >= 0.");
return -1;
}
limit_given = 1;
i++;
} else if (xadd && !strcasecmp(opt,"nomkstream")) {
args->no_mkstream = 1;
} else if (xadd) {
/* If we are here is a syntax error or a valid ID. */
if (streamParseStrictIDOrReply(c,c->argv[i],&args->id,0) != C_OK)
return -1;
args->id_given = 1;
break;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return -1;
}
}
if (args->limit && args->trim_strategy == TRIM_STRATEGY_NONE) {
addReplyError(c,"syntax error, LIMIT cannot be used without specifying a trimming strategy");
return -1;
}
if (!xadd && args->trim_strategy == TRIM_STRATEGY_NONE) {
addReplyError(c,"syntax error, XTRIM must be called with a trimming strategy");
return -1;
}
if (c == server.master || c->id == CLIENT_ID_AOF) {
/* If command cam from master or from AOF we must not enforce maxnodes
* (The maxlen/minid argument was re-written to make sure there's no
* inconsistency). */
args->limit = 0;
} else {
/* We need to set the limit (only if we got '~') */
if (limit_given) {
if (!args->approx_trim) {
/* LIMIT was provided without ~ */
addReplyError(c,"syntax error, LIMIT cannot be used without the special ~ option");
return -1;
}
} else {
/* User didn't provide LIMIT, we must set it. */
if (args->approx_trim) {
/* In order to prevent from trimming to do too much work and cause
* latency spikes we limit the amount of work it can do */
args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */
} else {
/* No LIMIT for exact trimming */
args->limit = 0;
}
}
}
return i;
}
/* Initialize the stream iterator, so that we can call iterating functions
* to get the next items. This requires a corresponding streamIteratorStop()
* at the end. The 'rev' parameter controls the direction. If it's zero the
@ -1375,68 +1697,36 @@ int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude,
return C_OK;
}
/* We propagate MAXLEN ~ <count> as MAXLEN = <resulting-len-of-stream>
* otherwise trimming is no longer determinsitic on replicas / AOF. */
void streamRewriteApproxMaxlen(client *c, stream *s, int maxlen_arg_idx) {
robj *maxlen_obj = createStringObjectFromLongLong(s->length);
void streamRewriteApproxSpecifier(client *c, int idx) {
robj *equal_obj = createStringObject("=",1);
rewriteClientCommandArgument(c,maxlen_arg_idx,maxlen_obj);
rewriteClientCommandArgument(c,maxlen_arg_idx-1,equal_obj);
rewriteClientCommandArgument(c,idx,equal_obj);
decrRefCount(equal_obj);
decrRefCount(maxlen_obj);
}
/* XADD key [MAXLEN [~|=] <count>] [NOMKSTREAM] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
streamID id;
int id_given = 0; /* Was an ID different than "*" specified? */
long long maxlen = -1; /* If left to -1 no trimming is performed. */
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
the maximum length is not applied verbatim. */
int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
int no_mkstream = 0; /* if set to 1 do not create new stream */
/* Parse options. */
int i = 2; /* This is the first argument position where we could
find an option, or the ID. */
for (; i < c->argc; i++) {
int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
char *opt = c->argv[i]->ptr;
if (opt[0] == '*' && opt[1] == '\0') {
/* This is just a fast path for the common case of auto-ID
* creation. */
break;
} else if (!strcasecmp(opt,"maxlen") && moreargs) {
approx_maxlen = 0;
char *next = c->argv[i+1]->ptr;
/* Check for the form MAXLEN ~ <count>. */
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
} else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
if (maxlen < 0) {
addReplyError(c,"The MAXLEN argument must be >= 0.");
return;
}
i++;
maxlen_arg_idx = i;
} else if (!strcasecmp(opt,"nomkstream")) {
no_mkstream = 1;
} else {
/* If we are here is a syntax error or a valid ID. */
if (streamParseStrictIDOrReply(c,c->argv[i],&id,0) != C_OK) return;
id_given = 1;
break;
}
/* We propagate MAXLEN/MINID ~ <count> as MAXLEN/MINID = <resulting-len-of-stream>
* otherwise trimming is no longer deterministic on replicas / AOF. */
void streamRewriteTrimArgument(client *c, stream *s, int trim_strategy, int idx) {
robj *arg;
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
arg = createStringObjectFromLongLong(s->length);
} else {
streamID first_id;
streamGetEdgeID(s, 1, &first_id);
arg = createObjectFromStreamID(&first_id);
}
int field_pos = i+1;
rewriteClientCommandArgument(c,idx,arg);
decrRefCount(arg);
}
/* XADD key [(MAXLEN [~|=] <count> | MINID [~|=] <id>) [LIMIT <entries>]] [NOMKSTREAM] <ID or *> [field value] [field value] ... */
void xaddCommand(client *c) {
/* Parse options. */
streamAddTrimArgs parsed_args;
int idpos = streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1);
if (idpos < 0)
return; /* streamParseAddOrTrimArgsOrReply already replied. */
int field_pos = idpos+1; /* The ID is always one argument before the first field */
/* Check arity. */
if ((c->argc - field_pos) < 2 || ((c->argc-field_pos) % 2) == 1) {
@ -1447,7 +1737,9 @@ void xaddCommand(client *c) {
/* Return ASAP if minimal ID (0-0) was given so we avoid possibly creating
* a new stream and have streamAppendItem fail, leaving an empty key in the
* database. */
if (id_given && id.ms == 0 && id.seq == 0) {
if (parsed_args.id_given &&
parsed_args.id.ms == 0 && parsed_args.id.seq == 0)
{
addReplyError(c,"The ID specified in XADD must be greater than 0-0");
return;
}
@ -1455,7 +1747,7 @@ void xaddCommand(client *c) {
/* Lookup the stream at key. */
robj *o;
stream *s;
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],no_mkstream)) == NULL) return;
if ((o = streamTypeLookupWriteOrCreate(c,c->argv[1],parsed_args.no_mkstream)) == NULL) return;
s = o->ptr;
/* Return ASAP if the stream has reached the last possible ID */
@ -1466,8 +1758,9 @@ void xaddCommand(client *c) {
}
/* Append using the low level function and return the ID. */
streamID id;
if (streamAppendItem(s,c->argv+field_pos,(c->argc-field_pos)/2,
&id, id_given ? &id : NULL)
&id, parsed_args.id_given ? &parsed_args.id : NULL)
== C_ERR)
{
addReplyError(c,"The ID specified in XADD is equal or smaller than the "
@ -1480,18 +1773,26 @@ void xaddCommand(client *c) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xadd",c->argv[1],c->db->id);
server.dirty++;
if (maxlen >= 0) {
/* Notify xtrim event if needed. */
if (streamTrimByLength(s,maxlen,approx_maxlen)) {
/* Trim if needed. */
if (parsed_args.trim_strategy != TRIM_STRATEGY_NONE) {
if (streamTrim(s, &parsed_args)) {
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
}
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
if (parsed_args.approx_trim) {
/* In case our trimming was limited (by LIMIT or by ~) we must
* re-write the relevant trim argument to make sure there will be
* no inconsistencies in AOF loading or in the replica.
* It's enough to check only args->approx because there is no
* way LIMIT is given without the ~ option. */
streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
}
}
/* Let's rewrite the ID argument with the one actually generated for
* AOF/replication propagation. */
robj *idarg = createObjectFromStreamID(&id);
rewriteClientCommandArgument(c,i,idarg);
rewriteClientCommandArgument(c,idpos,idarg);
decrRefCount(idarg);
/* We need to signal to blocked clients that there is new data on this
@ -2871,14 +3172,25 @@ cleanup:
*
* List of options:
*
* Trim strategies:
*
* MAXLEN [~|=] <count> -- Trim so that the stream will be capped at
* the specified length. Use ~ before the
* count in order to demand approximated trimming
* (like XADD MAXLEN option).
* MINID [~|=] <id> -- Trim so that the stream will not contain entries
* with IDs smaller than 'id'. Use ~ before the
* count in order to demand approximated trimming
* (like XADD MINID option).
*
* Other options:
*
* LIMIT <entries> -- The maximum number of entries to trim.
* 0 means unlimited. Unless specified, it is set
* to a default of 100*server.stream_node_max_entries,
* and that's in order to keep the trimming time sane.
* Has meaning only if `~` was provided.
*/
#define TRIM_STRATEGY_NONE 0
#define TRIM_STRATEGY_MAXLEN 1
void xtrimCommand(client *c) {
robj *o;
@ -2889,58 +3201,27 @@ void xtrimCommand(client *c) {
stream *s = o->ptr;
/* Argument parsing. */
int trim_strategy = TRIM_STRATEGY_NONE;
long long maxlen = -1; /* If left to -1 no trimming is performed. */
int approx_maxlen = 0; /* If 1 only delete whole radix tree nodes, so
the maxium length is not applied verbatim. */
int maxlen_arg_idx = 0; /* Index of the count in MAXLEN, for rewriting. */
/* Parse options. */
int i = 2; /* Start of options. */
for (; i < c->argc; i++) {
int moreargs = (c->argc-1) - i; /* Number of additional arguments. */
char *opt = c->argv[i]->ptr;
if (!strcasecmp(opt,"maxlen") && moreargs) {
approx_maxlen = 0;
trim_strategy = TRIM_STRATEGY_MAXLEN;
char *next = c->argv[i+1]->ptr;
/* Check for the form MAXLEN ~ <count>. */
if (moreargs >= 2 && next[0] == '~' && next[1] == '\0') {
approx_maxlen = 1;
i++;
} else if (moreargs >= 2 && next[0] == '=' && next[1] == '\0') {
i++;
}
if (getLongLongFromObjectOrReply(c,c->argv[i+1],&maxlen,NULL)
!= C_OK) return;
if (maxlen < 0) {
addReplyError(c,"The MAXLEN argument must be >= 0.");
return;
}
i++;
maxlen_arg_idx = i;
} else {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
}
streamAddTrimArgs parsed_args;
if (streamParseAddOrTrimArgsOrReply(c, &parsed_args, 1) < 0)
return; /* streamParseAddOrTrimArgsOrReply already replied. */
/* Perform the trimming. */
int64_t deleted = 0;
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
deleted = streamTrimByLength(s,maxlen,approx_maxlen);
} else {
addReplyError(c,"XTRIM called without an option to trim the stream");
return;
}
/* Propagate the write if needed. */
int64_t deleted = streamTrim(s, &parsed_args);
if (deleted) {
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_STREAM,"xtrim",c->argv[1],c->db->id);
if (parsed_args.approx_trim) {
/* In case our trimming was limited (by LIMIT or by ~) we must
* re-write the relevant trim argument to make sure there will be
* no inconsistencies in AOF loading or in the replica.
* It's enough to check only args->approx because there is no
* way LIMIT is given without the ~ option. */
streamRewriteApproxSpecifier(c,parsed_args.trim_strategy_arg_idx-1);
streamRewriteTrimArgument(c,s,parsed_args.trim_strategy,parsed_args.trim_strategy_arg_idx);
}
/* Propagate the write. */
signalModifiedKey(c, c->db,c->argv[1]);
server.dirty += deleted;
if (approx_maxlen) streamRewriteApproxMaxlen(c,s,maxlen_arg_idx);
}
addReplyLongLong(c,deleted);
}

View File

@ -94,6 +94,7 @@ start_server {
r XADD mystream MAXLEN 5 * yitem $j
}
}
assert {[r xlen mystream] == 5}
set res [r xrange mystream - +]
set expected 995
foreach r $res {
@ -138,6 +139,39 @@ start_server {
assert_equal [lindex $items 1 1] {item 2 value b}
}
test {XADD with MINID option} {
r DEL mystream
for {set j 1} {$j < 1001} {incr j} {
set minid 1000
if {$j >= 5} {
set minid [expr {$j-5}]
}
if {rand() < 0.9} {
r XADD mystream MINID $minid $j xitem $j
} else {
r XADD mystream MINID $minid $j yitem $j
}
}
assert {[r xlen mystream] == 6}
set res [r xrange mystream - +]
set expected 995
foreach r $res {
assert {[lindex $r 1 1] == $expected}
incr expected
}
}
test {XTRIM with MINID option} {
r DEL mystream
r XADD mystream 1-0 f v
r XADD mystream 2-0 f v
r XADD mystream 3-0 f v
r XADD mystream 4-0 f v
r XADD mystream 5-0 f v
r XTRIM mystream MINID = 3-0
assert_equal [r XRANGE mystream - +] {{3-0 {f v}} {4-0 {f v}} {5-0 {f v}}}
}
test {XADD mass insertion and XLEN} {
r DEL mystream
r multi
@ -448,7 +482,49 @@ start_server {
assert {[r XLEN mystream] == 400}
}
test {XADD with LIMIT consecutive calls} {
r del mystream
r config set stream-node-max-entries 10
for {set j 0} {$j < 100} {incr j} {
r XADD mystream * xitem v
}
r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v
assert {[r xlen mystream] == 71}
r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v
assert {[r xlen mystream] == 62}
r config set stream-node-max-entries 100
}
test {XTRIM with ~ is limited} {
r del mystream
r config set stream-node-max-entries 1
for {set j 0} {$j < 102} {incr j} {
r XADD mystream * xitem v
}
r XTRIM mystream MAXLEN ~ 1
assert {[r xlen mystream] == 2}
r config set stream-node-max-entries 100
}
test {XTRIM without ~ is not limited} {
r del mystream
r config set stream-node-max-entries 1
for {set j 0} {$j < 102} {incr j} {
r XADD mystream * xitem v
}
r XTRIM mystream MAXLEN 1
assert {[r xlen mystream] == 1}
r config set stream-node-max-entries 100
}
test {XTRIM without ~ and with LIMIT} {
r del mystream
r config set stream-node-max-entries 1
for {set j 0} {$j < 102} {incr j} {
r XADD mystream * xitem v
}
assert_error ERR* {r XTRIM mystream MAXLEN 1 LIMIT 30}
}
}
start_server {tags {"stream"} overrides {appendonly yes}} {
@ -466,6 +542,22 @@ start_server {tags {"stream"} overrides {appendonly yes}} {
}
}
start_server {tags {"stream"} overrides {appendonly yes}} {
test {XADD with MINID > lastid can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {
set id [expr {$j+1}]
r XADD mystream $id xitem v
}
r XADD mystream MINID 1 * xitem v
incr j
assert {[r xlen mystream] == $j}
r debug loadaof
r XADD mystream * xitem v
incr j
assert {[r xlen mystream] == $j}
}
}
start_server {tags {"stream"} overrides {appendonly yes}} {
test {XADD with ~ MAXLEN can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {
@ -482,6 +574,52 @@ start_server {tags {"stream"} overrides {appendonly yes}} {
}
}
start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
test {XADD with ~ MAXLEN and LIMIT can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {
r XADD mystream * xitem v
}
r XADD mystream MAXLEN ~ 55 LIMIT 30 * xitem v
assert {[r xlen mystream] == 71}
r config set stream-node-max-entries 1
r debug loadaof
r XADD mystream * xitem v
assert {[r xlen mystream] == 72}
}
}
start_server {tags {"stream"} overrides {appendonly yes}} {
test {XADD with ~ MINID can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {
set id [expr {$j+1}]
r XADD mystream $id xitem v
}
r XADD mystream MINID ~ $j * xitem v
incr j
assert {[r xlen mystream] == $j}
r config set stream-node-max-entries 1
r debug loadaof
r XADD mystream * xitem v
incr j
assert {[r xlen mystream] == $j}
}
}
start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
test {XADD with ~ MINID and LIMIT can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {
set id [expr {$j+1}]
r XADD mystream $id xitem v
}
r XADD mystream MINID ~ 55 LIMIT 30 * xitem v
assert {[r xlen mystream] == 71}
r config set stream-node-max-entries 1
r debug loadaof
r XADD mystream * xitem v
assert {[r xlen mystream] == 72}
}
}
start_server {tags {"stream"} overrides {appendonly yes stream-node-max-entries 10}} {
test {XTRIM with ~ MAXLEN can propagate correctly} {
for {set j 0} {$j < 100} {incr j} {