XREADGROUP: Unblock client if stream is deleted (#10306)

Deleting a stream while a client is blocked XREADGROUP should unblock the client.

The idea is that if a client is blocked via XREADGROUP is different from
any other blocking type in the sense that it depends on the existence of both
the key and the group. Even if the key is deleted and then revived with XADD
it won't help any clients blocked on XREADGROUP because the group no longer
exist, so they would fail with -NOGROUP anyway.
The conclusion is that it's better to unblock these clients (with error) upon
the deletion of the key, rather than waiting for the first XADD. 

Other changes:
1. Slightly optimize all `serveClientsBlockedOn*` functions by checking `server.blocked_clients_by_type`
2. All `serveClientsBlockedOn*` functions now use a list iterator rather than looking at `listFirst`, relying
  on `unblockClient` to delete the head of the list. Before this commit, only `serveClientsBlockedOnStreams`
  used to work like that.
3. bugfix: CLIENT UNBLOCK ERROR should work even if the command doesn't have a timeout_callback
  (only relevant to module commands)
This commit is contained in:
guybe7 2022-03-08 17:10:36 +02:00 committed by GitHub
parent 728e62523e
commit 2a2954086a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 295 additions and 52 deletions

View File

@ -288,25 +288,24 @@ void disconnectAllBlockedClients(void) {
* when there may be clients blocked on a list key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
/* Optimization: If no clients are in type BLOCKED_LIST,
* we can skip this loop. */
if (!server.blocked_clients_by_type[BLOCKED_LIST]) return;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
int deleted = 0;
listNode *ln;
listIter li;
listRewind(clients,&li);
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
if (receiver->btype != BLOCKED_LIST) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
listRotateHeadToTail(clients);
continue;
}
while((ln = listNext(&li))) {
client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_LIST) continue;
int deleted = 0;
robj *dstkey = receiver->bpop.target;
int wherefrom = receiver->bpop.blockpos.wherefrom;
int whereto = receiver->bpop.blockpos.whereto;
@ -342,25 +341,24 @@ void serveClientsBlockedOnListKey(robj *o, readyList *rl) {
* when there may be clients blocked on a sorted set key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
/* Optimization: If no clients are in type BLOCKED_ZSET,
* we can skip this loop. */
if (!server.blocked_clients_by_type[BLOCKED_ZSET]) return;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
int deleted = 0;
listNode *ln;
listIter li;
listRewind(clients,&li);
while (numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
if (receiver->btype != BLOCKED_ZSET) {
/* Put at the tail, so that at the next call
* we'll not run into it again. */
listRotateHeadToTail(clients);
continue;
}
while((ln = listNext(&li))) {
client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_ZSET) continue;
int deleted = 0;
long llen = zsetLength(o);
long count = receiver->bpop.count;
int where = receiver->bpop.blockpos.wherefrom;
@ -407,6 +405,10 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
* when there may be clients blocked on a stream key, and there may be new
* data to fetch (the key is ready). */
void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
/* Optimization: If no clients are in type BLOCKED_STREAM,
* we can skip this loop. */
if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
stream *s = o->ptr;
@ -520,30 +522,21 @@ unblock_receiver:
* see if the key is really able to serve the client, and in that case,
* unblock it. */
void serveClientsBlockedOnKeyByModule(readyList *rl) {
dictEntry *de;
/* Optimization: If no clients are in type BLOCKED_MODULE,
* we can skip this loop. */
if (!server.blocked_clients_by_type[BLOCKED_MODULE]) return;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
de = dictFind(rl->db->blocking_keys,rl->key);
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
int numclients = listLength(clients);
while(numclients--) {
listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value;
/* Put at the tail, so that at the next call
* we'll not run into it again: clients here may not be
* ready to be served, so they'll remain in the list
* sometimes. We want also be able to skip clients that are
* not blocked for the MODULE type safely. */
listRotateHeadToTail(clients);
listNode *ln;
listIter li;
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_MODULE) continue;
/* Note that if *this* client cannot be served by this key,
@ -566,6 +559,49 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
}
}
/* Helper function for handleClientsBlockedOnKeys(). This function is called
* when there may be clients blocked, via XREADGROUP, on an existing stream which
* was deleted. We need to unblock the clients in that case.
* The idea is that a client that is blocked via XREADGROUP is different from
* any other blocking type in the sense that it depends on the existence of both
* the key and the group. Even if the key is deleted and then revived with XADD
* it won't help any clients blocked on XREADGROUP because the group no longer
* exist, so they would fail with -NOGROUP anyway.
* The conclusion is that it's better to unblock these client (with error) upon
* the deletion of the key, rather than waiting for the first XADD. */
void unblockDeletedStreamReadgroupClients(readyList *rl) {
/* Optimization: If no clients are in type BLOCKED_STREAM,
* we can skip this loop. */
if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
/* We serve clients in the same order they blocked for
* this key, from the first blocked to the last. */
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
if (de) {
list *clients = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(clients,&li);
while((ln = listNext(&li))) {
client *receiver = listNodeValue(ln);
if (receiver->btype != BLOCKED_STREAM || !receiver->bpop.xread_group)
continue;
long long prev_error_replies = server.stat_total_error_replies;
client *old_client = server.current_client;
server.current_client = receiver;
monotime replyTimer;
elapsedStart(&replyTimer);
addReplyError(receiver, "-UNBLOCKED the stream key no longer exists");
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
unblockClient(receiver);
afterCommand(receiver);
server.current_client = old_client;
}
}
}
/* This function should be called by Redis every time a single command,
* a MULTI/EXEC block, or a Lua script, terminated its execution after
* being called by a client. It handles serving clients blocked in
@ -624,17 +660,27 @@ void handleClientsBlockedOnKeys(void) {
/* Serve clients blocked on the key. */
robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS);
if (o != NULL) {
if (o->type == OBJ_LIST)
int objtype = o->type;
if (objtype == OBJ_LIST)
serveClientsBlockedOnListKey(o,rl);
else if (o->type == OBJ_ZSET)
else if (objtype == OBJ_ZSET)
serveClientsBlockedOnSortedSetKey(o,rl);
else if (o->type == OBJ_STREAM)
else if (objtype == OBJ_STREAM)
serveClientsBlockedOnStreamKey(o,rl);
/* We want to serve clients blocked on module keys
* regardless of the object type: we don't know what the
* module is trying to accomplish right now. */
serveClientsBlockedOnKeyByModule(rl);
/* If we have XREADGROUP clients blocked on this key, and
* the key is not a stream, it must mean that the key was
* overwritten by either SET or something like
* (MULTI, DEL key, SADD key e, EXEC).
* In this case we need to unblock all these clients. */
if (objtype != OBJ_STREAM)
unblockDeletedStreamReadgroupClients(rl);
} else {
/* Unblock all XREADGROUP clients of this deleted key */
unblockDeletedStreamReadgroupClients(rl);
/* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to
* take care of the propagation here, because afterCommand wasn't called */
if (server.also_propagate.numops > 0)
@ -823,4 +869,3 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) {
incrRefCount(key);
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}

View File

@ -218,9 +218,12 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
val->lru = old->lru;
}
/* Although the key is not really deleted from the database, we regard
overwrite as two steps of unlink+add, so we still need to call the unlink
callback of the module. */
* overwrite as two steps of unlink+add, so we still need to call the unlink
* callback of the module. */
moduleNotifyKeyUnlink(key,old,db->id);
/* We want to try to unblock any client using a blocking XREADGROUP */
if (old->type == OBJ_STREAM)
signalKeyAsReady(db,key,old->type);
dictSetVal(db->dict, de, val);
if (server.lazyfree_lazy_server_del) {
@ -311,6 +314,9 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) {
robj *val = dictGetVal(de);
/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val,db->id);
/* We want to try to unblock any client using a blocking XREADGROUP */
if (val->type == OBJ_STREAM)
signalKeyAsReady(db,key,val->type);
if (async) {
freeObjAsync(key, val, db->id);
dictSetVal(db->dict, de, NULL);
@ -551,6 +557,7 @@ void signalFlushedDb(int dbid, int async) {
}
for (int j = startdb; j <= enddb; j++) {
scanDatabaseForDeletedStreams(&server.db[j], NULL);
touchAllWatchedKeysInDb(&server.db[j], NULL);
}
@ -1311,7 +1318,7 @@ void copyCommand(client *c) {
* one or more blocked clients for B[LR]POP or other blocking commands
* and signal the keys as ready if they are of the right type. See the comment
* where the function is used for more info. */
void scanDatabaseForReadyLists(redisDb *db) {
void scanDatabaseForReadyKeys(redisDb *db) {
dictEntry *de;
dictIterator *di = dictGetSafeIterator(db->blocking_keys);
while((de = dictNext(di)) != NULL) {
@ -1325,6 +1332,39 @@ void scanDatabaseForReadyLists(redisDb *db) {
dictReleaseIterator(di);
}
/* Since we are unblocking XREADGROUP clients in the event the
* key was deleted/overwritten we must do the same in case the
* database was flushed/swapped. */
void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with) {
/* Optimization: If no clients are in type BLOCKED_STREAM,
* we can skip this loop. */
if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
dictEntry *de;
dictIterator *di = dictGetSafeIterator(emptied->blocking_keys);
while((de = dictNext(di)) != NULL) {
robj *key = dictGetKey(de);
int was_stream = 0, is_stream = 0;
dictEntry *kde = dictFind(emptied->dict, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
was_stream = value->type == OBJ_STREAM;
}
if (replaced_with) {
dictEntry *kde = dictFind(replaced_with->dict, key->ptr);
if (kde) {
robj *value = dictGetVal(kde);
is_stream = value->type == OBJ_STREAM;
}
}
/* We want to try to unblock any client using a blocking XREADGROUP */
if (was_stream && !is_stream)
signalKeyAsReady(emptied, key, OBJ_STREAM);
}
dictReleaseIterator(di);
}
/* Swap two databases at runtime so that all clients will magically see
* the new database even if already connected. Note that the client
* structure c->db points to a given DB, so we need to be smarter and
@ -1345,6 +1385,10 @@ int dbSwapDatabases(int id1, int id2) {
touchAllWatchedKeysInDb(db1, db2);
touchAllWatchedKeysInDb(db2, db1);
/* Try to unblock any XREADGROUP clients if the key no longer exists. */
scanDatabaseForDeletedStreams(db1, db2);
scanDatabaseForDeletedStreams(db2, db1);
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since we want clients to
* remain in the same DB they were. */
@ -1367,8 +1411,8 @@ int dbSwapDatabases(int id1, int id2) {
* in dbAdd() when a list is created. So here we need to rescan
* the list of clients blocked on lists and signal lists as ready
* if needed. */
scanDatabaseForReadyLists(db1);
scanDatabaseForReadyLists(db2);
scanDatabaseForReadyKeys(db1);
scanDatabaseForReadyKeys(db2);
return C_OK;
}
@ -1391,6 +1435,9 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
* client watching keys. */
touchAllWatchedKeysInDb(activedb, newdb);
/* Try to unblock any XREADGROUP clients if the key no longer exists. */
scanDatabaseForDeletedStreams(activedb, newdb);
/* Swap hash tables. Note that we don't swap blocking_keys,
* ready_keys and watched_keys, since clients
* remain in the same DB they were. */
@ -1413,7 +1460,7 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
* in dbAdd() when a list is created. So here we need to rescan
* the list of clients blocked on lists and signal lists as ready
* if needed. */
scanDatabaseForReadyLists(activedb);
scanDatabaseForReadyKeys(activedb);
}
trackingInvalidateKeysOnFlush(1);

View File

@ -3075,6 +3075,10 @@ NULL
if (getLongLongFromObjectOrReply(c,c->argv[2],&id,NULL)
!= C_OK) return;
struct client *target = lookupClientByID(id);
/* Note that we never try to unblock a client blocked on a module command, which
* doesn't have a timeout callback (even in the case of UNBLOCK ERROR).
* The reason is that we assume that if a command doesn't expect to be timedout,
* it also doesn't expect to be unblocked by CLIENT UNBLOCK */
if (target && target->flags & CLIENT_BLOCKED && moduleBlockedClientMayTimeout(target)) {
if (unblock_error)
addReplyError(target,

View File

@ -3131,6 +3131,7 @@ void handleClientsBlockedOnKeys(void);
void signalKeyAsReady(redisDb *db, robj *key, int type);
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids);
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with);
/* timeout.c -- Blocked clients timeout and connections timeout. */
void addClientToTimeoutTable(client *c);

View File

@ -135,22 +135,29 @@ int bpop_timeout_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
return RedisModule_ReplyWithSimpleString(ctx, "Request timedout");
}
/* FSL.BPOP <key> <timeout> - Block clients until list has two or more elements.
/* FSL.BPOP <key> <timeout> [NO_TO_CB]- Block clients until list has two or more elements.
* When that happens, unblock client and pop the last two elements (from the right). */
int fsl_bpop(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
if (argc != 3)
if (argc < 3)
return RedisModule_WrongArity(ctx);
long long timeout;
if (RedisModule_StringToLongLong(argv[2],&timeout) != REDISMODULE_OK || timeout < 0)
return RedisModule_ReplyWithError(ctx,"ERR invalid timeout");
int to_cb = 1;
if (argc == 4) {
if (strcasecmp("NO_TO_CB", RedisModule_StringPtrLen(argv[3], NULL)))
return RedisModule_ReplyWithError(ctx,"ERR invalid argument");
to_cb = 0;
}
fsl_t *fsl;
if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
return REDISMODULE_OK;
if (!fsl) {
RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, bpop_timeout_callback,
RedisModule_BlockClientOnKeys(ctx, bpop_reply_callback, to_cb ? bpop_timeout_callback : NULL,
NULL, timeout, &argv[1], 1, NULL);
} else {
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);

View File

@ -168,6 +168,38 @@ start_server {tags {"modules"}} {
assert_error "*unblocked*" {$rd read}
}
test {Module client blocked on keys, no timeout CB, CLIENT UNBLOCK TIMEOUT} {
r del k
set rd [redis_deferring_client]
$rd client id
set cid [$rd read]
$rd fsl.bpop k 0 NO_TO_CB
;# wait until clients are actually blocked
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Clients are not blocked"
}
assert_equal [r client unblock $cid timeout] {0}
$rd close
}
test {Module client blocked on keys, no timeout CB, CLIENT UNBLOCK ERROR} {
r del k
set rd [redis_deferring_client]
$rd client id
set cid [$rd read]
$rd fsl.bpop k 0 NO_TO_CB
;# wait until clients are actually blocked
wait_for_condition 50 100 {
[s 0 blocked_clients] eq {1}
} else {
fail "Clients are not blocked"
}
assert_equal [r client unblock $cid error] {0}
$rd close
}
test {Module client re-blocked on keys after woke up on wrong type} {
r del k
set rd [redis_deferring_client]

View File

@ -205,6 +205,113 @@ start_server {
$rd close
}
test {Blocking XREADGROUP: key deleted} {
r DEL mystream
r XADD mystream 666 f v
r XGROUP CREATE mystream mygroup $
set rd [redis_deferring_client]
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
r DEL mystream
assert_error "*no longer exists*" {$rd read}
$rd close
}
test {Blocking XREADGROUP: key type changed with SET} {
r DEL mystream
r XADD mystream 666 f v
r XGROUP CREATE mystream mygroup $
set rd [redis_deferring_client]
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
r SET mystream val1
assert_error "*no longer exists*" {$rd read}
$rd close
}
test {Blocking XREADGROUP: key type changed with transaction} {
r DEL mystream
r XADD mystream 666 f v
r XGROUP CREATE mystream mygroup $
set rd [redis_deferring_client]
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
r MULTI
r DEL mystream
r SADD mystream e1
r EXEC
assert_error "*no longer exists*" {$rd read}
$rd close
}
test {Blocking XREADGROUP: flushed DB} {
r DEL mystream
r XADD mystream 666 f v
r XGROUP CREATE mystream mygroup $
set rd [redis_deferring_client]
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
r FLUSHALL
assert_error "*no longer exists*" {$rd read}
$rd close
}
test {Blocking XREADGROUP: swapped DB, key doesn't exist} {
r SELECT 4
r FLUSHDB
r SELECT 9
r DEL mystream
r XADD mystream 666 f v
r XGROUP CREATE mystream mygroup $
set rd [redis_deferring_client]
$rd SELECT 9
$rd read
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
r SWAPDB 4 9
assert_error "*no longer exists*" {$rd read}
$rd close
} {0} {external:skip}
test {Blocking XREADGROUP: swapped DB, key is not a stream} {
r SELECT 4
r FLUSHDB
r LPUSH mystream e1
r SELECT 9
r DEL mystream
r XADD mystream 666 f v
r XGROUP CREATE mystream mygroup $
set rd [redis_deferring_client]
$rd SELECT 9
$rd read
$rd XREADGROUP GROUP mygroup Alice BLOCK 0 STREAMS mystream ">"
r SWAPDB 4 9
assert_error "*no longer exists*" {$rd read}
$rd close
} {0} {external:skip}
test {Blocking XREAD: key deleted} {
r DEL mystream
r XADD mystream 666 f v
set rd [redis_deferring_client]
$rd XREAD BLOCK 0 STREAMS mystream "$"
r DEL mystream
r XADD mystream 667 f v
set res [$rd read]
assert_equal [lindex $res 0 1 0] {667-0 {f v}}
$rd close
}
test {Blocking XREAD: key type changed with SET} {
r DEL mystream
r XADD mystream 666 f v
set rd [redis_deferring_client]
$rd XREAD BLOCK 0 STREAMS mystream "$"
r SET mystream val1
r DEL mystream
r XADD mystream 667 f v
set res [$rd read]
assert_equal [lindex $res 0 1 0] {667-0 {f v}}
$rd close
}
test {Blocking XREADGROUP for stream that ran dry (issue #5299)} {
set rd [redis_deferring_client]