Modules: block on keys: use a better interface.

Using the is_key_ready() callback plus the reply callback later, creates
different issues AFAIK:

1. More complex API.
2. We need to call the reply callback() ASAP if the is_key_ready()
interface returned success, however the internals do not work in that
way, so when the reply callback is called the setup could be different.
To fix that, there is to break the current design that handles the
unblocked clients asyncrhonously, and run the list ASAP.
This commit is contained in:
antirez 2019-10-31 11:35:05 +01:00
parent c9ad11d797
commit 4af1df6b9f
5 changed files with 65 additions and 45 deletions

View File

@ -174,6 +174,7 @@ void unblockClient(client *c) {
} else if (c->btype == BLOCKED_WAIT) { } else if (c->btype == BLOCKED_WAIT) {
unblockClientWaitingReplicas(c); unblockClientWaitingReplicas(c);
} else if (c->btype == BLOCKED_MODULE) { } else if (c->btype == BLOCKED_MODULE) {
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
unblockClientFromModule(c); unblockClientFromModule(c);
} else { } else {
serverPanic("Unknown btype in unblockClient()."); serverPanic("Unknown btype in unblockClient().");
@ -449,7 +450,7 @@ void serveClientsBlockedOnKeyByModule(readyList *rl) {
listNode *clientnode = listFirst(clients); listNode *clientnode = listFirst(clients);
client *receiver = clientnode->value; client *receiver = clientnode->value;
if (!moduleIsKeyReady(receiver, rl->key)) continue; if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
moduleUnblockClient(receiver); moduleUnblockClient(receiver);
} }

View File

@ -140,6 +140,9 @@ struct RedisModuleCtx {
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */ void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
int postponed_arrays_count; /* Number of entries in postponed_arrays. */ int postponed_arrays_count; /* Number of entries in postponed_arrays. */
void *blocked_privdata; /* Privdata set when unblocking a client. */ void *blocked_privdata; /* Privdata set when unblocking a client. */
RedisModuleString *blocked_ready_key; /* Key ready when the reply callback
gets called for clients blocked
on keys. */
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */ /* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
int *keys_pos; int *keys_pos;
@ -153,7 +156,7 @@ struct RedisModuleCtx {
}; };
typedef struct RedisModuleCtx RedisModuleCtx; typedef struct RedisModuleCtx RedisModuleCtx;
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}} #define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}}
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0) #define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1) #define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2) #define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
@ -246,14 +249,6 @@ typedef struct RedisModuleBlockedClient {
in thread safe contexts. */ in thread safe contexts. */
int dbid; /* Database number selected by the original client. */ int dbid; /* Database number selected by the original client. */
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */ int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname,
void *privdata); /* When blocking on keys, even if the
key is signaled as ready, maybe it
was modified afterward before the
client unblocks. So we always
need a callback that tells us if
the key is ready in order to serve
the next blocked client. */
} RedisModuleBlockedClient; } RedisModuleBlockedClient;
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER; static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
@ -4002,10 +3997,10 @@ void unblockClientFromModule(client *c) {
* RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the * RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
* keys are passed or not. * keys are passed or not.
* *
* When not blocking for keys, the keys, numkeys, is_key_ready callback * When not blocking for keys, the keys, numkeys, and privdata parameters are
* and privdata parameters are not needed. The privdata in that case must * not needed. The privdata in that case must be NULL, since later is
* be NULL, since later is RM_UnblockClient() that will provide some private * RM_UnblockClient() that will provide some private data that the reply
* data that the reply callback will receive. * callback will receive.
* *
* Instead when blocking for keys, normally RM_UnblockClient() will not be * Instead when blocking for keys, normally RM_UnblockClient() will not be
* called (because the client will unblock when the key is modified), so * called (because the client will unblock when the key is modified), so
@ -4017,7 +4012,7 @@ void unblockClientFromModule(client *c) {
* in that case the privdata argument is disregarded, because we pass the * in that case the privdata argument is disregarded, because we pass the
* reply callback the privdata that is set here while blocking. * reply callback the privdata that is set here while blocking.
*/ */
RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *key, void *privdata), void *privdata) { RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
client *c = ctx->client; client *c = ctx->client;
int islua = c->flags & CLIENT_LUA; int islua = c->flags & CLIENT_LUA;
int ismulti = c->flags & CLIENT_MULTI; int ismulti = c->flags & CLIENT_MULTI;
@ -4041,7 +4036,6 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
bc->reply_client = createClient(NULL); bc->reply_client = createClient(NULL);
bc->reply_client->flags |= CLIENT_MODULE; bc->reply_client->flags |= CLIENT_MODULE;
bc->dbid = c->db->id; bc->dbid = c->db->id;
bc->is_key_ready = is_key_ready;
bc->blocked_on_keys = keys != NULL; bc->blocked_on_keys = keys != NULL;
c->bpop.timeout = timeout; c->bpop.timeout = timeout;
@ -4062,20 +4056,24 @@ RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdF
/* This function is called from module.c in order to check if a module /* This function is called from module.c in order to check if a module
* blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true) * blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true)
* can really be unblocked since the key is ready. */ * can really be unblocked, since the module was able to serve the client.
int moduleIsKeyReady(client *c, robj *key) { * If the callback returns REDISMODULE_OK, then the client can be unblocked,
* otherwise the client remains blocked and we'll retry again when one of
* the keys it blocked for becomes "ready" again. */
int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
int served = 0;
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle; RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
RedisModuleCtx ctx = REDISMODULE_CTX_INIT; RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_ready_key = key;
ctx.blocked_privdata = bc->privdata;
ctx.module = bc->module; ctx.module = bc->module;
ctx.client = bc->client; ctx.client = bc->client;
ctx.blocked_privdata = bc->privdata; /* In case the callback uses the ctx.blocked_client = bc;
API to get the pointer to the if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK)
privdata, even if we provide it served = 1;
as argument. */
selectDb(ctx.client, bc->dbid);
int ready = bc->is_key_ready(&ctx, key, bc->privdata);
moduleFreeContext(&ctx); moduleFreeContext(&ctx);
return ready; return served;
} }
/* Block a client in the context of a blocking command, returning an handle /* Block a client in the context of a blocking command, returning an handle
@ -4095,7 +4093,7 @@ int moduleIsKeyReady(client *c, robj *key) {
* by RedisModule_UnblockClient() call. * by RedisModule_UnblockClient() call.
*/ */
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) { RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL,NULL); return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
} }
/* This call is similar to RedisModule_BlockClient(), however in this case we /* This call is similar to RedisModule_BlockClient(), however in this case we
@ -4142,8 +4140,8 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
* information about the specific unblocking operation that you are * information about the specific unblocking operation that you are
* implementing. Such information will be freed using the free_privdata * implementing. Such information will be freed using the free_privdata
* callback provided by the user. */ * callback provided by the user. */
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata) { RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,is_key_ready,privdata); return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
} }
/* This function is used in order to potentially unblock a client blocked /* This function is used in order to potentially unblock a client blocked
@ -4174,6 +4172,13 @@ void moduleUnblockClient(client *c) {
moduleUnblockClientByHandle(bc,NULL); moduleUnblockClientByHandle(bc,NULL);
} }
/* Return true if the client 'c' was blocked by a module using
* RM_BlockClientOnKeys(). */
int moduleClientIsBlockedOnKeys(client *c) {
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
return bc->blocked_on_keys;
}
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger /* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
* the reply callbacks to be called in order to reply to the client. * the reply callbacks to be called in order to reply to the client.
* The 'privdata' argument will be accessible by the reply callback, so * The 'privdata' argument will be accessible by the reply callback, so
@ -4253,11 +4258,15 @@ void moduleHandleBlockedClients(void) {
* touch the shared list. */ * touch the shared list. */
/* Call the reply callback if the client is valid and we have /* Call the reply callback if the client is valid and we have
* any callback. */ * any callback. However the callback is not called if the client
if (c && bc->reply_callback) { * was blocked on keys (RM_BlockClientOnKeys()), because we already
* called such callback in moduleTryServeClientBlockedOnKey() when
* the key was signaled as ready. */
if (c && !bc->blocked_on_keys && bc->reply_callback) {
RedisModuleCtx ctx = REDISMODULE_CTX_INIT; RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY; ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
ctx.blocked_privdata = bc->privdata; ctx.blocked_privdata = bc->privdata;
ctx.blocked_ready_key = NULL;
ctx.module = bc->module; ctx.module = bc->module;
ctx.client = bc->client; ctx.client = bc->client;
ctx.blocked_client = bc; ctx.blocked_client = bc;
@ -4349,6 +4358,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
return ctx->blocked_privdata; return ctx->blocked_privdata;
} }
/* Get the key that is ready when the reply callback is called in the context
* of a client blocked by RedisModule_BlockClientOnKeys(). */
RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) {
return ctx->blocked_ready_key;
}
/* Get the blocked client associated with a given context. /* Get the blocked client associated with a given context.
* This is useful in the reply and timeout callbacks of blocked clients, * This is useful in the reply and timeout callbacks of blocked clients,
* before sometimes the module has the blocked client handle references * before sometimes the module has the blocked client handle references
@ -6682,4 +6697,5 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(SubscribeToServerEvent); REGISTER_API(SubscribeToServerEvent);
REGISTER_API(BlockClientOnKeys); REGISTER_API(BlockClientOnKeys);
REGISTER_API(SignalKeyAsReady); REGISTER_API(SignalKeyAsReady);
REGISTER_API(GetBlockedClientReadyKey);
} }

View File

@ -193,26 +193,26 @@ int HelloTypeLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
/* ====================== Example of a blocking command ==================== */ /* ====================== Example of a blocking command ==================== */
/* Is_key_ready callback for blocking command HELLOTYPE.BRANGE */ /* Reply callback for blocking command HELLOTYPE.BRANGE, this will get
int HelloBlock_IsKeyReady(RedisModuleCtx *ctx, RedisModuleString *keyname, void *privdata) { * called when the key we blocked for is ready: we need to check if we
REDISMODULE_NOT_USED(privdata); * can really serve the client, and reply OK or ERR accordingly. */
int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
REDISMODULE_NOT_USED(argv);
REDISMODULE_NOT_USED(argc);
RedisModule_AutoMemory(ctx); /* Use automatic memory management. */ RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
RedisModuleKey *key = RedisModule_OpenKey(ctx,keyname,REDISMODULE_READ); RedisModuleKey *key = RedisModule_OpenKey(ctx,keyname,REDISMODULE_READ);
int type = RedisModule_KeyType(key); int type = RedisModule_KeyType(key);
if (type != REDISMODULE_KEYTYPE_MODULE || if (type != REDISMODULE_KEYTYPE_MODULE ||
RedisModule_ModuleTypeGetType(key) != HelloType) RedisModule_ModuleTypeGetType(key) != HelloType)
{ {
return 0; RedisModule_CloseKey(key);
} else { return REDISMODULE_ERR;
return 1;
} }
}
/* Reply callback for blocking command HELLOTYPE.BRANGE */ /* In case the key is able to serve our blocked client, let's directly
int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { * use our original command implementation to make this example simpler. */
REDISMODULE_NOT_USED(argv); RedisModule_CloseKey(key);
REDISMODULE_NOT_USED(argc);
return HelloTypeRange_RedisCommand(ctx,argv,argc-1); return HelloTypeRange_RedisCommand(ctx,argv,argc-1);
} }
@ -251,7 +251,7 @@ int HelloTypeBRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
} }
void *privdata = RedisModule_Alloc(100); void *privdata = RedisModule_Alloc(100);
RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,HelloBlock_IsKeyReady,privdata); RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,privdata);
return REDISMODULE_OK; return REDISMODULE_OK;
} }

View File

@ -492,8 +492,9 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value); int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value);
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value); int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value);
int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback); int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback);
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata); RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata);
void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key); void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key);
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx);
/* Experimental APIs */ /* Experimental APIs */
#ifdef REDISMODULE_EXPERIMENTAL_API #ifdef REDISMODULE_EXPERIMENTAL_API
@ -692,6 +693,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(SubscribeToServerEvent); REDISMODULE_GET_API(SubscribeToServerEvent);
REDISMODULE_GET_API(BlockClientOnKeys); REDISMODULE_GET_API(BlockClientOnKeys);
REDISMODULE_GET_API(SignalKeyAsReady); REDISMODULE_GET_API(SignalKeyAsReady);
REDISMODULE_GET_API(GetBlockedClientReadyKey);
#ifdef REDISMODULE_EXPERIMENTAL_API #ifdef REDISMODULE_EXPERIMENTAL_API
REDISMODULE_GET_API(GetThreadSafeContext); REDISMODULE_GET_API(GetThreadSafeContext);

View File

@ -1602,8 +1602,9 @@ ssize_t rdbSaveModulesAux(rio *rdb, int when);
int moduleAllDatatypesHandleErrors(); int moduleAllDatatypesHandleErrors();
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections); sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
void moduleFireServerEvent(uint64_t eid, int subid, void *data); void moduleFireServerEvent(uint64_t eid, int subid, void *data);
int moduleIsKeyReady(client *c, robj *key); int moduleTryServeClientBlockedOnKey(client *c, robj *key);
void moduleUnblockClient(client *c); void moduleUnblockClient(client *c);
int moduleClientIsBlockedOnKeys(client *c);
/* Utils */ /* Utils */
long long ustime(void); long long ustime(void);