diff --git a/src/blocked.c b/src/blocked.c index 2b91c1b44..fb58f850b 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -174,6 +174,7 @@ void unblockClient(client *c) { } else if (c->btype == BLOCKED_WAIT) { unblockClientWaitingReplicas(c); } else if (c->btype == BLOCKED_MODULE) { + if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c); unblockClientFromModule(c); } else { serverPanic("Unknown btype in unblockClient()."); @@ -449,7 +450,7 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) { listNode *clientnode = listFirst(clients); client *receiver = clientnode->value; - if (!moduleIsKeyReady(receiver, rl->key)) continue; + if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue; moduleUnblockClient(receiver); } diff --git a/src/module.c b/src/module.c index a11d3a306..8837ae017 100644 --- a/src/module.c +++ b/src/module.c @@ -140,6 +140,9 @@ struct RedisModuleCtx { void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */ int postponed_arrays_count; /* Number of entries in postponed_arrays. */ void *blocked_privdata; /* Privdata set when unblocking a client. */ + RedisModuleString *blocked_ready_key; /* Key ready when the reply callback + gets called for clients blocked + on keys. */ /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */ int *keys_pos; @@ -153,7 +156,7 @@ struct RedisModuleCtx { }; typedef struct RedisModuleCtx RedisModuleCtx; -#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}} +#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}} #define REDISMODULE_CTX_MULTI_EMITTED (1<<0) #define REDISMODULE_CTX_AUTO_MEMORY (1<<1) #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2) @@ -246,14 +249,6 @@ typedef struct RedisModuleBlockedClient { in thread safe contexts. */ int dbid; /* Database number selected by the original client. */ int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */ - int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, - void *privdata); /* When blocking on keys, even if the - key is signaled as ready, maybe it - was modified afterward before the - client unblocks. So we always - need a callback that tells us if - the key is ready in order to serve - the next blocked client. */ } RedisModuleBlockedClient; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; @@ -4002,10 +3997,10 @@ void unblockClientFromModule(client *c) { * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the * keys are passed or not. * - * When not blocking for keys, the keys, numkeys, is_key_ready callback - * and privdata parameters are not needed. The privdata in that case must - * be NULL, since later is RM_UnblockClient() that will provide some private - * data that the reply callback will receive. + * When not blocking for keys, the keys, numkeys, and privdata parameters are + * not needed. The privdata in that case must be NULL, since later is + * RM_UnblockClient() that will provide some private data that the reply + * callback will receive. * * Instead when blocking for keys, normally RM_UnblockClient() will not be * called (because the client will unblock when the key is modified), so @@ -4017,7 +4012,7 @@ void unblockClientFromModule(client *c) { * in that case the privdata argument is disregarded, because we pass the * reply callback the privdata that is set here while blocking. */ -RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *key, void *privdata), void *privdata) { +RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { client *c = ctx->client; int islua = c->flags & CLIENT_LUA; int ismulti = c->flags & CLIENT_MULTI; @@ -4041,7 +4036,6 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF bc->reply_client = createClient(NULL); bc->reply_client->flags |= CLIENT_MODULE; bc->dbid = c->db->id; - bc->is_key_ready = is_key_ready; bc->blocked_on_keys = keys != NULL; c->bpop.timeout = timeout; @@ -4062,20 +4056,24 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF /* This function is called from module.c in order to check if a module * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true) - * can really be unblocked since the key is ready. */ -int moduleIsKeyReady(client *c, robj *key) { + * can really be unblocked, since the module was able to serve the client. + * If the callback returns REDISMODULE_OK, then the client can be unblocked, + * otherwise the client remains blocked and we'll retry again when one of + * the keys it blocked for becomes "ready" again. */ +int moduleTryServeClientBlockedOnKey(client *c, robj *key) { + int served = 0; RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; + ctx.blocked_ready_key = key; + ctx.blocked_privdata = bc->privdata; ctx.module = bc->module; ctx.client = bc->client; - ctx.blocked_privdata = bc->privdata; /* In case the callback uses the - API to get the pointer to the - privdata, even if we provide it - as argument. */ - selectDb(ctx.client, bc->dbid); - int ready = bc->is_key_ready(&ctx, key, bc->privdata); + ctx.blocked_client = bc; + if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK) + served = 1; moduleFreeContext(&ctx); - return ready; + return served; } /* Block a client in the context of a blocking command, returning an handle @@ -4095,7 +4093,7 @@ int moduleIsKeyReady(client *c, robj *key) { * by RedisModule_UnblockClient() call. */ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { - return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL,NULL); + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL); } /* This call is similar to RedisModule_BlockClient(), however in this case we @@ -4142,8 +4140,8 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc * information about the specific unblocking operation that you are * implementing. Such information will be freed using the free_privdata * callback provided by the user. */ -RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata) { - return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,is_key_ready,privdata); +RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) { + return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata); } /* This function is used in order to potentially unblock a client blocked @@ -4174,6 +4172,13 @@ void moduleUnblockClient(client *c) { moduleUnblockClientByHandle(bc,NULL); } +/* Return true if the client 'c' was blocked by a module using + * RM_BlockClientOnKeys(). */ +int moduleClientIsBlockedOnKeys(client *c) { + RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; + return bc->blocked_on_keys; +} + /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger * the reply callbacks to be called in order to reply to the client. * The 'privdata' argument will be accessible by the reply callback, so @@ -4253,11 +4258,15 @@ void moduleHandleBlockedClients(void) { * touch the shared list. */ /* Call the reply callback if the client is valid and we have - * any callback. */ - if (c && bc->reply_callback) { + * any callback. However the callback is not called if the client + * was blocked on keys (RM_BlockClientOnKeys()), because we already + * called such callback in moduleTryServeClientBlockedOnKey() when + * the key was signaled as ready. */ + if (c && !bc->blocked_on_keys && bc->reply_callback) { RedisModuleCtx ctx = REDISMODULE_CTX_INIT; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.blocked_privdata = bc->privdata; + ctx.blocked_ready_key = NULL; ctx.module = bc->module; ctx.client = bc->client; ctx.blocked_client = bc; @@ -4349,6 +4358,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) { return ctx->blocked_privdata; } +/* Get the key that is ready when the reply callback is called in the context + * of a client blocked by RedisModule_BlockClientOnKeys(). */ +RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) { + return ctx->blocked_ready_key; +} + /* Get the blocked client associated with a given context. * This is useful in the reply and timeout callbacks of blocked clients, * before sometimes the module has the blocked client handle references @@ -6682,4 +6697,5 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(SubscribeToServerEvent); REGISTER_API(BlockClientOnKeys); REGISTER_API(SignalKeyAsReady); + REGISTER_API(GetBlockedClientReadyKey); } diff --git a/src/modules/hellotype.c b/src/modules/hellotype.c index 084408798..dafbadbe5 100644 --- a/src/modules/hellotype.c +++ b/src/modules/hellotype.c @@ -193,26 +193,26 @@ int HelloTypeLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int /* ====================== Example of a blocking command ==================== */ -/* Is_key_ready callback for blocking command HELLOTYPE.BRANGE */ -int HelloBlock_IsKeyReady(RedisModuleCtx *ctx, RedisModuleString *keyname, void *privdata) { - REDISMODULE_NOT_USED(privdata); +/* Reply callback for blocking command HELLOTYPE.BRANGE, this will get + * called when the key we blocked for is ready: we need to check if we + * can really serve the client, and reply OK or ERR accordingly. */ +int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { + REDISMODULE_NOT_USED(argv); + REDISMODULE_NOT_USED(argc); - RedisModule_AutoMemory(ctx); /* Use automatic memory management. */ + RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx); RedisModuleKey *key = RedisModule_OpenKey(ctx,keyname,REDISMODULE_READ); int type = RedisModule_KeyType(key); if (type != REDISMODULE_KEYTYPE_MODULE || RedisModule_ModuleTypeGetType(key) != HelloType) { - return 0; - } else { - return 1; + RedisModule_CloseKey(key); + return REDISMODULE_ERR; } -} -/* Reply callback for blocking command HELLOTYPE.BRANGE */ -int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - REDISMODULE_NOT_USED(argv); - REDISMODULE_NOT_USED(argc); + /* In case the key is able to serve our blocked client, let's directly + * use our original command implementation to make this example simpler. */ + RedisModule_CloseKey(key); return HelloTypeRange_RedisCommand(ctx,argv,argc-1); } @@ -251,7 +251,7 @@ int HelloTypeBRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, } void *privdata = RedisModule_Alloc(100); - RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,HelloBlock_IsKeyReady,privdata); + RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,privdata); return REDISMODULE_OK; } diff --git a/src/redismodule.h b/src/redismodule.h index 5b4c31b19..1b284770b 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -492,8 +492,9 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value); int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value); int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback); -RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata); +RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata); void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key); +RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx); /* Experimental APIs */ #ifdef REDISMODULE_EXPERIMENTAL_API @@ -692,6 +693,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(SubscribeToServerEvent); REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(SignalKeyAsReady); + REDISMODULE_GET_API(GetBlockedClientReadyKey); #ifdef REDISMODULE_EXPERIMENTAL_API REDISMODULE_GET_API(GetThreadSafeContext); diff --git a/src/server.h b/src/server.h index 3a2bb1c7b..f724f7d64 100644 --- a/src/server.h +++ b/src/server.h @@ -1602,8 +1602,9 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when); int moduleAllDatatypesHandleErrors(); sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections); void moduleFireServerEvent(uint64_t eid, int subid, void *data); -int moduleIsKeyReady(client *c, robj *key); +int moduleTryServeClientBlockedOnKey(client *c, robj *key); void moduleUnblockClient(client *c); +int moduleClientIsBlockedOnKeys(client *c); /* Utils */ long long ustime(void);