Stream consumers: Re-purpose seen-time, add active-time (#11099)

1. "Fixed" the current code so that seen-time/idle actually refers to interaction
  attempts (as documented; breaking change)
2. Added active-time/inactive to refer to successful interaction (what
  seen-time/idle used to be)

At first, I tried to avoid changing the behavior of seen-time/idle but then realized
that, in this case, the odds are the people read the docs and implemented their
code based on the docs (which didn't match the behavior).
For the most part, that would work fine, except that issue #9996 was found.

I was working under the assumption that people relied on the docs, and for
the most part, it could have worked well enough. so instead of fixing the docs,
as I would usually do, I fixed the code to match the docs in this particular case.

Note that, in case the consumer has never read any entries, the values
for both "active-time" (XINFO FULL) and "inactive" (XINFO CONSUMERS) will
be -1, meaning here that the consumer was never active.

Note that seen/active time is only affected by XREADGROUP / X[AUTO]CLAIM, not
by XPENDING, XINFO, and other "read-only" stream CG commands (always has been,
even before this PR)

Other changes:
* Another behavioral change (arguably a bugfix) is that XREADGROUP and X[AUTO]CLAIM
  create the consumer regardless of whether it was able to perform some reading/claiming
* RDB format change to save the `active_time`, and set it to the same value of `seen_time` in old rdb files.
This commit is contained in:
guybe7 2022-11-30 17:51:31 +05:30 committed by GitHub
parent c81813148b
commit 72e90695ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 144 additions and 57 deletions

View File

@ -489,7 +489,7 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
if (group) {
noack = receiver->bpop.xread_group_noack;
sds name = receiver->bpop.xread_consumer->ptr;
consumer = streamLookupConsumer(group,name,SLC_DEFAULT);
consumer = streamLookupConsumer(group,name);
if (consumer == NULL) {
consumer = streamCreateConsumer(group,name,rl->key,
rl->db->id,SCC_DEFAULT);

View File

@ -684,7 +684,7 @@ int rdbSaveObjectType(rio *rdb, robj *o) {
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM:
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_2);
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS_3);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
@ -774,13 +774,20 @@ size_t rdbSaveStreamConsumers(rio *rdb, streamCG *cg) {
}
nwritten += n;
/* Last seen time. */
/* Seen time. */
if ((n = rdbSaveMillisecondTime(rdb,consumer->seen_time)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
/* Active time. */
if ((n = rdbSaveMillisecondTime(rdb,consumer->active_time)) == -1) {
raxStop(&ri);
return -1;
}
nwritten += n;
/* Consumer PEL, without the ACKs (see last parameter of the function
* passed with value of 0), at loading time we'll lookup the ID
* in the consumer group global PEL and will put a reference in the
@ -2426,7 +2433,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
rdbReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
break;
}
} else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS || rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
} else if (rdbtype == RDB_TYPE_STREAM_LISTPACKS ||
rdbtype == RDB_TYPE_STREAM_LISTPACKS_2 ||
rdbtype == RDB_TYPE_STREAM_LISTPACKS_3)
{
o = createStreamObject();
stream *s = o->ptr;
uint64_t listpacks = rdbLoadLen(rdb,NULL);
@ -2503,7 +2513,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
s->last_id.ms = rdbLoadLen(rdb,NULL);
s->last_id.seq = rdbLoadLen(rdb,NULL);
if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
/* Load the first entry ID. */
s->first_id.ms = rdbLoadLen(rdb,NULL);
s->first_id.seq = rdbLoadLen(rdb,NULL);
@ -2570,7 +2580,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
/* Load group offset. */
uint64_t cg_offset;
if (rdbtype == RDB_TYPE_STREAM_LISTPACKS_2) {
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
cg_offset = rdbLoadLen(rdb,NULL);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream cgroup offset loading failed.");
@ -2652,6 +2662,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
decrRefCount(o);
return NULL;
}
consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream short read reading seen time.");
@ -2659,6 +2670,18 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
return NULL;
}
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_3) {
consumer->active_time = rdbLoadMillisecondTime(rdb,RDB_VERSION);
if (rioGetReadError(rdb)) {
rdbReportReadError("Stream short read reading active time.");
decrRefCount(o);
return NULL;
}
} else {
/* That's the best estimate we got */
consumer->active_time = consumer->seen_time;
}
/* Load the PEL about entries owned by this specific
* consumer. */
pel_size = rdbLoadLen(rdb,NULL);

View File

@ -96,10 +96,11 @@
#define RDB_TYPE_LIST_QUICKLIST_2 18
#define RDB_TYPE_STREAM_LISTPACKS_2 19
#define RDB_TYPE_SET_LISTPACK 20
#define RDB_TYPE_STREAM_LISTPACKS_3 21
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
/* Test if a type is an object type. */
#define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 20))
#define rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 21))
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_FUNCTION2 245 /* function library data */

View File

@ -74,7 +74,8 @@ typedef struct streamCG {
/* A specific consumer in a consumer group. */
typedef struct streamConsumer {
mstime_t seen_time; /* Last time this consumer was active. */
mstime_t seen_time; /* Last time this consumer tried to perform an action (attempted reading/claiming). */
mstime_t active_time; /* Last time this consumer was active (successful reading/claiming). */
sds name; /* Consumer name. This is how the consumer
will be identified in the consumer group
protocol. Case sensitive. */
@ -105,10 +106,6 @@ typedef struct streamPropInfo {
/* Prototypes of exported APIs. */
struct client;
/* Flags for streamLookupConsumer */
#define SLC_DEFAULT 0
#define SLC_NO_REFRESH (1<<0) /* Do not update consumer's seen-time */
/* Flags for streamCreateConsumer */
#define SCC_DEFAULT 0
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
@ -126,7 +123,7 @@ void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsign
void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name);
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id, long long entries_read);
streamNACK *streamCreateNACK(streamConsumer *consumer);

View File

@ -235,6 +235,7 @@ robj *streamDup(robj *o) {
raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name,
sdslen(new_consumer->name), new_consumer, NULL);
new_consumer->seen_time = consumer->seen_time;
new_consumer->active_time = consumer->active_time;
/* Consumer PEL */
raxIterator ri_cpel;
@ -1769,6 +1770,8 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
serverPanic("NACK half-created. Should not be possible.");
}
consumer->active_time = commandTimeSnapshot();
/* Propagate as XCLAIM. */
if (spi) {
robj *idarg = createObjectFromStreamID(&id);
@ -2319,6 +2322,8 @@ void xreadCommand(client *c) {
streamID *gt = ids+i; /* ID must be greater than this. */
int serve_synchronously = 0;
int serve_history = 0; /* True for XREADGROUP with ID != ">". */
streamConsumer *consumer = NULL; /* Unused if XREAD */
streamPropInfo spi = {c->argv[streams_arg+i],groupname}; /* Unused if XREAD */
/* Check if there are the conditions to serve the client
* synchronously. */
@ -2342,6 +2347,17 @@ void xreadCommand(client *c) {
*gt = *last;
}
}
consumer = streamLookupConsumer(groups[i],consumername->ptr);
if (consumer == NULL) {
consumer = streamCreateConsumer(groups[i],consumername->ptr,
c->argv[streams_arg+i],
c->db->id,SCC_DEFAULT);
if (noack)
streamPropagateConsumerCreation(c,spi.keyname,
spi.groupname,
consumer->name);
}
consumer->seen_time = commandTimeSnapshot();
} else if (s->length) {
/* For consumers without a group, we serve synchronously if we can
* actually provide at least one item from the stream. */
@ -2365,20 +2381,7 @@ void xreadCommand(client *c) {
* of the stream and the data we extracted from it. */
if (c->resp == 2) addReplyArrayLen(c,2);
addReplyBulk(c,c->argv[streams_arg+i]);
streamConsumer *consumer = NULL;
streamPropInfo spi = {c->argv[i+streams_arg],groupname};
if (groups) {
consumer = streamLookupConsumer(groups[i],consumername->ptr,SLC_DEFAULT);
if (consumer == NULL) {
consumer = streamCreateConsumer(groups[i],consumername->ptr,
c->argv[streams_arg+i],
c->db->id,SCC_DEFAULT);
if (noack)
streamPropagateConsumerCreation(c,spi.keyname,
spi.groupname,
consumer->name);
}
}
int flags = 0;
if (noack) flags |= STREAM_RWR_NOACK;
if (serve_history) flags |= STREAM_RWR_HISTORY;
@ -2527,21 +2530,19 @@ streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid
}
consumer->name = sdsdup(name);
consumer->pel = raxNew();
consumer->active_time = -1;
consumer->seen_time = commandTimeSnapshot();
if (dirty) server.dirty++;
if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid);
return consumer;
}
/* Lookup the consumer with the specified name in the group 'cg'. Its last
* seen time is updated unless the SLC_NO_REFRESH flag is specified. */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
/* Lookup the consumer with the specified name in the group 'cg'. */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name) {
if (cg == NULL) return NULL;
int refresh = !(flags & SLC_NO_REFRESH);
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) return NULL;
if (refresh) consumer->seen_time = commandTimeSnapshot();
return consumer;
}
@ -2721,7 +2722,7 @@ NULL
addReplyLongLong(c,created ? 1 : 0);
} else if (!strcasecmp(opt,"DELCONSUMER") && c->argc == 5) {
long long pending = 0;
streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr,SLC_NO_REFRESH);
streamConsumer *consumer = streamLookupConsumer(cg,c->argv[4]->ptr);
if (consumer) {
/* Delete the consumer and returns the number of pending messages
* that were yet associated with such a consumer. */
@ -2996,7 +2997,7 @@ void xpendingCommand(client *c) {
} else { /* <start>, <stop> and <count> provided, return actual pending entries (not just info) */
streamConsumer *consumer = NULL;
if (consumername) {
consumer = streamLookupConsumer(group,consumername->ptr,SLC_NO_REFRESH);
consumer = streamLookupConsumer(group,consumername->ptr);
/* If a consumer name was mentioned but it does not exist, we can
* just return an empty array. */
@ -3221,10 +3222,14 @@ void xclaimCommand(client *c) {
}
/* Do the actual claiming. */
streamConsumer *consumer = NULL;
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr);
if (consumer == NULL) {
consumer = streamCreateConsumer(group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT);
}
consumer->seen_time = commandTimeSnapshot();
void *arraylenptr = addReplyDeferredLen(c);
size_t arraylen = 0;
sds name = c->argv[3]->ptr;
for (int j = 5; j <= last_id_arg; j++) {
streamID id = ids[j-5];
unsigned char buf[sizeof(streamID)];
@ -3272,11 +3277,6 @@ void xclaimCommand(client *c) {
if (this_idle < minidle) continue;
}
if (consumer == NULL &&
(consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL)
{
consumer = streamCreateConsumer(group,name,c->argv[1],c->db->id,SCC_DEFAULT);
}
if (nack->consumer != consumer) {
/* Remove the entry from the old consumer.
* Note that nack->consumer is NULL if we created the
@ -3305,6 +3305,8 @@ void xclaimCommand(client *c) {
}
arraylen++;
consumer->active_time = commandTimeSnapshot();
/* Propagate this change. */
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],c->argv[j],nack);
propagate_last_id = 0; /* Will be propagated by XCLAIM itself. */
@ -3400,7 +3402,12 @@ void xautoclaimCommand(client *c) {
}
/* Do the actual claiming. */
streamConsumer *consumer = NULL;
streamConsumer *consumer = streamLookupConsumer(group,c->argv[3]->ptr);
if (consumer == NULL) {
consumer = streamCreateConsumer(group,c->argv[3]->ptr,c->argv[1],c->db->id,SCC_DEFAULT);
}
consumer->seen_time = commandTimeSnapshot();
long long attempts = count * attempts_factor;
addReplyArrayLen(c, 3); /* We add another reply later */
@ -3414,7 +3421,6 @@ void xautoclaimCommand(client *c) {
raxSeek(&ri,">=",startkey,sizeof(startkey));
size_t arraylen = 0;
mstime_t now = commandTimeSnapshot();
sds name = c->argv[3]->ptr;
int deleted_id_num = 0;
while (attempts-- && count && raxNext(&ri)) {
streamNACK *nack = ri.data;
@ -3446,11 +3452,6 @@ void xautoclaimCommand(client *c) {
continue;
}
if (consumer == NULL &&
(consumer = streamLookupConsumer(group,name,SLC_DEFAULT)) == NULL)
{
consumer = streamCreateConsumer(group,name,c->argv[1],c->db->id,SCC_DEFAULT);
}
if (nack->consumer != consumer) {
/* Remove the entry from the old consumer.
* Note that nack->consumer is NULL if we created the
@ -3480,6 +3481,8 @@ void xautoclaimCommand(client *c) {
arraylen++;
count--;
consumer->active_time = commandTimeSnapshot();
/* Propagate this change. */
robj *idstr = createObjectFromStreamID(&id);
streamPropagateXCLAIM(c,c->argv[1],group,c->argv[2],idstr,nack);
@ -3787,7 +3790,7 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
raxSeek(&ri_consumers,"^",NULL,0);
while(raxNext(&ri_consumers)) {
streamConsumer *consumer = ri_consumers.data;
addReplyMapLen(c,4);
addReplyMapLen(c,5);
/* Consumer name */
addReplyBulkCString(c,"name");
@ -3797,6 +3800,10 @@ void xinfoReplyWithStreamInfo(client *c, stream *s) {
addReplyBulkCString(c,"seen-time");
addReplyLongLong(c,consumer->seen_time);
/* Active-time */
addReplyBulkCString(c,"active-time");
addReplyLongLong(c,consumer->active_time);
/* Consumer PEL count */
addReplyBulkCString(c,"pel-count");
addReplyLongLong(c,raxSize(consumer->pel));
@ -3887,16 +3894,19 @@ NULL
mstime_t now = commandTimeSnapshot();
while(raxNext(&ri)) {
streamConsumer *consumer = ri.data;
mstime_t inactive = consumer->active_time != -1 ? now - consumer->active_time : consumer->active_time;
mstime_t idle = now - consumer->seen_time;
if (idle < 0) idle = 0;
addReplyMapLen(c,3);
addReplyMapLen(c,4);
addReplyBulkCString(c,"name");
addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
addReplyBulkCString(c,"pending");
addReplyLongLong(c,raxSize(consumer->pel));
addReplyBulkCString(c,"idle");
addReplyLongLong(c,idle);
addReplyBulkCString(c,"inactive");
addReplyLongLong(c,inactive);
}
raxStop(&ri);
} else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) {

View File

@ -800,6 +800,51 @@ start_server {
assert_equal [dict get $reply entries] "{100-0 {a 1}}"
}
test {Consumer seen-time and active-time} {
r DEL mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
after 100
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert {[dict get $consumer_info idle] >= 100} ;# consumer idle (seen-time)
assert_equal [dict get $consumer_info inactive] "-1" ;# consumer inactive (active-time)
r XADD mystream * f v
r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert_equal [lindex $consumer_info 1] "Alice" ;# consumer name
assert {[dict get $consumer_info idle] < 80} ;# consumer idle (seen-time)
assert {[dict get $consumer_info inactive] < 80} ;# consumer inactive (active-time)
after 100
r XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream >
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert {[dict get $consumer_info idle] < 80} ;# consumer idle (seen-time)
assert {[dict get $consumer_info inactive] >= 100} ;# consumer inactive (active-time)
# Simulate loading from RDB
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
set consumer [lindex [dict get $group consumers] 0]
set prev_seen [dict get $consumer seen-time]
set prev_active [dict get $consumer active-time]
set dump [r DUMP mystream]
r DEL mystream
r RESTORE mystream 0 $dump
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
set consumer [lindex [dict get $group consumers] 0]
assert_equal $prev_seen [dict get $consumer seen-time]
assert_equal $prev_active [dict get $consumer active-time]
}
test {XGROUP CREATECONSUMER: create consumer if does not exist} {
r del mystream
r XGROUP CREATE mystream mygroup $ MKSTREAM
@ -889,13 +934,9 @@ start_server {
assert {$curr_grpinfo == $grpinfo}
set n_consumers [lindex $grpinfo 3]
# Bob should be created only when there will be new data for this consumer
assert_equal $n_consumers 2
set reply [r xinfo consumers mystream mygroup]
set consumer_info [lindex $reply 0]
assert_equal [lindex $consumer_info 1] "Alice"
set consumer_info [lindex $reply 1]
assert_equal [lindex $consumer_info 1] "Charlie"
# All consumers are created via XREADGROUP, regardless of whether they managed
# to read any entries ot not
assert_equal $n_consumers 3
$rd close
}
}
@ -1048,6 +1089,21 @@ start_server {
assert_equal [dict get $group lag] 2
}
test {Loading from legacy (Redis <= v7.0.x, rdb_ver < 11) persistence} {
# The payload was DUMPed from a v7 instance after:
# XGROUP CREATE x g $ MKSTREAM
# XADD x 1-1 f v
# XREADGROUP GROUP g Alice STREAMS x >
r DEL x
r RESTORE x 0 "\x13\x01\x10\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x1d\x1d\x00\x00\x00\n\x00\x01\x01\x00\x01\x01\x01\x81f\x02\x00\x01\x02\x01\x00\x01\x00\x01\x81v\x02\x04\x01\xff\x01\x01\x01\x01\x01\x00\x00\x01\x01\x01g\x01\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\xf5Zq\xc7\x84\x01\x00\x00\x01\x01\x05Alice\xf5Zq\xc7\x84\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x01\n\x00\xcev\xa9\xd6\xda`r\x12"
set reply [r XINFO STREAM x FULL]
set group [lindex [dict get $reply groups] 0]
set consumer [lindex [dict get $group consumers] 0]
assert_equal [dict get $consumer seen-time] [dict get $consumer active-time]
}
start_server {tags {"external:skip"}} {
set master [srv -1 client]
set master_host [srv -1 host]