diff --git a/src/t_stream.c b/src/t_stream.c index 4f9b0eb4b..ee37f9780 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2296,6 +2296,28 @@ void xreadCommand(client *c) { ids[id_idx].seq = 0; } continue; + } else if (strcmp(c->argv[i]->ptr,"+") == 0) { + if (xreadgroup) { + addReplyError(c,"The + ID is meaningless in the context of " + "XREADGROUP: you want to read the history of " + "this consumer by specifying a proper ID, or " + "use the > ID to get new messages. The + ID would " + "just return an empty result set."); + goto cleanup; + } + if (o) { + stream *s = o->ptr; + ids[id_idx] = s->last_id; + if (streamDecrID(&ids[id_idx]) != C_OK) { + /* shouldn't happen */ + addReplyError(c,"the stream last element ID is 0-0"); + goto cleanup; + } + } else { + ids[id_idx].ms = 0; + ids[id_idx].seq = 0; + } + continue; } else if (strcmp(c->argv[i]->ptr,">") == 0) { if (!xreadgroup) { addReplyError(c,"The > ID can be specified only when calling " diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 3081c40d1..06f58c8a2 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -394,6 +394,122 @@ start_server { $rd close } + test {XREAD last element from non-empty stream} { + # should return last entry + + # add 3 entries to a stream + r DEL lestream + r XADD lestream 1-0 k1 v1 + r XADD lestream 2-0 k2 v2 + r XADD lestream 3-0 k3 v3 + + # read the last entry + set res [r XREAD STREAMS lestream +] + + # verify it's the last entry + assert_equal $res {{lestream {{3-0 {k3 v3}}}}} + + # two more entries, with MAX_UINT64 for sequence number for the last one + r XADD lestream 3-18446744073709551614 k4 v4 + r XADD lestream 3-18446744073709551615 k5 v5 + + # read the new last entry + set res [r XREAD STREAMS lestream +] + + # verify it's the last entry + assert_equal $res {{lestream {{3-18446744073709551615 {k5 v5}}}}} + } + + test {XREAD last element from empty stream} { + # should return nil + + # make sure the stream is empty + r DEL lestream + + # read last entry and verify nil is received + assert_equal [r XREAD STREAMS lestream +] {} + + # add an element to the stream, than delete it + r XADD lestream 1-0 k1 v1 + r XDEL lestream 1-0 + + # verify nil is still received when reading last entry + assert_equal [r XREAD STREAMS lestream +] {} + } + + test {XREAD last element blocking from empty stream} { + # should block until a new entry is available + + # make sure there is no stream + r DEL lestream + + # read last entry from stream, blocking + set rd [redis_deferring_client] + $rd XREAD BLOCK 20000 STREAMS lestream + + wait_for_blocked_client + + # add an entry to the stream + r XADD lestream 1-0 k1 v1 + + # read and verify result + set res [$rd read] + assert_equal $res {{lestream {{1-0 {k1 v1}}}}} + $rd close + } + + test {XREAD last element blocking from non-empty stream} { + # should return last element immediately, w/o blocking + + # add 3 entries to a stream + r DEL lestream + r XADD lestream 1-0 k1 v1 + r XADD lestream 2-0 k2 v2 + r XADD lestream 3-0 k3 v3 + + # read the last entry + set res [r XREAD BLOCK 1000000 STREAMS lestream +] + + # verify it's the last entry + assert_equal $res {{lestream {{3-0 {k3 v3}}}}} + } + + test {XREAD last element from multiple streams} { + # should return last element only from non-empty streams + + # add 3 entries to one stream + r DEL "\{lestream\}1" + r XADD "\{lestream\}1" 1-0 k1 v1 + r XADD "\{lestream\}1" 2-0 k2 v2 + r XADD "\{lestream\}1" 3-0 k3 v3 + + # add 3 entries to another stream + r DEL "\{lestream\}2" + r XADD "\{lestream\}2" 1-0 k1 v4 + r XADD "\{lestream\}2" 2-0 k2 v5 + r XADD "\{lestream\}2" 3-0 k3 v6 + + # read last element from 3 streams (2 with enetries, 1 non-existent) + # verify the last element from the two existing streams were returned + set res [r XREAD STREAMS "\{lestream\}1" "\{lestream\}2" "\{lestream\}3" + + +] + assert_equal $res {{{{lestream}1} {{3-0 {k3 v3}}}} {{{lestream}2} {{3-0 {k3 v6}}}}} + } + + test {XREAD last element with count > 1} { + # Should return only the last element - count has no affect here + + # add 3 entries to a stream + r DEL lestream + r XADD lestream 1-0 k1 v1 + r XADD lestream 2-0 k2 v2 + r XADD lestream 3-0 k3 v3 + + # read the last entry + set res [r XREAD COUNT 3 STREAMS lestream +] + + # verify only last entry was read, even though COUNT > 1 + assert_equal $res {{lestream {{3-0 {k3 v3}}}}} + } + test "XREAD: XADD + DEL should not awake client" { set rd [redis_deferring_client] r del s1