diff --git a/src/module.c b/src/module.c index 4c609e4ff..c8cdf72a8 100644 --- a/src/module.c +++ b/src/module.c @@ -518,7 +518,8 @@ int moduleDelKeyIfEmpty(RedisModuleKey *key) { case OBJ_LIST: isempty = listTypeLength(o) == 0; break; case OBJ_SET: isempty = setTypeSize(o) == 0; break; case OBJ_ZSET: isempty = zsetLength(o) == 0; break; - case OBJ_HASH : isempty = hashTypeLength(o) == 0; break; + case OBJ_HASH: isempty = hashTypeLength(o) == 0; break; + case OBJ_STREAM: isempty = streamLength(o) == 0; break; default: isempty = 0; } @@ -1965,6 +1966,7 @@ int RM_KeyType(RedisModuleKey *key) { case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET; case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH; case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE; + case OBJ_STREAM: return REDISMODULE_KEYTYPE_STREAM; default: return 0; } } @@ -1982,6 +1984,7 @@ size_t RM_ValueLength(RedisModuleKey *key) { case OBJ_SET: return setTypeSize(key->value); case OBJ_ZSET: return zsetLength(key->value); case OBJ_HASH: return hashTypeLength(key->value); + case OBJ_STREAM: return streamLength(key->value); default: return 0; } } diff --git a/src/redismodule.h b/src/redismodule.h index 40b5ccf20..4f4f9abd5 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -33,6 +33,7 @@ #define REDISMODULE_KEYTYPE_SET 4 #define REDISMODULE_KEYTYPE_ZSET 5 #define REDISMODULE_KEYTYPE_MODULE 6 +#define REDISMODULE_KEYTYPE_STREAM 7 /* Reply types. */ #define REDISMODULE_REPLY_UNKNOWN -1 diff --git a/src/stream.h b/src/stream.h index 1163b3527..7de769ba1 100644 --- a/src/stream.h +++ b/src/stream.h @@ -98,6 +98,7 @@ struct client; stream *streamNew(void); void freeStream(stream *s); +unsigned long streamLength(const robj *subject); size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); diff --git a/src/t_stream.c b/src/t_stream.c index 0b07d7110..4533853da 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -67,6 +67,12 @@ void freeStream(stream *s) { zfree(s); } +/* Return the length of a stream. */ +unsigned long streamLength(const robj *subject) { + stream *s = subject->ptr; + return s->length; +} + /* Generate the next stream item ID given the previous one. If the current * milliseconds Unix time is greater than the previous one, just use this * as time part and start with sequence part of zero. Otherwise we use the