Merge pull request #6703 from guybe7/blocking_xread_empty_reply
Blocking XREAD[GROUP] should always reply with valid data (or timeout)
This commit is contained in:
commit
bb93686754
@ -797,6 +797,16 @@ int streamDeleteItem(stream *s, streamID *id) {
|
|||||||
return deleted;
|
return deleted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Get the last valid (non-tombstone) streamID of 's'. */
|
||||||
|
void streamLastValidID(stream *s, streamID *maxid)
|
||||||
|
{
|
||||||
|
streamIterator si;
|
||||||
|
streamIteratorStart(&si,s,NULL,NULL,1);
|
||||||
|
int64_t numfields;
|
||||||
|
streamIteratorGetID(&si,maxid,&numfields);
|
||||||
|
streamIteratorStop(&si);
|
||||||
|
}
|
||||||
|
|
||||||
/* Emit a reply in the client output buffer by formatting a Stream ID
|
/* Emit a reply in the client output buffer by formatting a Stream ID
|
||||||
* in the standard <ms>-<seq> format, using the simple string protocol
|
* in the standard <ms>-<seq> format, using the simple string protocol
|
||||||
* of REPL. */
|
* of REPL. */
|
||||||
@ -1506,20 +1516,23 @@ void xreadCommand(client *c) {
|
|||||||
{
|
{
|
||||||
serve_synchronously = 1;
|
serve_synchronously = 1;
|
||||||
serve_history = 1;
|
serve_history = 1;
|
||||||
} else {
|
} else if (s->length) {
|
||||||
/* We also want to serve a consumer in a consumer group
|
/* We also want to serve a consumer in a consumer group
|
||||||
* synchronously in case the group top item delivered is smaller
|
* synchronously in case the group top item delivered is smaller
|
||||||
* than what the stream has inside. */
|
* than what the stream has inside. */
|
||||||
streamID *last = &groups[i]->last_id;
|
streamID maxid, *last = &groups[i]->last_id;
|
||||||
if (s->length && (streamCompareID(&s->last_id, last) > 0)) {
|
streamLastValidID(s, &maxid);
|
||||||
|
if (streamCompareID(&maxid, last) > 0) {
|
||||||
serve_synchronously = 1;
|
serve_synchronously = 1;
|
||||||
*gt = *last;
|
*gt = *last;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else if (s->length) {
|
||||||
/* For consumers without a group, we serve synchronously if we can
|
/* For consumers without a group, we serve synchronously if we can
|
||||||
* actually provide at least one item from the stream. */
|
* actually provide at least one item from the stream. */
|
||||||
if (s->length && (streamCompareID(&s->last_id, gt) > 0)) {
|
streamID maxid;
|
||||||
|
streamLastValidID(s, &maxid);
|
||||||
|
if (streamCompareID(&maxid, gt) > 0) {
|
||||||
serve_synchronously = 1;
|
serve_synchronously = 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1871,11 +1884,7 @@ void xsetidCommand(client *c) {
|
|||||||
* item, otherwise the fundamental ID monotonicity assumption is violated. */
|
* item, otherwise the fundamental ID monotonicity assumption is violated. */
|
||||||
if (s->length > 0) {
|
if (s->length > 0) {
|
||||||
streamID maxid;
|
streamID maxid;
|
||||||
streamIterator si;
|
streamLastValidID(s,&maxid);
|
||||||
streamIteratorStart(&si,s,NULL,NULL,1);
|
|
||||||
int64_t numfields;
|
|
||||||
streamIteratorGetID(&si,&maxid,&numfields);
|
|
||||||
streamIteratorStop(&si);
|
|
||||||
|
|
||||||
if (streamCompareID(&id,&maxid) < 0) {
|
if (streamCompareID(&id,&maxid) < 0) {
|
||||||
addReplyError(c,"The ID specified in XSETID is smaller than the "
|
addReplyError(c,"The ID specified in XSETID is smaller than the "
|
||||||
|
@ -147,6 +147,20 @@ start_server {
|
|||||||
assert {[lindex $res 0 1 1] == {2-0 {field1 B}}}
|
assert {[lindex $res 0 1 1] == {2-0 {field1 B}}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {Blocking XREADGROUP will not reply with an empty array} {
|
||||||
|
r del mystream
|
||||||
|
r XGROUP CREATE mystream mygroup $ MKSTREAM
|
||||||
|
r XADD mystream 666 f v
|
||||||
|
set res [r XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"]
|
||||||
|
assert {[lindex $res 0 1 0] == {666-0 {f v}}}
|
||||||
|
r XADD mystream 667 f2 v2
|
||||||
|
r XDEL mystream 667
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
$rd XREADGROUP GROUP mygroup Alice BLOCK 10 STREAMS mystream ">"
|
||||||
|
after 20
|
||||||
|
assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {mystream {}}
|
||||||
|
}
|
||||||
|
|
||||||
test {XCLAIM can claim PEL items from another consumer} {
|
test {XCLAIM can claim PEL items from another consumer} {
|
||||||
# Add 3 items into the stream, and create a consumer group
|
# Add 3 items into the stream, and create a consumer group
|
||||||
r del mystream
|
r del mystream
|
||||||
|
@ -191,6 +191,17 @@ start_server {
|
|||||||
assert {[lindex $res 0 1 0 1] eq {old abcd1234}}
|
assert {[lindex $res 0 1 0 1] eq {old abcd1234}}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test {Blocking XREAD will not reply with an empty array} {
|
||||||
|
r del s1
|
||||||
|
r XADD s1 666 f v
|
||||||
|
r XADD s1 667 f2 v2
|
||||||
|
r XDEL s1 667
|
||||||
|
set rd [redis_deferring_client]
|
||||||
|
$rd XREAD BLOCK 10 STREAMS s1 666
|
||||||
|
after 20
|
||||||
|
assert {[$rd read] == {}} ;# before the fix, client didn't even block, but was served synchronously with {s1 {}}
|
||||||
|
}
|
||||||
|
|
||||||
test "XREAD: XADD + DEL should not awake client" {
|
test "XREAD: XADD + DEL should not awake client" {
|
||||||
set rd [redis_deferring_client]
|
set rd [redis_deferring_client]
|
||||||
r del s1
|
r del s1
|
||||||
|
Loading…
x
Reference in New Issue
Block a user