CG: XPENDING without start/stop variant implemented.
This commit is contained in:
parent
b65fe09bb8
commit
1bc31666da
@ -310,6 +310,7 @@ struct redisCommand redisCommandTable[] = {
|
|||||||
{"xreadgroup",xreadCommand,-3,"ws",0,xreadGetKeys,1,1,1,0,0},
|
{"xreadgroup",xreadCommand,-3,"ws",0,xreadGetKeys,1,1,1,0,0},
|
||||||
{"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
|
{"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0},
|
||||||
{"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0},
|
{"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0},
|
||||||
|
{"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0},
|
||||||
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
|
{"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
|
||||||
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
|
{"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0},
|
||||||
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
|
{"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0}
|
||||||
|
@ -2027,6 +2027,7 @@ void xlenCommand(client *c);
|
|||||||
void xreadCommand(client *c);
|
void xreadCommand(client *c);
|
||||||
void xgroupCommand(client *c);
|
void xgroupCommand(client *c);
|
||||||
void xackCommand(client *c);
|
void xackCommand(client *c);
|
||||||
|
void xpendingCommand(client *c);
|
||||||
|
|
||||||
#if defined(__GNUC__)
|
#if defined(__GNUC__)
|
||||||
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
void *calloc(size_t count, size_t size) __attribute__ ((deprecated));
|
||||||
|
@ -1447,7 +1447,7 @@ void xackCommand(client *c) {
|
|||||||
addReplyLongLong(c,acknowledged);
|
addReplyLongLong(c,acknowledged);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* XPENDING <key> <group> [<start> <stop>]
|
/* XPENDING <key> <group> [<start> <stop> <max>] [<consumer>]
|
||||||
*
|
*
|
||||||
* If start and stop are omitted, the command just outputs information about
|
* If start and stop are omitted, the command just outputs information about
|
||||||
* the amount of pending messages for the key/group pair, together with
|
* the amount of pending messages for the key/group pair, together with
|
||||||
@ -1456,6 +1456,82 @@ void xackCommand(client *c) {
|
|||||||
* If start and stop are provided instead, the pending messages are returned
|
* If start and stop are provided instead, the pending messages are returned
|
||||||
* with informations about the current owner, number of deliveries and last
|
* with informations about the current owner, number of deliveries and last
|
||||||
* delivery time and so forth. */
|
* delivery time and so forth. */
|
||||||
|
void xpendingCommand(client *c) {
|
||||||
|
int justinfo = c->argc == 3; /* Without the range just outputs general
|
||||||
|
informations about the PEL. */
|
||||||
|
robj *key = c->argv[1];
|
||||||
|
robj *groupname = c->argv[2];
|
||||||
|
robj *consumername = (c->argc == 7) ? c->argv[6] : NULL;
|
||||||
|
|
||||||
|
/* Start and stop, and the consumer, can be omitted. */
|
||||||
|
if (c->argc != 3 && c->argc != 6 && c->argc != 7) {
|
||||||
|
addReply(c,shared.syntaxerr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Lookup the key and the group inside the stream. */
|
||||||
|
robj *o = lookupKeyRead(c->db,c->argv[1]);
|
||||||
|
streamCG *group;
|
||||||
|
|
||||||
|
if (o && checkType(c,o,OBJ_STREAM)) return;
|
||||||
|
if (o == NULL ||
|
||||||
|
(group = streamLookupCG(o->ptr,groupname->ptr)) == NULL)
|
||||||
|
{
|
||||||
|
addReplyErrorFormat(c, "No such key '%s' or consumer "
|
||||||
|
"group '%s'",
|
||||||
|
key->ptr,groupname->ptr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* XPENDING <key> <group> variant. */
|
||||||
|
if (justinfo) {
|
||||||
|
addReplyMultiBulkLen(c,4);
|
||||||
|
/* Total number of messages in the PEL. */
|
||||||
|
addReplyLongLong(c,raxSize(group->pel));
|
||||||
|
/* First and last IDs. */
|
||||||
|
if (raxSize(group->pel) == 0) {
|
||||||
|
addReply(c,shared.nullbulk); /* Start. */
|
||||||
|
addReply(c,shared.nullbulk); /* End. */
|
||||||
|
addReply(c,shared.nullmultibulk); /* Clients. */
|
||||||
|
} else {
|
||||||
|
streamID startid,endid;
|
||||||
|
|
||||||
|
/* Start. */
|
||||||
|
raxIterator ri;
|
||||||
|
raxStart(&ri,group->pel);
|
||||||
|
raxSeek(&ri,"^",NULL,0);
|
||||||
|
raxNext(&ri);
|
||||||
|
streamDecodeID(ri.key,&startid);
|
||||||
|
addReplyStreamID(c,&startid);
|
||||||
|
|
||||||
|
/* End. */
|
||||||
|
raxSeek(&ri,"$",NULL,0);
|
||||||
|
raxNext(&ri);
|
||||||
|
streamDecodeID(ri.key,&endid);
|
||||||
|
addReplyStreamID(c,&endid);
|
||||||
|
raxStop(&ri);
|
||||||
|
|
||||||
|
/* Consumers with pending messages. */
|
||||||
|
raxStart(&ri,group->consumers);
|
||||||
|
raxSeek(&ri,"^",NULL,0);
|
||||||
|
void *arraylen_ptr = addDeferredMultiBulkLength(c);
|
||||||
|
size_t arraylen = 0;
|
||||||
|
while(raxNext(&ri)) {
|
||||||
|
streamConsumer *consumer = ri.data;
|
||||||
|
if (raxSize(consumer->pel) == 0) continue;
|
||||||
|
addReplyMultiBulkLen(c,2);
|
||||||
|
addReplyBulkCBuffer(c,ri.key,ri.key_len);
|
||||||
|
addReplyBulkLongLong(c,raxSize(consumer->pel));
|
||||||
|
arraylen++;
|
||||||
|
}
|
||||||
|
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
|
||||||
|
raxStop(&ri);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
/* XPENDING <key> <group> <start> <stop> <count> [<consumer>] variant. */
|
||||||
|
else {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ...*/
|
/* XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ...*/
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user