diff --git a/src/blocked.c b/src/blocked.c index 1b3a804b1..a7548ce98 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -564,7 +564,10 @@ static void handleClientsBlockedOnKey(readyList *rl) { listIter li; listRewind(clients,&li); - while((ln = listNext(&li))) { + /* Avoid processing more than the initial count so that we're not stuck + * in an endless loop in case the reprocessing of the command blocks again. */ + long count = listLength(clients); + while ((ln = listNext(&li)) && count--) { client *receiver = listNodeValue(ln); robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NOEFFECTS); /* 1. In case new key was added/touched we need to verify it satisfy the diff --git a/tests/unit/type/stream-cgroups.tcl b/tests/unit/type/stream-cgroups.tcl index bb1cd133e..a6cc5da7d 100644 --- a/tests/unit/type/stream-cgroups.tcl +++ b/tests/unit/type/stream-cgroups.tcl @@ -474,7 +474,30 @@ start_server { $rd close } - + + test {Blocking XREADGROUP for stream key that has clients blocked on list - avoid endless loop} { + r DEL mystream + r XGROUP CREATE mystream mygroup $ MKSTREAM + + set rd1 [redis_deferring_client] + set rd2 [redis_deferring_client] + set rd3 [redis_deferring_client] + + $rd1 xreadgroup GROUP mygroup myuser COUNT 10 BLOCK 10000 STREAMS mystream > + $rd2 xreadgroup GROUP mygroup myuser COUNT 10 BLOCK 10000 STREAMS mystream > + $rd3 xreadgroup GROUP mygroup myuser COUNT 10 BLOCK 10000 STREAMS mystream > + + wait_for_blocked_clients_count 3 + + r xadd mystream MAXLEN 5000 * field1 value1 field2 value2 field3 value3 + + $rd1 close + $rd2 close + $rd3 close + + assert_equal [r ping] {PONG} + } + test {XGROUP DESTROY should unblock XREADGROUP with -NOGROUP} { r config resetstat r del mystream