Support streams in general module API functions
Fixes GitHub issue #6492 Added stream support in RM_KeyType and RM_ValueLength. Also moduleDelKeyIfEmpty was updated, even though it has no effect now (It will be relevant when stream type direct API will be coded - i.e. RM_StreamAdd)
This commit is contained in:
parent
a15a5d7097
commit
1833d008b3
@ -518,7 +518,8 @@ int moduleDelKeyIfEmpty(RedisModuleKey *key) {
|
|||||||
case OBJ_LIST: isempty = listTypeLength(o) == 0; break;
|
case OBJ_LIST: isempty = listTypeLength(o) == 0; break;
|
||||||
case OBJ_SET: isempty = setTypeSize(o) == 0; break;
|
case OBJ_SET: isempty = setTypeSize(o) == 0; break;
|
||||||
case OBJ_ZSET: isempty = zsetLength(o) == 0; break;
|
case OBJ_ZSET: isempty = zsetLength(o) == 0; break;
|
||||||
case OBJ_HASH : isempty = hashTypeLength(o) == 0; break;
|
case OBJ_HASH: isempty = hashTypeLength(o) == 0; break;
|
||||||
|
case OBJ_STREAM: isempty = streamLength(o) == 0; break;
|
||||||
default: isempty = 0;
|
default: isempty = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1916,6 +1917,7 @@ int RM_KeyType(RedisModuleKey *key) {
|
|||||||
case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET;
|
case OBJ_ZSET: return REDISMODULE_KEYTYPE_ZSET;
|
||||||
case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH;
|
case OBJ_HASH: return REDISMODULE_KEYTYPE_HASH;
|
||||||
case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE;
|
case OBJ_MODULE: return REDISMODULE_KEYTYPE_MODULE;
|
||||||
|
case OBJ_STREAM: return REDISMODULE_KEYTYPE_STREAM;
|
||||||
default: return 0;
|
default: return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1933,6 +1935,7 @@ size_t RM_ValueLength(RedisModuleKey *key) {
|
|||||||
case OBJ_SET: return setTypeSize(key->value);
|
case OBJ_SET: return setTypeSize(key->value);
|
||||||
case OBJ_ZSET: return zsetLength(key->value);
|
case OBJ_ZSET: return zsetLength(key->value);
|
||||||
case OBJ_HASH: return hashTypeLength(key->value);
|
case OBJ_HASH: return hashTypeLength(key->value);
|
||||||
|
case OBJ_STREAM: return streamLength(key->value);
|
||||||
default: return 0;
|
default: return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
#define REDISMODULE_KEYTYPE_SET 4
|
#define REDISMODULE_KEYTYPE_SET 4
|
||||||
#define REDISMODULE_KEYTYPE_ZSET 5
|
#define REDISMODULE_KEYTYPE_ZSET 5
|
||||||
#define REDISMODULE_KEYTYPE_MODULE 6
|
#define REDISMODULE_KEYTYPE_MODULE 6
|
||||||
|
#define REDISMODULE_KEYTYPE_STREAM 7
|
||||||
|
|
||||||
/* Reply types. */
|
/* Reply types. */
|
||||||
#define REDISMODULE_REPLY_UNKNOWN -1
|
#define REDISMODULE_REPLY_UNKNOWN -1
|
||||||
|
@ -98,6 +98,7 @@ struct client;
|
|||||||
|
|
||||||
stream *streamNew(void);
|
stream *streamNew(void);
|
||||||
void freeStream(stream *s);
|
void freeStream(stream *s);
|
||||||
|
unsigned long streamLength(const robj *subject);
|
||||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
|
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
|
||||||
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
|
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
|
||||||
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
|
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
|
||||||
|
@ -67,6 +67,12 @@ void freeStream(stream *s) {
|
|||||||
zfree(s);
|
zfree(s);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Return the length of a stream. */
|
||||||
|
unsigned long streamLength(const robj *subject) {
|
||||||
|
stream *s = subject->ptr;
|
||||||
|
return s->length;
|
||||||
|
}
|
||||||
|
|
||||||
/* Generate the next stream item ID given the previous one. If the current
|
/* Generate the next stream item ID given the previous one. If the current
|
||||||
* milliseconds Unix time is greater than the previous one, just use this
|
* milliseconds Unix time is greater than the previous one, just use this
|
||||||
* as time part and start with sequence part of zero. Otherwise we use the
|
* as time part and start with sequence part of zero. Otherwise we use the
|
||||||
|
Loading…
x
Reference in New Issue
Block a user