XREADGROUP with NOACK should propagate only one XGROUP SETID command

This commit is contained in:
Valentino Geron 2020-04-21 20:55:43 +03:00
parent c49fb47fbe
commit 6fd2d7cfee

View File

@ -935,6 +935,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamIterator si; streamIterator si;
int64_t numfields; int64_t numfields;
streamID id; streamID id;
int propagate_last_id = 0;
int noack = flags & STREAM_RWR_NOACK;
/* If the client is asking for some history, we serve it using a /* If the client is asking for some history, we serve it using a
* different function, so that we return entries *solely* from its * different function, so that we return entries *solely* from its
@ -950,12 +952,14 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
arraylen_ptr = addReplyDeferredLen(c); arraylen_ptr = addReplyDeferredLen(c);
streamIteratorStart(&si,s,start,end,rev); streamIteratorStart(&si,s,start,end,rev);
while(streamIteratorGetID(&si,&id,&numfields)) { while(streamIteratorGetID(&si,&id,&numfields)) {
int propagate_last_id = 0;
/* Update the group last_id if needed. */ /* Update the group last_id if needed. */
if (group && streamCompareID(&id,&group->last_id) > 0) { if (group && streamCompareID(&id,&group->last_id) > 0) {
group->last_id = id; group->last_id = id;
propagate_last_id = 1; /* Group last id should be propagated only if NOACK was
* specified, otherwise the last id would be included
* in XCLAIM. */
if (noack)
propagate_last_id = 1;
} }
/* Emit a two elements array for each item. The first is /* Emit a two elements array for each item. The first is
@ -983,7 +987,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
* XGROUP SETID command. So if we find that there is already * XGROUP SETID command. So if we find that there is already
* a NACK for the entry, we need to associate it to the new * a NACK for the entry, we need to associate it to the new
* consumer. */ * consumer. */
if (group && !(flags & STREAM_RWR_NOACK)) { if (group && !noack) {
unsigned char buf[sizeof(streamID)]; unsigned char buf[sizeof(streamID)];
streamEncodeID(buf,&id); streamEncodeID(buf,&id);
@ -1020,14 +1024,16 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack); streamPropagateXCLAIM(c,spi->keyname,group,spi->groupname,idarg,nack);
decrRefCount(idarg); decrRefCount(idarg);
} }
} else {
if (propagate_last_id)
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
} }
arraylen++; arraylen++;
if (count && count == arraylen) break; if (count && count == arraylen) break;
} }
if (spi && propagate_last_id) {
streamPropagateGroupID(c,spi->keyname,group,spi->groupname);
}
streamIteratorStop(&si); streamIteratorStop(&si);
if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen); if (arraylen_ptr) setDeferredArrayLen(c,arraylen_ptr,arraylen);
return arraylen; return arraylen;