CG: consumer lookup + initial streamReplyWithRange() work to supprot CG.
This commit is contained in:
parent
23dc98ac52
commit
0f43a908f9
@ -326,7 +326,9 @@ void handleClientsBlockedOnKeys(void) {
|
||||
addReplyMultiBulkLen(receiver,2);
|
||||
addReplyBulk(receiver,rl->key);
|
||||
streamReplyWithRange(receiver,s,&start,NULL,
|
||||
receiver->bpop.xread_count,0);
|
||||
receiver->bpop.xread_count,0,
|
||||
NULL,
|
||||
NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -64,7 +64,7 @@ typedef struct streamCG {
|
||||
/* A specific consumer in a consumer group. */
|
||||
typedef struct streamConsumer {
|
||||
mstime_t seen_time; /* Last time this consumer was active. */
|
||||
sds *name; /* Consumer name. This is how the consumer
|
||||
sds name; /* Consumer name. This is how the consumer
|
||||
will be identified in the consumer group
|
||||
protocol. Case sensitive. */
|
||||
rax *pel; /* Consumer specific pending entries list: all
|
||||
@ -89,7 +89,7 @@ struct client;
|
||||
|
||||
stream *streamNew(void);
|
||||
void freeStream(stream *s);
|
||||
size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev);
|
||||
size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer);
|
||||
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
|
||||
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
|
||||
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
|
||||
|
@ -42,6 +42,7 @@
|
||||
|
||||
void streamFreeCG(streamCG *cg);
|
||||
streamCG *streamLookupCG(stream *s, sds groupname);
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Low level stream encoding: a radix tree of listpacks.
|
||||
@ -659,8 +660,19 @@ void streamIteratorStop(streamIterator *si) {
|
||||
* receive is between start and end inclusive, if 'count' is non zero, no more
|
||||
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
|
||||
* we want all the elements from 'start' till the end of the stream. If 'rev'
|
||||
* is non zero, elements are produced in reversed order from end to start. */
|
||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev) {
|
||||
* is non zero, elements are produced in reversed order from end to start.
|
||||
*
|
||||
* If group and consumer are not NULL, the function performs additional work:
|
||||
* 1. It updates the last delivered ID in the group in case we are
|
||||
* sending IDs greater than the current last ID.
|
||||
* 2. If the requested IDs are already assigned to some other consumer, the
|
||||
* function will not return it to the client.
|
||||
* 3. An entry in the pending list will be created for every entry delivered
|
||||
* for the first time to this consumer. This is only performed if
|
||||
* consumer != NULL, so in order to implement the XREADGROUP NOACK option
|
||||
* no consumer is passed to this function.
|
||||
*/
|
||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer) {
|
||||
void *arraylen_ptr = addDeferredMultiBulkLength(c);
|
||||
size_t arraylen = 0;
|
||||
streamIterator si;
|
||||
@ -903,7 +915,7 @@ void xrangeGenericCommand(client *c, int rev) {
|
||||
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|
||||
|| checkType(c,o,OBJ_STREAM)) return;
|
||||
s = o->ptr;
|
||||
streamReplyWithRange(c,s,&startid,&endid,count,rev);
|
||||
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL);
|
||||
}
|
||||
|
||||
/* XRANGE key start end [COUNT <n>] */
|
||||
@ -1063,13 +1075,18 @@ void xreadCommand(client *c) {
|
||||
* so start from the next ID, since we want only messages with
|
||||
* IDs greater than start. */
|
||||
streamID start = *gt;
|
||||
start.seq++; /* Can't overflow, it's an uint64_t */
|
||||
start.seq++; /* uint64_t can't overflow in this context. */
|
||||
|
||||
/* Emit the two elements sub-array consisting of the name
|
||||
* of the stream and the data we extracted from it. */
|
||||
addReplyMultiBulkLen(c,2);
|
||||
addReplyBulk(c,c->argv[i+streams_arg]);
|
||||
streamReplyWithRange(c,s,&start,NULL,count,0);
|
||||
streamConsumer *consumer = NULL;
|
||||
if (groups) consumer = streamLookupConsumer(groups[i],
|
||||
consumername->ptr);
|
||||
streamReplyWithRange(c,s,&start,NULL,count,0,
|
||||
groups ? groups[i] : NULL,
|
||||
consumer);
|
||||
}
|
||||
}
|
||||
|
||||
@ -1118,6 +1135,7 @@ void streamNotAckedFree(streamNotAcked *na) {
|
||||
}
|
||||
|
||||
void streamConsumerFree(streamConsumer *sc) {
|
||||
zfree(sc->name);
|
||||
zfree(sc);
|
||||
}
|
||||
|
||||
@ -1154,6 +1172,22 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
|
||||
return (cg == raxNotFound) ? NULL : cg;
|
||||
}
|
||||
|
||||
/* Lookup the consumer with the specified name in the group 'cg': if the
|
||||
* consumer does not exist it is automatically created as a side effect
|
||||
* of calling this function, otherwise its last seen time is updated and
|
||||
* the existing consumer reference returned. */
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
|
||||
streamConsumer *c = raxFind(cg->consumers,(unsigned char*)name,
|
||||
sdslen(name));
|
||||
if (c == raxNotFound) {
|
||||
c = zmalloc(sizeof(*c));
|
||||
c->name = sdsdup(name);
|
||||
c->pel = raxNew();
|
||||
}
|
||||
c->seen_time = mstime();
|
||||
return c;
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
* Consumer groups commands
|
||||
* ----------------------------------------------------------------------- */
|
||||
|
Loading…
x
Reference in New Issue
Block a user