Streams: stream iteration refactoring, WIP 1.
This commit is contained in:
parent
1a603e1a87
commit
b1ec333633
114
src/t_stream.c
114
src/t_stream.c
@ -196,6 +196,120 @@ int streamAppendItem(stream *s, robj **argv, int numfields, streamID *added_id,
|
|||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* We define an iterator to iterate stream items in an abstract way, without
|
||||||
|
* caring about the radix tree + listpack representation. Technically speaking
|
||||||
|
* the iterator is only used inside streamReplyWithRange(), so could just
|
||||||
|
* be implemented inside the function, but practically there is the AOF
|
||||||
|
* rewriting code that also needs to iterate the stream to emit the XADD
|
||||||
|
* commands. */
|
||||||
|
typedef struct streamIterator {
|
||||||
|
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
|
||||||
|
uint64_t end_key[2]; /* End key as 128 bit big endian. */
|
||||||
|
raxIterator ri; /* Rax iterator. */
|
||||||
|
unsigned char *lp; /* Current listpack. */
|
||||||
|
unsigned char *lp_ele; /* Current listpack cursor. */
|
||||||
|
} streamIterator;
|
||||||
|
|
||||||
|
/* Initialize the stream iterator, so that we can call iterating functions
|
||||||
|
* to get the next items. This requires a corresponding streamIteratorStop()
|
||||||
|
* at the end.
|
||||||
|
*
|
||||||
|
* Once the iterator is initalized, we iterate like this:
|
||||||
|
*
|
||||||
|
* streamIterator myiterator;
|
||||||
|
* streamIteratorStart(&myiterator,...);
|
||||||
|
* size_t numfields;
|
||||||
|
* while(streamIteratorGetID(&myitereator,&ID,&numfields)) {
|
||||||
|
* while(numfields--) {
|
||||||
|
* unsigned char *key, *value;
|
||||||
|
* size_t key_len, value_len;
|
||||||
|
* streamIteratorGetField(&myiterator,&key,&value,&key_len,&value_len);
|
||||||
|
*
|
||||||
|
* ... do what you want with key and value ...
|
||||||
|
* }
|
||||||
|
* }
|
||||||
|
* streamIteratorStop(&myiterator); */
|
||||||
|
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end) {
|
||||||
|
/* Intialize the iterator and translates the iteration start/stop
|
||||||
|
* elements into a 128 big big-endian number. */
|
||||||
|
streamEncodeID(si->start_key,start);
|
||||||
|
if (end) {
|
||||||
|
streamEncodeID(si->end_key,end);
|
||||||
|
} else {
|
||||||
|
/* We assume that UINT64_MAX is the same in little and big
|
||||||
|
* endian, that is, all bits set. */
|
||||||
|
si->end_key[0] = UINT64_MAX;
|
||||||
|
si->end_key[0] = UINT64_MAX;
|
||||||
|
}
|
||||||
|
raxStart(&si->ri,s->rax);
|
||||||
|
|
||||||
|
/* Seek the correct node in the radix tree. */
|
||||||
|
if (start->ms || start->seq) {
|
||||||
|
raxSeek(&si->ri,"<=",(unsigned char*)si->start_key,
|
||||||
|
sizeof(si->start_key));
|
||||||
|
if (raxEOF(&si->ri))
|
||||||
|
raxSeek(&si->ri,">",(unsigned char*)si->start_key,
|
||||||
|
sizeof(si->start_key));
|
||||||
|
} else {
|
||||||
|
raxSeek(&si->ri,"^",NULL,0);
|
||||||
|
}
|
||||||
|
si->lp = NULL; /* There is no current listpack right now. */
|
||||||
|
si->lp_ele = NULL; /* Current listpack cursor. */
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Return 1 and store the current item ID at 'id' if there are still
|
||||||
|
* elements within the iteration range, otherwise return 0 in order to
|
||||||
|
* signal the iteration terminated. */
|
||||||
|
int streamIteratorGetID(streamIterator *si, streamID *id, size_t *numfields) {
|
||||||
|
while(1) { /* Will stop when element > stop_key or end of radix tree. */
|
||||||
|
/* If the current listpack is set to NULL, this is the start of the
|
||||||
|
* iteration or the previous listpack was completely iterated.
|
||||||
|
* Go to the next node. */
|
||||||
|
if (si->lp == NULL || si->lp_ele == NULL) {
|
||||||
|
if (!raxNext(&si->ri)) return 0;
|
||||||
|
serverAssert(si->ri.key_len == sizeof(streamID));
|
||||||
|
si->lp = si->ri.data;
|
||||||
|
si->lp_ele = lpFirst(si->lp);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* For every radix tree node, iterate the corresponding listpack,
|
||||||
|
* returning elements when they are within range. */
|
||||||
|
while(si->lp_ele) {
|
||||||
|
int64_t e_len;
|
||||||
|
unsigned char buf[LP_INTBUF_SIZE];
|
||||||
|
unsigned char *e = lpGet(si->lp_ele,&e_len,buf);
|
||||||
|
serverAssert(e_len == sizeof(streamID));
|
||||||
|
|
||||||
|
/* Go to next field: number of elements. */
|
||||||
|
si->lp_ele = lpNext(si->lp,si->lp_ele);
|
||||||
|
|
||||||
|
/* If current >= start */
|
||||||
|
if (memcmp(e,si->start_key,sizeof(streamID)) >= 0) {
|
||||||
|
if (memcmp(e,si->end_key,sizeof(streamID)) > 0)
|
||||||
|
return 0; /* We are already out of range. */
|
||||||
|
streamDecodeID(e,id);
|
||||||
|
*numfields = lpGetInteger(si->lp_ele);
|
||||||
|
return 1; /* Valid item returned. */
|
||||||
|
} else {
|
||||||
|
/* If we do not emit, we have to discard. */
|
||||||
|
int64_t numfields = lpGetInteger(si->lp_ele);
|
||||||
|
si->lp_ele = lpNext(si->lp,si->lp_ele);
|
||||||
|
for (int64_t i = 0; i < numfields*2; i++)
|
||||||
|
si->lp_ele = lpNext(si->lp,si->lp_ele);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* End of listpack reached. Try the next radix tree node. */
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Stop the stream iterator. The only cleanup we need is to free the rax
|
||||||
|
* itereator, since the stream iterator itself is supposed to be stack
|
||||||
|
* allocated. */
|
||||||
|
void streamIteratorStop(streamIterator *si) {
|
||||||
|
raxStop(&si->ri);
|
||||||
|
}
|
||||||
|
|
||||||
/* Send the specified range to the client 'c'. The range the client will
|
/* Send the specified range to the client 'c'. The range the client will
|
||||||
* receive is between start and end inclusive, if 'count' is non zero, no more
|
* receive is between start and end inclusive, if 'count' is non zero, no more
|
||||||
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
|
* than 'count' elemnets are sent. The 'end' pointer can be NULL to mean that
|
||||||
|
Loading…
x
Reference in New Issue
Block a user