diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index 1cdf87984..ae8da27b8 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -205,11 +205,87 @@ start_server { $rd close } + test {Blocking XREADGROUP for stream that ran dry (issue #5299)} { + set rd [redis_deferring_client] + + # Add a entry then delete it, now stream's last_id is 666. + r DEL mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream 666 key value + r XDEL mystream 666 + + # Pass a special `>` ID but without new entry, released on timeout. + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 10 STREAMS mystream > + assert_equal [$rd read] {} + + # Throw an error if the ID equal or smaller than the last_id. + assert_error ERR*equal*smaller* {r XADD mystream 665 key value} + assert_error ERR*equal*smaller* {r XADD mystream 666 key value} + + # Entered blocking state and then release because of the new entry. + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream > + wait_for_blocked_clients_count 1 + r XADD mystream 667 key value + assert_equal [$rd read] {{mystream {{667-0 {key value}}}}} + + $rd close + } + + test "Blocking XREADGROUP will ignore BLOCK if ID is not >" { + set rd [redis_deferring_client] + + # Add a entry then delete it, now stream's last_id is 666. + r DEL mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + r XADD mystream 666 key value + r XDEL mystream 666 + + # Return right away instead of blocking, return the stream with an + # empty list instead of NIL if the ID specified is not the special `>` ID. + foreach id {0 600 666 700} { + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id + assert_equal [$rd read] {{mystream {}}} + } + + # After adding a new entry, `XREADGROUP BLOCK` still return the stream + # with an empty list because the pending list is empty. + r XADD mystream 667 key value + foreach id {0 600 666 667 700} { + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id + assert_equal [$rd read] {{mystream {}}} + } + + # After we read it once, the pending list is not empty at this time, + # pass any ID smaller than 667 will return one of the pending entry. + set res [r XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream >] + assert_equal $res {{mystream {{667-0 {key value}}}}} + foreach id {0 600 666} { + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id + assert_equal [$rd read] {{mystream {{667-0 {key value}}}}} + } + + # Pass ID equal or greater than 667 will return the stream with an empty list. + foreach id {667 700} { + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id + assert_equal [$rd read] {{mystream {}}} + } + + # After we ACK the pending entry, return the stream with an empty list. + r XACK mystream mygroup 667 + foreach id {0 600 666 667 700} { + $rd XREADGROUP GROUP mygroup myconsumer BLOCK 0 STREAMS mystream $id + assert_equal [$rd read] {{mystream {}}} + } + + $rd close + } + test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} { r del mystream r XGROUP CREATE mystream mygroup $ MKSTREAM set rd [redis_deferring_client] - $rd XREADGROUP GROUP mygroup Alice BLOCK 100 STREAMS mystream ">" + $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">" + wait_for_blocked_clients_count 1 r XGROUP DESTROY mystream mygroup assert_error "*NOGROUP*" {$rd read} $rd close @@ -220,6 +296,7 @@ start_server { r XGROUP CREATE mystream{t} mygroup $ MKSTREAM set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream{t} ">" + wait_for_blocked_clients_count 1 r XGROUP CREATE mystream2{t} mygroup $ MKSTREAM r XADD mystream2{t} 100 f1 v1 r RENAME mystream2{t} mystream{t} @@ -232,6 +309,7 @@ start_server { r XGROUP CREATE mystream{t} mygroup $ MKSTREAM set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream{t} ">" + wait_for_blocked_clients_count 1 r XADD mystream2{t} 100 f1 v1 r RENAME mystream2{t} mystream{t} assert_error "*NOGROUP*" {$rd read} ;# mystream2{t} didn't have mygroup before RENAME @@ -560,6 +638,7 @@ start_server { r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">" set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">" + wait_for_blocked_clients_count 1 r XADD mystream * f2 v2 set grpinfo [r xinfo groups mystream] @@ -580,6 +659,7 @@ start_server { r XREADGROUP GROUP mygroup Alice NOACK STREAMS mystream ">" set rd [redis_deferring_client] $rd XREADGROUP GROUP mygroup Bob BLOCK 0 NOACK STREAMS mystream ">" + wait_for_blocked_clients_count 1 r XGROUP CREATECONSUMER mystream mygroup Charlie set grpinfo [lindex [r xinfo groups mystream] 0] diff --git a/tests/unit/type/stream.tcl b/tests/unit/type/stream.tcl index 925d41501..474fe0e18 100644 --- a/tests/unit/type/stream.tcl +++ b/tests/unit/type/stream.tcl @@ -354,6 +354,31 @@ start_server { $rd close } + test "Blocking XREAD for stream that ran dry (issue #5299)" { + set rd [redis_deferring_client] + + # Add a entry then delete it, now stream's last_id is 666. + r DEL mystream + r XADD mystream 666 key value + r XDEL mystream 666 + + # Pass a ID smaller than stream's last_id, released on timeout. + $rd XREAD BLOCK 10 STREAMS mystream 665 + assert_equal [$rd read] {} + + # Throw an error if the ID equal or smaller than the last_id. + assert_error ERR*equal*smaller* {r XADD mystream 665 key value} + assert_error ERR*equal*smaller* {r XADD mystream 666 key value} + + # Entered blocking state and then release because of the new entry. + $rd XREAD BLOCK 0 STREAMS mystream 665 + wait_for_blocked_clients_count 1 + r XADD mystream 667 key value + assert_equal [$rd read] {{mystream {{667-0 {key value}}}}} + + $rd close + } + test "XREAD: XADD + DEL should not awake client" { set rd [redis_deferring_client] r del s1