From 1f10a99efdeb01ec89783f64090f2502ac5614d6 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 13 Mar 2018 16:26:48 +0100 Subject: [PATCH] CG: XINFO GROUPS + output format changes. XINFO is mainly an observability command that will be used more by humans than computers, and even when used by computers it will be a very low traffic command. For this reason the format was changed in order to have field names. They'll consume some bandwidth and CPU cycles, but in this context this is much better than having to understand what the numbers in the output array are. --- src/t_stream.c | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index 7944ab398..ad6d1c79a 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1863,6 +1863,7 @@ void xinfoCommand(client *c) { " CONSUMERS -- Show consumer groups of group .", " GROUPS -- Show the stream consumer groups.", " STREAM -- Show information about the stream.", +" (without subcommand) -- Alias for STREAM.", " HELP -- Prints this help.", NULL }; @@ -1878,6 +1879,8 @@ NULL /* Dispatch the different subcommands. */ if (!strcasecmp(opt,"CONSUMERS") && c->argc == 4) { + /* XINFO CONSUMERS . */ + streamCG *cg = streamLookupCG(s,c->argv[3]->ptr); if (cg == NULL) { addReplyErrorFormat(c, "-NOGROUP No such consumer group '%s' " @@ -1896,16 +1899,42 @@ NULL mstime_t idle = now - consumer->seen_time; if (idle < 0) idle = 0; - addReplyMultiBulkLen(c,3); + addReplyMultiBulkLen(c,6); + addReplyStatus(c,"name"); addReplyBulkCBuffer(c,consumer->name,sdslen(consumer->name)); + addReplyStatus(c,"pending"); addReplyLongLong(c,raxSize(consumer->pel)); + addReplyStatus(c,"idle"); addReplyLongLong(c,idle); } raxStop(&ri); + } else if (!strcasecmp(opt,"GROUPS") && c->argc == 3) { + /* XINFO GROUPS. */ + + if (s->cgroups == NULL) { + addReplyMultiBulkLen(c,0); + return; + } + + addReplyMultiBulkLen(c,raxSize(s->cgroups)); + raxIterator ri; + raxStart(&ri,s->cgroups); + raxSeek(&ri,"^",NULL,0); + while(raxNext(&ri)) { + streamCG *cg = ri.data; + addReplyMultiBulkLen(c,6); + addReplyStatus(c,"name"); + addReplyBulkCBuffer(c,ri.key,ri.key_len); + addReplyStatus(c,"consumers"); + addReplyLongLong(c,raxSize(cg->consumers)); + addReplyStatus(c,"pending"); + addReplyLongLong(c,raxSize(cg->pel)); + } + raxStop(&ri); } else if (!strcasecmp(opt,"HELP")) { addReplyHelp(c, help); } else { - addReply(c,shared.syntaxerr); + addReplyError(c,"syntax error, try 'XINFO anykey HELP'"); } }