diff --git a/src/blocked.c b/src/blocked.c index c4aded0c5..497ffe4ce 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -398,14 +398,26 @@ void serveClientsBlockedOnSortedSetKey(robj *o, readyList *rl) { /* Helper function for handleClientsBlockedOnKeys(). This function is called * when there may be clients blocked on a stream key, and there may be new - * data to fetch (the key is ready). */ + * data to fetch (the key is ready). + * This function also handles the case where there may be clients blocked, + * via XREADGROUP, on an existing stream which was deleted. + * We need to unblock the clients in that case. + * The idea is that a client that is blocked via XREADGROUP is different from + * any other blocking type in the sense that it depends on the existence of both + * the key and the group. Even if the key is deleted and then revived with XADD + * it won't help any clients blocked on XREADGROUP because the group no longer + * exist, so they would fail with -NOGROUP anyway. + * The conclusion is that it's better to unblock these client (with error) upon + * the deletion of the key, rather than waiting for the first XADD.*/ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { /* Optimization: If no clients are in type BLOCKED_STREAM, * we can skip this loop. */ if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - stream *s = o->ptr; + /* This function may be called with o=NULL (in order to unblock + * XREADGROUP clients whose key was deleted) */ + stream *s = o? o->ptr : NULL; /* We need to provide the new data arrived on the stream * to all the clients that are waiting for an offset smaller @@ -422,6 +434,13 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { bkinfo *bki = dictFetchValue(receiver->bpop.keys,rl->key); streamID *gt = &bki->stream_id; + if (!receiver->bpop.xread_group && (!o || o->type != OBJ_STREAM)) { + /* If it's a blocking XREAD and the stream was either deleted + * or replaced with another key, we don't do anything (it's ok + * the the client blocks on a non-existing key). */ + continue; + } + long long prev_error_replies = server.stat_total_error_replies; client *old_client = server.current_client; server.current_client = receiver; @@ -439,6 +458,12 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { * otherwise. */ streamCG *group = NULL; if (receiver->bpop.xread_group) { + /* If it's a blocking XREADGROUP and the stream was either deleted + * or replaced with another key, we unblock the client */ + if (!o || o->type != OBJ_STREAM) { + addReplyError(receiver, "-UNBLOCKED the stream key no longer exists"); + goto unblock_receiver; + } group = streamLookupCG(s, receiver->bpop.xread_group->ptr); /* If the group was not found, send an error @@ -545,52 +570,13 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { server.current_client = receiver; monotime replyTimer; elapsedStart(&replyTimer); - if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - moduleUnblockClient(receiver); - afterCommand(receiver); - server.current_client = old_client; - } - } -} - -/* Helper function for handleClientsBlockedOnKeys(). This function is called - * when there may be clients blocked, via XREADGROUP, on an existing stream which - * was deleted. We need to unblock the clients in that case. - * The idea is that a client that is blocked via XREADGROUP is different from - * any other blocking type in the sense that it depends on the existence of both - * the key and the group. Even if the key is deleted and then revived with XADD - * it won't help any clients blocked on XREADGROUP because the group no longer - * exist, so they would fail with -NOGROUP anyway. - * The conclusion is that it's better to unblock these client (with error) upon - * the deletion of the key, rather than waiting for the first XADD. */ -void unblockDeletedStreamReadgroupClients(readyList *rl) { - /* Optimization: If no clients are in type BLOCKED_STREAM, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; - - /* We serve clients in the same order they blocked for - * this key, from the first blocked to the last. */ - dictEntry *de = dictFind(rl->db->blocking_keys,rl->key); - if (de) { - list *clients = dictGetVal(de); - listNode *ln; - listIter li; - listRewind(clients,&li); - - while((ln = listNext(&li))) { - client *receiver = listNodeValue(ln); - if (receiver->btype != BLOCKED_STREAM || !receiver->bpop.xread_group) - continue; - - long long prev_error_replies = server.stat_total_error_replies; - client *old_client = server.current_client; - server.current_client = receiver; - monotime replyTimer; - elapsedStart(&replyTimer); - addReplyError(receiver, "-UNBLOCKED the stream key no longer exists"); - updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); - unblockClient(receiver); + if (moduleTryServeClientBlockedOnKey(receiver, rl->key)) { + updateStatsOnUnblock(receiver, 0, elapsedUs(replyTimer), server.stat_total_error_replies != prev_error_replies); + moduleUnblockClient(receiver); + } + /* We need to call afterCommand even if the client was not unblocked + * in order to propagate any changes that could have been done inside + * moduleTryServeClientBlockedOnKey */ afterCommand(receiver); server.current_client = old_client; } @@ -646,33 +632,35 @@ void handleClientsBlockedOnKeys(void) { /* Serve clients blocked on the key. */ robj *o = lookupKeyReadWithFlags(rl->db, rl->key, LOOKUP_NONOTIFY | LOOKUP_NOSTATS); - if (o != NULL) { - int objtype = o->type; - if (objtype == OBJ_LIST) - serveClientsBlockedOnListKey(o,rl); - else if (objtype == OBJ_ZSET) - serveClientsBlockedOnSortedSetKey(o,rl); - else if (objtype == OBJ_STREAM) - serveClientsBlockedOnStreamKey(o,rl); - /* We want to serve clients blocked on module keys - * regardless of the object type: we don't know what the - * module is trying to accomplish right now. */ - serveClientsBlockedOnKeyByModule(rl); - /* If we have XREADGROUP clients blocked on this key, and - * the key is not a stream, it must mean that the key was - * overwritten by either SET or something like - * (MULTI, DEL key, SADD key e, EXEC). - * In this case we need to unblock all these clients. */ - if (objtype != OBJ_STREAM) - unblockDeletedStreamReadgroupClients(rl); - } else { - /* Unblock all XREADGROUP clients of this deleted key */ - unblockDeletedStreamReadgroupClients(rl); + if (!o) { /* Edge case: If lookupKeyReadWithFlags decides to expire the key we have to * take care of the propagation here, because afterCommand wasn't called */ - if (server.also_propagate.numops > 0) - propagatePendingCommands(); + propagatePendingCommands(); + } else { + if (o->type == OBJ_LIST) + serveClientsBlockedOnListKey(o,rl); + else if (o->type == OBJ_ZSET) + serveClientsBlockedOnSortedSetKey(o,rl); } + /* We need to try to serve stream clients even if the key no longer exists because + * XREADGROUP clients need to be unblocked in case the key is missing, either deleted + * or replaced by SET or something like {MULTI, DEL key, SADD key e, EXEC}. + * In this case we need to unblock all these clients. */ + serveClientsBlockedOnStreamKey(o,rl); + /* We want to serve clients blocked on module keys regardless of the object type, or + * whether the object exists or not: we don't know what the module is trying to + * accomplish right now. + * Please note that this function must be called only after handling non-module + * clients, since moduleTryServeClientBlockedOnKey may delete the key, causing `o` + * to be stale. + * The scenario is that we have one client blocked on BLPOP while another module + * client is blocked by MODULE.SAME-AS-BLPOP on the same key. + * Of course we can call lookupKeyReadWithFlags again, but: + * 1) It takes CPU + * 2) It makes more sense to give priority to "native" blocking clients rather + * than module blocking clients + * */ + serveClientsBlockedOnKeyByModule(rl); /* Free this item. */ decrRefCount(rl->key); @@ -717,8 +705,7 @@ void handleClientsBlockedOnKeys(void) { * * 'count' for those commands that support the optional count argument. * Otherwise the value is 0. */ -void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids) { - dictEntry *de; +void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey) { list *l; int j; @@ -745,20 +732,27 @@ void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, ms incrRefCount(keys[j]); /* And in the other "side", to map keys -> clients */ - de = dictFind(c->db->blocking_keys,keys[j]); - if (de == NULL) { - int retval; - + dictEntry *de, *existing; + de = dictAddRaw(c->db->blocking_keys, keys[j], &existing); + if (de) { + incrRefCount(keys[j]); /* For every key we take a list of clients blocked for it */ l = listCreate(); - retval = dictAdd(c->db->blocking_keys,keys[j],l); - incrRefCount(keys[j]); - serverAssertWithInfo(c,keys[j],retval == DICT_OK); + dictSetVal(c->db->blocking_keys, de, l); } else { - l = dictGetVal(de); + l = dictGetVal(existing); } listAddNodeTail(l,c); bki->listnode = listLast(l); + + /* We need to add the key to blocking_keys_unblock_on_nokey, if the client + * wants to be awakened if key is deleted (like XREADGROUP) */ + if (unblock_on_nokey) { + de = dictAddRaw(c->db->blocking_keys_unblock_on_nokey, keys[j], NULL); + if (de) { + incrRefCount(keys[j]); + } + } } blockClient(c,btype); } @@ -782,8 +776,10 @@ void unblockClientWaitingData(client *c) { serverAssertWithInfo(c,key,l != NULL); listDelNode(l,bki->listnode); /* If the list is empty we need to remove it to avoid wasting memory */ - if (listLength(l) == 0) + if (listLength(l) == 0) { dictDelete(c->db->blocking_keys,key); + dictDelete(c->db->blocking_keys_unblock_on_nokey,key); + } } dictReleaseIterator(di); @@ -818,7 +814,7 @@ static int getBlockedTypeByType(int type) { * made by a script or in the context of MULTI/EXEC. * * The list will be finally processed by handleClientsBlockedOnKeys() */ -void signalKeyAsReady(redisDb *db, robj *key, int type) { +static void signalKeyAsReadyLogic(redisDb *db, robj *key, int type, int deleted) { readyList *rl; /* Quick returns. */ @@ -836,11 +832,28 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) { return; } - /* No clients blocking for this key? No need to queue it. */ - if (dictFind(db->blocking_keys,key) == NULL) return; + if (deleted) { + /* Key deleted and no clients blocking for this key? No need to queue it. */ + if (dictFind(db->blocking_keys_unblock_on_nokey,key) == NULL) + return; + /* Note: if we made it here it means the key is also present in db->blocking_keys */ + } else { + /* No clients blocking for this key? No need to queue it. */ + if (dictFind(db->blocking_keys,key) == NULL) + return; + } - /* Key was already signaled? No need to queue it again. */ - if (dictFind(db->ready_keys,key) != NULL) return; + dictEntry *de, *existing; + de = dictAddRaw(db->ready_keys, key, &existing); + if (de) { + /* We add the key in the db->ready_keys dictionary in order + * to avoid adding it multiple times into a list with a simple O(1) + * check. */ + incrRefCount(key); + } else { + /* Key was already signaled? No need to queue it again. */ + return; + } /* Ok, we need to queue this key into server.ready_keys. */ rl = zmalloc(sizeof(*rl)); @@ -848,10 +861,12 @@ void signalKeyAsReady(redisDb *db, robj *key, int type) { rl->db = db; incrRefCount(key); listAddNodeTail(server.ready_keys,rl); - - /* We also add the key in the db->ready_keys dictionary in order - * to avoid adding it multiple times into a list with a simple O(1) - * check. */ - incrRefCount(key); - serverAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK); +} + +void signalKeyAsReady(redisDb *db, robj *key, int type) { + signalKeyAsReadyLogic(db, key, type, 0); +} + +void signalDeletedKeyAsReady(redisDb *db, robj *key, int type) { + signalKeyAsReadyLogic(db, key, type, 1); } diff --git a/src/db.c b/src/db.c index 63705cd01..58febe0d9 100644 --- a/src/db.c +++ b/src/db.c @@ -232,9 +232,8 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { * overwrite as two steps of unlink+add, so we still need to call the unlink * callback of the module. */ moduleNotifyKeyUnlink(key,old,db->id); - /* We want to try to unblock any client using a blocking XREADGROUP */ - if (old->type == OBJ_STREAM) - signalKeyAsReady(db,key,old->type); + /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ + signalDeletedKeyAsReady(db,key,old->type); dictSetVal(db->dict, de, val); if (server.lazyfree_lazy_server_del) { @@ -325,9 +324,8 @@ static int dbGenericDelete(redisDb *db, robj *key, int async) { robj *val = dictGetVal(de); /* Tells the module that the key has been unlinked from the database. */ moduleNotifyKeyUnlink(key,val,db->id); - /* We want to try to unblock any client using a blocking XREADGROUP */ - if (val->type == OBJ_STREAM) - signalKeyAsReady(db,key,val->type); + /* We want to try to unblock any module clients or clients using a blocking XREADGROUP */ + signalDeletedKeyAsReady(db,key,val->type); if (async) { freeObjAsync(key, val, db->id); dictSetVal(db->dict, de, NULL); @@ -568,7 +566,7 @@ void signalFlushedDb(int dbid, int async) { } for (int j = startdb; j <= enddb; j++) { - scanDatabaseForDeletedStreams(&server.db[j], NULL); + scanDatabaseForDeletedKeys(&server.db[j], NULL); touchAllWatchedKeysInDb(&server.db[j], NULL); } @@ -1350,32 +1348,32 @@ void scanDatabaseForReadyKeys(redisDb *db) { /* Since we are unblocking XREADGROUP clients in the event the * key was deleted/overwritten we must do the same in case the * database was flushed/swapped. */ -void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with) { - /* Optimization: If no clients are in type BLOCKED_STREAM, - * we can skip this loop. */ - if (!server.blocked_clients_by_type[BLOCKED_STREAM]) return; - +void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with) { dictEntry *de; dictIterator *di = dictGetSafeIterator(emptied->blocking_keys); while((de = dictNext(di)) != NULL) { robj *key = dictGetKey(de); - int was_stream = 0, is_stream = 0; + int existed = 0, exists = 0; + int original_type = -1, curr_type = -1; dictEntry *kde = dictFind(emptied->dict, key->ptr); if (kde) { robj *value = dictGetVal(kde); - was_stream = value->type == OBJ_STREAM; + original_type = value->type; + existed = 1; } + if (replaced_with) { dictEntry *kde = dictFind(replaced_with->dict, key->ptr); if (kde) { robj *value = dictGetVal(kde); - is_stream = value->type == OBJ_STREAM; + curr_type = value->type; + exists = 1; } } /* We want to try to unblock any client using a blocking XREADGROUP */ - if (was_stream && !is_stream) - signalKeyAsReady(emptied, key, OBJ_STREAM); + if ((existed && !exists) || original_type != curr_type) + signalDeletedKeyAsReady(emptied, key, original_type); } dictReleaseIterator(di); } @@ -1401,8 +1399,8 @@ int dbSwapDatabases(int id1, int id2) { touchAllWatchedKeysInDb(db2, db1); /* Try to unblock any XREADGROUP clients if the key no longer exists. */ - scanDatabaseForDeletedStreams(db1, db2); - scanDatabaseForDeletedStreams(db2, db1); + scanDatabaseForDeletedKeys(db1, db2); + scanDatabaseForDeletedKeys(db2, db1); /* Swap hash tables. Note that we don't swap blocking_keys, * ready_keys and watched_keys, since we want clients to @@ -1451,7 +1449,7 @@ void swapMainDbWithTempDb(redisDb *tempDb) { touchAllWatchedKeysInDb(activedb, newdb); /* Try to unblock any XREADGROUP clients if the key no longer exists. */ - scanDatabaseForDeletedStreams(activedb, newdb); + scanDatabaseForDeletedKeys(activedb, newdb); /* Swap hash tables. Note that we don't swap blocking_keys, * ready_keys and watched_keys, since clients diff --git a/src/module.c b/src/module.c index 33ac151a2..2854d157a 100644 --- a/src/module.c +++ b/src/module.c @@ -7243,7 +7243,10 @@ void unblockClientFromModule(client *c) { * reply callback the privdata that is set here while blocking. * */ -RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { +RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, + RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), + long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata, + int flags) { client *c = ctx->client; int islua = scriptIsRunning(); int ismulti = server.in_exec; @@ -7282,7 +7285,7 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF "Blocking module command called from transaction"); } else { if (keys) { - blockForKeys(c,BLOCKED_MODULE,keys,numkeys,-1,timeout,NULL,NULL,NULL); + blockForKeys(c,BLOCKED_MODULE,keys,numkeys,-1,timeout,NULL,NULL,NULL,flags&REDISMODULE_BLOCK_UNBLOCK_DELETED); } else { blockClient(c,BLOCKED_MODULE); } @@ -7357,8 +7360,10 @@ int moduleTryServeClientBlockedOnKey(client *c, robj *key) { * use RM_BlockedClientMeasureTimeStart() and RM_BlockedClientMeasureTimeEnd() one, * or multiple times within the blocking command background work. */ -RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { - return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL); +RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, + RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), + long long timeout_ms) { + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL,0); } /* This call is similar to RedisModule_BlockClient(), however in this case we @@ -7419,15 +7424,31 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * handled as if it were timed-out (You must implement the timeout * callback in that case). */ -RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { - return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata); +RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, + RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), + long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata,0); +} + +/* Same as RedisModule_BlockClientOnKeys, but can take REDISMODULE_BLOCK_* flags + * Can be either REDISMODULE_BLOCK_UNBLOCK_DEFAULT, which means default behavior (same + * as calling RedisModule_BlockClientOnKeys) + * + * The flags is a bit mask of these: + * + * - `REDISMODULE_BLOCK_UNBLOCK_DELETED`: The clients should to be awakened in case any of `keys` are deleted. + * Mostly useful for commands that require the key to exist (like XREADGROUP) + */ +RedisModuleBlockedClient *RM_BlockClientOnKeysWithFlags(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, + RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), + long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata, + int flags) { + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata,flags); } /* This function is used in order to potentially unblock a client blocked * on keys with RedisModule_BlockClientOnKeys(). When this function is called, - * all the clients blocked for this key will get their reply_callback called. - * - * Note: The function has no effect if the signaled key doesn't exist. */ + * all the clients blocked for this key will get their reply_callback called. */ void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) { signalKeyAsReady(ctx->client->db, key, OBJ_MODULE); } @@ -12826,6 +12847,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SetLFU); REGISTER_API(GetLFU); REGISTER_API(BlockClientOnKeys); + REGISTER_API(BlockClientOnKeysWithFlags); REGISTER_API(SignalKeyAsReady); REGISTER_API(GetBlockedClientReadyKey); REGISTER_API(GetUsedMemoryRatio); diff --git a/src/redismodule.h b/src/redismodule.h index a2256e365..0515af61e 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -235,6 +235,10 @@ This flag should not be used directly by the module. #define REDISMODULE_YIELD_FLAG_NONE (1<<0) #define REDISMODULE_YIELD_FLAG_CLIENTS (1<<1) +/* RM_BlockClientOnKeysWithFlags flags */ +#define REDISMODULE_BLOCK_UNBLOCK_DEFAULT (0) +#define REDISMODULE_BLOCK_UNBLOCK_DELETED (1<<0) + /* This type represents a timer handle, and is returned when a timer is * registered and used in order to invalidate a timer. It's just a 64 bit * number, because this is how each timer is represented inside the radix tree @@ -1133,6 +1137,7 @@ REDISMODULE_API int (*RedisModule_GetLRU)(RedisModuleKey *key, mstime_t *lru_idl REDISMODULE_API int (*RedisModule_SetLFU)(RedisModuleKey *key, long long lfu_freq) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_GetLFU)(RedisModuleKey *key, long long *lfu_freq) REDISMODULE_ATTR; REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) REDISMODULE_ATTR; +REDISMODULE_API RedisModuleBlockedClient * (*RedisModule_BlockClientOnKeysWithFlags)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata, int flags) REDISMODULE_ATTR; REDISMODULE_API void (*RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key) REDISMODULE_ATTR; REDISMODULE_API RedisModuleString * (*RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx) REDISMODULE_ATTR; REDISMODULE_API RedisModuleScanCursor * (*RedisModule_ScanCursorCreate)() REDISMODULE_ATTR; @@ -1476,6 +1481,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(SetLFU); REDISMODULE_GET_API(GetLFU); REDISMODULE_GET_API(BlockClientOnKeys); + REDISMODULE_GET_API(BlockClientOnKeysWithFlags); REDISMODULE_GET_API(SignalKeyAsReady); REDISMODULE_GET_API(GetBlockedClientReadyKey); REDISMODULE_GET_API(ScanCursorCreate); diff --git a/src/server.c b/src/server.c index 4962b4fba..844ed4111 100644 --- a/src/server.c +++ b/src/server.c @@ -2495,6 +2495,7 @@ void initServer(void) { server.db[j].expires = dictCreate(&dbExpiresDictType); server.db[j].expires_cursor = 0; server.db[j].blocking_keys = dictCreate(&keylistDictType); + server.db[j].blocking_keys_unblock_on_nokey = dictCreate(&objectKeyPointerValueDictType); server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType); server.db[j].watched_keys = dictCreate(&keylistDictType); server.db[j].id = j; diff --git a/src/server.h b/src/server.h index d5c0b4917..092ee5758 100644 --- a/src/server.h +++ b/src/server.h @@ -917,6 +917,9 @@ typedef struct redisDb { dict *dict; /* The keyspace for this DB */ dict *expires; /* Timeout of keys with a timeout set */ dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/ + dict *blocking_keys_unblock_on_nokey; /* Keys with clients waiting for + * data, and should be unblocked if key is deleted (XREADEDGROUP). + * This is a subset of blocking_keys*/ dict *ready_keys; /* Blocked keys that received a PUSH */ dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */ int id; /* Database ID */ @@ -3234,9 +3237,10 @@ int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int void disconnectAllBlockedClients(void); void handleClientsBlockedOnKeys(void); void signalKeyAsReady(redisDb *db, robj *key, int type); -void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids); +void signalDeletedKeyAsReady(redisDb *db, robj *key, int type); +void blockForKeys(client *c, int btype, robj **keys, int numkeys, long count, mstime_t timeout, robj *target, struct blockPos *blockpos, streamID *ids, int unblock_on_nokey); void updateStatsOnUnblock(client *c, long blocked_us, long reply_us, int had_errors); -void scanDatabaseForDeletedStreams(redisDb *emptied, redisDb *replaced_with); +void scanDatabaseForDeletedKeys(redisDb *emptied, redisDb *replaced_with); /* timeout.c -- Blocked clients timeout and connections timeout. */ void addClientToTimeoutTable(client *c); diff --git a/src/t_list.c b/src/t_list.c index 64f1a71dd..e0d324309 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -1074,7 +1074,7 @@ void blockingPopGenericCommand(client *c, robj **keys, int numkeys, int where, i /* If the keys do not exist we must block */ struct blockPos pos = {where}; - blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL); + blockForKeys(c,BLOCKED_LIST,keys,numkeys,count,timeout,NULL,&pos,NULL,0); } /* BLPOP [ ...] */ @@ -1099,7 +1099,7 @@ void blmoveGenericCommand(client *c, int wherefrom, int whereto, mstime_t timeou } else { /* The list is empty and the client blocks. */ struct blockPos pos = {wherefrom, whereto}; - blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,-1,timeout,c->argv[2],&pos,NULL); + blockForKeys(c,BLOCKED_LIST,c->argv + 1,1,-1,timeout,c->argv[2],&pos,NULL,0); } } else { /* The list exists and has elements, so diff --git a/src/t_stream.c b/src/t_stream.c index b0ac0556a..5d06529d7 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -2404,7 +2404,7 @@ void xreadCommand(client *c) { goto cleanup; } blockForKeys(c, BLOCKED_STREAM, c->argv+streams_arg, streams_count, - -1, timeout, NULL, NULL, ids); + -1, timeout, NULL, NULL, ids, xreadgroup); /* If no COUNT is given and we block, set a relatively small count: * in case the ID provided is too low, we do not want the server to * block just to serve this client a huge stream of messages. */ diff --git a/src/t_zset.c b/src/t_zset.c index 4016fc925..e194860db 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -4044,7 +4044,7 @@ void blockingGenericZpopCommand(client *c, robj **keys, int numkeys, int where, /* If the keys do not exist we must block */ struct blockPos pos = {where}; - blockForKeys(c,BLOCKED_ZSET,keys,numkeys,count,timeout,NULL,&pos,NULL); + blockForKeys(c,BLOCKED_ZSET,keys,numkeys,count,timeout,NULL,&pos,NULL,0); } // BZPOPMIN key [key ...] timeout diff --git a/tests/modules/blockonkeys.c b/tests/modules/blockonkeys.c index 1aa576489..9b6c5e60b 100644 --- a/tests/modules/blockonkeys.c +++ b/tests/modules/blockonkeys.c @@ -5,6 +5,8 @@ #include #include +#define UNUSED(V) ((void) V) + #define LIST_SIZE 1024 typedef struct { @@ -174,7 +176,7 @@ int bpopgt_reply_callback(RedisModuleCtx *ctx, RedisModuleString **argv, int arg fsl_t *fsl; if (!get_fsl(ctx, keyname, REDISMODULE_READ, 0, &fsl, 0) || !fsl) - return REDISMODULE_ERR; + return RedisModule_ReplyWithError(ctx,"UNBLOCKED key no longer exists"); if (fsl->list[fsl->length-1] <= *pgt) return REDISMODULE_ERR; @@ -212,12 +214,17 @@ int fsl_bpopgt(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { if (!get_fsl(ctx, argv[1], REDISMODULE_READ, 0, &fsl, 1)) return REDISMODULE_OK; - if (!fsl || fsl->list[fsl->length-1] <= gt) { + if (!fsl) + return RedisModule_ReplyWithError(ctx,"ERR key must exist"); + + if (fsl->list[fsl->length-1] <= gt) { /* We use malloc so the tests in blockedonkeys.tcl can check for memory leaks */ long long *pgt = RedisModule_Alloc(sizeof(long long)); *pgt = gt; - RedisModule_BlockClientOnKeys(ctx, bpopgt_reply_callback, bpopgt_timeout_callback, - bpopgt_free_privdata, timeout, &argv[1], 1, pgt); + RedisModule_BlockClientOnKeysWithFlags( + ctx, bpopgt_reply_callback, bpopgt_timeout_callback, + bpopgt_free_privdata, timeout, &argv[1], 1, pgt, + REDISMODULE_BLOCK_UNBLOCK_DELETED); } else { RedisModule_ReplyWithLongLong(ctx, fsl->list[--fsl->length]); } @@ -469,7 +476,7 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) .aof_rewrite = fsl_aofrw, .mem_usage = NULL, .free = fsl_free, - .digest = NULL + .digest = NULL, }; fsltype = RedisModule_CreateDataType(ctx, "fsltype_t", 0, &tm); diff --git a/tests/unit/moduleapi/blockonkeys.tcl b/tests/unit/moduleapi/blockonkeys.tcl index 094bcc0c0..50b130ba2 100644 --- a/tests/unit/moduleapi/blockonkeys.tcl +++ b/tests/unit/moduleapi/blockonkeys.tcl @@ -106,6 +106,7 @@ start_server {tags {"modules"}} { test {Module client blocked on keys (with metadata): Blocked, case 2} { r del k + r fsl.push k 32 set rd [redis_deferring_client] $rd fsl.bpopgt k 35 0 ;# wait until clients are actually blocked @@ -121,8 +122,120 @@ start_server {tags {"modules"}} { assert_equal {36} [$rd read] } + test {Module client blocked on keys (with metadata): Blocked, DEL} { + r del k + r fsl.push k 32 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r del k + assert_error {*UNBLOCKED key no longer exists*} {$rd read} + } + + test {Module client blocked on keys (with metadata): Blocked, FLUSHALL} { + r del k + r fsl.push k 32 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r flushall + assert_error {*UNBLOCKED key no longer exists*} {$rd read} + } + + test {Module client blocked on keys (with metadata): Blocked, SWAPDB, no key} { + r select 9 + r del k + r fsl.push k 32 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r swapdb 0 9 + assert_error {*UNBLOCKED key no longer exists*} {$rd read} + } + + test {Module client blocked on keys (with metadata): Blocked, SWAPDB, key exists, case 1} { + ;# Key exists on other db, but wrong type + r flushall + r select 9 + r fsl.push k 32 + r select 0 + r lpush k 38 + r select 9 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r swapdb 0 9 + assert_error {*UNBLOCKED key no longer exists*} {$rd read} + r select 9 + } + + test {Module client blocked on keys (with metadata): Blocked, SWAPDB, key exists, case 2} { + ;# Key exists on other db, with the right type, but the value doesn't allow to unblock + r flushall + r select 9 + r fsl.push k 32 + r select 0 + r fsl.push k 34 + r select 9 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r swapdb 0 9 + assert_equal {1} [s 0 blocked_clients] + r fsl.push k 38 + assert_equal {38} [$rd read] + r select 9 + } + + test {Module client blocked on keys (with metadata): Blocked, SWAPDB, key exists, case 3} { + ;# Key exists on other db, with the right type, the value allows to unblock + r flushall + r select 9 + r fsl.push k 32 + r select 0 + r fsl.push k 38 + r select 9 + set rd [redis_deferring_client] + $rd fsl.bpopgt k 35 0 + ;# wait until clients are actually blocked + wait_for_condition 50 100 { + [s 0 blocked_clients] eq {1} + } else { + fail "Clients are not blocked" + } + r swapdb 0 9 + assert_equal {38} [$rd read] + r select 9 + } + test {Module client blocked on keys (with metadata): Blocked, CLIENT KILL} { r del k + r fsl.push k 32 set rd [redis_deferring_client] $rd client id set cid [$rd read] @@ -138,6 +251,7 @@ start_server {tags {"modules"}} { test {Module client blocked on keys (with metadata): Blocked, CLIENT UNBLOCK TIMEOUT} { r del k + r fsl.push k 32 set rd [redis_deferring_client] $rd client id set cid [$rd read] @@ -154,6 +268,7 @@ start_server {tags {"modules"}} { test {Module client blocked on keys (with metadata): Blocked, CLIENT UNBLOCK ERROR} { r del k + r fsl.push k 32 set rd [redis_deferring_client] $rd client id set cid [$rd read]