XPENDING should not update consumer's seen-time
Same goes for XGROUP DELCONSUMER (But in this case, it doesn't have any visible effect)
This commit is contained in:
parent
8b40f686fb
commit
3843eb300d
@ -371,9 +371,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
|||||||
int noack = 0;
|
int noack = 0;
|
||||||
|
|
||||||
if (group) {
|
if (group) {
|
||||||
consumer = streamLookupConsumer(group,
|
consumer =
|
||||||
|
streamLookupConsumer(group,
|
||||||
receiver->bpop.xread_consumer->ptr,
|
receiver->bpop.xread_consumer->ptr,
|
||||||
1);
|
SLC_NONE);
|
||||||
noack = receiver->bpop.xread_group_noack;
|
noack = receiver->bpop.xread_group_noack;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1851,8 +1851,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) {
|
|||||||
decrRefCount(o);
|
decrRefCount(o);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
streamConsumer *consumer = streamLookupConsumer(cgroup,cname,
|
streamConsumer *consumer =
|
||||||
1);
|
streamLookupConsumer(cgroup,cname,SLC_NONE);
|
||||||
sdsfree(cname);
|
sdsfree(cname);
|
||||||
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
|
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
|
||||||
if (rioGetReadError(rdb)) {
|
if (rioGetReadError(rdb)) {
|
||||||
|
@ -96,6 +96,11 @@ typedef struct streamPropInfo {
|
|||||||
/* Prototypes of exported APIs. */
|
/* Prototypes of exported APIs. */
|
||||||
struct client;
|
struct client;
|
||||||
|
|
||||||
|
/* Flags for streamLookupConsumer */
|
||||||
|
#define SLC_NONE 0
|
||||||
|
#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */
|
||||||
|
#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */
|
||||||
|
|
||||||
stream *streamNew(void);
|
stream *streamNew(void);
|
||||||
void freeStream(stream *s);
|
void freeStream(stream *s);
|
||||||
unsigned long streamLength(const robj *subject);
|
unsigned long streamLength(const robj *subject);
|
||||||
@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
|
|||||||
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
|
void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen);
|
||||||
void streamIteratorStop(streamIterator *si);
|
void streamIteratorStop(streamIterator *si);
|
||||||
streamCG *streamLookupCG(stream *s, sds groupname);
|
streamCG *streamLookupCG(stream *s, sds groupname);
|
||||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create);
|
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
|
||||||
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
|
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
|
||||||
streamNACK *streamCreateNACK(streamConsumer *consumer);
|
streamNACK *streamCreateNACK(streamConsumer *consumer);
|
||||||
void streamDecodeID(void *buf, streamID *id);
|
void streamDecodeID(void *buf, streamID *id);
|
||||||
|
@ -1570,7 +1570,8 @@ void xreadCommand(client *c) {
|
|||||||
addReplyBulk(c,c->argv[streams_arg+i]);
|
addReplyBulk(c,c->argv[streams_arg+i]);
|
||||||
streamConsumer *consumer = NULL;
|
streamConsumer *consumer = NULL;
|
||||||
if (groups) consumer = streamLookupConsumer(groups[i],
|
if (groups) consumer = streamLookupConsumer(groups[i],
|
||||||
consumername->ptr,1);
|
consumername->ptr,
|
||||||
|
SLC_NONE);
|
||||||
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
|
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
|
||||||
int flags = 0;
|
int flags = 0;
|
||||||
if (noack) flags |= STREAM_RWR_NOACK;
|
if (noack) flags |= STREAM_RWR_NOACK;
|
||||||
@ -1706,7 +1707,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
|
|||||||
* consumer does not exist it is automatically created as a side effect
|
* consumer does not exist it is automatically created as a side effect
|
||||||
* of calling this function, otherwise its last seen time is updated and
|
* of calling this function, otherwise its last seen time is updated and
|
||||||
* the existing consumer reference returned. */
|
* the existing consumer reference returned. */
|
||||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
|
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
|
||||||
|
int create = !(flags & SLC_NOCREAT);
|
||||||
|
int refresh = !(flags & SLC_NOREFRESH);
|
||||||
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
|
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
|
||||||
sdslen(name));
|
sdslen(name));
|
||||||
if (consumer == raxNotFound) {
|
if (consumer == raxNotFound) {
|
||||||
@ -1717,7 +1720,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
|
|||||||
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
|
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
|
||||||
consumer,NULL);
|
consumer,NULL);
|
||||||
}
|
}
|
||||||
consumer->seen_time = mstime();
|
if (refresh) consumer->seen_time = mstime();
|
||||||
return consumer;
|
return consumer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1725,7 +1728,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) {
|
|||||||
* may have pending messages: they are removed from the PEL, and the number
|
* may have pending messages: they are removed from the PEL, and the number
|
||||||
* of pending messages "lost" is returned. */
|
* of pending messages "lost" is returned. */
|
||||||
uint64_t streamDelConsumer(streamCG *cg, sds name) {
|
uint64_t streamDelConsumer(streamCG *cg, sds name) {
|
||||||
streamConsumer *consumer = streamLookupConsumer(cg,name,0);
|
streamConsumer *consumer =
|
||||||
|
streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH);
|
||||||
if (consumer == NULL) return 0;
|
if (consumer == NULL) return 0;
|
||||||
|
|
||||||
uint64_t retval = raxSize(consumer->pel);
|
uint64_t retval = raxSize(consumer->pel);
|
||||||
@ -2068,16 +2072,19 @@ void xpendingCommand(client *c) {
|
|||||||
}
|
}
|
||||||
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
|
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
|
||||||
else {
|
else {
|
||||||
streamConsumer *consumer = consumername ?
|
streamConsumer *consumer = NULL;
|
||||||
streamLookupConsumer(group,consumername->ptr,0):
|
if (consumername) {
|
||||||
NULL;
|
consumer = streamLookupConsumer(group,
|
||||||
|
consumername->ptr,
|
||||||
|
SLC_NOCREAT|SLC_NOREFRESH);
|
||||||
|
|
||||||
/* If a consumer name was mentioned but it does not exist, we can
|
/* If a consumer name was mentioned but it does not exist, we can
|
||||||
* just return an empty array. */
|
* just return an empty array. */
|
||||||
if (consumername && consumer == NULL) {
|
if (consumer == NULL) {
|
||||||
addReplyArrayLen(c,0);
|
addReplyArrayLen(c,0);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rax *pel = consumer ? consumer->pel : group->pel;
|
rax *pel = consumer ? consumer->pel : group->pel;
|
||||||
unsigned char startkey[sizeof(streamID)];
|
unsigned char startkey[sizeof(streamID)];
|
||||||
@ -2338,7 +2345,7 @@ void xclaimCommand(client *c) {
|
|||||||
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
|
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
|
||||||
/* Update the consumer and idle time. */
|
/* Update the consumer and idle time. */
|
||||||
if (consumer == NULL)
|
if (consumer == NULL)
|
||||||
consumer = streamLookupConsumer(group,c->argv[3]->ptr,1);
|
consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE);
|
||||||
nack->consumer = consumer;
|
nack->consumer = consumer;
|
||||||
nack->delivery_time = deliverytime;
|
nack->delivery_time = deliverytime;
|
||||||
/* Set the delivery attempts counter if given, otherwise
|
/* Set the delivery attempts counter if given, otherwise
|
||||||
|
Loading…
x
Reference in New Issue
Block a user