Fix if consumer is created as a side effect without notify and dirty++ (#9263)
Fixes: - When a consumer is created as a side effect, redis didn't issue a keyspace notification, nor incremented the server.dirty (affects periodic snapshots). this was a bug in XREADGROUP, XCLAIM, and XAUTOCLAIM. - When attempting to delete a non-existent consumer, don't issue a keyspace notification and don't increment server.dirty this was a bug in XGROUP DELCONSUMER Other changes: - Changed streamLookupConsumer() to always only do lookup consumer (never do implicit creation), Its last seen time is updated unless the SLC_NO_REFRESH flag is specified. - Added streamCreateConsumer() to create a new consumer. When the creation is successful, it will notify and dirty++ unless the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. - Changed streamDelConsumer() to always only do delete consumer. - Added keyspace notifications tests about stream events.
This commit is contained in:
parent
27a68a4d1b
commit
82c3158ad5
@ -424,17 +424,17 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
||||
int noack = 0;
|
||||
|
||||
if (group) {
|
||||
int created = 0;
|
||||
consumer =
|
||||
streamLookupConsumer(group,
|
||||
receiver->bpop.xread_consumer->ptr,
|
||||
SLC_NONE,
|
||||
&created);
|
||||
noack = receiver->bpop.xread_group_noack;
|
||||
if (created && noack) {
|
||||
streamPropagateConsumerCreation(receiver,rl->key,
|
||||
receiver->bpop.xread_group,
|
||||
consumer->name);
|
||||
sds name = receiver->bpop.xread_consumer->ptr;
|
||||
consumer = streamLookupConsumer(group,name,SLC_DEFAULT);
|
||||
if (consumer == NULL) {
|
||||
consumer = streamCreateConsumer(group,name,rl->key,
|
||||
rl->db->id,SCC_DEFAULT);
|
||||
if (noack) {
|
||||
streamPropagateConsumerCreation(receiver,rl->key,
|
||||
receiver->bpop.xread_group,
|
||||
consumer->name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2112,8 +2112,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid) {
|
||||
decrRefCount(o);
|
||||
return NULL;
|
||||
}
|
||||
streamConsumer *consumer =
|
||||
streamLookupConsumer(cgroup,cname,SLC_NONE,NULL);
|
||||
streamConsumer *consumer = streamCreateConsumer(cgroup,cname,NULL,0,
|
||||
SCC_NO_NOTIFY|SCC_NO_DIRTIFY);
|
||||
sdsfree(cname);
|
||||
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
|
||||
if (rioGetReadError(rdb)) {
|
||||
|
13
src/stream.h
13
src/stream.h
@ -97,9 +97,13 @@ typedef struct streamPropInfo {
|
||||
struct client;
|
||||
|
||||
/* Flags for streamLookupConsumer */
|
||||
#define SLC_NONE 0
|
||||
#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */
|
||||
#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */
|
||||
#define SLC_DEFAULT 0
|
||||
#define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */
|
||||
|
||||
/* Flags for streamCreateConsumer */
|
||||
#define SCC_DEFAULT 0
|
||||
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
|
||||
#define SCC_NO_DIRTIFY (1<<1) /* Do not dirty++ if consumer created */
|
||||
|
||||
stream *streamNew(void);
|
||||
void freeStream(stream *s);
|
||||
@ -111,7 +115,8 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign
|
||||
void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
|
||||
void streamIteratorStop(streamIterator *si);
|
||||
streamCG *streamLookupCG(stream *s, sds groupname);
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created);
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
|
||||
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
|
||||
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id);
|
||||
streamNACK *streamCreateNACK(streamConsumer *consumer);
|
||||
void streamDecodeID(void *buf, streamID *id);
|
||||
|
132
src/t_stream.c
132
src/t_stream.c
@ -2128,17 +2128,20 @@ void xreadCommand(client *c) {
|
||||
* of the stream and the data we extracted from it. */
|
||||
if (c->resp == 2) addReplyArrayLen(c,2);
|
||||
addReplyBulk(c,c->argv[streams_arg+i]);
|
||||
int created = 0;
|
||||
streamConsumer *consumer = NULL;
|
||||
if (groups) consumer = streamLookupConsumer(groups[i],
|
||||
consumername->ptr,
|
||||
SLC_NONE,
|
||||
&created);
|
||||
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
|
||||
if (created && noack)
|
||||
streamPropagateConsumerCreation(c,spi.keyname,
|
||||
spi.groupname,
|
||||
consumer->name);
|
||||
if (groups) {
|
||||
consumer = streamLookupConsumer(groups[i],consumername->ptr,SLC_DEFAULT);
|
||||
if (consumer == NULL) {
|
||||
consumer = streamCreateConsumer(groups[i],consumername->ptr,
|
||||
c->argv[streams_arg+i],
|
||||
c->db->id,SCC_DEFAULT);
|
||||
if (noack)
|
||||
streamPropagateConsumerCreation(c,spi.keyname,
|
||||
spi.groupname,
|
||||
consumer->name);
|
||||
}
|
||||
}
|
||||
int flags = 0;
|
||||
if (noack) flags |= STREAM_RWR_NOACK;
|
||||
if (serve_history) flags |= STREAM_RWR_HISTORY;
|
||||
@ -2269,39 +2272,43 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
|
||||
return (cg == raxNotFound) ? NULL : cg;
|
||||
}
|
||||
|
||||
/* Lookup the consumer with the specified name in the group 'cg': if the
|
||||
* consumer does not exist it is created unless SLC_NOCREAT flag was specified.
|
||||
* Its last seen time is updated unless SLC_NOREFRESH flag was specified. */
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created) {
|
||||
if (created) *created = 0;
|
||||
int create = !(flags & SLC_NOCREAT);
|
||||
int refresh = !(flags & SLC_NOREFRESH);
|
||||
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
|
||||
sdslen(name));
|
||||
if (consumer == raxNotFound) {
|
||||
if (!create) return NULL;
|
||||
consumer = zmalloc(sizeof(*consumer));
|
||||
consumer->name = sdsdup(name);
|
||||
consumer->pel = raxNew();
|
||||
raxInsert(cg->consumers,(unsigned char*)name,sdslen(name),
|
||||
consumer,NULL);
|
||||
consumer->seen_time = mstime();
|
||||
if (created) *created = 1;
|
||||
} else if (refresh)
|
||||
consumer->seen_time = mstime();
|
||||
/* Create a consumer with the specified name in the group 'cg' and return.
|
||||
* If the consumer exists, return NULL. As a side effect, when the consumer
|
||||
* is successfully created, the key space will be notified and dirty++ unless
|
||||
* the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */
|
||||
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) {
|
||||
if (cg == NULL) return NULL;
|
||||
int notify = !(flags & SCC_NO_NOTIFY);
|
||||
int dirty = !(flags & SCC_NO_DIRTIFY);
|
||||
streamConsumer *consumer = zmalloc(sizeof(*consumer));
|
||||
int success = raxTryInsert(cg->consumers,(unsigned char*)name,
|
||||
sdslen(name),consumer,NULL);
|
||||
if (!success) {
|
||||
zfree(consumer);
|
||||
return NULL;
|
||||
}
|
||||
consumer->name = sdsdup(name);
|
||||
consumer->pel = raxNew();
|
||||
consumer->seen_time = mstime();
|
||||
if (dirty) server.dirty++;
|
||||
if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid);
|
||||
return consumer;
|
||||
}
|
||||
|
||||
/* Delete the consumer specified in the consumer group 'cg'. The consumer
|
||||
* may have pending messages: they are removed from the PEL, and the number
|
||||
* of pending messages "lost" is returned. */
|
||||
uint64_t streamDelConsumer(streamCG *cg, sds name) {
|
||||
streamConsumer *consumer =
|
||||
streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH,NULL);
|
||||
if (consumer == NULL) return 0;
|
||||
|
||||
uint64_t retval = raxSize(consumer->pel);
|
||||
/* Lookup the consumer with the specified name in the group 'cg'. Its last
|
||||
* seen time is updated unless the SLC_NO_REFRESH flag is specified. */
|
||||
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
|
||||
if (cg == NULL) return NULL;
|
||||
int refresh = !(flags & SLC_NO_REFRESH);
|
||||
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
|
||||
sdslen(name));
|
||||
if (consumer == raxNotFound) return NULL;
|
||||
if (refresh) consumer->seen_time = mstime();
|
||||
return consumer;
|
||||
}
|
||||
|
||||
/* Delete the consumer specified in the consumer group 'cg'. */
|
||||
void streamDelConsumer(streamCG *cg, streamConsumer *consumer) {
|
||||
/* Iterate all the consumer pending messages, deleting every corresponding
|
||||
* entry from the global entry. */
|
||||
raxIterator ri;
|
||||
@ -2315,9 +2322,9 @@ uint64_t streamDelConsumer(streamCG *cg, sds name) {
|
||||
raxStop(&ri);
|
||||
|
||||
/* Deallocate the consumer. */
|
||||
raxRemove(cg->consumers,(unsigned char*)name,sdslen(name),NULL);
|
||||
raxRemove(cg->consumers,(unsigned char*)consumer->name,
|
||||
sdslen(consumer->name),NULL);
|
||||
streamFreeConsumer(consumer);
|
||||
return retval;
|
||||
}
|
||||
|
||||
/* -----------------------------------------------------------------------
|
||||
@ -2456,22 +2463,22 @@ NULL
|
||||
addReply(c,shared.czero);
|
||||
}
|
||||
} else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) {
|
||||
int created = 0;
|
||||
streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NOREFRESH,&created);
|
||||
if (created) {
|
||||
streamConsumer *created = streamCreateConsumer(cg,c->argv[4]->ptr,c->argv[2],
|
||||
c->db->id,SCC_DEFAULT);
|
||||
addReplyLongLong(c,created ? 1 : 0);
|
||||
} else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
|
||||
long long pending = 0;
|
||||
streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NO_REFRESH);
|
||||
if (consumer) {
|
||||
/* Delete the consumer and returns the number of pending messages
|
||||
* that were yet associated with such a consumer. */
|
||||
pending = raxSize(consumer->pel);
|
||||
streamDelConsumer(cg,consumer);
|
||||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",
|
||||
c->argv[2],c->db->id);
|
||||
}
|
||||
addReplyLongLong(c,created);
|
||||
} else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
|
||||
/* Delete the consumer and returns the number of pending messages
|
||||
* that were yet associated with such a consumer. */
|
||||
long long pending = streamDelConsumer(cg,c->argv[4]->ptr);
|
||||
addReplyLongLong(c,pending);
|
||||
server.dirty++;
|
||||
notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-delconsumer",
|
||||
c->argv[2],c->db->id);
|
||||
} else {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
}
|
||||
@ -2692,10 +2699,7 @@ void xpendingCommand(client *c) {
|
||||
} else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */
|
||||
streamConsumer *consumer = NULL;
|
||||
if (consumername) {
|
||||
consumer = streamLookupConsumer(group,
|
||||
consumername->ptr,
|
||||
SLC_NOCREAT|SLC_NOREFRESH,
|
||||
NULL);
|
||||
consumer = streamLookupConsumer(group,consumername->ptr,SLC_NO_REFRESH);
|
||||
|
||||
/* If a consumer name was mentioned but it does not exist, we can
|
||||
* just return an empty array. */
|
||||
@ -2923,6 +2927,7 @@ void xclaimCommand(client *c) {
|
||||
streamConsumer *consumer = NULL;
|
||||
void *arraylenptr = addReplyDeferredLen(c);
|
||||
size_t arraylen = 0;
|
||||
sds name = c->argv[3]->ptr;
|
||||
for (int j = 5; j <= last_id_arg; j++) {
|
||||
streamID id = ids[j-5];
|
||||
unsigned char buf[sizeof(streamID)];
|
||||
@ -2964,8 +2969,11 @@ void xclaimCommand(client *c) {
|
||||
mstime_t this_idle = now - nack->delivery_time;
|
||||
if (this_idle < minidle) continue;
|
||||
}
|
||||
if (consumer == NULL)
|
||||
consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL);
|
||||
if (consumer == NULL &&
|
||||
(consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL)
|
||||
{
|
||||
consumer = streamCreateConsumer(group,name,c->argv[1],c->db->id,SCC_DEFAULT);
|
||||
}
|
||||
if (nack->consumer != consumer) {
|
||||
/* Remove the entry from the old consumer.
|
||||
* Note that nack->consumer is NULL if we created the
|
||||
@ -3097,6 +3105,7 @@ void xautoclaimCommand(client *c) {
|
||||
raxSeek(&ri,">=",startkey,sizeof(startkey));
|
||||
size_t arraylen = 0;
|
||||
mstime_t now = mstime();
|
||||
sds name = c->argv[3]->ptr;
|
||||
while (attempts-- && count && raxNext(&ri)) {
|
||||
streamNACK *nack = ri.data;
|
||||
|
||||
@ -3109,8 +3118,11 @@ void xautoclaimCommand(client *c) {
|
||||
streamID id;
|
||||
streamDecodeID(ri.key, &id);
|
||||
|
||||
if (consumer == NULL)
|
||||
consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL);
|
||||
if (consumer == NULL &&
|
||||
(consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL)
|
||||
{
|
||||
consumer = streamCreateConsumer(group,name,c->argv[1],c->db->id,SCC_DEFAULT);
|
||||
}
|
||||
if (nack->consumer != consumer) {
|
||||
/* Remove the entry from the old consumer.
|
||||
* Note that nack->consumer is NULL if we created the
|
||||
|
@ -297,6 +297,30 @@ start_server {tags {"pubsub network"}} {
|
||||
$rd1 close
|
||||
}
|
||||
|
||||
test "Keyspace notifications: stream events test" {
|
||||
r config set notify-keyspace-events Kt
|
||||
r del mystream
|
||||
set rd1 [redis_deferring_client]
|
||||
assert_equal {1} [psubscribe $rd1 *]
|
||||
r xgroup create mystream mygroup $ mkstream
|
||||
r xgroup createconsumer mystream mygroup Bob
|
||||
set id [r xadd mystream 1 field1 A]
|
||||
r xreadgroup group mygroup Alice STREAMS mystream >
|
||||
r xclaim mystream mygroup Mike 0 $id force
|
||||
# Not notify because of "Lee" not exists.
|
||||
r xgroup delconsumer mystream mygroup Lee
|
||||
# Not notify because of "Bob" exists.
|
||||
r xautoclaim mystream mygroup Bob 0 $id
|
||||
r xgroup delconsumer mystream mygroup Bob
|
||||
assert_equal "pmessage * __keyspace@${db}__:mystream xgroup-create" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:mystream xgroup-createconsumer" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:mystream xadd" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:mystream xgroup-createconsumer" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:mystream xgroup-createconsumer" [$rd1 read]
|
||||
assert_equal "pmessage * __keyspace@${db}__:mystream xgroup-delconsumer" [$rd1 read]
|
||||
$rd1 close
|
||||
}
|
||||
|
||||
test "Keyspace notifications: expired events (triggered expire)" {
|
||||
r config set notify-keyspace-events Ex
|
||||
r del foo
|
||||
|
Loading…
x
Reference in New Issue
Block a user