Blocked module clients should be aware when a key is deleted (#11310)
The use case is a module that wants to implement a blocking command on a key that necessarily exists and wants to unblock the client in case the key is deleted (much like what we implemented for XREADGROUP in #10306) New module API: * RedisModule_BlockClientOnKeysWithFlags Flags: * REDISMODULE_BLOCK_UNBLOCK_NONE * REDISMODULE_BLOCK_UNBLOCK_DELETED ### Detailed description of code changes blocked.c: 1. Both module and stream functions are called whether the key exists or not, regardless of its type. We do that in order to allow modules/stream to unblock the client in case the key is no longer present or has changed type (the behavior for streams didn't change, just code that moved into serveClientsBlockedOnStreamKey) 2. Make sure afterCommand is called in serveClientsBlockedOnKeyByModule, in order to propagate actions from moduleTryServeClientBlockedOnKey. 3. handleClientsBlockedOnKeys: call propagatePendingCommands directly after lookupKeyReadWithFlags to prevent a possible lazy-expire DEL from being mixed with any command propagated by the preceding functions. 4. blockForKeys: Caller can specifiy that it wants to be awakened if key is deleted. Minor optimizations (use dictAddRaw). 5. signalKeyAsReady became signalKeyAsReadyLogic which can take a boolean in case the key is deleted. It will only signal if there's at least one client that awaits key deletion (to save calls to handleClientsBlockedOnKeys). Minor optimizations (use dictAddRaw) db.c: 1. scanDatabaseForDeletedStreams is now scanDatabaseForDeletedKeys and will signalKeyAsReady for any key that was removed from the database or changed type. It is the responsibility of the code in blocked.c to ignore or act on deleted/type-changed keys. 2. Use the new signalDeletedKeyAsReady where needed blockedonkey.c + tcl: 1. Added test of new capabilities (FSL.BPOPGT now requires the key to exist in order to work)
This commit is contained in:
parent
b43f254813
commit
b57fd01064
203
src/blocked.c
203
src/blocked.c
@ -398,14 +398,26 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) {
|
||||
|
||||
/* Helper function for handleClientsBlockedOnKeys(). This function is called
|
||||
* when there may be clients blocked on a stream key, and there may be new
|
||||
* data to fetch (the key is ready). */
|
||||
* data to fetch (the key is ready).
|
||||
* This function also handles the case where there may be clients blocked,
|
||||
* via XREADGROUP, on an existing stream which was deleted.
|
||||
* We need to unblock the clients in that case.
|
||||
* The idea is that a client that is blocked via XREADGROUP is different from
|
||||
* any other blocking type in the sense that it depends on the existence of both
|
||||
* the key and the group. Even if the key is deleted and then revived with XADD
|
||||
* it won't help any clients blocked on XREADGROUP because the group no longer
|
||||
* exist, so they would fail with -NOGROUP anyway.
|
||||
* The conclusion is that it's better to unblock these client (with error) upon
|
||||
* the deletion of the key, rather than waiting for the first XADD.*/
|
||||
void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
||||
/* Optimization: If no clients are in type BLOCKED_STREAM,
|
||||
* we can skip this loop. */
|
||||
if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
|
||||
|
||||
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
|
||||
stream *s = o->ptr;
|
||||
/* This function may be called with o=NULL (in order to unblock
|
||||
* XREADGROUP clients whose key was deleted) */
|
||||
stream *s = o? o->ptr : NULL;
|
||||
|
||||
/* We need to provide the new data arrived on the stream
|
||||
* to all the clients that are waiting for an offset smaller
|
||||
@ -422,6 +434,13 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
||||
bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key);
|
||||
streamID *gt = &bki->stream_id;
|
||||
|
||||
if (!receiver->bpop.xread_group && (!o || o->type != OBJ_STREAM)) {
|
||||
/* If it's a blocking XREAD and the stream was either deleted
|
||||
* or replaced with another key, we don't do anything (it's ok
|
||||
* the the client blocks on a non-existing key). */
|
||||
continue;
|
||||
}
|
||||
|
||||
long long prev_error_replies = server.stat_total_error_replies;
|
||||
client *old_client = server.current_client;
|
||||
server.current_client = receiver;
|
||||
@ -439,6 +458,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
||||
* otherwise. */
|
||||
streamCG *group = NULL;
|
||||
if (receiver->bpop.xread_group) {
|
||||
/* If it's a blocking XREADGROUP and the stream was either deleted
|
||||
* or replaced with another key, we unblock the client */
|
||||
if (!o || o->type != OBJ_STREAM) {
|
||||
addReplyError(receiver, "-UNBLOCKED the stream key no longer exists");
|
||||
goto unblock_receiver;
|
||||
}
|
||||
group = streamLookupCG(s,
|
||||
receiver->bpop.xread_group->ptr);
|
||||
/* If the group was not found, send an error
|
||||
@ -545,52 +570,13 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
|
||||
server.current_client = receiver;
|
||||
monotime replyTimer;
|
||||
elapsedStart(&replyTimer);
|
||||
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
|
||||
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
|
||||
moduleUnblockClient(receiver);
|
||||
afterCommand(receiver);
|
||||
server.current_client = old_client;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Helper function for handleClientsBlockedOnKeys(). This function is called
|
||||
* when there may be clients blocked, via XREADGROUP, on an existing stream which
|
||||
* was deleted. We need to unblock the clients in that case.
|
||||
* The idea is that a client that is blocked via XREADGROUP is different from
|
||||
* any other blocking type in the sense that it depends on the existence of both
|
||||
* the key and the group. Even if the key is deleted and then revived with XADD
|
||||
* it won't help any clients blocked on XREADGROUP because the group no longer
|
||||
* exist, so they would fail with -NOGROUP anyway.
|
||||
* The conclusion is that it's better to unblock these client (with error) upon
|
||||
* the deletion of the key, rather than waiting for the first XADD. */
|
||||
void unblockDeletedStreamReadgroupClients(readyList *rl) {
|
||||
/* Optimization: If no clients are in type BLOCKED_STREAM,
|
||||
* we can skip this loop. */
|
||||
if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
|
||||
|
||||
/* We serve clients in the same order they blocked for
|
||||
* this key, from the first blocked to the last. */
|
||||
dictEntry *de = dictFind(rl->db->blocking_keys,rl->key);
|
||||
if (de) {
|
||||
list *clients = dictGetVal(de);
|
||||
listNode *ln;
|
||||
listIter li;
|
||||
listRewind(clients,&li);
|
||||
|
||||
while((ln = listNext(&li))) {
|
||||
client *receiver = listNodeValue(ln);
|
||||
if (receiver->btype != BLOCKED_STREAM || !receiver->bpop.xread_group)
|
||||
continue;
|
||||
|
||||
long long prev_error_replies = server.stat_total_error_replies;
|
||||
client *old_client = server.current_client;
|
||||
server.current_client = receiver;
|
||||
monotime replyTimer;
|
||||
elapsedStart(&replyTimer);
|
||||
addReplyError(receiver, "-UNBLOCKED the stream key no longer exists");
|
||||
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
|
||||
unblockClient(receiver);
|
||||
if (moduleTryServeClientBlockedOnKey(receiver, rl->key)) {
|
||||
updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies);
|
||||
moduleUnblockClient(receiver);
|
||||
}
|
||||
/* We need to call afterCommand even if the client was not unblocked
|
||||
* in order to propagate any changes that could have been done inside
|
||||
* moduleTryServeClientBlockedOnKey */
|
||||
afterCommand(receiver);
|
||||
server.current_client = old_client;
|
||||
}
|
||||
@ -646,33 +632,35 @@ void handleClientsBlockedOnKeys(void) {
|
||||
|
||||
/* Serve clients blocked on the key. */
|
||||
robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS);
|
||||
if (o != NULL) {
|
||||
int objtype = o->type;
|
||||
if (objtype == OBJ_LIST)
|
||||
serveClientsBlockedOnListKey(o,rl);
|
||||
else if (objtype == OBJ_ZSET)
|
||||
serveClientsBlockedOnSortedSetKey(o,rl);
|
||||
else if (objtype == OBJ_STREAM)
|
||||
serveClientsBlockedOnStreamKey(o,rl);
|
||||
/* We want to serve clients blocked on module keys
|
||||
* regardless of the object type: we don't know what the
|
||||
* module is trying to accomplish right now. */
|
||||
serveClientsBlockedOnKeyByModule(rl);
|
||||
/* If we have XREADGROUP clients blocked on this key, and
|
||||
* the key is not a stream, it must mean that the key was
|
||||
* overwritten by either SET or something like
|
||||
* (MULTI, DEL key, SADD key e, EXEC).
|
||||
* In this case we need to unblock all these clients. */
|
||||
if (objtype != OBJ_STREAM)
|
||||
unblockDeletedStreamReadgroupClients(rl);
|
||||
} else {
|
||||
/* Unblock all XREADGROUP clients of this deleted key */
|
||||
unblockDeletedStreamReadgroupClients(rl);
|
||||
if (!o) {
|
||||
/* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to
|
||||
* take care of the propagation here, because afterCommand wasn't called */
|
||||
if (server.also_propagate.numops > 0)
|
||||
propagatePendingCommands();
|
||||
propagatePendingCommands();
|
||||
} else {
|
||||
if (o->type == OBJ_LIST)
|
||||
serveClientsBlockedOnListKey(o,rl);
|
||||
else if (o->type == OBJ_ZSET)
|
||||
serveClientsBlockedOnSortedSetKey(o,rl);
|
||||
}
|
||||
/* We need to try to serve stream clients even if the key no longer exists because
|
||||
* XREADGROUP clients need to be unblocked in case the key is missing, either deleted
|
||||
* or replaced by SET or something like {MULTI, DEL key, SADD key e, EXEC}.
|
||||
* In this case we need to unblock all these clients. */
|
||||
serveClientsBlockedOnStreamKey(o,rl);
|
||||
/* We want to serve clients blocked on module keys regardless of the object type, or
|
||||
* whether the object exists or not: we don't know what the module is trying to
|
||||
* accomplish right now.
|
||||
* Please note that this function must be called only after handling non-module
|
||||
* clients, since moduleTryServeClientBlockedOnKey may delete the key, causing `o`
|
||||
* to be stale.
|
||||
* The scenario is that we have one client blocked on BLPOP while another module
|
||||
* client is blocked by MODULE.SAME-AS-BLPOP on the same key.
|
||||
* Of course we can call lookupKeyReadWithFlags again, but:
|
||||
* 1) It takes CPU
|
||||
* 2) It makes more sense to give priority to "native" blocking clients rather
|
||||
* than module blocking clients
|
||||
* */
|
||||
serveClientsBlockedOnKeyByModule(rl);
|
||||
|
||||
/* Free this item. */
|
||||
decrRefCount(rl->key);
|
||||
@ -717,8 +705,7 @@ void handleClientsBlockedOnKeys(void) {
|
||||
*
|
||||
* 'count' for those commands that support the optional count argument.
|
||||
* Otherwise the value is 0. */
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids) {
|
||||
dictEntry *de;
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey) {
|
||||
list *l;
|
||||
int j;
|
||||
|
||||
@ -745,20 +732,27 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, ms
|
||||
incrRefCount(keys[j]);
|
||||
|
||||
/* And in the other "side", to map keys -> clients */
|
||||
de = dictFind(c->db->blocking_keys,keys[j]);
|
||||
if (de == NULL) {
|
||||
int retval;
|
||||
|
||||
dictEntry *de, *existing;
|
||||
de = dictAddRaw(c->db->blocking_keys, keys[j], &existing);
|
||||
if (de) {
|
||||
incrRefCount(keys[j]);
|
||||
/* For every key we take a list of clients blocked for it */
|
||||
l = listCreate();
|
||||
retval = dictAdd(c->db->blocking_keys,keys[j],l);
|
||||
incrRefCount(keys[j]);
|
||||
serverAssertWithInfo(c,keys[j],retval == DICT_OK);
|
||||
dictSetVal(c->db->blocking_keys, de, l);
|
||||
} else {
|
||||
l = dictGetVal(de);
|
||||
l = dictGetVal(existing);
|
||||
}
|
||||
listAddNodeTail(l,c);
|
||||
bki->listnode = listLast(l);
|
||||
|
||||
/* We need to add the key to blocking_keys_unblock_on_nokey, if the client
|
||||
* wants to be awakened if key is deleted (like XREADGROUP) */
|
||||
if (unblock_on_nokey) {
|
||||
de = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], NULL);
|
||||
if (de) {
|
||||
incrRefCount(keys[j]);
|
||||
}
|
||||
}
|
||||
}
|
||||
blockClient(c,btype);
|
||||
}
|
||||
@ -782,8 +776,10 @@ void unblockClientWaitingData(client *c) {
|
||||
serverAssertWithInfo(c,key,l != NULL);
|
||||
listDelNode(l,bki->listnode);
|
||||
/* If the list is empty we need to remove it to avoid wasting memory */
|
||||
if (listLength(l) == 0)
|
||||
if (listLength(l) == 0) {
|
||||
dictDelete(c->db->blocking_keys,key);
|
||||
dictDelete(c->db->blocking_keys_unblock_on_nokey,key);
|
||||
}
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
@ -818,7 +814,7 @@ static int getBlockedTypeByType(int type) {
|
||||
* made by a script or in the context of MULTI/EXEC.
|
||||
*
|
||||
* The list will be finally processed by handleClientsBlockedOnKeys() */
|
||||
void signalKeyAsReady(redisDb *db, robj *key, int type) {
|
||||
static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) {
|
||||
readyList *rl;
|
||||
|
||||
/* Quick returns. */
|
||||
@ -836,11 +832,28 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) {
|
||||
return;
|
||||
}
|
||||
|
||||
/* No clients blocking for this key? No need to queue it. */
|
||||
if (dictFind(db->blocking_keys,key) == NULL) return;
|
||||
if (deleted) {
|
||||
/* Key deleted and no clients blocking for this key? No need to queue it. */
|
||||
if (dictFind(db->blocking_keys_unblock_on_nokey,key) == NULL)
|
||||
return;
|
||||
/* Note: if we made it here it means the key is also present in db->blocking_keys */
|
||||
} else {
|
||||
/* No clients blocking for this key? No need to queue it. */
|
||||
if (dictFind(db->blocking_keys,key) == NULL)
|
||||
return;
|
||||
}
|
||||
|
||||
/* Key was already signaled? No need to queue it again. */
|
||||
if (dictFind(db->ready_keys,key) != NULL) return;
|
||||
dictEntry *de, *existing;
|
||||
de = dictAddRaw(db->ready_keys, key, &existing);
|
||||
if (de) {
|
||||
/* We add the key in the db->ready_keys dictionary in order
|
||||
* to avoid adding it multiple times into a list with a simple O(1)
|
||||
* check. */
|
||||
incrRefCount(key);
|
||||
} else {
|
||||
/* Key was already signaled? No need to queue it again. */
|
||||
return;
|
||||
}
|
||||
|
||||
/* Ok, we need to queue this key into server.ready_keys. */
|
||||
rl = zmalloc(sizeof(*rl));
|
||||
@ -848,10 +861,12 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) {
|
||||
rl->db = db;
|
||||
incrRefCount(key);
|
||||
listAddNodeTail(server.ready_keys,rl);
|
||||
|
||||
/* We also add the key in the db->ready_keys dictionary in order
|
||||
* to avoid adding it multiple times into a list with a simple O(1)
|
||||
* check. */
|
||||
incrRefCount(key);
|
||||
serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
|
||||
}
|
||||
|
||||
void signalKeyAsReady(redisDb *db, robj *key, int type) {
|
||||
signalKeyAsReadyLogic(db, key, type, 0);
|
||||
}
|
||||
|
||||
void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) {
|
||||
signalKeyAsReadyLogic(db, key, type, 1);
|
||||
}
|
||||
|
38
src/db.c
38
src/db.c
@ -232,9 +232,8 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
|
||||
* overwrite as two steps of unlink+add, so we still need to call the unlink
|
||||
* callback of the module. */
|
||||
moduleNotifyKeyUnlink(key,old,db->id);
|
||||
/* We want to try to unblock any client using a blocking XREADGROUP */
|
||||
if (old->type == OBJ_STREAM)
|
||||
signalKeyAsReady(db,key,old->type);
|
||||
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
|
||||
signalDeletedKeyAsReady(db,key,old->type);
|
||||
dictSetVal(db->dict, de, val);
|
||||
|
||||
if (server.lazyfree_lazy_server_del) {
|
||||
@ -325,9 +324,8 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) {
|
||||
robj *val = dictGetVal(de);
|
||||
/* Tells the module that the key has been unlinked from the database. */
|
||||
moduleNotifyKeyUnlink(key,val,db->id);
|
||||
/* We want to try to unblock any client using a blocking XREADGROUP */
|
||||
if (val->type == OBJ_STREAM)
|
||||
signalKeyAsReady(db,key,val->type);
|
||||
/* We want to try to unblock any module clients or clients using a blocking XREADGROUP */
|
||||
signalDeletedKeyAsReady(db,key,val->type);
|
||||
if (async) {
|
||||
freeObjAsync(key, val, db->id);
|
||||
dictSetVal(db->dict, de, NULL);
|
||||
@ -568,7 +566,7 @@ void signalFlushedDb(int dbid, int async) {
|
||||
}
|
||||
|
||||
for (int j = startdb; j <= enddb; j++) {
|
||||
scanDatabaseForDeletedStreams(&server.db[j], NULL);
|
||||
scanDatabaseForDeletedKeys(&server.db[j], NULL);
|
||||
touchAllWatchedKeysInDb(&server.db[j], NULL);
|
||||
}
|
||||
|
||||
@ -1350,32 +1348,32 @@ void scanDatabaseForReadyKeys(redisDb *db) {
|
||||
/* Since we are unblocking XREADGROUP clients in the event the
|
||||
* key was deleted/overwritten we must do the same in case the
|
||||
* database was flushed/swapped. */
|
||||
void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with) {
|
||||
/* Optimization: If no clients are in type BLOCKED_STREAM,
|
||||
* we can skip this loop. */
|
||||
if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return;
|
||||
|
||||
void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) {
|
||||
dictEntry *de;
|
||||
dictIterator *di = dictGetSafeIterator(emptied->blocking_keys);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
robj *key = dictGetKey(de);
|
||||
int was_stream = 0, is_stream = 0;
|
||||
int existed = 0, exists = 0;
|
||||
int original_type = -1, curr_type = -1;
|
||||
|
||||
dictEntry *kde = dictFind(emptied->dict, key->ptr);
|
||||
if (kde) {
|
||||
robj *value = dictGetVal(kde);
|
||||
was_stream = value->type == OBJ_STREAM;
|
||||
original_type = value->type;
|
||||
existed = 1;
|
||||
}
|
||||
|
||||
if (replaced_with) {
|
||||
dictEntry *kde = dictFind(replaced_with->dict, key->ptr);
|
||||
if (kde) {
|
||||
robj *value = dictGetVal(kde);
|
||||
is_stream = value->type == OBJ_STREAM;
|
||||
curr_type = value->type;
|
||||
exists = 1;
|
||||
}
|
||||
}
|
||||
/* We want to try to unblock any client using a blocking XREADGROUP */
|
||||
if (was_stream && !is_stream)
|
||||
signalKeyAsReady(emptied, key, OBJ_STREAM);
|
||||
if ((existed && !exists) || original_type != curr_type)
|
||||
signalDeletedKeyAsReady(emptied, key, original_type);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
@ -1401,8 +1399,8 @@ int dbSwapDatabases(int id1, int id2) {
|
||||
touchAllWatchedKeysInDb(db2, db1);
|
||||
|
||||
/* Try to unblock any XREADGROUP clients if the key no longer exists. */
|
||||
scanDatabaseForDeletedStreams(db1, db2);
|
||||
scanDatabaseForDeletedStreams(db2, db1);
|
||||
scanDatabaseForDeletedKeys(db1, db2);
|
||||
scanDatabaseForDeletedKeys(db2, db1);
|
||||
|
||||
/* Swap hash tables. Note that we don't swap blocking_keys,
|
||||
* ready_keys and watched_keys, since we want clients to
|
||||
@ -1451,7 +1449,7 @@ void swapMainDbWithTempDb(redisDb *tempDb) {
|
||||
touchAllWatchedKeysInDb(activedb, newdb);
|
||||
|
||||
/* Try to unblock any XREADGROUP clients if the key no longer exists. */
|
||||
scanDatabaseForDeletedStreams(activedb, newdb);
|
||||
scanDatabaseForDeletedKeys(activedb, newdb);
|
||||
|
||||
/* Swap hash tables. Note that we don't swap blocking_keys,
|
||||
* ready_keys and watched_keys, since clients
|
||||
|
40
src/module.c
40
src/module.c
@ -7243,7 +7243,10 @@ void unblockClientFromModule(client *c) {
|
||||
* reply callback the privdata that is set here while blocking.
|
||||
*
|
||||
*/
|
||||
RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
|
||||
RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback,
|
||||
RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*),
|
||||
long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata,
|
||||
int flags) {
|
||||
client *c = ctx->client;
|
||||
int islua = scriptIsRunning();
|
||||
int ismulti = server.in_exec;
|
||||
@ -7282,7 +7285,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
|
||||
"Blocking module command called from transaction");
|
||||
} else {
|
||||
if (keys) {
|
||||
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,-1,timeout,NULL,NULL,NULL);
|
||||
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,-1,timeout,NULL,NULL,NULL,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED);
|
||||
} else {
|
||||
blockClient(c,BLOCKED_MODULE);
|
||||
}
|
||||
@ -7357,8 +7360,10 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
|
||||
* use RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() one,
|
||||
* or multiple times within the blocking command background work.
|
||||
*/
|
||||
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
|
||||
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
|
||||
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback,
|
||||
RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*),
|
||||
long long timeout_ms) {
|
||||
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL,0);
|
||||
}
|
||||
|
||||
/* This call is similar to RedisModule_BlockClient(), however in this case we
|
||||
@ -7419,15 +7424,31 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
|
||||
* handled as if it were timed-out (You must implement the timeout
|
||||
* callback in that case).
|
||||
*/
|
||||
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
|
||||
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
|
||||
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback,
|
||||
RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*),
|
||||
long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
|
||||
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata,0);
|
||||
}
|
||||
|
||||
/* Same as RedisModule_BlockClientOnKeys, but can take REDISMODULE_BLOCK_* flags
|
||||
* Can be either REDISMODULE_BLOCK_UNBLOCK_DEFAULT, which means default behavior (same
|
||||
* as calling RedisModule_BlockClientOnKeys)
|
||||
*
|
||||
* The flags is a bit mask of these:
|
||||
*
|
||||
* - `REDISMODULE_BLOCK_UNBLOCK_DELETED`: The clients should to be awakened in case any of `keys` are deleted.
|
||||
* Mostly useful for commands that require the key to exist (like XREADGROUP)
|
||||
*/
|
||||
RedisModuleBlockedClient *RM_BlockClientOnKeysWithFlags(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback,
|
||||
RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*),
|
||||
long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata,
|
||||
int flags) {
|
||||
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata,flags);
|
||||
}
|
||||
|
||||
/* This function is used in order to potentially unblock a client blocked
|
||||
* on keys with RedisModule_BlockClientOnKeys(). When this function is called,
|
||||
* all the clients blocked for this key will get their reply_callback called.
|
||||
*
|
||||
* Note: The function has no effect if the signaled key doesn't exist. */
|
||||
* all the clients blocked for this key will get their reply_callback called. */
|
||||
void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
|
||||
signalKeyAsReady(ctx->client->db, key, OBJ_MODULE);
|
||||
}
|
||||
@ -12826,6 +12847,7 @@ void moduleRegisterCoreAPI(void) {
|
||||
REGISTER_API(SetLFU);
|
||||
REGISTER_API(GetLFU);
|
||||
REGISTER_API(BlockClientOnKeys);
|
||||
REGISTER_API(BlockClientOnKeysWithFlags);
|
||||
REGISTER_API(SignalKeyAsReady);
|
||||
REGISTER_API(GetBlockedClientReadyKey);
|
||||
REGISTER_API(GetUsedMemoryRatio);
|
||||
|
@ -235,6 +235,10 @@ This flag should not be used directly by the module.
|
||||
#define REDISMODULE_YIELD_FLAG_NONE (1<<0)
|
||||
#define REDISMODULE_YIELD_FLAG_CLIENTS (1<<1)
|
||||
|
||||
/* RM_BlockClientOnKeysWithFlags flags */
|
||||
#define REDISMODULE_BLOCK_UNBLOCK_DEFAULT (0)
|
||||
#define REDISMODULE_BLOCK_UNBLOCK_DELETED (1<<0)
|
||||
|
||||
/* This type represents a timer handle, and is returned when a timer is
|
||||
* registered and used in order to invalidate a timer. It's just a 64 bit
|
||||
* number, because this is how each timer is represented inside the radix tree
|
||||
@ -1133,6 +1137,7 @@ REDISMODULE_API int (*RedisModule_GetLRU)(RedisModuleKey *key, mstime_t *lru_idl
|
||||
REDISMODULE_API int (*RedisModule_SetLFU)(RedisModuleKey *key, long long lfu_freq) REDISMODULE_ATTR;
|
||||
REDISMODULE_API int (*RedisModule_GetLFU)(RedisModuleKey *key, long long *lfu_freq) REDISMODULE_ATTR;
|
||||
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) REDISMODULE_ATTR;
|
||||
REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata, int flags) REDISMODULE_ATTR;
|
||||
REDISMODULE_API void (*RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key) REDISMODULE_ATTR;
|
||||
REDISMODULE_API RedisModuleString * (*RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx) REDISMODULE_ATTR;
|
||||
REDISMODULE_API RedisModuleScanCursor * (*RedisModule_ScanCursorCreate)() REDISMODULE_ATTR;
|
||||
@ -1476,6 +1481,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
||||
REDISMODULE_GET_API(SetLFU);
|
||||
REDISMODULE_GET_API(GetLFU);
|
||||
REDISMODULE_GET_API(BlockClientOnKeys);
|
||||
REDISMODULE_GET_API(BlockClientOnKeysWithFlags);
|
||||
REDISMODULE_GET_API(SignalKeyAsReady);
|
||||
REDISMODULE_GET_API(GetBlockedClientReadyKey);
|
||||
REDISMODULE_GET_API(ScanCursorCreate);
|
||||
|
@ -2495,6 +2495,7 @@ void initServer(void) {
|
||||
server.db[j].expires = dictCreate(&dbExpiresDictType);
|
||||
server.db[j].expires_cursor = 0;
|
||||
server.db[j].blocking_keys = dictCreate(&keylistDictType);
|
||||
server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType);
|
||||
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType);
|
||||
server.db[j].watched_keys = dictCreate(&keylistDictType);
|
||||
server.db[j].id = j;
|
||||
|
@ -917,6 +917,9 @@ typedef struct redisDb {
|
||||
dict *dict; /* The keyspace for this DB */
|
||||
dict *expires; /* Timeout of keys with a timeout set */
|
||||
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/
|
||||
dict *blocking_keys_unblock_on_nokey; /* Keys with clients waiting for
|
||||
* data, and should be unblocked if key is deleted (XREADEDGROUP).
|
||||
* This is a subset of blocking_keys*/
|
||||
dict *ready_keys; /* Blocked keys that received a PUSH */
|
||||
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
|
||||
int id; /* Database ID */
|
||||
@ -3234,9 +3237,10 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int
|
||||
void disconnectAllBlockedClients(void);
|
||||
void handleClientsBlockedOnKeys(void);
|
||||
void signalKeyAsReady(redisDb *db, robj *key, int type);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids);
|
||||
void signalDeletedKeyAsReady(redisDb *db, robj *key, int type);
|
||||
void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey);
|
||||
void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors);
|
||||
void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with);
|
||||
void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with);
|
||||
|
||||
/* timeout.c -- Blocked clients timeout and connections timeout. */
|
||||
void addClientToTimeoutTable(client *c);
|
||||
|
@ -1074,7 +1074,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i
|
||||
|
||||
/* If the keys do not exist we must block */
|
||||
struct blockPos pos = {where};
|
||||
blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL);
|
||||
blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL,0);
|
||||
}
|
||||
|
||||
/* BLPOP <key> [<key> ...] <timeout> */
|
||||
@ -1099,7 +1099,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou
|
||||
} else {
|
||||
/* The list is empty and the client blocks. */
|
||||
struct blockPos pos = {wherefrom, whereto};
|
||||
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,-1,timeout,c->argv[2],&pos,NULL);
|
||||
blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,-1,timeout,c->argv[2],&pos,NULL,0);
|
||||
}
|
||||
} else {
|
||||
/* The list exists and has elements, so
|
||||
|
@ -2404,7 +2404,7 @@ void xreadCommand(client *c) {
|
||||
goto cleanup;
|
||||
}
|
||||
blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count,
|
||||
-1, timeout, NULL, NULL, ids);
|
||||
-1, timeout, NULL, NULL, ids, xreadgroup);
|
||||
/* If no COUNT is given and we block, set a relatively small count:
|
||||
* in case the ID provided is too low, we do not want the server to
|
||||
* block just to serve this client a huge stream of messages. */
|
||||
|
@ -4044,7 +4044,7 @@ void blockingGenericZpopCommand(client *c, robj **keys, int numkeys, int where,
|
||||
|
||||
/* If the keys do not exist we must block */
|
||||
struct blockPos pos = {where};
|
||||
blockForKeys(c,BLOCKED_ZSET,keys,numkeys,count,timeout,NULL,&pos,NULL);
|
||||
blockForKeys(c,BLOCKED_ZSET,keys,numkeys,count,timeout,NULL,&pos,NULL,0);
|
||||
}
|
||||
|
||||
// BZPOPMIN key [key ...] timeout
|
||||
|
@ -5,6 +5,8 @@
|
||||
#include <assert.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#define UNUSED(V) ((void) V)
|
||||
|
||||
#define LIST_SIZE 1024
|
||||
|
||||
typedef struct {
|
||||
@ -174,7 +176,7 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
|
||||
|
||||
fsl_t *fsl;
|
||||
if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl)
|
||||
return REDISMODULE_ERR;
|
||||
return RedisModule_ReplyWithError(ctx,"UNBLOCKED key no longer exists");
|
||||
|
||||
if (fsl->list[fsl->length-1] <= *pgt)
|
||||
return REDISMODULE_ERR;
|
||||
@ -212,12 +214,17 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1))
|
||||
return REDISMODULE_OK;
|
||||
|
||||
if (!fsl || fsl->list[fsl->length-1] <= gt) {
|
||||
if (!fsl)
|
||||
return RedisModule_ReplyWithError(ctx,"ERR key must exist");
|
||||
|
||||
if (fsl->list[fsl->length-1] <= gt) {
|
||||
/* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */
|
||||
long long *pgt = RedisModule_Alloc(sizeof(long long));
|
||||
*pgt = gt;
|
||||
RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
|
||||
bpopgt_free_privdata, timeout, &argv[1], 1, pgt);
|
||||
RedisModule_BlockClientOnKeysWithFlags(
|
||||
ctx, bpopgt_reply_callback, bpopgt_timeout_callback,
|
||||
bpopgt_free_privdata, timeout, &argv[1], 1, pgt,
|
||||
REDISMODULE_BLOCK_UNBLOCK_DELETED);
|
||||
} else {
|
||||
RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]);
|
||||
}
|
||||
@ -469,7 +476,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
.aof_rewrite = fsl_aofrw,
|
||||
.mem_usage = NULL,
|
||||
.free = fsl_free,
|
||||
.digest = NULL
|
||||
.digest = NULL,
|
||||
};
|
||||
|
||||
fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm);
|
||||
|
@ -106,6 +106,7 @@ start_server {tags {"modules"}} {
|
||||
|
||||
test {Module client blocked on keys (with metadata): Blocked, case 2} {
|
||||
r del k
|
||||
r fsl.push k 32
|
||||
set rd [redis_deferring_client]
|
||||
$rd fsl.bpopgt k 35 0
|
||||
;# wait until clients are actually blocked
|
||||
@ -121,8 +122,120 @@ start_server {tags {"modules"}} {
|
||||
assert_equal {36} [$rd read]
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (with metadata): Blocked, DEL} {
|
||||
r del k
|
||||
r fsl.push k 32
|
||||
set rd [redis_deferring_client]
|
||||
$rd fsl.bpopgt k 35 0
|
||||
;# wait until clients are actually blocked
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Clients are not blocked"
|
||||
}
|
||||
r del k
|
||||
assert_error {*UNBLOCKED key no longer exists*} {$rd read}
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (with metadata): Blocked, FLUSHALL} {
|
||||
r del k
|
||||
r fsl.push k 32
|
||||
set rd [redis_deferring_client]
|
||||
$rd fsl.bpopgt k 35 0
|
||||
;# wait until clients are actually blocked
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Clients are not blocked"
|
||||
}
|
||||
r flushall
|
||||
assert_error {*UNBLOCKED key no longer exists*} {$rd read}
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (with metadata): Blocked, SWAPDB, no key} {
|
||||
r select 9
|
||||
r del k
|
||||
r fsl.push k 32
|
||||
set rd [redis_deferring_client]
|
||||
$rd fsl.bpopgt k 35 0
|
||||
;# wait until clients are actually blocked
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Clients are not blocked"
|
||||
}
|
||||
r swapdb 0 9
|
||||
assert_error {*UNBLOCKED key no longer exists*} {$rd read}
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (with metadata): Blocked, SWAPDB, key exists, case 1} {
|
||||
;# Key exists on other db, but wrong type
|
||||
r flushall
|
||||
r select 9
|
||||
r fsl.push k 32
|
||||
r select 0
|
||||
r lpush k 38
|
||||
r select 9
|
||||
set rd [redis_deferring_client]
|
||||
$rd fsl.bpopgt k 35 0
|
||||
;# wait until clients are actually blocked
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Clients are not blocked"
|
||||
}
|
||||
r swapdb 0 9
|
||||
assert_error {*UNBLOCKED key no longer exists*} {$rd read}
|
||||
r select 9
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (with metadata): Blocked, SWAPDB, key exists, case 2} {
|
||||
;# Key exists on other db, with the right type, but the value doesn't allow to unblock
|
||||
r flushall
|
||||
r select 9
|
||||
r fsl.push k 32
|
||||
r select 0
|
||||
r fsl.push k 34
|
||||
r select 9
|
||||
set rd [redis_deferring_client]
|
||||
$rd fsl.bpopgt k 35 0
|
||||
;# wait until clients are actually blocked
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Clients are not blocked"
|
||||
}
|
||||
r swapdb 0 9
|
||||
assert_equal {1} [s 0 blocked_clients]
|
||||
r fsl.push k 38
|
||||
assert_equal {38} [$rd read]
|
||||
r select 9
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (with metadata): Blocked, SWAPDB, key exists, case 3} {
|
||||
;# Key exists on other db, with the right type, the value allows to unblock
|
||||
r flushall
|
||||
r select 9
|
||||
r fsl.push k 32
|
||||
r select 0
|
||||
r fsl.push k 38
|
||||
r select 9
|
||||
set rd [redis_deferring_client]
|
||||
$rd fsl.bpopgt k 35 0
|
||||
;# wait until clients are actually blocked
|
||||
wait_for_condition 50 100 {
|
||||
[s 0 blocked_clients] eq {1}
|
||||
} else {
|
||||
fail "Clients are not blocked"
|
||||
}
|
||||
r swapdb 0 9
|
||||
assert_equal {38} [$rd read]
|
||||
r select 9
|
||||
}
|
||||
|
||||
test {Module client blocked on keys (with metadata): Blocked, CLIENT KILL} {
|
||||
r del k
|
||||
r fsl.push k 32
|
||||
set rd [redis_deferring_client]
|
||||
$rd client id
|
||||
set cid [$rd read]
|
||||
@ -138,6 +251,7 @@ start_server {tags {"modules"}} {
|
||||
|
||||
test {Module client blocked on keys (with metadata): Blocked, CLIENT UNBLOCK TIMEOUT} {
|
||||
r del k
|
||||
r fsl.push k 32
|
||||
set rd [redis_deferring_client]
|
||||
$rd client id
|
||||
set cid [$rd read]
|
||||
@ -154,6 +268,7 @@ start_server {tags {"modules"}} {
|
||||
|
||||
test {Module client blocked on keys (with metadata): Blocked, CLIENT UNBLOCK ERROR} {
|
||||
r del k
|
||||
r fsl.push k 32
|
||||
set rd [redis_deferring_client]
|
||||
$rd client id
|
||||
set cid [$rd read]
|
||||
|
Loading…
x
Reference in New Issue
Block a user