diff --git a/src/aof.c b/src/aof.c index f1490a39c..c51fdbeff 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1185,6 +1185,20 @@ int rioWriteStreamPendingEntry(rio *r, robj *key, const char *groupname, size_t return 1; } +/* Helper for rewriteStreamObject(): emit the XGROUP CREATECONSUMER is + * needed in order to create consumers that do not have any pending entries. + * All this in the context of the specified key and group. */ +int rioWriteStreamEmptyConsumer(rio *r, robj *key, const char *groupname, size_t groupname_len, streamConsumer *consumer) { + /* XGROUP CREATECONSUMER */ + if (rioWriteBulkCount(r,'*',5) == 0) return 0; + if (rioWriteBulkString(r,"XGROUP",6) == 0) return 0; + if (rioWriteBulkString(r,"CREATECONSUMER",14) == 0) return 0; + if (rioWriteBulkObject(r,key) == 0) return 0; + if (rioWriteBulkString(r,groupname,groupname_len) == 0) return 0; + if (rioWriteBulkString(r,consumer->name,sdslen(consumer->name)) == 0) return 0; + return 1; +} + /* Emit the commands needed to rebuild a stream object. * The function returns 0 on error, 1 on success. */ int rewriteStreamObject(rio *r, robj *key, robj *o) { @@ -1273,13 +1287,22 @@ int rewriteStreamObject(rio *r, robj *key, robj *o) { } /* Generate XCLAIMs for each consumer that happens to - * have pending entries. Empty consumers have no semantical - * value so they are discarded. */ + * have pending entries. Empty consumers would be generated with + * XGROUP CREATECONSUMER. */ raxIterator ri_cons; raxStart(&ri_cons,group->consumers); raxSeek(&ri_cons,"^",NULL,0); while(raxNext(&ri_cons)) { streamConsumer *consumer = ri_cons.data; + /* If there are no pending entries, just emit XGROUP CREATECONSUMER */ + if (raxSize(consumer->pel) == 0) { + if (rioWriteStreamEmptyConsumer(r,key,(char*)ri.key, + ri.key_len,consumer) == 0) + { + return 0; + } + continue; + } /* For the current consumer, iterate all the PEL entries * to emit the XCLAIM protocol. */ raxIterator ri_pel; diff --git a/src/blocked.c b/src/blocked.c index 08de63485..87f816faa 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -371,11 +371,18 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { int noack = 0; if (group) { + int created = 0; consumer = streamLookupConsumer(group, receiver->bpop.xread_consumer->ptr, - SLC_NONE); + SLC_NONE, + &created); noack = receiver->bpop.xread_group_noack; + if (created && noack) { + streamPropagateConsumerCreation(receiver,rl->key, + receiver->bpop.xread_group, + consumer->name); + } } /* Emit the two elements sub-array consisting of diff --git a/src/rdb.c b/src/rdb.c index 00cc184af..6420e9924 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1912,7 +1912,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { return NULL; } streamConsumer *consumer = - streamLookupConsumer(cgroup,cname,SLC_NONE); + streamLookupConsumer(cgroup,cname,SLC_NONE,NULL); sdsfree(cname); consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); if (rioGetReadError(rdb)) { diff --git a/src/stream.h b/src/stream.h index e4c5ff78d..3d692cd9d 100644 --- a/src/stream.h +++ b/src/stream.h @@ -110,12 +110,13 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); void streamFreeNACK(streamNACK *na); void streamIncrID(streamID *id); +void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername); #endif diff --git a/src/t_stream.c b/src/t_stream.c index d565dd1f6..285684987 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -894,6 +894,30 @@ void streamPropagateGroupID(client *c, robj *key, streamCG *group, robj *groupna decrRefCount(argv[4]); } +/* We need this when we want to propagate creation of consumer that was created + * by XREADGROUP with the NOACK option. In that case, the only way to create + * the consumer at the replica is by using XGROUP CREATECONSUMER (see issue #7140) + * + * XGROUP CREATECONSUMER + */ +void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername) { + robj *argv[5]; + argv[0] = createStringObject("XGROUP",6); + argv[1] = createStringObject("CREATECONSUMER",14); + argv[2] = key; + argv[3] = groupname; + argv[4] = createObject(OBJ_STRING,sdsdup(consumername)); + + /* We use progagate() because this code path is not always called from + * the command execution context. Moreover this will just alter the + * consumer group state, and we don't need MULTI/EXEC wrapping because + * there is no message state cross-message atomicity required. */ + propagate(server.xgroupCommand,c->db->id,argv,5,PROPAGATE_AOF|PROPAGATE_REPL); + decrRefCount(argv[0]); + decrRefCount(argv[1]); + decrRefCount(argv[4]); +} + /* Send the stream items in the specified range to the client 'c'. The range * the client will receive is between start and end inclusive, if 'count' is * non zero, no more than 'count' elements are sent. @@ -1565,11 +1589,17 @@ void xreadCommand(client *c) { * of the stream and the data we extracted from it. */ if (c->resp == 2) addReplyArrayLen(c,2); addReplyBulk(c,c->argv[streams_arg+i]); + int created = 0; streamConsumer *consumer = NULL; if (groups) consumer = streamLookupConsumer(groups[i], consumername->ptr, - SLC_NONE); + SLC_NONE, + &created); streamPropInfo spi = {c->argv[i+streams_arg],groupname}; + if (created && noack) + streamPropagateConsumerCreation(c,spi.keyname, + spi.groupname, + consumer->name); int flags = 0; if (noack) flags |= STREAM_RWR_NOACK; if (serve_history) flags |= STREAM_RWR_HISTORY; @@ -1701,10 +1731,10 @@ streamCG *streamLookupCG(stream *s, sds groupname) { } /* Lookup the consumer with the specified name in the group 'cg': if the - * consumer does not exist it is automatically created as a side effect - * of calling this function, otherwise its last seen time is updated and - * the existing consumer reference returned. */ -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { + * consumer does not exist it is created unless SLC_NOCREAT flag was specified. + * Its last seen time is updated unless SLC_NOREFRESH flag was specified. */ +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags, int *created) { + if (created) *created = 0; int create = !(flags & SLC_NOCREAT); int refresh = !(flags & SLC_NOREFRESH); streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, @@ -1716,8 +1746,10 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { consumer->pel = raxNew(); raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), consumer,NULL); - } - if (refresh) consumer->seen_time = mstime(); + consumer->seen_time = mstime(); + if (created) *created = 1; + } else if (refresh) + consumer->seen_time = mstime(); return consumer; } @@ -1726,7 +1758,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { * of pending messages "lost" is returned. */ uint64_t streamDelConsumer(streamCG *cg, sds name) { streamConsumer *consumer = - streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH); + streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH,NULL); if (consumer == NULL) return 0; uint64_t retval = raxSize(consumer->pel); @@ -1756,15 +1788,17 @@ uint64_t streamDelConsumer(streamCG *cg, sds name) { /* XGROUP CREATE [MKSTREAM] * XGROUP SETID * XGROUP DESTROY + * CREATECONSUMER * XGROUP DELCONSUMER */ void xgroupCommand(client *c) { const char *help[] = { -"CREATE [opt] -- Create a new consumer group.", -" option MKSTREAM: create the empty stream if it does not exist.", -"SETID -- Set the current group ID.", -"DESTROY -- Remove the specified group.", -"DELCONSUMER -- Remove the specified consumer.", -"HELP -- Prints this help.", +"CREATE [opt] -- Create a new consumer group.", +" option MKSTREAM: create the empty stream if it does not exist.", +"SETID -- Set the current group ID.", +"DESTROY -- Remove the specified group.", +"CREATECONSUMER -- Create new consumer in the specified group.", +"DELCONSUMER -- Remove the specified consumer.", +"HELP -- Prints this help.", NULL }; stream *s = NULL; @@ -1809,6 +1843,7 @@ NULL /* Certain subcommands require the group to exist. */ if ((cg = streamLookupCG(s,grpname)) == NULL && (!strcasecmp(opt,"SETID") || + !strcasecmp(opt,"CREATECONSUMER") || !strcasecmp(opt,"DELCONSUMER"))) { addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " @@ -1875,6 +1910,15 @@ NULL } else { addReply(c,shared.czero); } + } else if (!strcasecmp(opt,"CREATECONSUMER") && c->argc == 5) { + int created = 0; + streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NOREFRESH,&created); + if (created) { + server.dirty++; + notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer", + c->argv[2],c->db->id); + } + addReplyLongLong(c,created); } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) { /* Delete the consumer and returns the number of pending messages * that were yet associated with such a consumer. */ @@ -2077,7 +2121,8 @@ void xpendingCommand(client *c) { if (consumername) { consumer = streamLookupConsumer(group, consumername->ptr, - SLC_NOCREAT|SLC_NOREFRESH); + SLC_NOCREAT|SLC_NOREFRESH, + NULL); /* If a consumer name was mentioned but it does not exist, we can * just return an empty array. */ @@ -2348,7 +2393,7 @@ void xclaimCommand(client *c) { raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); /* Update the consumer and idle time. */ if (consumer == NULL) - consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE); + consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE,NULL); nack->consumer = consumer; nack->delivery_time = deliverytime; /* Set the delivery attempts counter if given, otherwise diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index dfcd735f6..74ddc98ba 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -328,6 +328,102 @@ start_server { assert_equal [lindex $reply 9] "{100-0 {a 1}}" } + test {XGROUP CREATECONSUMER: create consumer if does not exist} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream * f v + + set reply [r xinfo groups mystream] + set group_info [lindex $reply 0] + set n_consumers [lindex $group_info 3] + assert_equal $n_consumers 0 ;# consumers number in cg + + # create consumer using XREADGROUP + r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > + + set reply [r xinfo groups mystream] + set group_info [lindex $reply 0] + set n_consumers [lindex $group_info 3] + assert_equal $n_consumers 1 ;# consumers number in cg + + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name + + # create group using XGROUP CREATECONSUMER when Alice already exists + set created [r XGROUP CREATECONSUMER mystream mygroup Alice] + assert_equal $created 0 + + # create group using XGROUP CREATECONSUMER when Bob does not exist + set created [r XGROUP CREATECONSUMER mystream mygroup Bob] + assert_equal $created 1 + + set reply [r xinfo groups mystream] + set group_info [lindex $reply 0] + set n_consumers [lindex $group_info 3] + assert_equal $n_consumers 2 ;# consumers number in cg + + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name + set consumer_info [lindex $reply 1] + assert_equal [lindex $consumer_info 1] "Bob" ;# consumer name + } + + test {XGROUP CREATECONSUMER: group must exist} { + r del mystream + r XADD mystream * f v + assert_error "*NOGROUP*" {r XGROUP CREATECONSUMER mystream mygroup consumer} + } + + start_server {tags {"stream"} overrides {appendonly yes aof-use-rdb-preamble no appendfsync always}} { + test {XREADGROUP with NOACK creates consumer} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream * f1 v1 + r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">" + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">" + r XADD mystream * f2 v2 + set grpinfo [r xinfo groups mystream] + + r debug loadaof + assert {[r xinfo groups mystream] == $grpinfo} + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name + set consumer_info [lindex $reply 1] + assert_equal [lindex $consumer_info 1] "Bob" ;# consumer name + } + + test {Consumer without PEL is present in AOF after AOFRW} { + r del mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream * f v + r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">" + set rd [redis_deferring_client] + $rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">" + r XGROUP CREATECONSUMER mystream mygroup Charlie + set grpinfo [lindex [r xinfo groups mystream] 0] + + r bgrewriteaof + waitForBgrewriteaof r + r debug loadaof + + set curr_grpinfo [lindex [r xinfo groups mystream] 0] + assert {$curr_grpinfo == $grpinfo} + set n_consumers [lindex $grpinfo 3] + + # Bob should be created only when there will be new data for this client + assert_equal $n_consumers 2 + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert_equal [lindex $consumer_info 1] "Alice" + set consumer_info [lindex $reply 1] + assert_equal [lindex $consumer_info 1] "Charlie" + } + } + start_server {} { set master [srv -1 client] set master_host [srv -1 host]