Modules: block on keys functions layout and mechanism.
This commit is contained in:
parent
cfcb475435
commit
3649568ff2
153
src/module.c
153
src/module.c
@ -245,6 +245,15 @@ typedef struct RedisModuleBlockedClient {
|
|||||||
client *reply_client; /* Fake client used to accumulate replies
|
client *reply_client; /* Fake client used to accumulate replies
|
||||||
in thread safe contexts. */
|
in thread safe contexts. */
|
||||||
int dbid; /* Database number selected by the original client. */
|
int dbid; /* Database number selected by the original client. */
|
||||||
|
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
|
||||||
|
int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname,
|
||||||
|
void *privdata); /* When blocking on keys, even if the
|
||||||
|
key is signaled as ready, maybe it
|
||||||
|
was modified afterward before the
|
||||||
|
client unblocks. So we always
|
||||||
|
need a callback that tells us if
|
||||||
|
the key is ready in order to serve
|
||||||
|
the next blocked client. */
|
||||||
} RedisModuleBlockedClient;
|
} RedisModuleBlockedClient;
|
||||||
|
|
||||||
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
@ -3989,6 +3998,68 @@ void unblockClientFromModule(client *c) {
|
|||||||
resetClient(c);
|
resetClient(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Block a client in the context of a module: this function implements both
|
||||||
|
* RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
|
||||||
|
* keys are passed or not.
|
||||||
|
*
|
||||||
|
* When not blocking for keys, the keys, numkeys, is_key_ready callback
|
||||||
|
* and privdata parameters are not needed. The privdata in that case must
|
||||||
|
* be NULL, since later is RM_UnblockClient() that will provide some private
|
||||||
|
* data that the reply callback will receive.
|
||||||
|
*
|
||||||
|
* Instead when blocking for keys, normally RM_UnblockClient() will not be
|
||||||
|
* called (because the client will unblock when the key is modified), so
|
||||||
|
* 'privdata' should be provided in that case, so that once the client is
|
||||||
|
* unlocked and the reply callback is called, it will receive its associated
|
||||||
|
* private data.
|
||||||
|
*
|
||||||
|
* Even when blocking on keys, RM_UnblockClient() can be called however, but
|
||||||
|
* in that case the privdata argument is disregarded, because we pass the
|
||||||
|
* reply callback the privdata that is set here while blocking.
|
||||||
|
*/
|
||||||
|
RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *key, void *privdata), void *privdata) {
|
||||||
|
client *c = ctx->client;
|
||||||
|
int islua = c->flags & CLIENT_LUA;
|
||||||
|
int ismulti = c->flags & CLIENT_MULTI;
|
||||||
|
|
||||||
|
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
|
||||||
|
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||||
|
ctx->module->blocked_clients++;
|
||||||
|
|
||||||
|
/* We need to handle the invalid operation of calling modules blocking
|
||||||
|
* commands from Lua or MULTI. We actually create an already aborted
|
||||||
|
* (client set to NULL) blocked client handle, and actually reply with
|
||||||
|
* an error. */
|
||||||
|
mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
|
||||||
|
bc->client = (islua || ismulti) ? NULL : c;
|
||||||
|
bc->module = ctx->module;
|
||||||
|
bc->reply_callback = reply_callback;
|
||||||
|
bc->timeout_callback = timeout_callback;
|
||||||
|
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
|
||||||
|
bc->free_privdata = free_privdata;
|
||||||
|
bc->privdata = privdata;
|
||||||
|
bc->reply_client = createClient(NULL);
|
||||||
|
bc->reply_client->flags |= CLIENT_MODULE;
|
||||||
|
bc->dbid = c->db->id;
|
||||||
|
bc->is_key_ready = is_key_ready;
|
||||||
|
bc->blocked_on_keys = keys != NULL;
|
||||||
|
c->bpop.timeout = timeout;
|
||||||
|
|
||||||
|
if (islua || ismulti) {
|
||||||
|
c->bpop.module_blocked_handle = NULL;
|
||||||
|
addReplyError(c, islua ?
|
||||||
|
"Blocking module command called from Lua script" :
|
||||||
|
"Blocking module command called from transaction");
|
||||||
|
} else {
|
||||||
|
if (keys) {
|
||||||
|
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL);
|
||||||
|
} else {
|
||||||
|
blockClient(c,BLOCKED_MODULE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bc;
|
||||||
|
}
|
||||||
|
|
||||||
/* Block a client in the context of a blocking command, returning an handle
|
/* Block a client in the context of a blocking command, returning an handle
|
||||||
* which will be used, later, in order to unblock the client with a call to
|
* which will be used, later, in order to unblock the client with a call to
|
||||||
* RedisModule_UnblockClient(). The arguments specify callback functions
|
* RedisModule_UnblockClient(). The arguments specify callback functions
|
||||||
@ -4006,39 +4077,55 @@ void unblockClientFromModule(client *c) {
|
|||||||
* by RedisModule_UnblockClient() call.
|
* by RedisModule_UnblockClient() call.
|
||||||
*/
|
*/
|
||||||
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
|
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
|
||||||
client *c = ctx->client;
|
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL,NULL);
|
||||||
int islua = c->flags & CLIENT_LUA;
|
}
|
||||||
int ismulti = c->flags & CLIENT_MULTI;
|
|
||||||
|
|
||||||
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
|
/* This call is similar to RedisModule_BlockClient(), however in this case we
|
||||||
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
* don't just block the client, but also ask Redis to unblock it automatically
|
||||||
ctx->module->blocked_clients++;
|
* once certain keys become "ready", that is, contain more data.
|
||||||
|
*
|
||||||
/* We need to handle the invalid operation of calling modules blocking
|
* Basically this is similar to what a typical Redis command usually does,
|
||||||
* commands from Lua or MULTI. We actually create an already aborted
|
* like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP,
|
||||||
* (client set to NULL) blocked client handle, and actually reply with
|
* and later when the key receives new data (a list push for instance), the
|
||||||
* an error. */
|
* client is unblocked and served.
|
||||||
bc->client = (islua || ismulti) ? NULL : c;
|
*
|
||||||
bc->module = ctx->module;
|
* However in the case of this module API, when the client is unblocked?
|
||||||
bc->reply_callback = reply_callback;
|
*
|
||||||
bc->timeout_callback = timeout_callback;
|
* 1. If you block ok a key of a type that has blocking operations associated,
|
||||||
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
|
* like a list, a sorted set, a stream, and so forth, the client may be
|
||||||
bc->free_privdata = free_privdata;
|
* unblocked once the relevant key is targeted by an operation that normally
|
||||||
bc->privdata = NULL;
|
* unblocks the native blocking operations for that type. So if we block
|
||||||
bc->reply_client = createClient(NULL);
|
* on a list key, an RPUSH command may unblock our client and so forth.
|
||||||
bc->reply_client->flags |= CLIENT_MODULE;
|
* 2. If you are implementing your native data type, or if you want to add new
|
||||||
bc->dbid = c->db->id;
|
* unblocking conditions in addition to "1", you can call the modules API
|
||||||
c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
|
* RedisModule_SignalKeyAsReady().
|
||||||
|
*
|
||||||
if (islua || ismulti) {
|
* Anyway we can't be sure if the client should be unblocked just because the
|
||||||
c->bpop.module_blocked_handle = NULL;
|
* key is signaled as ready: for instance a successive operation may change the
|
||||||
addReplyError(c, islua ?
|
* key, or a client in queue before this one can be served, modifying the key
|
||||||
"Blocking module command called from Lua script" :
|
* as well and making it empty again. So when blocking for keys, we need to
|
||||||
"Blocking module command called from transaction");
|
* register a callback called is_key_ready. This callback gets called with
|
||||||
} else {
|
* a context, selected with the right database, and the key name: if it
|
||||||
blockClient(c,BLOCKED_MODULE);
|
* returns 1, then we proceed calling the reply callback, otherwise if the
|
||||||
}
|
* is_key_ready callback returns 0 the client is not unblocked, since the
|
||||||
return bc;
|
* key is yet not ready.
|
||||||
|
*
|
||||||
|
* Thanks to this system we can setup complex blocking scenarios, like
|
||||||
|
* unblocking a client only if a list contains at least 5 items or other
|
||||||
|
* more fancy logics.
|
||||||
|
*
|
||||||
|
* Note that another difference with RedisModule_BlockClient(), is that here
|
||||||
|
* we pass the private data directly when blocking the client: such private
|
||||||
|
* data will be later provided both to the is_key_ready callback, and to the
|
||||||
|
* reply callback. Normally in RedisModule_BlockClient() the private data
|
||||||
|
* to reply to the client is passed when calling RedisModule_UnblockClient()
|
||||||
|
* but here the unblocking is performed by Redis itself, so we need to have
|
||||||
|
* some private data before hand. The private data is used to store any
|
||||||
|
* information about the specific unblocking operation that you are
|
||||||
|
* implementing. Such information will be freed using the free_privdata
|
||||||
|
* callback provided by the user. */
|
||||||
|
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, int (*is_key_ready)(RedisModuleCtx*, RedisModuleString *keyname, void *privdata), void *privdata) {
|
||||||
|
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,is_key_ready,privdata);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
|
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
|
||||||
@ -4054,7 +4141,7 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
|
|||||||
* Note: this function can be called from threads spawned by the module. */
|
* Note: this function can be called from threads spawned by the module. */
|
||||||
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
|
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
|
||||||
pthread_mutex_lock(&moduleUnblockedClientsMutex);
|
pthread_mutex_lock(&moduleUnblockedClientsMutex);
|
||||||
bc->privdata = privdata;
|
if (!bc->blocked_on_keys) bc->privdata = privdata;
|
||||||
listAddNodeTail(moduleUnblockedClients,bc);
|
listAddNodeTail(moduleUnblockedClients,bc);
|
||||||
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
|
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
|
||||||
/* Ignore the error, this is best-effort. */
|
/* Ignore the error, this is best-effort. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user