Streams: implement streamReplyWithRange() in terms of the iterator.
This commit is contained in:
parent
0f48b7436d
commit
84af6ec436
@ -223,7 +223,7 @@ typedef struct streamIterator {
|
|||||||
*
|
*
|
||||||
* streamIterator myiterator;
|
* streamIterator myiterator;
|
||||||
* streamIteratorStart(&myiterator,...);
|
* streamIteratorStart(&myiterator,...);
|
||||||
* size_t numfields;
|
* int64_t numfields;
|
||||||
* while(streamIteratorGetID(&myitereator,&ID,&numfields)) {
|
* while(streamIteratorGetID(&myitereator,&ID,&numfields)) {
|
||||||
* while(numfields--) {
|
* while(numfields--) {
|
||||||
* unsigned char *key, *value;
|
* unsigned char *key, *value;
|
||||||
@ -265,7 +265,7 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI
|
|||||||
/* Return 1 and store the current item ID at 'id' if there are still
|
/* Return 1 and store the current item ID at 'id' if there are still
|
||||||
* elements within the iteration range, otherwise return 0 in order to
|
* elements within the iteration range, otherwise return 0 in order to
|
||||||
* signal the iteration terminated. */
|
* signal the iteration terminated. */
|
||||||
int streamIteratorGetID(streamIterator *si, streamID *id, size_t *numfields) {
|
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields) {
|
||||||
while(1) { /* Will stop when element > stop_key or end of radix tree. */
|
while(1) { /* Will stop when element > stop_key or end of radix tree. */
|
||||||
/* If the current listpack is set to NULL, this is the start of the
|
/* If the current listpack is set to NULL, this is the start of the
|
||||||
* iteration or the previous listpack was completely iterated.
|
* iteration or the previous listpack was completely iterated.
|
||||||
@ -336,74 +336,31 @@ void streamIteratorStop(streamIterator *si) {
|
|||||||
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count) {
|
size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count) {
|
||||||
void *arraylen_ptr = addDeferredMultiBulkLength(c);
|
void *arraylen_ptr = addDeferredMultiBulkLength(c);
|
||||||
size_t arraylen = 0;
|
size_t arraylen = 0;
|
||||||
|
streamIterator si;
|
||||||
|
int64_t numfields;
|
||||||
|
streamID id;
|
||||||
|
|
||||||
/* Seek the radix tree node that contains our start item. */
|
streamIteratorStart(&si,s,start,end);
|
||||||
uint64_t key[2];
|
while(streamIteratorGetID(&si,&id,&numfields)) {
|
||||||
uint64_t end_key[2];
|
/* Emit a two elements array for each item. The first is
|
||||||
streamEncodeID(key,start);
|
* the ID, the second is an array of field-value pairs. */
|
||||||
if (end) streamEncodeID(end_key,end);
|
sds replyid = sdscatfmt(sdsempty(),"+%U.%U\r\n",id.ms,id.seq);
|
||||||
raxIterator ri;
|
addReplyMultiBulkLen(c,2);
|
||||||
raxStart(&ri,s->rax);
|
addReplySds(c,replyid);
|
||||||
|
addReplyMultiBulkLen(c,numfields*2);
|
||||||
|
|
||||||
/* Seek the correct node in the radix tree. */
|
/* Emit the field-value pairs. */
|
||||||
if (start->ms || start->seq) {
|
while(numfields--) {
|
||||||
raxSeek(&ri,"<=",(unsigned char*)key,sizeof(key));
|
unsigned char *key, *value;
|
||||||
if (raxEOF(&ri)) raxSeek(&ri,">",(unsigned char*)key,sizeof(key));
|
int64_t key_len, value_len;
|
||||||
} else {
|
streamIteratorGetField(&si,&key,&value,&key_len,&value_len);
|
||||||
raxSeek(&ri,"^",NULL,0);
|
addReplyBulkCBuffer(c,key,key_len);
|
||||||
}
|
addReplyBulkCBuffer(c,value,value_len);
|
||||||
|
|
||||||
/* For every radix tree node, iterate the corresponding listpack,
|
|
||||||
* returning elmeents when they are within range. */
|
|
||||||
while (raxNext(&ri)) {
|
|
||||||
serverAssert(ri.key_len == sizeof(key));
|
|
||||||
unsigned char *lp = ri.data;
|
|
||||||
unsigned char *lp_ele = lpFirst(lp);
|
|
||||||
while(lp_ele) {
|
|
||||||
int64_t e_len;
|
|
||||||
unsigned char buf[LP_INTBUF_SIZE];
|
|
||||||
unsigned char *e = lpGet(lp_ele,&e_len,buf);
|
|
||||||
serverAssert(e_len == sizeof(streamID));
|
|
||||||
|
|
||||||
/* Seek next field: number of elements. */
|
|
||||||
lp_ele = lpNext(lp,lp_ele);
|
|
||||||
if (memcmp(e,key,sizeof(key)) >= 0) { /* If current >= start */
|
|
||||||
if (end && memcmp(e,end_key,sizeof(key)) > 0) {
|
|
||||||
break; /* We are already out of range. */
|
|
||||||
}
|
|
||||||
streamID thisid;
|
|
||||||
streamDecodeID(e,&thisid);
|
|
||||||
sds replyid = sdscatfmt(sdsempty(),"+%U.%U\r\n",
|
|
||||||
thisid.ms,thisid.seq);
|
|
||||||
|
|
||||||
/* Emit this stream entry in the client output. */
|
|
||||||
addReplyMultiBulkLen(c,2);
|
|
||||||
addReplySds(c,replyid);
|
|
||||||
int64_t numfields = lpGetInteger(lp_ele);
|
|
||||||
lp_ele = lpNext(lp,lp_ele);
|
|
||||||
addReplyMultiBulkLen(c,numfields*2);
|
|
||||||
for (int64_t i = 0; i < numfields; i++) {
|
|
||||||
/* Emit two items (key-value) per iteration. */
|
|
||||||
for (int k = 0; k < 2; k++) {
|
|
||||||
e = lpGet(lp_ele,&e_len,buf);
|
|
||||||
addReplyBulkCBuffer(c,e,e_len);
|
|
||||||
lp_ele = lpNext(lp,lp_ele);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
arraylen++;
|
|
||||||
if (count && count == arraylen) break;
|
|
||||||
} else {
|
|
||||||
/* If we do not emit, we have to discard. */
|
|
||||||
int64_t numfields = lpGetInteger(lp_ele);
|
|
||||||
lp_ele = lpNext(lp,lp_ele);
|
|
||||||
for (int64_t i = 0; i < numfields*2; i++)
|
|
||||||
lp_ele = lpNext(lp,lp_ele);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
arraylen++;
|
||||||
if (count && count == arraylen) break;
|
if (count && count == arraylen) break;
|
||||||
}
|
}
|
||||||
raxStop(&ri);
|
streamIteratorStop(&si);
|
||||||
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
|
setDeferredMultiBulkLength(c,arraylen_ptr,arraylen);
|
||||||
return arraylen;
|
return arraylen;
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user