Modules: block on keys functions layout and mechanism.
This commit is contained in:
parent
ac5f73bfbf
commit
5f2a2b974e
153
src/module.c
153
src/module.c
@ -245,6 +245,15 @@ typedef struct RedisModuleBlockedClient {
|
||||
client *reply_client; /* Fake client used to accumulate replies
|
||||
in thread safe contexts. */
|
||||
int dbid; /* Database number selected by the original client. */
|
||||
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
|
||||
int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname,
|
||||
void *privdata); /* When blocking on keys, even if the
|
||||
key is signaled as ready, maybe it
|
||||
was modified afterward before the
|
||||
client unblocks. So we always
|
||||
need a callback that tells us if
|
||||
the key is ready in order to serve
|
||||
the next blocked client. */
|
||||
} RedisModuleBlockedClient;
|
||||
|
||||
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
||||
@ -3989,6 +3998,68 @@ void unblockClientFromModule(client *c) {
|
||||
resetClient(c);
|
||||
}
|
||||
|
||||
/* Block a client in the context of a module: this function implements both
|
||||
* RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
|
||||
* keys are passed or not.
|
||||
*
|
||||
* When not blocking for keys, the keys, numkeys, is_key_ready callback
|
||||
* and privdata parameters are not needed. The privdata in that case must
|
||||
* be NULL, since later is RM_UnblockClient() that will provide some private
|
||||
* data that the reply callback will receive.
|
||||
*
|
||||
* Instead when blocking for keys, normally RM_UnblockClient() will not be
|
||||
* called (because the client will unblock when the key is modified), so
|
||||
* 'privdata' should be provided in that case, so that once the client is
|
||||
* unlocked and the reply callback is called, it will receive its associated
|
||||
* private data.
|
||||
*
|
||||
* Even when blocking on keys, RM_UnblockClient() can be called however, but
|
||||
* in that case the privdata argument is disregarded, because we pass the
|
||||
* reply callback the privdata that is set here while blocking.
|
||||
*/
|
||||
RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *key, void *privdata), void *privdata) {
|
||||
client *c = ctx->client;
|
||||
int islua = c->flags & CLIENT_LUA;
|
||||
int ismulti = c->flags & CLIENT_MULTI;
|
||||
|
||||
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
|
||||
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||
ctx->module->blocked_clients++;
|
||||
|
||||
/* We need to handle the invalid operation of calling modules blocking
|
||||
* commands from Lua or MULTI. We actually create an already aborted
|
||||
* (client set to NULL) blocked client handle, and actually reply with
|
||||
* an error. */
|
||||
mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
|
||||
bc->client = (islua || ismulti) ? NULL : c;
|
||||
bc->module = ctx->module;
|
||||
bc->reply_callback = reply_callback;
|
||||
bc->timeout_callback = timeout_callback;
|
||||
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
|
||||
bc->free_privdata = free_privdata;
|
||||
bc->privdata = privdata;
|
||||
bc->reply_client = createClient(NULL);
|
||||
bc->reply_client->flags |= CLIENT_MODULE;
|
||||
bc->dbid = c->db->id;
|
||||
bc->is_key_ready = is_key_ready;
|
||||
bc->blocked_on_keys = keys != NULL;
|
||||
c->bpop.timeout = timeout;
|
||||
|
||||
if (islua || ismulti) {
|
||||
c->bpop.module_blocked_handle = NULL;
|
||||
addReplyError(c, islua ?
|
||||
"Blocking module command called from Lua script" :
|
||||
"Blocking module command called from transaction");
|
||||
} else {
|
||||
if (keys) {
|
||||
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL);
|
||||
} else {
|
||||
blockClient(c,BLOCKED_MODULE);
|
||||
}
|
||||
}
|
||||
return bc;
|
||||
}
|
||||
|
||||
/* Block a client in the context of a blocking command, returning an handle
|
||||
* which will be used, later, in order to unblock the client with a call to
|
||||
* RedisModule_UnblockClient(). The arguments specify callback functions
|
||||
@ -4006,39 +4077,55 @@ void unblockClientFromModule(client *c) {
|
||||
* by RedisModule_UnblockClient() call.
|
||||
*/
|
||||
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
|
||||
client *c = ctx->client;
|
||||
int islua = c->flags & CLIENT_LUA;
|
||||
int ismulti = c->flags & CLIENT_MULTI;
|
||||
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL,NULL);
|
||||
}
|
||||
|
||||
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
|
||||
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||
ctx->module->blocked_clients++;
|
||||
|
||||
/* We need to handle the invalid operation of calling modules blocking
|
||||
* commands from Lua or MULTI. We actually create an already aborted
|
||||
* (client set to NULL) blocked client handle, and actually reply with
|
||||
* an error. */
|
||||
bc->client = (islua || ismulti) ? NULL : c;
|
||||
bc->module = ctx->module;
|
||||
bc->reply_callback = reply_callback;
|
||||
bc->timeout_callback = timeout_callback;
|
||||
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
|
||||
bc->free_privdata = free_privdata;
|
||||
bc->privdata = NULL;
|
||||
bc->reply_client = createClient(NULL);
|
||||
bc->reply_client->flags |= CLIENT_MODULE;
|
||||
bc->dbid = c->db->id;
|
||||
c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
|
||||
|
||||
if (islua || ismulti) {
|
||||
c->bpop.module_blocked_handle = NULL;
|
||||
addReplyError(c, islua ?
|
||||
"Blocking module command called from Lua script" :
|
||||
"Blocking module command called from transaction");
|
||||
} else {
|
||||
blockClient(c,BLOCKED_MODULE);
|
||||
}
|
||||
return bc;
|
||||
/* This call is similar to RedisModule_BlockClient(), however in this case we
|
||||
* don't just block the client, but also ask Redis to unblock it automatically
|
||||
* once certain keys become "ready", that is, contain more data.
|
||||
*
|
||||
* Basically this is similar to what a typical Redis command usually does,
|
||||
* like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP,
|
||||
* and later when the key receives new data (a list push for instance), the
|
||||
* client is unblocked and served.
|
||||
*
|
||||
* However in the case of this module API, when the client is unblocked?
|
||||
*
|
||||
* 1. If you block ok a key of a type that has blocking operations associated,
|
||||
* like a list, a sorted set, a stream, and so forth, the client may be
|
||||
* unblocked once the relevant key is targeted by an operation that normally
|
||||
* unblocks the native blocking operations for that type. So if we block
|
||||
* on a list key, an RPUSH command may unblock our client and so forth.
|
||||
* 2. If you are implementing your native data type, or if you want to add new
|
||||
* unblocking conditions in addition to "1", you can call the modules API
|
||||
* RedisModule_SignalKeyAsReady().
|
||||
*
|
||||
* Anyway we can't be sure if the client should be unblocked just because the
|
||||
* key is signaled as ready: for instance a successive operation may change the
|
||||
* key, or a client in queue before this one can be served, modifying the key
|
||||
* as well and making it empty again. So when blocking for keys, we need to
|
||||
* register a callback called is_key_ready. This callback gets called with
|
||||
* a context, selected with the right database, and the key name: if it
|
||||
* returns 1, then we proceed calling the reply callback, otherwise if the
|
||||
* is_key_ready callback returns 0 the client is not unblocked, since the
|
||||
* key is yet not ready.
|
||||
*
|
||||
* Thanks to this system we can setup complex blocking scenarios, like
|
||||
* unblocking a client only if a list contains at least 5 items or other
|
||||
* more fancy logics.
|
||||
*
|
||||
* Note that another difference with RedisModule_BlockClient(), is that here
|
||||
* we pass the private data directly when blocking the client: such private
|
||||
* data will be later provided both to the is_key_ready callback, and to the
|
||||
* reply callback. Normally in RedisModule_BlockClient() the private data
|
||||
* to reply to the client is passed when calling RedisModule_UnblockClient()
|
||||
* but here the unblocking is performed by Redis itself, so we need to have
|
||||
* some private data before hand. The private data is used to store any
|
||||
* information about the specific unblocking operation that you are
|
||||
* implementing. Such information will be freed using the free_privdata
|
||||
* callback provided by the user. */
|
||||
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata) {
|
||||
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,is_key_ready,privdata);
|
||||
}
|
||||
|
||||
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
|
||||
@ -4054,7 +4141,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
|
||||
* Note: this function can be called from threads spawned by the module. */
|
||||
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
|
||||
pthread_mutex_lock(&moduleUnblockedClientsMutex);
|
||||
bc->privdata = privdata;
|
||||
if (!bc->blocked_on_keys) bc->privdata = privdata;
|
||||
listAddNodeTail(moduleUnblockedClients,bc);
|
||||
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
|
||||
/* Ignore the error, this is best-effort. */
|
||||
|
Loading…
x
Reference in New Issue
Block a user