From 0f43a908f9eebfd32bbe3fd088a9f08426ef379a Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 19 Jan 2018 17:18:06 +0100 Subject: [PATCH] CG: consumer lookup + initial streamReplyWithRange() work to supprot CG. --- src/blocked.c | 4 +++- src/stream.h | 4 ++-- src/t_stream.c | 44 +++++++++++++++++++++++++++++++++++++++----- 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index d560a8f38..371f243bb 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -326,7 +326,9 @@ void handleClientsBlockedOnKeys(void) { addReplyMultiBulkLen(receiver,2); addReplyBulk(receiver,rl->key); streamReplyWithRange(receiver,s,&start,NULL, - receiver->bpop.xread_count,0); + receiver->bpop.xread_count,0, + NULL, + NULL); } } } diff --git a/src/stream.h b/src/stream.h index 4b9e68885..91bdbee5d 100644 --- a/src/stream.h +++ b/src/stream.h @@ -64,7 +64,7 @@ typedef struct streamCG { /* A specific consumer in a consumer group. */ typedef struct streamConsumer { mstime_t seen_time; /* Last time this consumer was active. */ - sds *name; /* Consumer name. This is how the consumer + sds name; /* Consumer name. This is how the consumer will be identified in the consumer group protocol. Case sensitive. */ rax *pel; /* Consumer specific pending entries list: all @@ -89,7 +89,7 @@ struct client; stream *streamNew(void); void freeStream(stream *s); -size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev); +size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); diff --git a/src/t_stream.c b/src/t_stream.c index 65a926ae1..c51dc94cb 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -42,6 +42,7 @@ void streamFreeCG(streamCG *cg); streamCG *streamLookupCG(stream *s, sds groupname); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name); /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. @@ -659,8 +660,19 @@ void streamIteratorStop(streamIterator *si) { * receive is between start and end inclusive, if 'count' is non zero, no more * than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that * we want all the elements from 'start' till the end of the stream. If 'rev' - * is non zero, elements are produced in reversed order from end to start. */ -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev) { + * is non zero, elements are produced in reversed order from end to start. + * + * If group and consumer are not NULL, the function performs additional work: + * 1. It updates the last delivered ID in the group in case we are + * sending IDs greater than the current last ID. + * 2. If the requested IDs are already assigned to some other consumer, the + * function will not return it to the client. + * 3. An entry in the pending list will be created for every entry delivered + * for the first time to this consumer. This is only performed if + * consumer != NULL, so in order to implement the XREADGROUP NOACK option + * no consumer is passed to this function. + */ +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer) { void *arraylen_ptr = addDeferredMultiBulkLength(c); size_t arraylen = 0; streamIterator si; @@ -903,7 +915,7 @@ void xrangeGenericCommand(client *c, int rev) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,OBJ_STREAM)) return; s = o->ptr; - streamReplyWithRange(c,s,&startid,&endid,count,rev); + streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL); } /* XRANGE key start end [COUNT ] */ @@ -1063,13 +1075,18 @@ void xreadCommand(client *c) { * so start from the next ID, since we want only messages with * IDs greater than start. */ streamID start = *gt; - start.seq++; /* Can't overflow, it's an uint64_t */ + start.seq++; /* uint64_t can't overflow in this context. */ /* Emit the two elements sub-array consisting of the name * of the stream and the data we extracted from it. */ addReplyMultiBulkLen(c,2); addReplyBulk(c,c->argv[i+streams_arg]); - streamReplyWithRange(c,s,&start,NULL,count,0); + streamConsumer *consumer = NULL; + if (groups) consumer = streamLookupConsumer(groups[i], + consumername->ptr); + streamReplyWithRange(c,s,&start,NULL,count,0, + groups ? groups[i] : NULL, + consumer); } } @@ -1118,6 +1135,7 @@ void streamNotAckedFree(streamNotAcked *na) { } void streamConsumerFree(streamConsumer *sc) { + zfree(sc->name); zfree(sc); } @@ -1154,6 +1172,22 @@ streamCG *streamLookupCG(stream *s, sds groupname) { return (cg == raxNotFound) ? NULL : cg; } +/* Lookup the consumer with the specified name in the group 'cg': if the + * consumer does not exist it is automatically created as a side effect + * of calling this function, otherwise its last seen time is updated and + * the existing consumer reference returned. */ +streamConsumer *streamLookupConsumer(streamCG *cg, sds name) { + streamConsumer *c = raxFind(cg->consumers,(unsigned char*)name, + sdslen(name)); + if (c == raxNotFound) { + c = zmalloc(sizeof(*c)); + c->name = sdsdup(name); + c->pel = raxNew(); + } + c->seen_time = mstime(); + return c; +} + /* ----------------------------------------------------------------------- * Consumer groups commands * ----------------------------------------------------------------------- */