From 7104c334c0743101014ec139e2ddf5be9b2c051c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Thu, 28 Jan 2021 15:19:43 +0100 Subject: [PATCH] Add modules API for streams (#8288) APIs added for these stream operations: add, delete, iterate and trim (by ID or maxlength). The functions are prefixed by RM_Stream. * RM_StreamAdd * RM_StreamDelete * RM_StreamIteratorStart * RM_StreamIteratorStop * RM_StreamIteratorNextID * RM_StreamIteratorNextField * RM_StreamIteratorDelete * RM_StreamTrimByLength * RM_StreamTrimByID The type RedisModuleStreamID is added and functions for converting from and to RedisModuleString. * RM_CreateStringFromStreamID * RM_StringToStreamID Whenever the stream functions return REDISMODULE_ERR, errno is set to provide additional error information. Refactoring: The zset iterator fields in the RedisModuleKey struct are wrapped in a union, to allow the same space to be used for type- specific info for streams and allow future use for other key types. --- runtest-moduleapi | 1 + src/module.c | 676 ++++++++++++++++++++++++++++---- src/redismodule.h | 36 ++ src/stream.h | 7 + src/t_stream.c | 29 +- tests/modules/Makefile | 3 +- tests/modules/stream.c | 258 ++++++++++++ tests/unit/moduleapi/stream.tcl | 155 ++++++++ 8 files changed, 1089 insertions(+), 76 deletions(-) create mode 100644 tests/modules/stream.c create mode 100644 tests/unit/moduleapi/stream.tcl diff --git a/runtest-moduleapi b/runtest-moduleapi index 9a48867d2..878b5c6ad 100755 --- a/runtest-moduleapi +++ b/runtest-moduleapi @@ -31,4 +31,5 @@ $TCLSH tests/test_helper.tcl \ --single unit/moduleapi/getkeys \ --single unit/moduleapi/test_lazyfree \ --single unit/moduleapi/defrag \ +--single unit/moduleapi/stream \ "${@}" diff --git a/src/module.c b/src/module.c index 56a3ef98a..cabccfb7a 100644 --- a/src/module.c +++ b/src/module.c @@ -177,15 +177,25 @@ struct RedisModuleKey { void *iter; /* Iterator. */ int mode; /* Opening mode. */ - /* Zset iterator. */ - uint32_t ztype; /* REDISMODULE_ZSET_RANGE_* */ - zrangespec zrs; /* Score range. */ - zlexrangespec zlrs; /* Lex range. */ - uint32_t zstart; /* Start pos for positional ranges. */ - uint32_t zend; /* End pos for positional ranges. */ - void *zcurrent; /* Zset iterator current node. */ - int zer; /* Zset iterator end reached flag - (true if end was reached). */ + union { + struct { + /* Zset iterator, use only if value->type == OBJ_ZSET */ + uint32_t type; /* REDISMODULE_ZSET_RANGE_* */ + zrangespec rs; /* Score range. */ + zlexrangespec lrs; /* Lex range. */ + uint32_t start; /* Start pos for positional ranges. */ + uint32_t end; /* End pos for positional ranges. */ + void *current; /* Zset iterator current node. */ + int er; /* Zset iterator end reached flag + (true if end was reached). */ + } zset; + struct { + /* Stream, use only if value->type == OBJ_STREAM */ + streamID currentid; /* Current entry while iterating. */ + int64_t numfieldsleft; /* Fields left to fetch for current entry. */ + int signalready; /* Flag that signalKeyAsReady() is needed. */ + } stream; + } u; }; typedef struct RedisModuleKey RedisModuleKey; @@ -376,6 +386,7 @@ robj **moduleCreateArgvFromUserFormat(const char *cmdname, const char *fmt, int void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx); void RM_ZsetRangeStop(RedisModuleKey *kp); static void zsetKeyReset(RedisModuleKey *key); +static void moduleInitKeyTypeSpecific(RedisModuleKey *key); void RM_FreeDict(RedisModuleCtx *ctx, RedisModuleDict *d); void RM_FreeServerInfo(RedisModuleCtx *ctx, RedisModuleServerInfoData *data); @@ -509,10 +520,14 @@ int moduleCreateEmptyKey(RedisModuleKey *key, int type) { case REDISMODULE_KEYTYPE_HASH: obj = createHashObject(); break; + case REDISMODULE_KEYTYPE_STREAM: + obj = createStreamObject(); + break; default: return REDISMODULE_ERR; } dbAdd(key->db,key->key,obj); key->value = obj; + moduleInitKeyTypeSpecific(key); return REDISMODULE_OK; } @@ -1113,6 +1128,18 @@ RedisModuleString *RM_CreateStringFromString(RedisModuleCtx *ctx, const RedisMod return o; } +/* Creates a string from a stream ID. The returned string must be released with + * RedisModule_FreeString(), unless automatic memory is enabled. + * + * The passed context `ctx` may be NULL if necessary. See the + * RedisModule_CreateString() documentation for more info. */ +RedisModuleString *RM_CreateStringFromStreamID(RedisModuleCtx *ctx, const RedisModuleStreamID *id) { + streamID streamid = {id->ms, id->seq}; + RedisModuleString *o = createObjectFromStreamID(&streamid); + if (ctx != NULL) autoMemoryAdd(ctx, REDISMODULE_AM_STRING, o); + return o; +} + /* Free a module string object obtained with one of the Redis modules API calls * that return new string objects. * @@ -1270,6 +1297,30 @@ int RM_StringToLongDouble(const RedisModuleString *str, long double *ld) { return retval ? REDISMODULE_OK : REDISMODULE_ERR; } +/* Convert the string into a stream ID, storing it at `*id`. + * Returns REDISMODULE_OK on success and returns REDISMODULE_ERR if the string + * is not a valid string representation of a stream ID. The special IDs "+" and + * "-" are allowed. + * + * RedisModuleStreamID is a struct with two 64-bit fields, which is used in + * stream functions and defined as + * + * typedef struct RedisModuleStreamID { + * uint64_t ms; + * uint64_t seq; + * } RedisModuleStreamID; + */ +int RM_StringToStreamID(const RedisModuleString *str, RedisModuleStreamID *id) { + streamID streamid; + if (streamParseID(str, &streamid) == C_OK) { + id->ms = streamid.ms; + id->seq = streamid.seq; + return REDISMODULE_OK; + } else { + return REDISMODULE_ERR; + } +} + /* Compare two string objects, returning -1, 0 or 1 respectively if * a < b, a == b, a > b. Strings are compared byte by byte as two * binary blobs without any encoding care / collation attempt. */ @@ -2072,7 +2123,15 @@ static void moduleInitKey(RedisModuleKey *kp, RedisModuleCtx *ctx, robj *keyname kp->value = value; kp->iter = NULL; kp->mode = mode; - zsetKeyReset(kp); + if (kp->value) moduleInitKeyTypeSpecific(kp); +} + +/* Initialize the type-specific part of the key. Only when key has a value. */ +static void moduleInitKeyTypeSpecific(RedisModuleKey *key) { + switch (key->value->type) { + case OBJ_ZSET: zsetKeyReset(key); break; + case OBJ_STREAM: key->u.stream.signalready = 0; break; + } } /* Return an handle representing a Redis key, so that it is possible @@ -2115,8 +2174,13 @@ static void moduleCloseKey(RedisModuleKey *key) { int signal = SHOULD_SIGNAL_MODIFIED_KEYS(key->ctx); if ((key->mode & REDISMODULE_WRITE) && signal) signalModifiedKey(key->ctx->client,key->db,key->key); - /* TODO: if (key->iter) RM_KeyIteratorStop(kp); */ + if (key->iter) zfree(key->iter); RM_ZsetRangeStop(key); + if (key && key->value && key->value->type == OBJ_STREAM && + key->u.stream.signalready) { + /* One of more RM_StreamAdd() have been done. */ + signalKeyAsReady(key->db, key->key, OBJ_STREAM); + } decrRefCount(key->key); } @@ -2545,16 +2609,17 @@ int RM_ZsetScore(RedisModuleKey *key, RedisModuleString *ele, double *score) { * -------------------------------------------------------------------------- */ void zsetKeyReset(RedisModuleKey *key) { - key->ztype = REDISMODULE_ZSET_RANGE_NONE; - key->zcurrent = NULL; - key->zer = 1; + key->u.zset.type = REDISMODULE_ZSET_RANGE_NONE; + key->u.zset.current = NULL; + key->u.zset.er = 1; } /* Stop a sorted set iteration. */ void RM_ZsetRangeStop(RedisModuleKey *key) { + if (!key->value || key->value->type != OBJ_ZSET) return; /* Free resources if needed. */ - if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) - zslFreeLexRange(&key->zlrs); + if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) + zslFreeLexRange(&key->u.zset.lrs); /* Setup sensible values so that misused iteration API calls when an * iterator is not active will result into something more sensible * than crashing. */ @@ -2563,7 +2628,7 @@ void RM_ZsetRangeStop(RedisModuleKey *key) { /* Return the "End of range" flag value to signal the end of the iteration. */ int RM_ZsetRangeEndReached(RedisModuleKey *key) { - return key->zer; + return key->u.zset.er; } /* Helper function for RM_ZsetFirstInScoreRange() and RM_ZsetLastInScoreRange(). @@ -2576,29 +2641,29 @@ int zsetInitScoreRange(RedisModuleKey *key, double min, double max, int minex, i if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR; RM_ZsetRangeStop(key); - key->ztype = REDISMODULE_ZSET_RANGE_SCORE; - key->zer = 0; + key->u.zset.type = REDISMODULE_ZSET_RANGE_SCORE; + key->u.zset.er = 0; /* Setup the range structure used by the sorted set core implementation * in order to seek at the specified element. */ - zrangespec *zrs = &key->zrs; + zrangespec *zrs = &key->u.zset.rs; zrs->min = min; zrs->max = max; zrs->minex = minex; zrs->maxex = maxex; if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { - key->zcurrent = first ? zzlFirstInRange(key->value->ptr,zrs) : - zzlLastInRange(key->value->ptr,zrs); + key->u.zset.current = first ? zzlFirstInRange(key->value->ptr,zrs) : + zzlLastInRange(key->value->ptr,zrs); } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = key->value->ptr; zskiplist *zsl = zs->zsl; - key->zcurrent = first ? zslFirstInRange(zsl,zrs) : - zslLastInRange(zsl,zrs); + key->u.zset.current = first ? zslFirstInRange(zsl,zrs) : + zslLastInRange(zsl,zrs); } else { serverPanic("Unsupported zset encoding"); } - if (key->zcurrent == NULL) key->zer = 1; + if (key->u.zset.current == NULL) key->u.zset.er = 1; return REDISMODULE_OK; } @@ -2640,29 +2705,29 @@ int zsetInitLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModuleStr if (!key->value || key->value->type != OBJ_ZSET) return REDISMODULE_ERR; RM_ZsetRangeStop(key); - key->zer = 0; + key->u.zset.er = 0; /* Setup the range structure used by the sorted set core implementation * in order to seek at the specified element. */ - zlexrangespec *zlrs = &key->zlrs; + zlexrangespec *zlrs = &key->u.zset.lrs; if (zslParseLexRange(min, max, zlrs) == C_ERR) return REDISMODULE_ERR; /* Set the range type to lex only after successfully parsing the range, * otherwise we don't want the zlexrangespec to be freed. */ - key->ztype = REDISMODULE_ZSET_RANGE_LEX; + key->u.zset.type = REDISMODULE_ZSET_RANGE_LEX; if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { - key->zcurrent = first ? zzlFirstInLexRange(key->value->ptr,zlrs) : - zzlLastInLexRange(key->value->ptr,zlrs); + key->u.zset.current = first ? zzlFirstInLexRange(key->value->ptr,zlrs) : + zzlLastInLexRange(key->value->ptr,zlrs); } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { zset *zs = key->value->ptr; zskiplist *zsl = zs->zsl; - key->zcurrent = first ? zslFirstInLexRange(zsl,zlrs) : - zslLastInLexRange(zsl,zlrs); + key->u.zset.current = first ? zslFirstInLexRange(zsl,zlrs) : + zslLastInLexRange(zsl,zlrs); } else { serverPanic("Unsupported zset encoding"); } - if (key->zcurrent == NULL) key->zer = 1; + if (key->u.zset.current == NULL) key->u.zset.er = 1; return REDISMODULE_OK; } @@ -2695,10 +2760,11 @@ int RM_ZsetLastInLexRange(RedisModuleKey *key, RedisModuleString *min, RedisModu RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score) { RedisModuleString *str; - if (key->zcurrent == NULL) return NULL; + if (!key->value || key->value->type != OBJ_ZSET) return NULL; + if (key->u.zset.current == NULL) return NULL; if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *eptr, *sptr; - eptr = key->zcurrent; + eptr = key->u.zset.current; sds ele = ziplistGetObject(eptr); if (score) { sptr = ziplistNext(key->value->ptr,eptr); @@ -2706,7 +2772,7 @@ RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score } str = createObject(OBJ_STRING,ele); } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { - zskiplistNode *ln = key->zcurrent; + zskiplistNode *ln = key->u.zset.current; if (score) *score = ln->score; str = createStringObject(ln->ele,sdslen(ln->ele)); } else { @@ -2720,58 +2786,59 @@ RedisModuleString *RM_ZsetRangeCurrentElement(RedisModuleKey *key, double *score * a next element, 0 if we are already at the latest element or the range * does not include any item at all. */ int RM_ZsetRangeNext(RedisModuleKey *key) { - if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */ + if (!key->value || key->value->type != OBJ_ZSET) return 0; + if (!key->u.zset.type || !key->u.zset.current) return 0; /* No active iterator. */ if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = key->value->ptr; - unsigned char *eptr = key->zcurrent; + unsigned char *eptr = key->u.zset.current; unsigned char *next; next = ziplistNext(zl,eptr); /* Skip element. */ if (next) next = ziplistNext(zl,next); /* Skip score. */ if (next == NULL) { - key->zer = 1; + key->u.zset.er = 1; return 0; } else { /* Are we still within the range? */ - if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) { + if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE) { /* Fetch the next element score for the * range check. */ unsigned char *saved_next = next; next = ziplistNext(zl,next); /* Skip next element. */ double score = zzlGetScore(next); /* Obtain the next score. */ - if (!zslValueLteMax(score,&key->zrs)) { - key->zer = 1; + if (!zslValueLteMax(score,&key->u.zset.rs)) { + key->u.zset.er = 1; return 0; } next = saved_next; - } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { - if (!zzlLexValueLteMax(next,&key->zlrs)) { - key->zer = 1; + } else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) { + if (!zzlLexValueLteMax(next,&key->u.zset.lrs)) { + key->u.zset.er = 1; return 0; } } - key->zcurrent = next; + key->u.zset.current = next; return 1; } } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { - zskiplistNode *ln = key->zcurrent, *next = ln->level[0].forward; + zskiplistNode *ln = key->u.zset.current, *next = ln->level[0].forward; if (next == NULL) { - key->zer = 1; + key->u.zset.er = 1; return 0; } else { /* Are we still within the range? */ - if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE && - !zslValueLteMax(next->score,&key->zrs)) + if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE && + !zslValueLteMax(next->score,&key->u.zset.rs)) { - key->zer = 1; + key->u.zset.er = 1; return 0; - } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { - if (!zslLexValueLteMax(next->ele,&key->zlrs)) { - key->zer = 1; + } else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) { + if (!zslLexValueLteMax(next->ele,&key->u.zset.lrs)) { + key->u.zset.er = 1; return 0; } } - key->zcurrent = next; + key->u.zset.current = next; return 1; } } else { @@ -2783,58 +2850,59 @@ int RM_ZsetRangeNext(RedisModuleKey *key) { * a previous element, 0 if we are already at the first element or the range * does not include any item at all. */ int RM_ZsetRangePrev(RedisModuleKey *key) { - if (!key->ztype || !key->zcurrent) return 0; /* No active iterator. */ + if (!key->value || key->value->type != OBJ_ZSET) return 0; + if (!key->u.zset.type || !key->u.zset.current) return 0; /* No active iterator. */ if (key->value->encoding == OBJ_ENCODING_ZIPLIST) { unsigned char *zl = key->value->ptr; - unsigned char *eptr = key->zcurrent; + unsigned char *eptr = key->u.zset.current; unsigned char *prev; prev = ziplistPrev(zl,eptr); /* Go back to previous score. */ if (prev) prev = ziplistPrev(zl,prev); /* Back to previous ele. */ if (prev == NULL) { - key->zer = 1; + key->u.zset.er = 1; return 0; } else { /* Are we still within the range? */ - if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE) { + if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE) { /* Fetch the previous element score for the * range check. */ unsigned char *saved_prev = prev; prev = ziplistNext(zl,prev); /* Skip element to get the score.*/ double score = zzlGetScore(prev); /* Obtain the prev score. */ - if (!zslValueGteMin(score,&key->zrs)) { - key->zer = 1; + if (!zslValueGteMin(score,&key->u.zset.rs)) { + key->u.zset.er = 1; return 0; } prev = saved_prev; - } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { - if (!zzlLexValueGteMin(prev,&key->zlrs)) { - key->zer = 1; + } else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) { + if (!zzlLexValueGteMin(prev,&key->u.zset.lrs)) { + key->u.zset.er = 1; return 0; } } - key->zcurrent = prev; + key->u.zset.current = prev; return 1; } } else if (key->value->encoding == OBJ_ENCODING_SKIPLIST) { - zskiplistNode *ln = key->zcurrent, *prev = ln->backward; + zskiplistNode *ln = key->u.zset.current, *prev = ln->backward; if (prev == NULL) { - key->zer = 1; + key->u.zset.er = 1; return 0; } else { /* Are we still within the range? */ - if (key->ztype == REDISMODULE_ZSET_RANGE_SCORE && - !zslValueGteMin(prev->score,&key->zrs)) + if (key->u.zset.type == REDISMODULE_ZSET_RANGE_SCORE && + !zslValueGteMin(prev->score,&key->u.zset.rs)) { - key->zer = 1; + key->u.zset.er = 1; return 0; - } else if (key->ztype == REDISMODULE_ZSET_RANGE_LEX) { - if (!zslLexValueGteMin(prev->ele,&key->zlrs)) { - key->zer = 1; + } else if (key->u.zset.type == REDISMODULE_ZSET_RANGE_LEX) { + if (!zslLexValueGteMin(prev->ele,&key->u.zset.lrs)) { + key->u.zset.er = 1; return 0; } } - key->zcurrent = prev; + key->u.zset.current = prev; return 1; } } else { @@ -3049,6 +3117,455 @@ int RM_HashGet(RedisModuleKey *key, int flags, ...) { return REDISMODULE_OK; } +/* -------------------------------------------------------------------------- + * Key API for the stream type. + * -------------------------------------------------------------------------- */ + +/* Adds an entry to a stream. Like XADD without trimming. + * + * - `key`: The key where the stream is (or will be) stored + * - `flags`: A bit field of + * - `REDISMODULE_STREAM_ADD_AUTOID`: Assign a stream ID automatically, like + * `*` in the XADD command. + * - `id`: If the `AUTOID` flag is set, this is where the assigned ID is + * returned. Can be NULL if `AUTOID` is set, if you don't care to receive the + * ID. If `AUTOID` is not set, this is the requested ID. + * - `argv`: A pointer to an array of size `numfields * 2` containing the + * fields and values. + * - `numfields`: The number of field-value pairs in `argv`. + * + * Returns REDISMODULE_OK if an entry has been added. On failure, + * REDISMODULE_ERR is returned and `errno` is set as follows: + * + * - EINVAL if called with invalid arguments + * - ENOTSUP if the key refers to a value of a type other than stream + * - EBADF if the key was not opened for writing + * - EDOM if the given ID was 0-0 or not greater than all other IDs in the + * stream (only if the AUTOID flag is unset) + * - EFBIG if the stream has reached the last possible ID + */ +int RM_StreamAdd(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisModuleString **argv, long numfields) { + /* Validate args */ + if (!key || (numfields != 0 && !argv) || /* invalid key or argv */ + (flags & ~(REDISMODULE_STREAM_ADD_AUTOID)) || /* invalid flags */ + (!(flags & REDISMODULE_STREAM_ADD_AUTOID) && !id)) { /* id required */ + errno = EINVAL; + return REDISMODULE_ERR; + } else if (key->value && key->value->type != OBJ_STREAM) { + errno = ENOTSUP; /* wrong type */ + return REDISMODULE_ERR; + } else if (!(key->mode & REDISMODULE_WRITE)) { + errno = EBADF; /* key not open for writing */ + return REDISMODULE_ERR; + } else if (!(flags & REDISMODULE_STREAM_ADD_AUTOID) && + id->ms == 0 && id->seq == 0) { + errno = EDOM; /* ID out of range */ + return REDISMODULE_ERR; + } + + /* Create key if necessery */ + int created = 0; + if (key->value == NULL) { + moduleCreateEmptyKey(key, REDISMODULE_KEYTYPE_STREAM); + created = 1; + } + + stream *s = key->value->ptr; + if (s->last_id.ms == UINT64_MAX && s->last_id.seq == UINT64_MAX) { + /* The stream has reached the last possible ID */ + errno = EFBIG; + return REDISMODULE_ERR; + } + + streamID added_id; + streamID use_id; + streamID *use_id_ptr = NULL; + if (!(flags & REDISMODULE_STREAM_ADD_AUTOID)) { + use_id.ms = id->ms; + use_id.seq = id->seq; + use_id_ptr = &use_id; + } + if (streamAppendItem(s, argv, numfields, &added_id, use_id_ptr) == C_ERR) { + /* ID not greater than all existing IDs in the stream */ + errno = EDOM; + return REDISMODULE_ERR; + } + /* Postponed signalKeyAsReady(). Done implicitly by moduleCreateEmptyKey() + * so not needed if the stream has just been created. */ + if (!created) key->u.stream.signalready = 1; + + if (id != NULL) { + id->ms = added_id.ms; + id->seq = added_id.seq; + } + + return REDISMODULE_OK; +} + +/* Deletes an entry from a stream. + * + * - `key`: A key opened for writing, with no stream iterator started. + * - `id`: The stream ID of the entry to delete. + * + * Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned + * and `errno` is set as follows: + * + * - EINVAL if called with invalid arguments + * - ENOTSUP if the key refers to a value of a type other than stream or if the + * key is empty + * - EBADF if the key was not opened for writing or if a stream iterator is + * associated with the key + * - ENOENT if no entry with the given stream ID exists + * + * See also RM_StreamIteratorDelete() for deleting the current entry while + * iterating using a stream iterator. + */ +int RM_StreamDelete(RedisModuleKey *key, RedisModuleStreamID *id) { + if (!key || !id) { + errno = EINVAL; + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; /* wrong type */ + return REDISMODULE_ERR; + } else if (!(key->mode & REDISMODULE_WRITE) || + key->iter != NULL) { + errno = EBADF; /* key not opened for writing or iterator started */ + return REDISMODULE_ERR; + } + stream *s = key->value->ptr; + streamID streamid = {id->ms, id->seq}; + if (streamDeleteItem(s, &streamid)) { + return REDISMODULE_OK; + } else { + errno = ENOENT; /* no entry with this id */ + return REDISMODULE_ERR; + } +} + +/* Sets up a stream iterator. + * + * - `key`: The stream key opened for reading using RedisModule_OpenKey(). + * - `flags`: + * - `REDISMODULE_STREAM_ITERATOR_EXCLUSIVE`: Don't include `start` and `end` + * in the iterated range. + * - `REDISMODULE_STREAM_ITERATOR_REVERSE`: Iterate in reverse order, starting + * from the `end` of the range. + * - `start`: The lower bound of the range. Use NULL for the beginning of the + * stream. + * - `end`: The upper bound of the range. Use NULL for the end of the stream. + * + * Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned + * and `errno` is set as follows: + * + * - EINVAL if called with invalid arguments + * - ENOTSUP if the key refers to a value of a type other than stream or if the + * key is empty + * - EBADF if the key was not opened for writing or if a stream iterator is + * already associated with the key + * - EDOM if `start` or `end` is outside the valid range + * + * Returns REDISMODULE_OK on success and REDISMODULE_ERR if the key doesn't + * refer to a stream or if invalid arguments were given. + * + * The stream IDs are retrieved using RedisModule_StreamIteratorNextID() and + * for each stream ID, the fields and values are retrieved using + * RedisModule_StreamIteratorNextField(). The iterator is freed by calling + * RedisModule_StreamIteratorStop(). + * + * Example (error handling omitted): + * + * RedisModule_StreamIteratorStart(key, 0, startid_ptr, endid_ptr); + * RedisModuleStreamID id; + * long numfields; + * while (RedisModule_StreamIteratorNextID(key, &id, &numfields) == + * REDISMODULE_OK) { + * RedisModuleString *field, *value; + * while (RedisModule_StreamIteratorNextField(key, &field, &value) == + * REDISMODULE_OK) { + * // + * // ... Do stuff ... + * // + * RedisModule_Free(field); + * RedisModule_Free(value); + * } + * } + * RedisModule_StreamIteratorStop(key); + */ +int RM_StreamIteratorStart(RedisModuleKey *key, int flags, RedisModuleStreamID *start, RedisModuleStreamID *end) { + /* check args */ + if (!key || + (flags & ~(REDISMODULE_STREAM_ITERATOR_EXCLUSIVE | + REDISMODULE_STREAM_ITERATOR_REVERSE))) { + errno = EINVAL; /* key missing or invalid flags */ + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return REDISMODULE_ERR; /* not a stream */ + } else if (key->iter) { + errno = EBADF; /* iterator already started */ + return REDISMODULE_ERR; + } + + /* define range for streamIteratorStart() */ + streamID lower, upper; + if (start) lower = (streamID){start->ms, start->seq}; + if (end) upper = (streamID){end->ms, end->seq}; + if (flags & REDISMODULE_STREAM_ITERATOR_EXCLUSIVE) { + if ((start && streamIncrID(&lower) != C_OK) || + (end && streamDecrID(&upper) != C_OK)) { + errno = EDOM; /* end is 0-0 or start is MAX-MAX? */ + return REDISMODULE_ERR; + } + } + + /* create iterator */ + stream *s = key->value->ptr; + int rev = flags & REDISMODULE_STREAM_ITERATOR_REVERSE; + streamIterator *si = zmalloc(sizeof(*si)); + streamIteratorStart(si, s, start ? &lower : NULL, end ? &upper : NULL, rev); + key->iter = si; + key->u.stream.currentid.ms = 0; /* for RM_StreamIteratorDelete() */ + key->u.stream.currentid.seq = 0; + key->u.stream.numfieldsleft = 0; /* for RM_StreamIteratorNextField() */ + return REDISMODULE_OK; +} + +/* Stops a stream iterator created using RedisModule_StreamIteratorStart() and + * reclaims its memory. + * + * Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned + * and `errno` is set as follows: + * + * - EINVAL if called with a NULL key + * - ENOTSUP if the key refers to a value of a type other than stream or if the + * key is empty + * - EBADF if the key was not opened for writing or if no stream iterator is + * associated with the key + */ +int RM_StreamIteratorStop(RedisModuleKey *key) { + if (!key) { + errno = EINVAL; + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return REDISMODULE_ERR; + } else if (!key->iter) { + errno = EBADF; + return REDISMODULE_ERR; + } + zfree(key->iter); + key->iter = NULL; + return REDISMODULE_OK; +} + +/* Finds the next stream entry and returns its stream ID and the number of + * fields. + * + * - `key`: Key for which a stream iterator has been started using + * RedisModule_StreamIteratorStart(). + * - `id`: The stream ID returned. NULL if you don't care. + * - `numfields`: The number of fields in the found stream entry. NULL if you + * don't care. + * + * Returns REDISMODULE_OK and sets `*id` and `*numfields` if an entry was found. + * On failure, REDISMODULE_ERR is returned and `errno` is set as follows: + * + * - EINVAL if called with a NULL key + * - ENOTSUP if the key refers to a value of a type other than stream or if the + * key is empty + * - EBADF if no stream iterator is associated with the key + * - ENOENT if there are no more entries in the range of the iterator + * + * In practice, if RM_StreamIteratorNextID() is called after a successful call + * to RM_StreamIteratorStart() and with the same key, it is safe to assume that + * an REDISMODULE_ERR return value means that there are no more entries. + * + * Use RedisModule_StreamIteratorNextField() to retrieve the fields and values. + * See the example at RedisModule_StreamIteratorStart(). + */ +int RM_StreamIteratorNextID(RedisModuleKey *key, RedisModuleStreamID *id, long *numfields) { + if (!key) { + errno = EINVAL; + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return REDISMODULE_ERR; + } else if (!key->iter) { + errno = EBADF; + return REDISMODULE_ERR; + } + streamIterator *si = key->iter; + int64_t *num_ptr = &key->u.stream.numfieldsleft; + streamID *streamid_ptr = &key->u.stream.currentid; + if (streamIteratorGetID(si, streamid_ptr, num_ptr)) { + if (id) { + id->ms = streamid_ptr->ms; + id->seq = streamid_ptr->seq; + } + if (numfields) *numfields = *num_ptr; + return REDISMODULE_OK; + } else { + /* No entry found. */ + key->u.stream.currentid.ms = 0; /* for RM_StreamIteratorDelete() */ + key->u.stream.currentid.seq = 0; + key->u.stream.numfieldsleft = 0; /* for RM_StreamIteratorNextField() */ + errno = ENOENT; + return REDISMODULE_ERR; + } +} + +/* Retrieves the next field of the current stream ID and its corresponding value + * in a stream iteration. This function should be called repeatedly after calling + * RedisModule_StreamIteratorNextID() to fetch each field-value pair. + * + * - `key`: Key where a stream iterator has been started. + * - `field_ptr`: This is where the field is returned. + * - `value_ptr`: This is where the value is returned. + * + * Returns REDISMODULE_OK and points `*field_ptr` and `*value_ptr` to freshly + * allocated RedisModuleString objects. The string objects are freed + * automatically when the callback finishes if automatic memory is enabled. On + * failure, REDISMODULE_ERR is returned and `errno` is set as follows: + * + * - EINVAL if called with a NULL key + * - ENOTSUP if the key refers to a value of a type other than stream or if the + * key is empty + * - EBADF if no stream iterator is associated with the key + * - ENOENT if there are no more fields in the current stream entry + * + * In practice, if RM_StreamIteratorNextField() is called after a successful + * call to RM_StreamIteratorNextID() and with the same key, it is safe to assume + * that an REDISMODULE_ERR return value means that there are no more fields. + * + * See the example at RedisModule_StreamIteratorStart(). + */ +int RM_StreamIteratorNextField(RedisModuleKey *key, RedisModuleString **field_ptr, RedisModuleString **value_ptr) { + if (!key) { + errno = EINVAL; + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return REDISMODULE_ERR; + } else if (!key->iter) { + errno = EBADF; + return REDISMODULE_ERR; + } else if (key->u.stream.numfieldsleft <= 0) { + errno = ENOENT; + return REDISMODULE_ERR; + } + streamIterator *si = key->iter; + unsigned char *field, *value; + int64_t field_len, value_len; + streamIteratorGetField(si, &field, &value, &field_len, &value_len); + if (field_ptr) { + *field_ptr = createRawStringObject((char *)field, field_len); + autoMemoryAdd(key->ctx, REDISMODULE_AM_STRING, *field_ptr); + } + if (value_ptr) { + *value_ptr = createRawStringObject((char *)value, value_len); + autoMemoryAdd(key->ctx, REDISMODULE_AM_STRING, *value_ptr); + } + key->u.stream.numfieldsleft--; + return REDISMODULE_OK; +} + +/* Deletes the current stream entry while iterating. + * + * This function can be called after RM_StreamIteratorNextID() or after any + * calls to RM_StreamIteratorNextField(). + * + * Returns REDISMODULE_OK on success. On failure, REDISMODULE_ERR is returned + * and `errno` is set as follows: + * + * - EINVAL if key is NULL + * - ENOTSUP if the key is empty or is of another type than stream + * - EBADF if the key is not opened for writing, if no iterator has been started + * - ENOENT if the iterator has no current stream entry + */ +int RM_StreamIteratorDelete(RedisModuleKey *key) { + if (!key) { + errno = EINVAL; + return REDISMODULE_ERR; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return REDISMODULE_ERR; + } else if (!(key->mode & REDISMODULE_WRITE) || !key->iter) { + errno = EBADF; + return REDISMODULE_ERR; + } else if (key->u.stream.currentid.ms == 0 && + key->u.stream.currentid.seq == 0) { + errno = ENOENT; + return REDISMODULE_ERR; + } + streamIterator *si = key->iter; + streamIteratorRemoveEntry(si, &key->u.stream.currentid); + key->u.stream.currentid.ms = 0; /* Make sure repeated Delete() fails */ + key->u.stream.currentid.seq = 0; + key->u.stream.numfieldsleft = 0; /* Make sure NextField() fails */ + return REDISMODULE_OK; +} + +/* Trim a stream by length, similar to XTRIM with MAXLEN. + * + * - `key`: Key opened for writing. + * - `flags`: A bitfield of + * - `REDISMODULE_STREAM_TRIM_APPROX`: Trim less if it improves performance, + * like XTRIM with `~`. + * - `length`: The number of stream entries to keep after trimming. + * + * Returns the number of entries deleted. On failure, a negative value is + * returned and `errno` is set as follows: + * + * - EINVAL if called with invalid arguments + * - ENOTSUP if the key is empty or of a type other than stream + * - EBADF if the key is not opened for writing + */ +long long RM_StreamTrimByLength(RedisModuleKey *key, int flags, long long length) { + if (!key || (flags & ~(REDISMODULE_STREAM_TRIM_APPROX)) || length < 0) { + errno = EINVAL; + return -1; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return -1; + } else if (!(key->mode & REDISMODULE_WRITE)) { + errno = EBADF; + return -1; + } + int approx = flags & REDISMODULE_STREAM_TRIM_APPROX ? 1 : 0; + return streamTrimByLength((stream *)key->value->ptr, length, approx); +} + +/* Trim a stream by ID, similar to XTRIM with MINID. + * + * - `key`: Key opened for writing. + * - `flags`: A bitfield of + * - `REDISMODULE_STREAM_TRIM_APPROX`: Trim less if it improves performance, + * like XTRIM with `~`. + * - `id`: The smallest stream ID to keep after trimming. + * + * Returns the number of entries deleted. On failure, a negative value is + * returned and `errno` is set as follows: + * + * - EINVAL if called with invalid arguments + * - ENOTSUP if the key is empty or of a type other than stream + * - EBADF if the key is not opened for writing + */ +long long RM_StreamTrimByID(RedisModuleKey *key, int flags, RedisModuleStreamID *id) { + if (!key || (flags & ~(REDISMODULE_STREAM_TRIM_APPROX)) || !id) { + errno = EINVAL; + return -1; + } else if (!key->value || key->value->type != OBJ_STREAM) { + errno = ENOTSUP; + return -1; + } else if (!(key->mode & REDISMODULE_WRITE)) { + errno = EBADF; + return -1; + } + int approx = flags & REDISMODULE_STREAM_TRIM_APPROX ? 1 : 0; + streamID minid = (streamID){id->ms, id->seq}; + return streamTrimByID((stream *)key->value->ptr, minid, approx); +} + /* -------------------------------------------------------------------------- * Redis <-> Modules generic Call() API * -------------------------------------------------------------------------- */ @@ -8462,6 +8979,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(StringToLongLong); REGISTER_API(StringToDouble); REGISTER_API(StringToLongDouble); + REGISTER_API(StringToStreamID); REGISTER_API(Call); REGISTER_API(CallReplyProto); REGISTER_API(FreeCallReply); @@ -8476,6 +8994,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(CreateStringFromDouble); REGISTER_API(CreateStringFromLongDouble); REGISTER_API(CreateStringFromString); + REGISTER_API(CreateStringFromStreamID); REGISTER_API(CreateStringPrintf); REGISTER_API(FreeString); REGISTER_API(StringPtrLen); @@ -8507,6 +9026,15 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(ZsetRangeEndReached); REGISTER_API(HashSet); REGISTER_API(HashGet); + REGISTER_API(StreamAdd); + REGISTER_API(StreamDelete); + REGISTER_API(StreamIteratorStart); + REGISTER_API(StreamIteratorStop); + REGISTER_API(StreamIteratorNextID); + REGISTER_API(StreamIteratorNextField); + REGISTER_API(StreamIteratorDelete); + REGISTER_API(StreamTrimByLength); + REGISTER_API(StreamTrimByID); REGISTER_API(IsKeysPositionRequest); REGISTER_API(KeyAtPos); REGISTER_API(GetClientId); diff --git a/src/redismodule.h b/src/redismodule.h index 36c566bb3..1b1304172 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -69,6 +69,20 @@ #define REDISMODULE_HASH_CFIELDS (1<<2) #define REDISMODULE_HASH_EXISTS (1<<3) +/* StreamID type. */ +typedef struct RedisModuleStreamID { + uint64_t ms; + uint64_t seq; +} RedisModuleStreamID; + +/* StreamAdd() flags. */ +#define REDISMODULE_STREAM_ADD_AUTOID (1<<0) +/* StreamIteratorStart() flags. */ +#define REDISMODULE_STREAM_ITERATOR_EXCLUSIVE (1<<0) +#define REDISMODULE_STREAM_ITERATOR_REVERSE (1<<1) +/* StreamIteratorTrim*() flags. */ +#define REDISMODULE_STREAM_TRIM_APPROX (1<<0) + /* Context Flags: Info about the current context returned by * RM_GetContextFlags(). */ @@ -578,6 +592,7 @@ REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringFromLongLong)(Redi REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringFromDouble)(RedisModuleCtx *ctx, double d) REDISMODULE_ATTR; REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringFromLongDouble)(RedisModuleCtx *ctx, long double ld, int humanfriendly) REDISMODULE_ATTR; REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringFromString)(RedisModuleCtx *ctx, const RedisModuleString *str) REDISMODULE_ATTR; +REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringFromStreamID)(RedisModuleCtx *ctx, const RedisModuleStreamID *id) REDISMODULE_ATTR; REDISMODULE_API RedisModuleString * (*RedisModule_CreateStringPrintf)(RedisModuleCtx *ctx, const char *fmt, ...) REDISMODULE_ATTR_PRINTF(2,3) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_FreeString)(RedisModuleCtx *ctx, RedisModuleString *str) REDISMODULE_ATTR; REDISMODULE_API const char * (*RedisModule_StringPtrLen)(const RedisModuleString *str, size_t *len) REDISMODULE_ATTR; @@ -599,6 +614,7 @@ REDISMODULE_API int (*RedisModule_ReplyWithCallReply)(RedisModuleCtx *ctx, Redis REDISMODULE_API int (*RedisModule_StringToLongLong)(const RedisModuleString *str, long long *ll) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_StringToDouble)(const RedisModuleString *str, double *d) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_StringToLongDouble)(const RedisModuleString *str, long double *d) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_StringToStreamID)(const RedisModuleString *str, RedisModuleStreamID *id) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_AutoMemory)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_Replicate)(RedisModuleCtx *ctx, const char *cmdname, const char *fmt, ...) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ReplicateVerbatim)(RedisModuleCtx *ctx) REDISMODULE_ATTR; @@ -629,6 +645,15 @@ REDISMODULE_API int (*RedisModule_ZsetRangePrev)(RedisModuleKey *key) REDISMODUL REDISMODULE_API int (*RedisModule_ZsetRangeEndReached)(RedisModuleKey *key) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_HashSet)(RedisModuleKey *key, int flags, ...) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_HashGet)(RedisModuleKey *key, int flags, ...) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_StreamAdd)(RedisModuleKey *key, int flags, RedisModuleStreamID *id, RedisModuleString **argv, int64_t numfields) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_StreamDelete)(RedisModuleKey *key, RedisModuleStreamID *id) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_StreamIteratorStart)(RedisModuleKey *key, int flags, RedisModuleStreamID *startid, RedisModuleStreamID *endid) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_StreamIteratorStop)(RedisModuleKey *key) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_StreamIteratorNextID)(RedisModuleKey *key, RedisModuleStreamID *id, long *numfields) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_StreamIteratorNextField)(RedisModuleKey *key, RedisModuleString **field_ptr, RedisModuleString **value_ptr) REDISMODULE_ATTR; +REDISMODULE_API int (*RedisModule_StreamIteratorDelete)(RedisModuleKey *key) REDISMODULE_ATTR; +REDISMODULE_API long long (*RedisModule_StreamTrimByLength)(RedisModuleKey *key, int flags, long long length) REDISMODULE_ATTR; +REDISMODULE_API long long (*RedisModule_StreamTrimByID)(RedisModuleKey *key, int flags, RedisModuleStreamID *id) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_IsKeysPositionRequest)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_KeyAtPos)(RedisModuleCtx *ctx, int pos) REDISMODULE_ATTR; REDISMODULE_API unsigned long long (*RedisModule_GetClientId)(RedisModuleCtx *ctx) REDISMODULE_ATTR; @@ -842,6 +867,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(StringToLongLong); REDISMODULE_GET_API(StringToDouble); REDISMODULE_GET_API(StringToLongDouble); + REDISMODULE_GET_API(StringToStreamID); REDISMODULE_GET_API(Call); REDISMODULE_GET_API(CallReplyProto); REDISMODULE_GET_API(FreeCallReply); @@ -856,6 +882,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(CreateStringFromDouble); REDISMODULE_GET_API(CreateStringFromLongDouble); REDISMODULE_GET_API(CreateStringFromString); + REDISMODULE_GET_API(CreateStringFromStreamID); REDISMODULE_GET_API(CreateStringPrintf); REDISMODULE_GET_API(FreeString); REDISMODULE_GET_API(StringPtrLen); @@ -887,6 +914,15 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(ZsetRangeEndReached); REDISMODULE_GET_API(HashSet); REDISMODULE_GET_API(HashGet); + REDISMODULE_GET_API(StreamAdd); + REDISMODULE_GET_API(StreamDelete); + REDISMODULE_GET_API(StreamIteratorStart); + REDISMODULE_GET_API(StreamIteratorStop); + REDISMODULE_GET_API(StreamIteratorNextID); + REDISMODULE_GET_API(StreamIteratorNextField); + REDISMODULE_GET_API(StreamIteratorDelete); + REDISMODULE_GET_API(StreamTrimByLength); + REDISMODULE_GET_API(StreamTrimByID); REDISMODULE_GET_API(IsKeysPositionRequest); REDISMODULE_GET_API(KeyAtPos); REDISMODULE_GET_API(GetClientId); diff --git a/src/stream.h b/src/stream.h index c7acee719..1f2132365 100644 --- a/src/stream.h +++ b/src/stream.h @@ -108,6 +108,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); +void streamIteratorRemoveEntry(streamIterator *si, streamID *current); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created); @@ -121,5 +122,11 @@ int streamDecrID(streamID *id); void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername); robj *streamDup(robj *o); int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep); +int streamParseID(const robj *o, streamID *id); +robj *createObjectFromStreamID(streamID *id); +int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id); +int streamDeleteItem(stream *s, streamID *id); +int64_t streamTrimByLength(stream *s, long long maxlen, int approx); +int64_t streamTrimByID(stream *s, streamID minid, int approx); #endif diff --git a/src/t_stream.c b/src/t_stream.c index f991765eb..197b7d4f7 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -818,6 +818,28 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { return deleted; } +/* Trims a stream by length. Returns the number of deleted items. */ +int64_t streamTrimByLength(stream *s, long long maxlen, int approx) { + streamAddTrimArgs args = { + .trim_strategy = TRIM_STRATEGY_MAXLEN, + .approx_trim = approx, + .limit = approx ? 100 * server.stream_node_max_entries : 0, + .maxlen = maxlen + }; + return streamTrim(s, &args); +} + +/* Trims a stream by minimum ID. Returns the number of deleted items. */ +int64_t streamTrimByID(stream *s, streamID minid, int approx) { + streamAddTrimArgs args = { + .trim_strategy = TRIM_STRATEGY_MINID, + .approx_trim = approx, + .limit = approx ? 100 * server.stream_node_max_entries : 0, + .minid = minid + }; + return streamTrim(s, &args); +} + /* Parse the arguements of XADD/XTRIM. * * See streamAddTrimArgs for more details about the arguments handled. @@ -1625,7 +1647,7 @@ robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) { * treated as an invalid ID. * * If 'c' is set to NULL, no reply is sent to the client. */ -int streamGenericParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int strict) { +int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict) { char buf[128]; if (sdslen(o->ptr) > sizeof(buf)-1) goto invalid; memcpy(buf,o->ptr,sdslen(o->ptr)+1); @@ -1661,6 +1683,11 @@ invalid: return C_ERR; } +/* Wrapper for streamGenericParseIDOrReply() used by module API. */ +int streamParseID(const robj *o, streamID *id) { + return streamGenericParseIDOrReply(NULL, o, id, 0, 0); +} + /* Wrapper for streamGenericParseIDOrReply() with 'strict' argument set to * 0, to be used when - and + are acceptable IDs. */ int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq) { diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 7363c98bc..febed82d0 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -27,7 +27,8 @@ TEST_MODULES = \ getkeys.so \ test_lazyfree.so \ timer.so \ - defragtest.so + defragtest.so \ + stream.so .PHONY: all diff --git a/tests/modules/stream.c b/tests/modules/stream.c new file mode 100644 index 000000000..abfbb1faf --- /dev/null +++ b/tests/modules/stream.c @@ -0,0 +1,258 @@ +#include "redismodule.h" + +#include +#include +#include +#include +#include + +/* Command which adds a stream entry with automatic ID, like XADD *. + * + * Syntax: STREAM.ADD key field1 value1 [ field2 value2 ... ] + * + * The response is the ID of the added stream entry or an error message. + */ +int stream_add(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc < 2 || argc % 2 != 0) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + RedisModuleStreamID id; + if (RedisModule_StreamAdd(key, REDISMODULE_STREAM_ADD_AUTOID, &id, + &argv[2], (argc-2)/2) == REDISMODULE_OK) { + RedisModuleString *id_str = RedisModule_CreateStringFromStreamID(ctx, &id); + RedisModule_ReplyWithString(ctx, id_str); + RedisModule_FreeString(ctx, id_str); + } else { + RedisModule_ReplyWithError(ctx, "ERR StreamAdd failed"); + } + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +/* Command which adds a stream entry N times. + * + * Syntax: STREAM.ADD key N field1 value1 [ field2 value2 ... ] + * + * Returns the number of successfully added entries. + */ +int stream_addn(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc < 3 || argc % 2 == 0) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + long long n, i; + if (RedisModule_StringToLongLong(argv[2], &n) == REDISMODULE_ERR) { + RedisModule_ReplyWithError(ctx, "N must be a number"); + return REDISMODULE_OK; + } + + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + for (i = 0; i < n; i++) { + if (RedisModule_StreamAdd(key, REDISMODULE_STREAM_ADD_AUTOID, NULL, + &argv[3], (argc-3)/2) == REDISMODULE_ERR) + break; + } + RedisModule_ReplyWithLongLong(ctx, i); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +/* STREAM.DELETE key stream-id */ +int stream_delete(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 3) return RedisModule_WrongArity(ctx); + RedisModuleStreamID id; + if (RedisModule_StringToStreamID(argv[2], &id) != REDISMODULE_OK) { + return RedisModule_ReplyWithError(ctx, "Invalid stream ID"); + } + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + if (RedisModule_StreamDelete(key, &id) == REDISMODULE_OK) { + RedisModule_ReplyWithSimpleString(ctx, "OK"); + } else { + RedisModule_ReplyWithError(ctx, "ERR StreamDelete failed"); + } + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +/* STREAM.RANGE key start-id end-id + * + * Returns an array of stream items. Each item is an array on the form + * [stream-id, [field1, value1, field2, value2, ...]]. + * + * A funny side-effect used for testing RM_StreamIteratorDelete() is that if any + * entry has a field named "selfdestruct", the stream entry is deleted. It is + * however included in the results of this command. + */ +int stream_range(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 4) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + RedisModuleStreamID startid, endid; + if (RedisModule_StringToStreamID(argv[2], &startid) != REDISMODULE_OK || + RedisModule_StringToStreamID(argv[3], &endid) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "Invalid stream ID"); + return REDISMODULE_OK; + } + + /* If startid > endid, we swap and set the reverse flag. */ + int flags = 0; + if (startid.ms > endid.ms || + (startid.ms == endid.ms && startid.seq > endid.seq)) { + RedisModuleStreamID tmp = startid; + startid = endid; + endid = tmp; + flags |= REDISMODULE_STREAM_ITERATOR_REVERSE; + } + + /* Open key and start iterator. */ + int openflags = REDISMODULE_READ | REDISMODULE_WRITE; + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], openflags); + if (RedisModule_StreamIteratorStart(key, flags, + &startid, &endid) != REDISMODULE_OK) { + /* Key is not a stream, etc. */ + RedisModule_ReplyWithError(ctx, "ERR StreamIteratorStart failed"); + RedisModule_CloseKey(key); + return REDISMODULE_OK; + } + + /* Check error handling: Delete current entry when no current entry. */ + assert(RedisModule_StreamIteratorDelete(key) == + REDISMODULE_ERR); + assert(errno == ENOENT); + + /* Check error handling: Fetch fields when no current entry. */ + assert(RedisModule_StreamIteratorNextField(key, NULL, NULL) == + REDISMODULE_ERR); + assert(errno == ENOENT); + + /* Return array. */ + RedisModule_ReplyWithArray(ctx, REDISMODULE_POSTPONED_ARRAY_LEN); + RedisModule_AutoMemory(ctx); + RedisModuleStreamID id; + long numfields; + long len = 0; + while (RedisModule_StreamIteratorNextID(key, &id, + &numfields) == REDISMODULE_OK) { + RedisModule_ReplyWithArray(ctx, 2); + RedisModuleString *id_str = RedisModule_CreateStringFromStreamID(ctx, &id); + RedisModule_ReplyWithString(ctx, id_str); + RedisModule_ReplyWithArray(ctx, numfields * 2); + int delete = 0; + RedisModuleString *field, *value; + for (long i = 0; i < numfields; i++) { + assert(RedisModule_StreamIteratorNextField(key, &field, &value) == + REDISMODULE_OK); + RedisModule_ReplyWithString(ctx, field); + RedisModule_ReplyWithString(ctx, value); + /* check if this is a "selfdestruct" field */ + size_t field_len; + const char *field_str = RedisModule_StringPtrLen(field, &field_len); + if (!strncmp(field_str, "selfdestruct", field_len)) delete = 1; + } + if (delete) { + assert(RedisModule_StreamIteratorDelete(key) == REDISMODULE_OK); + } + /* check error handling: no more fields to fetch */ + assert(RedisModule_StreamIteratorNextField(key, &field, &value) == + REDISMODULE_ERR); + assert(errno == ENOENT); + len++; + } + RedisModule_ReplySetArrayLength(ctx, len); + RedisModule_StreamIteratorStop(key); + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +/* + * STREAM.TRIM key (MAXLEN (=|~) length | MINID (=|~) id) + */ +int stream_trim(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + if (argc != 5) { + RedisModule_WrongArity(ctx); + return REDISMODULE_OK; + } + + /* Parse args */ + int trim_by_id = 0; /* 0 = maxlen, 1 = minid */ + long long maxlen; + RedisModuleStreamID minid; + size_t arg_len; + const char *arg = RedisModule_StringPtrLen(argv[2], &arg_len); + if (!strcasecmp(arg, "minid")) { + trim_by_id = 1; + if (RedisModule_StringToStreamID(argv[4], &minid) != REDISMODULE_OK) { + RedisModule_ReplyWithError(ctx, "ERR Invalid stream ID"); + return REDISMODULE_OK; + } + } else if (!strcasecmp(arg, "maxlen")) { + if (RedisModule_StringToLongLong(argv[4], &maxlen) == REDISMODULE_ERR) { + RedisModule_ReplyWithError(ctx, "ERR Maxlen must be a number"); + return REDISMODULE_OK; + } + } else { + RedisModule_ReplyWithError(ctx, "ERR Invalid arguments"); + return REDISMODULE_OK; + } + + /* Approx or exact */ + int flags; + arg = RedisModule_StringPtrLen(argv[3], &arg_len); + if (arg_len == 1 && arg[0] == '~') { + flags = REDISMODULE_STREAM_TRIM_APPROX; + } else if (arg_len == 1 && arg[0] == '=') { + flags = 0; + } else { + RedisModule_ReplyWithError(ctx, "ERR Invalid approx-or-exact mark"); + return REDISMODULE_OK; + } + + /* Trim */ + RedisModuleKey *key = RedisModule_OpenKey(ctx, argv[1], REDISMODULE_WRITE); + long long trimmed; + if (trim_by_id) { + trimmed = RedisModule_StreamTrimByID(key, flags, &minid); + } else { + trimmed = RedisModule_StreamTrimByLength(key, flags, maxlen); + } + + /* Return result */ + if (trimmed < 0) { + RedisModule_ReplyWithError(ctx, "ERR Trimming failed"); + } else { + RedisModule_ReplyWithLongLong(ctx, trimmed); + } + RedisModule_CloseKey(key); + return REDISMODULE_OK; +} + +int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); + if (RedisModule_Init(ctx, "stream", 1, REDISMODULE_APIVER_1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + if (RedisModule_CreateCommand(ctx, "stream.add", stream_add, "", + 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "stream.addn", stream_addn, "", + 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "stream.delete", stream_delete, "", + 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "stream.range", stream_range, "", + 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + if (RedisModule_CreateCommand(ctx, "stream.trim", stream_trim, "", + 1, 1, 1) == REDISMODULE_ERR) + return REDISMODULE_ERR; + + return REDISMODULE_OK; +} diff --git a/tests/unit/moduleapi/stream.tcl b/tests/unit/moduleapi/stream.tcl new file mode 100644 index 000000000..15e97c183 --- /dev/null +++ b/tests/unit/moduleapi/stream.tcl @@ -0,0 +1,155 @@ +set testmodule [file normalize tests/modules/stream.so] + +start_server {tags {"modules"}} { + r module load $testmodule + + test {Module stream add and delete} { + r del mystream + # add to empty key + set streamid1 [r stream.add mystream item 1 value a] + # add to existing stream + set streamid2 [r stream.add mystream item 2 value b] + # check result + assert { [string match "*-*" $streamid1] } + set items [r XRANGE mystream - +] + assert_equal $items \ + "{$streamid1 {item 1 value a}} {$streamid2 {item 2 value b}}" + # delete one of them and try deleting non-existing ID + assert_equal OK [r stream.delete mystream $streamid1] + assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456} + assert_error "Invalid stream ID*" {r stream.delete mystream foo} + assert_equal "{$streamid2 {item 2 value b}}" [r XRANGE mystream - +] + # check error condition: wrong type + r del mystream + r set mystream mystring + assert_error "ERR StreamAdd*" {r stream.add mystream item 1 value a} + assert_error "ERR StreamDelete*" {r stream.delete mystream 123-456} + } + + test {Module stream add unblocks blocking xread} { + r del mystream + + # Blocking XREAD on an empty key + set rd1 [redis_deferring_client] + $rd1 XREAD BLOCK 3000 STREAMS mystream $ + # wait until client is actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Client is not blocked" + } + set id [r stream.add mystream field 1 value a] + assert_equal "{mystream {{$id {field 1 value a}}}}" [$rd1 read] + + # Blocking XREAD on an existing stream + set rd2 [redis_deferring_client] + $rd2 XREAD BLOCK 3000 STREAMS mystream $ + # wait until client is actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Client is not blocked" + } + set id [r stream.add mystream field 2 value b] + assert_equal "{mystream {{$id {field 2 value b}}}}" [$rd2 read] + } + + test {Module stream add benchmark (1M stream add)} { + set n 1000000 + r del mystream + set result [r stream.addn mystream $n field value] + assert_equal $result $n + } + + test {Module stream iterator} { + r del mystream + set streamid1 [r xadd mystream * item 1 value a] + set streamid2 [r xadd mystream * item 2 value b] + # range result + set result1 [r stream.range mystream "-" "+"] + set expect1 [r xrange mystream "-" "+"] + assert_equal $result1 $expect1 + # reverse range + set result_rev [r stream.range mystream "+" "-"] + set expect_rev [r xrevrange mystream "+" "-"] + assert_equal $result_rev $expect_rev + + # only one item: range with startid = endid + set result2 [r stream.range mystream "-" $streamid1] + assert_equal $result2 "{$streamid1 {item 1 value a}}" + assert_equal $result2 [list [list $streamid1 {item 1 value a}]] + # only one item: range with startid = endid + set result3 [r stream.range mystream $streamid2 $streamid2] + assert_equal $result3 "{$streamid2 {item 2 value b}}" + assert_equal $result3 [list [list $streamid2 {item 2 value b}]] + } + + test {Module stream iterator delete} { + r del mystream + set id1 [r xadd mystream * normal item] + set id2 [r xadd mystream * selfdestruct yes] + set id3 [r xadd mystream * another item] + # stream.range deletes the "selfdestruct" item after returning it + assert_equal \ + "{$id1 {normal item}} {$id2 {selfdestruct yes}} {$id3 {another item}}" \ + [r stream.range mystream - +] + # now, the "selfdestruct" item is gone + assert_equal \ + "{$id1 {normal item}} {$id3 {another item}}" \ + [r stream.range mystream - +] + } + + test {Module stream trim by length} { + r del mystream + # exact maxlen + r xadd mystream * item 1 value a + r xadd mystream * item 2 value b + r xadd mystream * item 3 value c + assert_equal 3 [r xlen mystream] + assert_equal 0 [r stream.trim mystream maxlen = 5] + assert_equal 3 [r xlen mystream] + assert_equal 2 [r stream.trim mystream maxlen = 1] + assert_equal 1 [r xlen mystream] + assert_equal 1 [r stream.trim mystream maxlen = 0] + # check that there is no limit for exact maxlen + r stream.addn mystream 20000 item x value y + assert_equal 20000 [r stream.trim mystream maxlen = 0] + # approx maxlen (100 items per node implies default limit 10K items) + r stream.addn mystream 20000 item x value y + assert_equal 20000 [r xlen mystream] + assert_equal 10000 [r stream.trim mystream maxlen ~ 2] + assert_equal 9900 [r stream.trim mystream maxlen ~ 2] + assert_equal 0 [r stream.trim mystream maxlen ~ 2] + assert_equal 100 [r xlen mystream] + assert_equal 100 [r stream.trim mystream maxlen ~ 0] + assert_equal 0 [r xlen mystream] + } + + test {Module stream trim by ID} { + r del mystream + # exact minid + r xadd mystream * item 1 value a + r xadd mystream * item 2 value b + set minid [r xadd mystream * item 3 value c] + assert_equal 3 [r xlen mystream] + assert_equal 0 [r stream.trim mystream minid = -] + assert_equal 3 [r xlen mystream] + assert_equal 2 [r stream.trim mystream minid = $minid] + assert_equal 1 [r xlen mystream] + assert_equal 1 [r stream.trim mystream minid = +] + # check that there is no limit for exact minid + r stream.addn mystream 20000 item x value y + assert_equal 20000 [r stream.trim mystream minid = +] + # approx minid (100 items per node implies default limit 10K items) + r stream.addn mystream 19980 item x value y + set minid [r xadd mystream * item x value y] + r stream.addn mystream 19 item x value y + assert_equal 20000 [r xlen mystream] + assert_equal 10000 [r stream.trim mystream minid ~ $minid] + assert_equal 9900 [r stream.trim mystream minid ~ $minid] + assert_equal 0 [r stream.trim mystream minid ~ $minid] + assert_equal 100 [r xlen mystream] + assert_equal 100 [r stream.trim mystream minid ~ +] + assert_equal 0 [r xlen mystream] + } +}