diff --git a/src/t_stream.c b/src/t_stream.c index 03860b8e4..de9561a51 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -223,7 +223,7 @@ typedef struct streamIterator { * * streamIterator myiterator; * streamIteratorStart(&myiterator,...); - * size_t numfields; + * int64_t numfields; * while(streamIteratorGetID(&myitereator,&ID,&numfields)) { * while(numfields--) { * unsigned char *key, *value; @@ -265,7 +265,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI /* Return 1 and store the current item ID at 'id' if there are still * elements within the iteration range, otherwise return 0 in order to * signal the iteration terminated. */ -int streamIteratorGetID(streamIterator *si, streamID *id, size_t *numfields) { +int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) { while(1) { /* Will stop when element > stop_key or end of radix tree. */ /* If the current listpack is set to NULL, this is the start of the * iteration or the previous listpack was completely iterated. @@ -336,74 +336,31 @@ void streamIteratorStop(streamIterator *si) { size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count) { void *arraylen_ptr = addDeferredMultiBulkLength(c); size_t arraylen = 0; + streamIterator si; + int64_t numfields; + streamID id; - /* Seek the radix tree node that contains our start item. */ - uint64_t key[2]; - uint64_t end_key[2]; - streamEncodeID(key,start); - if (end) streamEncodeID(end_key,end); - raxIterator ri; - raxStart(&ri,s->rax); + streamIteratorStart(&si,s,start,end); + while(streamIteratorGetID(&si,&id,&numfields)) { + /* Emit a two elements array for each item. The first is + * the ID, the second is an array of field-value pairs. */ + sds replyid = sdscatfmt(sdsempty(),"+%U.%U\r\n",id.ms,id.seq); + addReplyMultiBulkLen(c,2); + addReplySds(c,replyid); + addReplyMultiBulkLen(c,numfields*2); - /* Seek the correct node in the radix tree. */ - if (start->ms || start->seq) { - raxSeek(&ri,"<=",(unsigned char*)key,sizeof(key)); - if (raxEOF(&ri)) raxSeek(&ri,">",(unsigned char*)key,sizeof(key)); - } else { - raxSeek(&ri,"^",NULL,0); - } - - /* For every radix tree node, iterate the corresponding listpack, - * returning elmeents when they are within range. */ - while (raxNext(&ri)) { - serverAssert(ri.key_len == sizeof(key)); - unsigned char *lp = ri.data; - unsigned char *lp_ele = lpFirst(lp); - while(lp_ele) { - int64_t e_len; - unsigned char buf[LP_INTBUF_SIZE]; - unsigned char *e = lpGet(lp_ele,&e_len,buf); - serverAssert(e_len == sizeof(streamID)); - - /* Seek next field: number of elements. */ - lp_ele = lpNext(lp,lp_ele); - if (memcmp(e,key,sizeof(key)) >= 0) { /* If current >= start */ - if (end && memcmp(e,end_key,sizeof(key)) > 0) { - break; /* We are already out of range. */ - } - streamID thisid; - streamDecodeID(e,&thisid); - sds replyid = sdscatfmt(sdsempty(),"+%U.%U\r\n", - thisid.ms,thisid.seq); - - /* Emit this stream entry in the client output. */ - addReplyMultiBulkLen(c,2); - addReplySds(c,replyid); - int64_t numfields = lpGetInteger(lp_ele); - lp_ele = lpNext(lp,lp_ele); - addReplyMultiBulkLen(c,numfields*2); - for (int64_t i = 0; i < numfields; i++) { - /* Emit two items (key-value) per iteration. */ - for (int k = 0; k < 2; k++) { - e = lpGet(lp_ele,&e_len,buf); - addReplyBulkCBuffer(c,e,e_len); - lp_ele = lpNext(lp,lp_ele); - } - } - - arraylen++; - if (count && count == arraylen) break; - } else { - /* If we do not emit, we have to discard. */ - int64_t numfields = lpGetInteger(lp_ele); - lp_ele = lpNext(lp,lp_ele); - for (int64_t i = 0; i < numfields*2; i++) - lp_ele = lpNext(lp,lp_ele); - } + /* Emit the field-value pairs. */ + while(numfields--) { + unsigned char *key, *value; + int64_t key_len, value_len; + streamIteratorGetField(&si,&key,&value,&key_len,&value_len); + addReplyBulkCBuffer(c,key,key_len); + addReplyBulkCBuffer(c,value,value_len); } + arraylen++; if (count && count == arraylen) break; } - raxStop(&ri); + streamIteratorStop(&si); setDeferredMultiBulkLength(c,arraylen_ptr,arraylen); return arraylen; }