Merge pull request #7134 from guybe7/xstate_command
Extend XINFO STREAM output
This commit is contained in:
commit
e0de7c0852
220
src/t_stream.c
220
src/t_stream.c
@ -2489,15 +2489,199 @@ void xtrimCommand(client *c) {
|
||||
addReplyLongLong(c,deleted);
|
||||
}
|
||||
|
||||
/* Helper function for xinfoCommand.
|
||||
* Handles the variants of XINFO STREAM */
|
||||
void xinfoReplyWithStreamInfo(client *c, stream *s) {
|
||||
int full = 1;
|
||||
long long count = 0;
|
||||
robj **optv = c->argv + 3; /* Options start after XINFO STREAM <key> */
|
||||
int optc = c->argc - 3;
|
||||
|
||||
/* Parse options. */
|
||||
if (optc == 0) {
|
||||
full = 0;
|
||||
} else {
|
||||
/* Valid options are [FULL] or [FULL COUNT <count>] */
|
||||
if (optc != 1 && optc != 3) {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
return;
|
||||
}
|
||||
|
||||
/* First option must be "FULL" */
|
||||
if (strcasecmp(optv[0]->ptr,"full")) {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
return;
|
||||
}
|
||||
|
||||
if (optc == 3) {
|
||||
/* First option must be "FULL" */
|
||||
if (strcasecmp(optv[1]->ptr,"count")) {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
return;
|
||||
}
|
||||
if (getLongLongFromObjectOrReply(c,optv[2],&count,NULL) == C_ERR)
|
||||
return;
|
||||
if (count < 0) count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
addReplyMapLen(c,full ? 6 : 7);
|
||||
addReplyBulkCString(c,"length");
|
||||
addReplyLongLong(c,s->length);
|
||||
addReplyBulkCString(c,"radix-tree-keys");
|
||||
addReplyLongLong(c,raxSize(s->rax));
|
||||
addReplyBulkCString(c,"radix-tree-nodes");
|
||||
addReplyLongLong(c,s->rax->numnodes);
|
||||
addReplyBulkCString(c,"last-generated-id");
|
||||
addReplyStreamID(c,&s->last_id);
|
||||
|
||||
if (!full) {
|
||||
/* XINFO STREAM <key> */
|
||||
|
||||
addReplyBulkCString(c,"groups");
|
||||
addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
|
||||
|
||||
/* To emit the first/last entry we use streamReplyWithRange(). */
|
||||
int emitted;
|
||||
streamID start, end;
|
||||
start.ms = start.seq = 0;
|
||||
end.ms = end.seq = UINT64_MAX;
|
||||
addReplyBulkCString(c,"first-entry");
|
||||
emitted = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
|
||||
STREAM_RWR_RAWENTRIES,NULL);
|
||||
if (!emitted) addReplyNull(c);
|
||||
addReplyBulkCString(c,"last-entry");
|
||||
emitted = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
|
||||
STREAM_RWR_RAWENTRIES,NULL);
|
||||
if (!emitted) addReplyNull(c);
|
||||
} else {
|
||||
/* XINFO STREAM <key> FULL [COUNT <count>] */
|
||||
|
||||
/* Stream entries */
|
||||
addReplyBulkCString(c,"entries");
|
||||
streamReplyWithRange(c,s,NULL,NULL,count,0,NULL,NULL,0,NULL);
|
||||
|
||||
/* Consumer groups */
|
||||
addReplyBulkCString(c,"groups");
|
||||
if (s->cgroups == NULL) {
|
||||
addReplyArrayLen(c,0);
|
||||
} else {
|
||||
addReplyArrayLen(c,raxSize(s->cgroups));
|
||||
raxIterator ri_cgroups;
|
||||
raxStart(&ri_cgroups,s->cgroups);
|
||||
raxSeek(&ri_cgroups,"^",NULL,0);
|
||||
while(raxNext(&ri_cgroups)) {
|
||||
streamCG *cg = ri_cgroups.data;
|
||||
addReplyMapLen(c,5);
|
||||
|
||||
/* Name */
|
||||
addReplyBulkCString(c,"name");
|
||||
addReplyBulkCBuffer(c,ri_cgroups.key,ri_cgroups.key_len);
|
||||
|
||||
/* Last delivered ID */
|
||||
addReplyBulkCString(c,"last-delivered-id");
|
||||
addReplyStreamID(c,&cg->last_id);
|
||||
|
||||
/* Group PEL count */
|
||||
addReplyBulkCString(c,"pel-count");
|
||||
addReplyLongLong(c,raxSize(cg->pel));
|
||||
|
||||
/* Group PEL */
|
||||
addReplyBulkCString(c,"pending");
|
||||
long long arraylen_cg_pel = 0;
|
||||
void *arrayptr_cg_pel = addReplyDeferredLen(c);
|
||||
raxIterator ri_cg_pel;
|
||||
raxStart(&ri_cg_pel,cg->pel);
|
||||
raxSeek(&ri_cg_pel,"^",NULL,0);
|
||||
while(raxNext(&ri_cg_pel) && (!count || arraylen_cg_pel < count)) {
|
||||
streamNACK *nack = ri_cg_pel.data;
|
||||
addReplyArrayLen(c,4);
|
||||
|
||||
/* Entry ID. */
|
||||
streamID id;
|
||||
streamDecodeID(ri_cg_pel.key,&id);
|
||||
addReplyStreamID(c,&id);
|
||||
|
||||
/* Consumer name. */
|
||||
addReplyBulkCBuffer(c,nack->consumer->name,
|
||||
sdslen(nack->consumer->name));
|
||||
|
||||
/* Last delivery. */
|
||||
addReplyLongLong(c,nack->delivery_time);
|
||||
|
||||
/* Number of deliveries. */
|
||||
addReplyLongLong(c,nack->delivery_count);
|
||||
|
||||
arraylen_cg_pel++;
|
||||
}
|
||||
setDeferredArrayLen(c,arrayptr_cg_pel,arraylen_cg_pel);
|
||||
raxStop(&ri_cg_pel);
|
||||
|
||||
/* Consumers */
|
||||
addReplyBulkCString(c,"consumers");
|
||||
addReplyArrayLen(c,raxSize(cg->consumers));
|
||||
raxIterator ri_consumers;
|
||||
raxStart(&ri_consumers,cg->consumers);
|
||||
raxSeek(&ri_consumers,"^",NULL,0);
|
||||
while(raxNext(&ri_consumers)) {
|
||||
streamConsumer *consumer = ri_consumers.data;
|
||||
addReplyMapLen(c,4);
|
||||
|
||||
/* Consumer name */
|
||||
addReplyBulkCString(c,"name");
|
||||
addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name));
|
||||
|
||||
/* Seen-time */
|
||||
addReplyBulkCString(c,"seen-time");
|
||||
addReplyLongLong(c,consumer->seen_time);
|
||||
|
||||
/* Consumer PEL count */
|
||||
addReplyBulkCString(c,"pel-count");
|
||||
addReplyLongLong(c,raxSize(consumer->pel));
|
||||
|
||||
/* Consumer PEL */
|
||||
addReplyBulkCString(c,"pending");
|
||||
long long arraylen_cpel = 0;
|
||||
void *arrayptr_cpel = addReplyDeferredLen(c);
|
||||
raxIterator ri_cpel;
|
||||
raxStart(&ri_cpel,consumer->pel);
|
||||
raxSeek(&ri_cpel,"^",NULL,0);
|
||||
while(raxNext(&ri_cpel) && (!count || arraylen_cpel < count)) {
|
||||
streamNACK *nack = ri_cpel.data;
|
||||
addReplyArrayLen(c,3);
|
||||
|
||||
/* Entry ID. */
|
||||
streamID id;
|
||||
streamDecodeID(ri_cpel.key,&id);
|
||||
addReplyStreamID(c,&id);
|
||||
|
||||
/* Last delivery. */
|
||||
addReplyLongLong(c,nack->delivery_time);
|
||||
|
||||
/* Number of deliveries. */
|
||||
addReplyLongLong(c,nack->delivery_count);
|
||||
|
||||
arraylen_cpel++;
|
||||
}
|
||||
setDeferredArrayLen(c,arrayptr_cpel,arraylen_cpel);
|
||||
raxStop(&ri_cpel);
|
||||
}
|
||||
raxStop(&ri_consumers);
|
||||
}
|
||||
raxStop(&ri_cgroups);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* XINFO CONSUMERS <key> <group>
|
||||
* XINFO GROUPS <key>
|
||||
* XINFO STREAM <key>
|
||||
* XINFO STREAM <key> [FULL [COUNT <count>]]
|
||||
* XINFO HELP. */
|
||||
void xinfoCommand(client *c) {
|
||||
const char *help[] = {
|
||||
"CONSUMERS <key> <groupname> -- Show consumer groups of group <groupname>.",
|
||||
"GROUPS <key> -- Show the stream consumer groups.",
|
||||
"STREAM <key> -- Show information about the stream.",
|
||||
"STREAM <key> [FULL [COUNT <count>]] -- Show information about the stream.",
|
||||
"HELP -- Print this help.",
|
||||
NULL
|
||||
};
|
||||
@ -2578,36 +2762,10 @@ NULL
|
||||
addReplyStreamID(c,&cg->last_id);
|
||||
}
|
||||
raxStop(&ri);
|
||||
} else if (!strcasecmp(opt,"STREAM") && c->argc == 3) {
|
||||
/* XINFO STREAM <key> (or the alias XINFO <key>). */
|
||||
addReplyMapLen(c,7);
|
||||
addReplyBulkCString(c,"length");
|
||||
addReplyLongLong(c,s->length);
|
||||
addReplyBulkCString(c,"radix-tree-keys");
|
||||
addReplyLongLong(c,raxSize(s->rax));
|
||||
addReplyBulkCString(c,"radix-tree-nodes");
|
||||
addReplyLongLong(c,s->rax->numnodes);
|
||||
addReplyBulkCString(c,"groups");
|
||||
addReplyLongLong(c,s->cgroups ? raxSize(s->cgroups) : 0);
|
||||
addReplyBulkCString(c,"last-generated-id");
|
||||
addReplyStreamID(c,&s->last_id);
|
||||
|
||||
/* To emit the first/last entry we us the streamReplyWithRange()
|
||||
* API. */
|
||||
int count;
|
||||
streamID start, end;
|
||||
start.ms = start.seq = 0;
|
||||
end.ms = end.seq = UINT64_MAX;
|
||||
addReplyBulkCString(c,"first-entry");
|
||||
count = streamReplyWithRange(c,s,&start,&end,1,0,NULL,NULL,
|
||||
STREAM_RWR_RAWENTRIES,NULL);
|
||||
if (!count) addReplyNull(c);
|
||||
addReplyBulkCString(c,"last-entry");
|
||||
count = streamReplyWithRange(c,s,&start,&end,1,1,NULL,NULL,
|
||||
STREAM_RWR_RAWENTRIES,NULL);
|
||||
if (!count) addReplyNull(c);
|
||||
} else if (!strcasecmp(opt,"STREAM")) {
|
||||
/* XINFO STREAM <key> [FULL [COUNT <count>]]. */
|
||||
xinfoReplyWithStreamInfo(c,s);
|
||||
} else {
|
||||
addReplySubcommandSyntaxError(c);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -294,6 +294,40 @@ start_server {
|
||||
assert {[lindex $reply 0 3] == 2}
|
||||
}
|
||||
|
||||
test {XINFO FULL output} {
|
||||
r del x
|
||||
r XADD x 100 a 1
|
||||
r XADD x 101 b 1
|
||||
r XADD x 102 c 1
|
||||
r XADD x 103 e 1
|
||||
r XADD x 104 f 1
|
||||
r XGROUP CREATE x g1 0
|
||||
r XGROUP CREATE x g2 0
|
||||
r XREADGROUP GROUP g1 Alice COUNT 1 STREAMS x >
|
||||
r XREADGROUP GROUP g1 Bob COUNT 1 STREAMS x >
|
||||
r XREADGROUP GROUP g1 Bob NOACK COUNT 1 STREAMS x >
|
||||
r XREADGROUP GROUP g2 Charlie COUNT 4 STREAMS x >
|
||||
r XDEL x 103
|
||||
|
||||
set reply [r XINFO STREAM x FULL]
|
||||
assert_equal [llength $reply] 12
|
||||
assert_equal [lindex $reply 1] 4 ;# stream length
|
||||
assert_equal [lindex $reply 9] "{100-0 {a 1}} {101-0 {b 1}} {102-0 {c 1}} {104-0 {f 1}}" ;# entries
|
||||
assert_equal [lindex $reply 11 0 1] "g1" ;# first group name
|
||||
assert_equal [lindex $reply 11 0 7 0 0] "100-0" ;# first entry in group's PEL
|
||||
assert_equal [lindex $reply 11 0 9 0 1] "Alice" ;# first consumer
|
||||
assert_equal [lindex $reply 11 0 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL
|
||||
assert_equal [lindex $reply 11 1 1] "g2" ;# second group name
|
||||
assert_equal [lindex $reply 11 1 9 0 1] "Charlie" ;# first consumer
|
||||
assert_equal [lindex $reply 11 1 9 0 7 0 0] "100-0" ;# first entry in first consumer's PEL
|
||||
assert_equal [lindex $reply 11 1 9 0 7 1 0] "101-0" ;# second entry in first consumer's PEL
|
||||
|
||||
set reply [r XINFO STREAM x FULL COUNT 1]
|
||||
assert_equal [llength $reply] 12
|
||||
assert_equal [lindex $reply 1] 4
|
||||
assert_equal [lindex $reply 9] "{100-0 {a 1}}"
|
||||
}
|
||||
|
||||
start_server {} {
|
||||
set master [srv -1 client]
|
||||
set master_host [srv -1 host]
|
||||
|
Loading…
x
Reference in New Issue
Block a user