diff --git a/src/t_stream.c b/src/t_stream.c index 485ea29aa..0820a7438 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -398,10 +398,10 @@ void xlenCommand(client *c) { * [RETRY ] STREAMS key_1 ID_1 key_2 ID_2 ... * key_N ID_N */ void xreadCommand(client *c) { - long long block = 0; + long long timeout = 0; long long count = 0; int streams_count = 0; - int streams_argc = 0; + int streams_arg = 0; #define STREAMID_STATIC_VECTOR_LEN 8 streamID static_ids[STREAMID_STATIC_VECTOR_LEN]; streamID *ids = static_ids; @@ -412,17 +412,17 @@ void xreadCommand(client *c) { char *o = c->argv[i]->ptr; if (!strcasecmp(o,"BLOCK") && moreargs) { i++; - if (getLongLongFromObjectOrReply(c,c->argv[i],&block,NULL) != C_OK) - return; - if (block < 0) block = 0; + if (getLongLongFromObjectOrReply(c,c->argv[i],&timeout,NULL) + != C_OK) return; + if (timeout < 0) timeout = 0; } else if (!strcasecmp(o,"COUNT") && moreargs) { i++; if (getLongLongFromObjectOrReply(c,c->argv[i],&count,NULL) != C_OK) return; if (count < 0) count = 0; } else if (!strcasecmp(o,"STREAMS") && moreargs) { - streams_argc = i+1; - streams_count = (c->argc-streams_argc); + streams_arg = i+1; + streams_count = (c->argc-streams_arg); if ((streams_count % 2) != 0) { addReplyError(c,"Unbalanced XREAD list of streams: " "for each stream key an ID or '$' must be " @@ -438,7 +438,7 @@ void xreadCommand(client *c) { } /* STREAMS option is mandatory. */ - if (streams_argc == 0) { + if (streams_arg == 0) { addReply(c,shared.syntaxerr); return; } @@ -447,8 +447,7 @@ void xreadCommand(client *c) { if (streams_count > STREAMID_STATIC_VECTOR_LEN) ids = zmalloc(sizeof(streamID)*streams_count); - /* Try to serve the client synchronously. */ - for (int i = streams_argc + streams_count; i < c->argc; i++) { + for (int i = streams_arg + streams_count; i < c->argc; i++) { /* Specifying "$" as last-known-id means that the client wants to be * served with just the messages that will arrive into the stream * starting from now. */ @@ -466,6 +465,43 @@ void xreadCommand(client *c) { if (streamParseIDOrReply(c,c->argv[i],ids+i,0) != C_OK) goto cleanup; } + /* Try to serve the client synchronously. */ + for (int i = 0; i < streams_count; i++) { + robj *o = lookupKeyRead(c->db,c->argv[i+streams_arg]); + if (o == NULL) continue; + stream *s = o->ptr; + streamID *gt = ids+i; /* ID must be greater than this. */ + if (s->last_id.ms > gt->ms || + (s->last_id.ms == gt->ms && s->last_id.seq > gt->seq)) + { + /* streamReplyWithRange() handles the 'start' ID as inclusive, + * so start from the next ID, since we want only messages with + * IDs greater than start. */ + streamID start = *gt; + start.seq++; /* Can't overflow, it's an uint64_t */ + streamReplyWithRange(c,s,&start,NULL,count); + goto cleanup; + } + } + + /* Block if needed. */ + if (timeout) { + /* If we are inside a MULTI/EXEC and the list is empty the only thing + * we can do is treating it as a timeout (even with timeout 0). */ + if (c->flags & CLIENT_MULTI) { + addReply(c,shared.nullmultibulk); + goto cleanup; + } + blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, + timeout, NULL, ids); + goto cleanup; + } + + /* No BLOCK option, nor any stream we can serve. Reply as with a + * timeout happened. */ + addReply(c,shared.nullmultibulk); + /* Continue to cleanup... */ + cleanup: /* Cleanup. */ if (ids != static_ids) zfree(ids);