diff --git a/src/blocked.c b/src/blocked.c index 069d3ba95..5d6d0c800 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -489,7 +489,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { if (group) { noack = receiver->bpop.xread_group_noack; sds name = receiver->bpop.xread_consumer->ptr; - consumer = streamLookupConsumer(group,name,SLC_DEFAULT); + consumer = streamLookupConsumer(group,name); if (consumer == NULL) { consumer = streamCreateConsumer(group,name,rl->key, rl->db->id,SCC_DEFAULT); diff --git a/src/rdb.c b/src/rdb.c index 3bb8a4cc8..733181ac6 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -684,7 +684,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) { else serverPanic("Unknown hash encoding"); case OBJ_STREAM: - return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_2); + return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_3); case OBJ_MODULE: return rdbSaveType(rdb,RDB_TYPE_MODULE_2); default: @@ -774,13 +774,20 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) { } nwritten += n; - /* Last seen time. */ + /* Seen time. */ if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1) { raxStop(&ri); return -1; } nwritten += n; + /* Active time. */ + if ((n = rdbSaveMillisecondTime(rdb,consumer->active_time)) == -1) { + raxStop(&ri); + return -1; + } + nwritten += n; + /* Consumer PEL, without the ACKs (see last parameter of the function * passed with value of 0), at loading time we'll lookup the ID * in the consumer group global PEL and will put a reference in the @@ -2426,7 +2433,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { rdbReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); break; } - } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS || rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) { + } else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS || + rdbtype == RDB_TYPE_STREAM_LISTPACKS_2 || + rdbtype == RDB_TYPE_STREAM_LISTPACKS_3) + { o = createStreamObject(); stream *s = o->ptr; uint64_t listpacks = rdbLoadLen(rdb,NULL); @@ -2503,7 +2513,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { s->last_id.ms = rdbLoadLen(rdb,NULL); s->last_id.seq = rdbLoadLen(rdb,NULL); - if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) { + if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) { /* Load the first entry ID. */ s->first_id.ms = rdbLoadLen(rdb,NULL); s->first_id.seq = rdbLoadLen(rdb,NULL); @@ -2570,7 +2580,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { /* Load group offset. */ uint64_t cg_offset; - if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) { + if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) { cg_offset = rdbLoadLen(rdb,NULL); if (rioGetReadError(rdb)) { rdbReportReadError("Stream cgroup offset loading failed."); @@ -2652,6 +2662,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { decrRefCount(o); return NULL; } + consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); if (rioGetReadError(rdb)) { rdbReportReadError("Stream short read reading seen time."); @@ -2659,6 +2670,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) { return NULL; } + if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_3) { + consumer->active_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); + if (rioGetReadError(rdb)) { + rdbReportReadError("Stream short read reading active time."); + decrRefCount(o); + return NULL; + } + } else { + /* That's the best estimate we got */ + consumer->active_time = consumer->seen_time; + } + /* Load the PEL about entries owned by this specific * consumer. */ pel_size = rdbLoadLen(rdb,NULL); diff --git a/src/rdb.h b/src/rdb.h index b8c25865c..491f643f9 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -96,10 +96,11 @@ #define RDB_TYPE_LIST_QUICKLIST_2 18 #define RDB_TYPE_STREAM_LISTPACKS_2 19 #define RDB_TYPE_SET_LISTPACK 20 +#define RDB_TYPE_STREAM_LISTPACKS_3 21 /* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */ /* Test if a type is an object type. */ -#define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 20)) +#define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 21)) /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ #define RDB_OPCODE_FUNCTION2 245 /* function library data */ diff --git a/src/stream.h b/src/stream.h index 2d4997919..bfc165440 100644 --- a/src/stream.h +++ b/src/stream.h @@ -74,7 +74,8 @@ typedef struct streamCG { /* A specific consumer in a consumer group. */ typedef struct streamConsumer { - mstime_t seen_time; /* Last time this consumer was active. */ + mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */ + mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */ sds name; /* Consumer name. This is how the consumer will be identified in the consumer group protocol. Case sensitive. */ @@ -105,10 +106,6 @@ typedef struct streamPropInfo { /* Prototypes of exported APIs. */ struct client; -/* Flags for streamLookupConsumer */ -#define SLC_DEFAULT 0 -#define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */ - /* Flags for streamCreateConsumer */ #define SCC_DEFAULT 0 #define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ @@ -126,7 +123,7 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign void streamIteratorRemoveEntry(streamIterator *si, streamID *current); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name); streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read); streamNACK *streamCreateNACK(streamConsumer *consumer); diff --git a/src/t_stream.c b/src/t_stream.c index 91ffdbbd4..b2e58dc15 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -235,6 +235,7 @@ robj *streamDup(robj *o) { raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name, sdslen(new_consumer->name), new_consumer, NULL); new_consumer->seen_time = consumer->seen_time; + new_consumer->active_time = consumer->active_time; /* Consumer PEL */ raxIterator ri_cpel; @@ -1769,6 +1770,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end serverPanic("NACK half-created. Should not be possible."); } + consumer->active_time = commandTimeSnapshot(); + /* Propagate as XCLAIM. */ if (spi) { robj *idarg = createObjectFromStreamID(&id); @@ -2319,6 +2322,8 @@ void xreadCommand(client *c) { streamID *gt = ids+i; /* ID must be greater than this. */ int serve_synchronously = 0; int serve_history = 0; /* True for XREADGROUP with ID != ">". */ + streamConsumer *consumer = NULL; /* Unused if XREAD */ + streamPropInfo spi = {c->argv[streams_arg+i],groupname}; /* Unused if XREAD */ /* Check if there are the conditions to serve the client * synchronously. */ @@ -2342,6 +2347,17 @@ void xreadCommand(client *c) { *gt = *last; } } + consumer = streamLookupConsumer(groups[i],consumername->ptr); + if (consumer == NULL) { + consumer = streamCreateConsumer(groups[i],consumername->ptr, + c->argv[streams_arg+i], + c->db->id,SCC_DEFAULT); + if (noack) + streamPropagateConsumerCreation(c,spi.keyname, + spi.groupname, + consumer->name); + } + consumer->seen_time = commandTimeSnapshot(); } else if (s->length) { /* For consumers without a group, we serve synchronously if we can * actually provide at least one item from the stream. */ @@ -2365,20 +2381,7 @@ void xreadCommand(client *c) { * of the stream and the data we extracted from it. */ if (c->resp == 2) addReplyArrayLen(c,2); addReplyBulk(c,c->argv[streams_arg+i]); - streamConsumer *consumer = NULL; - streamPropInfo spi = {c->argv[i+streams_arg],groupname}; - if (groups) { - consumer = streamLookupConsumer(groups[i],consumername->ptr,SLC_DEFAULT); - if (consumer == NULL) { - consumer = streamCreateConsumer(groups[i],consumername->ptr, - c->argv[streams_arg+i], - c->db->id,SCC_DEFAULT); - if (noack) - streamPropagateConsumerCreation(c,spi.keyname, - spi.groupname, - consumer->name); - } - } + int flags = 0; if (noack) flags |= STREAM_RWR_NOACK; if (serve_history) flags |= STREAM_RWR_HISTORY; @@ -2527,21 +2530,19 @@ streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid } consumer->name = sdsdup(name); consumer->pel = raxNew(); + consumer->active_time = -1; consumer->seen_time = commandTimeSnapshot(); if (dirty) server.dirty++; if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid); return consumer; } -/* Lookup the consumer with the specified name in the group 'cg'. Its last - * seen time is updated unless the SLC_NO_REFRESH flag is specified. */ -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { +/* Lookup the consumer with the specified name in the group 'cg'. */ +streamConsumer *streamLookupConsumer(streamCG *cg, sds name) { if (cg == NULL) return NULL; - int refresh = !(flags & SLC_NO_REFRESH); streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); if (consumer == raxNotFound) return NULL; - if (refresh) consumer->seen_time = commandTimeSnapshot(); return consumer; } @@ -2721,7 +2722,7 @@ NULL addReplyLongLong(c,created ? 1 : 0); } else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) { long long pending = 0; - streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NO_REFRESH); + streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr); if (consumer) { /* Delete the consumer and returns the number of pending messages * that were yet associated with such a consumer. */ @@ -2996,7 +2997,7 @@ void xpendingCommand(client *c) { } else { /* , and provided, return actual pending entries (not just info) */ streamConsumer *consumer = NULL; if (consumername) { - consumer = streamLookupConsumer(group,consumername->ptr,SLC_NO_REFRESH); + consumer = streamLookupConsumer(group,consumername->ptr); /* If a consumer name was mentioned but it does not exist, we can * just return an empty array. */ @@ -3221,10 +3222,14 @@ void xclaimCommand(client *c) { } /* Do the actual claiming. */ - streamConsumer *consumer = NULL; + streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr); + if (consumer == NULL) { + consumer = streamCreateConsumer(group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT); + } + consumer->seen_time = commandTimeSnapshot(); + void *arraylenptr = addReplyDeferredLen(c); size_t arraylen = 0; - sds name = c->argv[3]->ptr; for (int j = 5; j <= last_id_arg; j++) { streamID id = ids[j-5]; unsigned char buf[sizeof(streamID)]; @@ -3272,11 +3277,6 @@ void xclaimCommand(client *c) { if (this_idle < minidle) continue; } - if (consumer == NULL && - (consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL) - { - consumer = streamCreateConsumer(group,name,c->argv[1],c->db->id,SCC_DEFAULT); - } if (nack->consumer != consumer) { /* Remove the entry from the old consumer. * Note that nack->consumer is NULL if we created the @@ -3305,6 +3305,8 @@ void xclaimCommand(client *c) { } arraylen++; + consumer->active_time = commandTimeSnapshot(); + /* Propagate this change. */ streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack); propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */ @@ -3400,7 +3402,12 @@ void xautoclaimCommand(client *c) { } /* Do the actual claiming. */ - streamConsumer *consumer = NULL; + streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr); + if (consumer == NULL) { + consumer = streamCreateConsumer(group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT); + } + consumer->seen_time = commandTimeSnapshot(); + long long attempts = count * attempts_factor; addReplyArrayLen(c, 3); /* We add another reply later */ @@ -3414,7 +3421,6 @@ void xautoclaimCommand(client *c) { raxSeek(&ri,">=",startkey,sizeof(startkey)); size_t arraylen = 0; mstime_t now = commandTimeSnapshot(); - sds name = c->argv[3]->ptr; int deleted_id_num = 0; while (attempts-- && count && raxNext(&ri)) { streamNACK *nack = ri.data; @@ -3446,11 +3452,6 @@ void xautoclaimCommand(client *c) { continue; } - if (consumer == NULL && - (consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL) - { - consumer = streamCreateConsumer(group,name,c->argv[1],c->db->id,SCC_DEFAULT); - } if (nack->consumer != consumer) { /* Remove the entry from the old consumer. * Note that nack->consumer is NULL if we created the @@ -3480,6 +3481,8 @@ void xautoclaimCommand(client *c) { arraylen++; count--; + consumer->active_time = commandTimeSnapshot(); + /* Propagate this change. */ robj *idstr = createObjectFromStreamID(&id); streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack); @@ -3787,7 +3790,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { raxSeek(&ri_consumers,"^",NULL,0); while(raxNext(&ri_consumers)) { streamConsumer *consumer = ri_consumers.data; - addReplyMapLen(c,4); + addReplyMapLen(c,5); /* Consumer name */ addReplyBulkCString(c,"name"); @@ -3797,6 +3800,10 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) { addReplyBulkCString(c,"seen-time"); addReplyLongLong(c,consumer->seen_time); + /* Active-time */ + addReplyBulkCString(c,"active-time"); + addReplyLongLong(c,consumer->active_time); + /* Consumer PEL count */ addReplyBulkCString(c,"pel-count"); addReplyLongLong(c,raxSize(consumer->pel)); @@ -3887,16 +3894,19 @@ NULL mstime_t now = commandTimeSnapshot(); while(raxNext(&ri)) { streamConsumer *consumer = ri.data; + mstime_t inactive = consumer->active_time != -1 ? now - consumer->active_time : consumer->active_time; mstime_t idle = now - consumer->seen_time; if (idle < 0) idle = 0; - addReplyMapLen(c,3); + addReplyMapLen(c,4); addReplyBulkCString(c,"name"); addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); addReplyBulkCString(c,"pending"); addReplyLongLong(c,raxSize(consumer->pel)); addReplyBulkCString(c,"idle"); addReplyLongLong(c,idle); + addReplyBulkCString(c,"inactive"); + addReplyLongLong(c,inactive); } raxStop(&ri); } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) { diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 928a83b07..6a09fba67 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -800,6 +800,51 @@ start_server { assert_equal [dict get $reply entries] "{100-0 {a 1}}" } + test {Consumer seen-time and active-time} { + r DEL mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > + after 100 + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert {[dict get $consumer_info idle] >= 100} ;# consumer idle (seen-time) + assert_equal [dict get $consumer_info inactive] "-1" ;# consumer inactive (active-time) + + r XADD mystream * f v + r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name + assert {[dict get $consumer_info idle] < 80} ;# consumer idle (seen-time) + assert {[dict get $consumer_info inactive] < 80} ;# consumer inactive (active-time) + + after 100 + r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream > + set reply [r xinfo consumers mystream mygroup] + set consumer_info [lindex $reply 0] + assert {[dict get $consumer_info idle] < 80} ;# consumer idle (seen-time) + assert {[dict get $consumer_info inactive] >= 100} ;# consumer inactive (active-time) + + + # Simulate loading from RDB + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + set consumer [lindex [dict get $group consumers] 0] + set prev_seen [dict get $consumer seen-time] + set prev_active [dict get $consumer active-time] + + set dump [r DUMP mystream] + r DEL mystream + r RESTORE mystream 0 $dump + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + set consumer [lindex [dict get $group consumers] 0] + assert_equal $prev_seen [dict get $consumer seen-time] + assert_equal $prev_active [dict get $consumer active-time] + } + test {XGROUP CREATECONSUMER: create consumer if does not exist} { r del mystream r XGROUP CREATE mystream mygroup $ MKSTREAM @@ -889,13 +934,9 @@ start_server { assert {$curr_grpinfo == $grpinfo} set n_consumers [lindex $grpinfo 3] - # Bob should be created only when there will be new data for this consumer - assert_equal $n_consumers 2 - set reply [r xinfo consumers mystream mygroup] - set consumer_info [lindex $reply 0] - assert_equal [lindex $consumer_info 1] "Alice" - set consumer_info [lindex $reply 1] - assert_equal [lindex $consumer_info 1] "Charlie" + # All consumers are created via XREADGROUP, regardless of whether they managed + # to read any entries ot not + assert_equal $n_consumers 3 $rd close } } @@ -1048,6 +1089,21 @@ start_server { assert_equal [dict get $group lag] 2 } + test {Loading from legacy (Redis <= v7.0.x, rdb_ver < 11) persistence} { + # The payload was DUMPed from a v7 instance after: + # XGROUP CREATE x g $ MKSTREAM + # XADD x 1-1 f v + # XREADGROUP GROUP g Alice STREAMS x > + + r DEL x + r RESTORE x 0 "\x13\x01\x10\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x1d\x1d\x00\x00\x00\n\x00\x01\x01\x00\x01\x01\x01\x81f\x02\x00\x01\x02\x01\x00\x01\x00\x01\x81v\x02\x04\x01\xff\x01\x01\x01\x01\x01\x00\x00\x01\x01\x01g\x01\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\xf5Zq\xc7\x84\x01\x00\x00\x01\x01\x05Alice\xf5Zq\xc7\x84\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\n\x00\xcev\xa9\xd6\xda`r\x12" + + set reply [r XINFO STREAM x FULL] + set group [lindex [dict get $reply groups] 0] + set consumer [lindex [dict get $group consumers] 0] + assert_equal [dict get $consumer seen-time] [dict get $consumer active-time] + } + start_server {tags {"external:skip"}} { set master [srv -1 client] set master_host [srv -1 host]