diff --git a/src/server.c b/src/server.c index c39acfce2..bebc67fa0 100644 --- a/src/server.c +++ b/src/server.c @@ -310,6 +310,7 @@ struct redisCommand redisCommandTable[] = { {"xreadgroup",xreadCommand,-3,"ws",0,xreadGetKeys,1,1,1,0,0}, {"xgroup",xgroupCommand,-2,"wm",0,NULL,2,2,1,0,0}, {"xack",xackCommand,-3,"wF",0,NULL,1,1,1,0,0}, + {"xpending",xpendingCommand,-3,"r",0,NULL,1,1,1,0,0}, {"post",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"host:",securityWarningCommand,-1,"lt",0,NULL,0,0,0,0,0}, {"latency",latencyCommand,-2,"aslt",0,NULL,0,0,0,0,0} diff --git a/src/server.h b/src/server.h index d2ef36983..5863e0c79 100644 --- a/src/server.h +++ b/src/server.h @@ -2027,6 +2027,7 @@ void xlenCommand(client *c); void xreadCommand(client *c); void xgroupCommand(client *c); void xackCommand(client *c); +void xpendingCommand(client *c); #if defined(__GNUC__) void *calloc(size_t count, size_t size) __attribute__ ((deprecated)); diff --git a/src/t_stream.c b/src/t_stream.c index f4babd3fc..0ca1cea03 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1447,7 +1447,7 @@ void xackCommand(client *c) { addReplyLongLong(c,acknowledged); } -/* XPENDING [ ] +/* XPENDING [ ] [] * * If start and stop are omitted, the command just outputs information about * the amount of pending messages for the key/group pair, together with @@ -1456,6 +1456,82 @@ void xackCommand(client *c) { * If start and stop are provided instead, the pending messages are returned * with informations about the current owner, number of deliveries and last * delivery time and so forth. */ +void xpendingCommand(client *c) { + int justinfo = c->argc == 3; /* Without the range just outputs general + informations about the PEL. */ + robj *key = c->argv[1]; + robj *groupname = c->argv[2]; + robj *consumername = (c->argc == 7) ? c->argv[6] : NULL; + + /* Start and stop, and the consumer, can be omitted. */ + if (c->argc != 3 && c->argc != 6 && c->argc != 7) { + addReply(c,shared.syntaxerr); + return; + } + + /* Lookup the key and the group inside the stream. */ + robj *o = lookupKeyRead(c->db,c->argv[1]); + streamCG *group; + + if (o && checkType(c,o,OBJ_STREAM)) return; + if (o == NULL || + (group = streamLookupCG(o->ptr,groupname->ptr)) == NULL) + { + addReplyErrorFormat(c, "No such key '%s' or consumer " + "group '%s'", + key->ptr,groupname->ptr); + return; + } + + /* XPENDING variant. */ + if (justinfo) { + addReplyMultiBulkLen(c,4); + /* Total number of messages in the PEL. */ + addReplyLongLong(c,raxSize(group->pel)); + /* First and last IDs. */ + if (raxSize(group->pel) == 0) { + addReply(c,shared.nullbulk); /* Start. */ + addReply(c,shared.nullbulk); /* End. */ + addReply(c,shared.nullmultibulk); /* Clients. */ + } else { + streamID startid,endid; + + /* Start. */ + raxIterator ri; + raxStart(&ri,group->pel); + raxSeek(&ri,"^",NULL,0); + raxNext(&ri); + streamDecodeID(ri.key,&startid); + addReplyStreamID(c,&startid); + + /* End. */ + raxSeek(&ri,"$",NULL,0); + raxNext(&ri); + streamDecodeID(ri.key,&endid); + addReplyStreamID(c,&endid); + raxStop(&ri); + + /* Consumers with pending messages. */ + raxStart(&ri,group->consumers); + raxSeek(&ri,"^",NULL,0); + void *arraylen_ptr = addDeferredMultiBulkLength(c); + size_t arraylen = 0; + while(raxNext(&ri)) { + streamConsumer *consumer = ri.data; + if (raxSize(consumer->pel) == 0) continue; + addReplyMultiBulkLen(c,2); + addReplyBulkCBuffer(c,ri.key,ri.key_len); + addReplyBulkLongLong(c,raxSize(consumer->pel)); + arraylen++; + } + setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); + raxStop(&ri); + } + } + /* XPENDING [] variant. */ + else { + } +} /* XCLAIM ...*/