From 4a5eaaeb7633b863ec45d6e8b7c0ec20a3327e27 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 23 Jan 2018 18:52:24 +0100 Subject: [PATCH] CG: creation of NACK entries in PELs. --- src/blocked.c | 3 +- src/stream.h | 10 +++--- src/t_stream.c | 86 ++++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 75 insertions(+), 24 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 371f243bb..0bbbe6c6a 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -327,8 +327,7 @@ void handleClientsBlockedOnKeys(void) { addReplyBulk(receiver,rl->key); streamReplyWithRange(receiver,s,&start,NULL, receiver->bpop.xread_count,0, - NULL, - NULL); + NULL,NULL,0); } } } diff --git a/src/stream.h b/src/stream.h index 91bdbee5d..fa6947482 100644 --- a/src/stream.h +++ b/src/stream.h @@ -55,7 +55,7 @@ typedef struct streamCG { the NOACK option) that was yet not acknowledged as processed. The key of the radix tree is the ID as a 64 bit big endian number, while the - associated value is a streamNotAcked structure.*/ + associated value is a streamNACK structure.*/ rax *consumers; /* A radix tree representing the consumers by name and their associated representation in the form of streamConsumer structures. */ @@ -71,25 +71,25 @@ typedef struct streamConsumer { the pending messages delivered to this consumer not yet acknowledged. Keys are big endian message IDs, while values are - the same streamNotAcked structure referenced + the same streamNACK structure referenced in the "pel" of the conumser group structure itself, so the value is shared. */ } streamConsumer; /* Pending (yet not acknowledged) message in a consumer group. */ -typedef struct streamNotAcked { +typedef struct streamNACK { mstime_t delivery_time; /* Last time this message was delivered. */ uint64_t delivery_count; /* Number of times this message was delivered.*/ streamConsumer *consumer; /* The consumer this message was delivered to in the last delivery. */ -} streamNotAcked; +} streamNACK; /* Prototypes of exported APIs. */ struct client; stream *streamNew(void); void freeStream(stream *s); -size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer); +size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); diff --git a/src/t_stream.c b/src/t_stream.c index 4d9c45548..605d3251b 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -43,6 +43,7 @@ void streamFreeCG(streamCG *cg); streamCG *streamLookupCG(stream *s, sds groupname); streamConsumer *streamLookupConsumer(streamCG *cg, sds name); +streamNACK *streamCreateNACK(streamConsumer *consumer); /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. @@ -665,6 +666,18 @@ void streamIteratorStop(streamIterator *si) { raxStop(&si->ri); } +/* This is an helper function for streamReplyWithRange() when called with + * group and consumer arguments, but with a range that is referring to already + * delivered messages. In this case we just emit messages that are already + * in the history of the conusmer, fetching the IDs from its PEL. + * + * Note that this function does not have a 'rev' argument because it's not + * possible to iterate in reverse using a group. Basically this function + * is only called as a result of the XREADGROUP command. */ +size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) { + /* TODO: update the last time delivery and delivery count. */ +} + /* Send the specified range to the client 'c'. The range the client will * receive is between start and end inclusive, if 'count' is non zero, no more * than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that @@ -678,24 +691,27 @@ void streamIteratorStop(streamIterator *si) { * function will not return it to the client. * 3. An entry in the pending list will be created for every entry delivered * for the first time to this consumer. This is only performed if - * consumer != NULL, so in order to implement the XREADGROUP NOACK option - * no consumer is passed to this function. + * 'noack' is non-zero. */ -size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer) { +size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack) { void *arraylen_ptr = addDeferredMultiBulkLength(c); size_t arraylen = 0; streamIterator si; int64_t numfields; streamID id; - /* If a group was passed, as an optimization we check if the range - * specified is about messages that were never delivered. This is true if - * the 'start' range is greater than the current last_id in the stream. - * In that case, there is no need to check if the messages may already - * have another owner before delivering the message. This speeds up - * the processing significantly. */ - int newmessages = group != NULL && - streamCompareID(start,&group->last_id) > 0; + /* If a group was passed, we check if the request is about messages + * never delivered so far (normally this happens when ">" ID is passed). + * + * If instead the client is asking for some history, we serve it + * using a different function, so that we return entries *solely* + * from its own PEL. This ensures each consumer will always and only + * see the history of messages delivered to it and not yet confirmed + * as delivered. */ + if (group && streamCompareID(start,&group->last_id) <= 0) { + return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count, + group,consumer); + } streamIteratorStart(&si,s,start,end,rev); while(streamIteratorGetID(&si,&id,&numfields)) { @@ -720,6 +736,22 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end } arraylen++; if (count && count == arraylen) break; + + /* If a group is passed, we need to create an entry in the + * PEL (pending entries list) of this group *and* this consumer. + * Note that we are sure about the fact the message is not already + * associated with some other consumer, because if we reached this + * loop the IDs the user is requesting are greater than any message + * delivered for this group. */ + if (group && !noack) { + unsigned char buf[sizeof(streamID)]; + streamEncodeID(buf,&id); + streamNACK *nack = streamCreateNACK(consumer); + int retval = 0; + retval += raxInsert(group->pel,buf,sizeof(buf),nack,NULL); + retval += raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL); + serverAssert(retval == 2); /* Make sure entry was inserted. */ + } } streamIteratorStop(&si); setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); @@ -937,7 +969,7 @@ void xrangeGenericCommand(client *c, int rev) { if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL || checkType(c,o,OBJ_STREAM)) return; s = o->ptr; - streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL); + streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0); } /* XRANGE key start end [COUNT ] */ @@ -972,6 +1004,7 @@ void xreadCommand(client *c) { long long count = 0; int streams_count = 0; int streams_arg = 0; + int noack = 0; /* True if NOACK option was specified. */ #define STREAMID_STATIC_VECTOR_LEN 8 streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; streamID *ids = static_ids; @@ -1013,6 +1046,13 @@ void xreadCommand(client *c) { groupname = c->argv[i+1]; consumername = c->argv[i+2]; i += 2; + } else if (!strcasecmp(o,"NOACK")) { + if (!xreadgroup) { + addReplyError(c,"The NOACK option is only supported by " + "XREADGROUP. You called XREAD instead."); + return; + } + noack = 1; } else { addReply(c,shared.syntaxerr); return; @@ -1109,7 +1149,7 @@ void xreadCommand(client *c) { consumername->ptr); streamReplyWithRange(c,s,&start,NULL,count,0, groups ? groups[i] : NULL, - consumer); + consumer, noack); } } @@ -1153,7 +1193,19 @@ cleanup: * Low level implementation of consumer groups * ----------------------------------------------------------------------- */ -void streamNotAckedFree(streamNotAcked *na) { +/* Create a NACK entry setting the delivery count to 1 and the delivery + * time to the current time. The NACK consumer will be set to the one + * specified as argument of the function. */ +streamNACK *streamCreateNACK(streamConsumer *consumer) { + streamNACK *nack = zmalloc(sizeof(*nack)); + nack->delivery_time = mstime(); + nack->delivery_count = 1; + nack->consumer = consumer; + return nack; +} + +/* Free a NACK entry. */ +void streamFreeNACK(streamNACK *na) { zfree(na); } @@ -1162,7 +1214,7 @@ void streamNotAckedFree(streamNotAcked *na) { * nor will delete them from the stream, so when this function is called * to delete a consumer, and not when the whole stream is destroyed, the caller * should do some work before. */ -void streamConsumerFree(streamConsumer *sc) { +void streamFreeConsumer(streamConsumer *sc) { raxFree(sc->pel); /* No value free callback: the PEL entries are shared between the consumer and the main stream PEL. */ sdsfree(sc->name); @@ -1188,8 +1240,8 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) { /* Free a consumer group and all its associated data. */ void streamFreeCG(streamCG *cg) { - raxFreeWithCallback(cg->pel,(void(*)(void*))streamNotAckedFree); - raxFreeWithCallback(cg->consumers,(void(*)(void*))streamConsumerFree); + raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK); + raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer); zfree(cg); }