Streams: XREAD, first draft. Handling of blocked clients still missing.
This commit is contained in:
parent
7ccc80d503
commit
004a8b8712
@ -398,10 +398,10 @@ void xlenCommand(client *c) {
|
|||||||
* [RETRY <milliseconds> <ttl>] STREAMS key_1 ID_1 key_2 ID_2 ...
|
* [RETRY <milliseconds> <ttl>] STREAMS key_1 ID_1 key_2 ID_2 ...
|
||||||
* key_N ID_N */
|
* key_N ID_N */
|
||||||
void xreadCommand(client *c) {
|
void xreadCommand(client *c) {
|
||||||
long long block = 0;
|
long long timeout = 0;
|
||||||
long long count = 0;
|
long long count = 0;
|
||||||
int streams_count = 0;
|
int streams_count = 0;
|
||||||
int streams_argc = 0;
|
int streams_arg = 0;
|
||||||
#define STREAMID_STATIC_VECTOR_LEN 8
|
#define STREAMID_STATIC_VECTOR_LEN 8
|
||||||
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
|
streamID static_ids[STREAMID_STATIC_VECTOR_LEN];
|
||||||
streamID *ids = static_ids;
|
streamID *ids = static_ids;
|
||||||
@ -412,17 +412,17 @@ void xreadCommand(client *c) {
|
|||||||
char *o = c->argv[i]->ptr;
|
char *o = c->argv[i]->ptr;
|
||||||
if (!strcasecmp(o,"BLOCK") && moreargs) {
|
if (!strcasecmp(o,"BLOCK") && moreargs) {
|
||||||
i++;
|
i++;
|
||||||
if (getLongLongFromObjectOrReply(c,c->argv[i],&block,NULL) != C_OK)
|
if (getLongLongFromObjectOrReply(c,c->argv[i],&timeout,NULL)
|
||||||
return;
|
!= C_OK) return;
|
||||||
if (block < 0) block = 0;
|
if (timeout < 0) timeout = 0;
|
||||||
} else if (!strcasecmp(o,"COUNT") && moreargs) {
|
} else if (!strcasecmp(o,"COUNT") && moreargs) {
|
||||||
i++;
|
i++;
|
||||||
if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
|
if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK)
|
||||||
return;
|
return;
|
||||||
if (count < 0) count = 0;
|
if (count < 0) count = 0;
|
||||||
} else if (!strcasecmp(o,"STREAMS") && moreargs) {
|
} else if (!strcasecmp(o,"STREAMS") && moreargs) {
|
||||||
streams_argc = i+1;
|
streams_arg = i+1;
|
||||||
streams_count = (c->argc-streams_argc);
|
streams_count = (c->argc-streams_arg);
|
||||||
if ((streams_count % 2) != 0) {
|
if ((streams_count % 2) != 0) {
|
||||||
addReplyError(c,"Unbalanced XREAD list of streams: "
|
addReplyError(c,"Unbalanced XREAD list of streams: "
|
||||||
"for each stream key an ID or '$' must be "
|
"for each stream key an ID or '$' must be "
|
||||||
@ -438,7 +438,7 @@ void xreadCommand(client *c) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* STREAMS option is mandatory. */
|
/* STREAMS option is mandatory. */
|
||||||
if (streams_argc == 0) {
|
if (streams_arg == 0) {
|
||||||
addReply(c,shared.syntaxerr);
|
addReply(c,shared.syntaxerr);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -447,8 +447,7 @@ void xreadCommand(client *c) {
|
|||||||
if (streams_count > STREAMID_STATIC_VECTOR_LEN)
|
if (streams_count > STREAMID_STATIC_VECTOR_LEN)
|
||||||
ids = zmalloc(sizeof(streamID)*streams_count);
|
ids = zmalloc(sizeof(streamID)*streams_count);
|
||||||
|
|
||||||
/* Try to serve the client synchronously. */
|
for (int i = streams_arg + streams_count; i < c->argc; i++) {
|
||||||
for (int i = streams_argc + streams_count; i < c->argc; i++) {
|
|
||||||
/* Specifying "$" as last-known-id means that the client wants to be
|
/* Specifying "$" as last-known-id means that the client wants to be
|
||||||
* served with just the messages that will arrive into the stream
|
* served with just the messages that will arrive into the stream
|
||||||
* starting from now. */
|
* starting from now. */
|
||||||
@ -466,6 +465,43 @@ void xreadCommand(client *c) {
|
|||||||
if (streamParseIDOrReply(c,c->argv[i],ids+i,0) != C_OK) goto cleanup;
|
if (streamParseIDOrReply(c,c->argv[i],ids+i,0) != C_OK) goto cleanup;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Try to serve the client synchronously. */
|
||||||
|
for (int i = 0; i < streams_count; i++) {
|
||||||
|
robj *o = lookupKeyRead(c->db,c->argv[i+streams_arg]);
|
||||||
|
if (o == NULL) continue;
|
||||||
|
stream *s = o->ptr;
|
||||||
|
streamID *gt = ids+i; /* ID must be greater than this. */
|
||||||
|
if (s->last_id.ms > gt->ms ||
|
||||||
|
(s->last_id.ms == gt->ms && s->last_id.seq > gt->seq))
|
||||||
|
{
|
||||||
|
/* streamReplyWithRange() handles the 'start' ID as inclusive,
|
||||||
|
* so start from the next ID, since we want only messages with
|
||||||
|
* IDs greater than start. */
|
||||||
|
streamID start = *gt;
|
||||||
|
start.seq++; /* Can't overflow, it's an uint64_t */
|
||||||
|
streamReplyWithRange(c,s,&start,NULL,count);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Block if needed. */
|
||||||
|
if (timeout) {
|
||||||
|
/* If we are inside a MULTI/EXEC and the list is empty the only thing
|
||||||
|
* we can do is treating it as a timeout (even with timeout 0). */
|
||||||
|
if (c->flags & CLIENT_MULTI) {
|
||||||
|
addReply(c,shared.nullmultibulk);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
|
||||||
|
timeout, NULL, ids);
|
||||||
|
goto cleanup;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* No BLOCK option, nor any stream we can serve. Reply as with a
|
||||||
|
* timeout happened. */
|
||||||
|
addReply(c,shared.nullmultibulk);
|
||||||
|
/* Continue to cleanup... */
|
||||||
|
|
||||||
cleanup:
|
cleanup:
|
||||||
/* Cleanup. */
|
/* Cleanup. */
|
||||||
if (ids != static_ids) zfree(ids);
|
if (ids != static_ids) zfree(ids);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user