Streams: implement streamReplyWithRange() in terms of the iterator.

This commit is contained in:
antirez 2017-09-14 14:46:31 +02:00
parent a58733cacf
commit 9ed40f0fc3

View File

@ -223,7 +223,7 @@ typedef struct streamIterator {
*
* streamIterator myiterator;
* streamIteratorStart(&myiterator,...);
* size_t numfields;
* int64_t numfields;
* while(streamIteratorGetID(&myitereator,&ID,&numfields)) {
* while(numfields--) {
* unsigned char *key, *value;
@ -265,7 +265,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
/* Return 1 and store the current item ID at 'id' if there are still
* elements within the iteration range, otherwise return 0 in order to
* signal the iteration terminated. */
int streamIteratorGetID(streamIterator *si, streamID *id, size_t *numfields) {
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
while(1) { /* Will stop when element > stop_key or end of radix tree. */
/* If the current listpack is set to NULL, this is the start of the
* iteration or the previous listpack was completely iterated.
@ -336,74 +336,31 @@ void streamIteratorStop(streamIterator *si) {
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count) {
void *arraylen_ptr = addDeferredMultiBulkLength(c);
size_t arraylen = 0;
streamIterator si;
int64_t numfields;
streamID id;
/* Seek the radix tree node that contains our start item. */
uint64_t key[2];
uint64_t end_key[2];
streamEncodeID(key,start);
if (end) streamEncodeID(end_key,end);
raxIterator ri;
raxStart(&ri,s->rax);
streamIteratorStart(&si,s,start,end);
while(streamIteratorGetID(&si,&id,&numfields)) {
/* Emit a two elements array for each item. The first is
* the ID, the second is an array of field-value pairs. */
sds replyid = sdscatfmt(sdsempty(),"+%U.%U\r\n",id.ms,id.seq);
addReplyMultiBulkLen(c,2);
addReplySds(c,replyid);
addReplyMultiBulkLen(c,numfields*2);
/* Seek the correct node in the radix tree. */
if (start->ms || start->seq) {
raxSeek(&ri,"<=",(unsigned char*)key,sizeof(key));
if (raxEOF(&ri)) raxSeek(&ri,">",(unsigned char*)key,sizeof(key));
} else {
raxSeek(&ri,"^",NULL,0);
}
/* For every radix tree node, iterate the corresponding listpack,
* returning elmeents when they are within range. */
while (raxNext(&ri)) {
serverAssert(ri.key_len == sizeof(key));
unsigned char *lp = ri.data;
unsigned char *lp_ele = lpFirst(lp);
while(lp_ele) {
int64_t e_len;
unsigned char buf[LP_INTBUF_SIZE];
unsigned char *e = lpGet(lp_ele,&e_len,buf);
serverAssert(e_len == sizeof(streamID));
/* Seek next field: number of elements. */
lp_ele = lpNext(lp,lp_ele);
if (memcmp(e,key,sizeof(key)) >= 0) { /* If current >= start */
if (end && memcmp(e,end_key,sizeof(key)) > 0) {
break; /* We are already out of range. */
}
streamID thisid;
streamDecodeID(e,&thisid);
sds replyid = sdscatfmt(sdsempty(),"+%U.%U\r\n",
thisid.ms,thisid.seq);
/* Emit this stream entry in the client output. */
addReplyMultiBulkLen(c,2);
addReplySds(c,replyid);
int64_t numfields = lpGetInteger(lp_ele);
lp_ele = lpNext(lp,lp_ele);
addReplyMultiBulkLen(c,numfields*2);
for (int64_t i = 0; i < numfields; i++) {
/* Emit two items (key-value) per iteration. */
for (int k = 0; k < 2; k++) {
e = lpGet(lp_ele,&e_len,buf);
addReplyBulkCBuffer(c,e,e_len);
lp_ele = lpNext(lp,lp_ele);
}
}
arraylen++;
if (count && count == arraylen) break;
} else {
/* If we do not emit, we have to discard. */
int64_t numfields = lpGetInteger(lp_ele);
lp_ele = lpNext(lp,lp_ele);
for (int64_t i = 0; i < numfields*2; i++)
lp_ele = lpNext(lp,lp_ele);
}
/* Emit the field-value pairs. */
while(numfields--) {
unsigned char *key, *value;
int64_t key_len, value_len;
streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
addReplyBulkCBuffer(c,key,key_len);
addReplyBulkCBuffer(c,value,value_len);
}
arraylen++;
if (count && count == arraylen) break;
}
raxStop(&ri);
streamIteratorStop(&si);
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
return arraylen;
}