CG: creation of NACK entries in PELs.

This commit is contained in:
antirez 2018-01-23 18:52:24 +01:00
parent 1ffb6723f5
commit 41809fd969
3 changed files with 75 additions and 24 deletions

View File

@ -327,8 +327,7 @@ void handleClientsBlockedOnKeys(void) {
addReplyBulk(receiver,rl->key); addReplyBulk(receiver,rl->key);
streamReplyWithRange(receiver,s,&start,NULL, streamReplyWithRange(receiver,s,&start,NULL,
receiver->bpop.xread_count,0, receiver->bpop.xread_count,0,
NULL, NULL,NULL,0);
NULL);
} }
} }
} }

View File

@ -55,7 +55,7 @@ typedef struct streamCG {
the NOACK option) that was yet not acknowledged the NOACK option) that was yet not acknowledged
as processed. The key of the radix tree is the as processed. The key of the radix tree is the
ID as a 64 bit big endian number, while the ID as a 64 bit big endian number, while the
associated value is a streamNotAcked structure.*/ associated value is a streamNACK structure.*/
rax *consumers; /* A radix tree representing the consumers by name rax *consumers; /* A radix tree representing the consumers by name
and their associated representation in the form and their associated representation in the form
of streamConsumer structures. */ of streamConsumer structures. */
@ -71,25 +71,25 @@ typedef struct streamConsumer {
the pending messages delivered to this the pending messages delivered to this
consumer not yet acknowledged. Keys are consumer not yet acknowledged. Keys are
big endian message IDs, while values are big endian message IDs, while values are
the same streamNotAcked structure referenced the same streamNACK structure referenced
in the "pel" of the conumser group structure in the "pel" of the conumser group structure
itself, so the value is shared. */ itself, so the value is shared. */
} streamConsumer; } streamConsumer;
/* Pending (yet not acknowledged) message in a consumer group. */ /* Pending (yet not acknowledged) message in a consumer group. */
typedef struct streamNotAcked { typedef struct streamNACK {
mstime_t delivery_time; /* Last time this message was delivered. */ mstime_t delivery_time; /* Last time this message was delivered. */
uint64_t delivery_count; /* Number of times this message was delivered.*/ uint64_t delivery_count; /* Number of times this message was delivered.*/
streamConsumer *consumer; /* The consumer this message was delivered to streamConsumer *consumer; /* The consumer this message was delivered to
in the last delivery. */ in the last delivery. */
} streamNotAcked; } streamNACK;
/* Prototypes of exported APIs. */ /* Prototypes of exported APIs. */
struct client; struct client;
stream *streamNew(void); stream *streamNew(void);
void freeStream(stream *s); void freeStream(stream *s);
size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer); size_t streamReplyWithRange(struct client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev); void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);

View File

@ -43,6 +43,7 @@
void streamFreeCG(streamCG *cg); void streamFreeCG(streamCG *cg);
streamCG *streamLookupCG(stream *s, sds groupname); streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name); streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
streamNACK *streamCreateNACK(streamConsumer *consumer);
/* ----------------------------------------------------------------------- /* -----------------------------------------------------------------------
* Low level stream encoding: a radix tree of listpacks. * Low level stream encoding: a radix tree of listpacks.
@ -665,6 +666,18 @@ void streamIteratorStop(streamIterator *si) {
raxStop(&si->ri); raxStop(&si->ri);
} }
/* This is an helper function for streamReplyWithRange() when called with
* group and consumer arguments, but with a range that is referring to already
* delivered messages. In this case we just emit messages that are already
* in the history of the conusmer, fetching the IDs from its PEL.
*
* Note that this function does not have a 'rev' argument because it's not
* possible to iterate in reverse using a group. Basically this function
* is only called as a result of the XREADGROUP command. */
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamCG *group, streamConsumer *consumer) {
/* TODO: update the last time delivery and delivery count. */
}
/* Send the specified range to the client 'c'. The range the client will /* Send the specified range to the client 'c'. The range the client will
* receive is between start and end inclusive, if 'count' is non zero, no more * receive is between start and end inclusive, if 'count' is non zero, no more
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that * than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
@ -678,24 +691,27 @@ void streamIteratorStop(streamIterator *si) {
* function will not return it to the client. * function will not return it to the client.
* 3. An entry in the pending list will be created for every entry delivered * 3. An entry in the pending list will be created for every entry delivered
* for the first time to this consumer. This is only performed if * for the first time to this consumer. This is only performed if
* consumer != NULL, so in order to implement the XREADGROUP NOACK option * 'noack' is non-zero.
* no consumer is passed to this function.
*/ */
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer) { size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int noack) {
void *arraylen_ptr = addDeferredMultiBulkLength(c); void *arraylen_ptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0; size_t arraylen = 0;
streamIterator si; streamIterator si;
int64_t numfields; int64_t numfields;
streamID id; streamID id;
/* If a group was passed, as an optimization we check if the range /* If a group was passed, we check if the request is about messages
* specified is about messages that were never delivered. This is true if * never delivered so far (normally this happens when ">" ID is passed).
* the 'start' range is greater than the current last_id in the stream. *
* In that case, there is no need to check if the messages may already * If instead the client is asking for some history, we serve it
* have another owner before delivering the message. This speeds up * using a different function, so that we return entries *solely*
* the processing significantly. */ * from its own PEL. This ensures each consumer will always and only
int newmessages = group != NULL && * see the history of messages delivered to it and not yet confirmed
streamCompareID(start,&group->last_id) > 0; * as delivered. */
if (group && streamCompareID(start,&group->last_id) <= 0) {
return streamReplyWithRangeFromConsumerPEL(c,s,start,end,count,
group,consumer);
}
streamIteratorStart(&si,s,start,end,rev); streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) { while(streamIteratorGetID(&si,&id,&numfields)) {
@ -720,6 +736,22 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
} }
arraylen++; arraylen++;
if (count && count == arraylen) break; if (count && count == arraylen) break;
/* If a group is passed, we need to create an entry in the
* PEL (pending entries list) of this group *and* this consumer.
* Note that we are sure about the fact the message is not already
* associated with some other consumer, because if we reached this
* loop the IDs the user is requesting are greater than any message
* delivered for this group. */
if (group && !noack) {
unsigned char buf[sizeof(streamID)];
streamEncodeID(buf,&id);
streamNACK *nack = streamCreateNACK(consumer);
int retval = 0;
retval += raxInsert(group->pel,buf,sizeof(buf),nack,NULL);
retval += raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
serverAssert(retval == 2); /* Make sure entry was inserted. */
}
} }
streamIteratorStop(&si); streamIteratorStop(&si);
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
@ -937,7 +969,7 @@ void xrangeGenericCommand(client *c, int rev) {
if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.emptymultibulk)) == NULL
|| checkType(c,o,OBJ_STREAM)) return; || checkType(c,o,OBJ_STREAM)) return;
s = o->ptr; s = o->ptr;
streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL); streamReplyWithRange(c,s,&startid,&endid,count,rev,NULL,NULL,0);
} }
/* XRANGE key start end [COUNT <n>] */ /* XRANGE key start end [COUNT <n>] */
@ -972,6 +1004,7 @@ void xreadCommand(client *c) {
long long count = 0; long long count = 0;
int streams_count = 0; int streams_count = 0;
int streams_arg = 0; int streams_arg = 0;
int noack = 0; /* True if NOACK option was specified. */
#define STREAMID_STATIC_VECTOR_LEN 8 #define STREAMID_STATIC_VECTOR_LEN 8
streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
streamID *ids = static_ids; streamID *ids = static_ids;
@ -1013,6 +1046,13 @@ void xreadCommand(client *c) {
groupname = c->argv[i+1]; groupname = c->argv[i+1];
consumername = c->argv[i+2]; consumername = c->argv[i+2];
i += 2; i += 2;
} else if (!strcasecmp(o,"NOACK")) {
if (!xreadgroup) {
addReplyError(c,"The NOACK option is only supported by "
"XREADGROUP. You called XREAD instead.");
return;
}
noack = 1;
} else { } else {
addReply(c,shared.syntaxerr); addReply(c,shared.syntaxerr);
return; return;
@ -1109,7 +1149,7 @@ void xreadCommand(client *c) {
consumername->ptr); consumername->ptr);
streamReplyWithRange(c,s,&start,NULL,count,0, streamReplyWithRange(c,s,&start,NULL,count,0,
groups ? groups[i] : NULL, groups ? groups[i] : NULL,
consumer); consumer, noack);
} }
} }
@ -1153,7 +1193,19 @@ cleanup:
* Low level implementation of consumer groups * Low level implementation of consumer groups
* ----------------------------------------------------------------------- */ * ----------------------------------------------------------------------- */
void streamNotAckedFree(streamNotAcked *na) { /* Create a NACK entry setting the delivery count to 1 and the delivery
* time to the current time. The NACK consumer will be set to the one
* specified as argument of the function. */
streamNACK *streamCreateNACK(streamConsumer *consumer) {
streamNACK *nack = zmalloc(sizeof(*nack));
nack->delivery_time = mstime();
nack->delivery_count = 1;
nack->consumer = consumer;
return nack;
}
/* Free a NACK entry. */
void streamFreeNACK(streamNACK *na) {
zfree(na); zfree(na);
} }
@ -1162,7 +1214,7 @@ void streamNotAckedFree(streamNotAcked *na) {
* nor will delete them from the stream, so when this function is called * nor will delete them from the stream, so when this function is called
* to delete a consumer, and not when the whole stream is destroyed, the caller * to delete a consumer, and not when the whole stream is destroyed, the caller
* should do some work before. */ * should do some work before. */
void streamConsumerFree(streamConsumer *sc) { void streamFreeConsumer(streamConsumer *sc) {
raxFree(sc->pel); /* No value free callback: the PEL entries are shared raxFree(sc->pel); /* No value free callback: the PEL entries are shared
between the consumer and the main stream PEL. */ between the consumer and the main stream PEL. */
sdsfree(sc->name); sdsfree(sc->name);
@ -1188,8 +1240,8 @@ streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID id) {
/* Free a consumer group and all its associated data. */ /* Free a consumer group and all its associated data. */
void streamFreeCG(streamCG *cg) { void streamFreeCG(streamCG *cg) {
raxFreeWithCallback(cg->pel,(void(*)(void*))streamNotAckedFree); raxFreeWithCallback(cg->pel,(void(*)(void*))streamFreeNACK);
raxFreeWithCallback(cg->consumers,(void(*)(void*))streamConsumerFree); raxFreeWithCallback(cg->consumers,(void(*)(void*))streamFreeConsumer);
zfree(cg); zfree(cg);
} }