Merge branch 'unstable' into module_hooks
This commit is contained in:
commit
da44c54897
@ -174,6 +174,7 @@ void unblockClient(client *c) {
|
|||||||
} else if (c->btype == BLOCKED_WAIT) {
|
} else if (c->btype == BLOCKED_WAIT) {
|
||||||
unblockClientWaitingReplicas(c);
|
unblockClientWaitingReplicas(c);
|
||||||
} else if (c->btype == BLOCKED_MODULE) {
|
} else if (c->btype == BLOCKED_MODULE) {
|
||||||
|
if (moduleClientIsBlockedOnKeys(c)) unblockClientWaitingData(c);
|
||||||
unblockClientFromModule(c);
|
unblockClientFromModule(c);
|
||||||
} else {
|
} else {
|
||||||
serverPanic("Unknown btype in unblockClient().");
|
serverPanic("Unknown btype in unblockClient().");
|
||||||
@ -430,6 +431,49 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Helper function for handleClientsBlockedOnKeys(). This function is called
|
||||||
|
* in order to check if we can serve clients blocked by modules using
|
||||||
|
* RM_BlockClientOnKeys(), when the corresponding key was signaled as ready:
|
||||||
|
* our goal here is to call the RedisModuleBlockedClient reply() callback to
|
||||||
|
* see if the key is really able to serve the client, and in that case,
|
||||||
|
* unblock it. */
|
||||||
|
void serveClientsBlockedOnKeyByModule(readyList *rl) {
|
||||||
|
dictEntry *de;
|
||||||
|
|
||||||
|
/* We serve clients in the same order they blocked for
|
||||||
|
* this key, from the first blocked to the last. */
|
||||||
|
de = dictFind(rl->db->blocking_keys,rl->key);
|
||||||
|
if (de) {
|
||||||
|
list *clients = dictGetVal(de);
|
||||||
|
int numclients = listLength(clients);
|
||||||
|
|
||||||
|
while(numclients--) {
|
||||||
|
listNode *clientnode = listFirst(clients);
|
||||||
|
client *receiver = clientnode->value;
|
||||||
|
|
||||||
|
/* Put at the tail, so that at the next call
|
||||||
|
* we'll not run into it again: clients here may not be
|
||||||
|
* ready to be served, so they'll remain in the list
|
||||||
|
* sometimes. We want also be able to skip clients that are
|
||||||
|
* not blocked for the MODULE type safely. */
|
||||||
|
listDelNode(clients,clientnode);
|
||||||
|
listAddNodeTail(clients,receiver);
|
||||||
|
|
||||||
|
if (receiver->btype != BLOCKED_MODULE) continue;
|
||||||
|
|
||||||
|
/* Note that if *this* client cannot be served by this key,
|
||||||
|
* it does not mean that another client that is next into the
|
||||||
|
* list cannot be served as well: they may be blocked by
|
||||||
|
* different modules with different triggers to consider if a key
|
||||||
|
* is ready or not. This means we can't exit the loop but need
|
||||||
|
* to continue after the first failure. */
|
||||||
|
if (!moduleTryServeClientBlockedOnKey(receiver, rl->key)) continue;
|
||||||
|
|
||||||
|
moduleUnblockClient(receiver);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* This function should be called by Redis every time a single command,
|
/* This function should be called by Redis every time a single command,
|
||||||
* a MULTI/EXEC block, or a Lua script, terminated its execution after
|
* a MULTI/EXEC block, or a Lua script, terminated its execution after
|
||||||
* being called by a client. It handles serving clients blocked in
|
* being called by a client. It handles serving clients blocked in
|
||||||
@ -480,6 +524,10 @@ void handleClientsBlockedOnKeys(void) {
|
|||||||
serveClientsBlockedOnSortedSetKey(o,rl);
|
serveClientsBlockedOnSortedSetKey(o,rl);
|
||||||
else if (o->type == OBJ_STREAM)
|
else if (o->type == OBJ_STREAM)
|
||||||
serveClientsBlockedOnStreamKey(o,rl);
|
serveClientsBlockedOnStreamKey(o,rl);
|
||||||
|
/* We want to serve clients blocked on module keys
|
||||||
|
* regardless of the object type: we don't know what the
|
||||||
|
* module is trying to accomplish right now. */
|
||||||
|
serveClientsBlockedOnKeyByModule(rl);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Free this item. */
|
/* Free this item. */
|
||||||
|
254
src/module.c
254
src/module.c
@ -140,6 +140,9 @@ struct RedisModuleCtx {
|
|||||||
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
|
void **postponed_arrays; /* To set with RM_ReplySetArrayLength(). */
|
||||||
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
|
int postponed_arrays_count; /* Number of entries in postponed_arrays. */
|
||||||
void *blocked_privdata; /* Privdata set when unblocking a client. */
|
void *blocked_privdata; /* Privdata set when unblocking a client. */
|
||||||
|
RedisModuleString *blocked_ready_key; /* Key ready when the reply callback
|
||||||
|
gets called for clients blocked
|
||||||
|
on keys. */
|
||||||
|
|
||||||
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
|
/* Used if there is the REDISMODULE_CTX_KEYS_POS_REQUEST flag set. */
|
||||||
int *keys_pos;
|
int *keys_pos;
|
||||||
@ -153,7 +156,7 @@ struct RedisModuleCtx {
|
|||||||
};
|
};
|
||||||
typedef struct RedisModuleCtx RedisModuleCtx;
|
typedef struct RedisModuleCtx RedisModuleCtx;
|
||||||
|
|
||||||
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, 0, NULL, {0}}
|
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, 0, NULL, {0}}
|
||||||
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
|
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
|
||||||
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
|
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
|
||||||
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
|
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
|
||||||
@ -245,6 +248,8 @@ typedef struct RedisModuleBlockedClient {
|
|||||||
client *reply_client; /* Fake client used to accumulate replies
|
client *reply_client; /* Fake client used to accumulate replies
|
||||||
in thread safe contexts. */
|
in thread safe contexts. */
|
||||||
int dbid; /* Database number selected by the original client. */
|
int dbid; /* Database number selected by the original client. */
|
||||||
|
int blocked_on_keys; /* If blocked via RM_BlockClientOnKeys(). */
|
||||||
|
int unblocked; /* Already on the moduleUnblocked list. */
|
||||||
} RedisModuleBlockedClient;
|
} RedisModuleBlockedClient;
|
||||||
|
|
||||||
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
static pthread_mutex_t moduleUnblockedClientsMutex = PTHREAD_MUTEX_INITIALIZER;
|
||||||
@ -4034,6 +4039,94 @@ void unblockClientFromModule(client *c) {
|
|||||||
resetClient(c);
|
resetClient(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Block a client in the context of a module: this function implements both
|
||||||
|
* RM_BlockClient() and RM_BlockClientOnKeys() depending on the fact the
|
||||||
|
* keys are passed or not.
|
||||||
|
*
|
||||||
|
* When not blocking for keys, the keys, numkeys, and privdata parameters are
|
||||||
|
* not needed. The privdata in that case must be NULL, since later is
|
||||||
|
* RM_UnblockClient() that will provide some private data that the reply
|
||||||
|
* callback will receive.
|
||||||
|
*
|
||||||
|
* Instead when blocking for keys, normally RM_UnblockClient() will not be
|
||||||
|
* called (because the client will unblock when the key is modified), so
|
||||||
|
* 'privdata' should be provided in that case, so that once the client is
|
||||||
|
* unlocked and the reply callback is called, it will receive its associated
|
||||||
|
* private data.
|
||||||
|
*
|
||||||
|
* Even when blocking on keys, RM_UnblockClient() can be called however, but
|
||||||
|
* in that case the privdata argument is disregarded, because we pass the
|
||||||
|
* reply callback the privdata that is set here while blocking.
|
||||||
|
*/
|
||||||
|
RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
|
||||||
|
client *c = ctx->client;
|
||||||
|
int islua = c->flags & CLIENT_LUA;
|
||||||
|
int ismulti = c->flags & CLIENT_MULTI;
|
||||||
|
|
||||||
|
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
|
||||||
|
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||||
|
ctx->module->blocked_clients++;
|
||||||
|
|
||||||
|
/* We need to handle the invalid operation of calling modules blocking
|
||||||
|
* commands from Lua or MULTI. We actually create an already aborted
|
||||||
|
* (client set to NULL) blocked client handle, and actually reply with
|
||||||
|
* an error. */
|
||||||
|
mstime_t timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
|
||||||
|
bc->client = (islua || ismulti) ? NULL : c;
|
||||||
|
bc->module = ctx->module;
|
||||||
|
bc->reply_callback = reply_callback;
|
||||||
|
bc->timeout_callback = timeout_callback;
|
||||||
|
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
|
||||||
|
bc->free_privdata = free_privdata;
|
||||||
|
bc->privdata = privdata;
|
||||||
|
bc->reply_client = createClient(NULL);
|
||||||
|
bc->reply_client->flags |= CLIENT_MODULE;
|
||||||
|
bc->dbid = c->db->id;
|
||||||
|
bc->blocked_on_keys = keys != NULL;
|
||||||
|
bc->unblocked = 0;
|
||||||
|
c->bpop.timeout = timeout;
|
||||||
|
|
||||||
|
if (islua || ismulti) {
|
||||||
|
c->bpop.module_blocked_handle = NULL;
|
||||||
|
addReplyError(c, islua ?
|
||||||
|
"Blocking module command called from Lua script" :
|
||||||
|
"Blocking module command called from transaction");
|
||||||
|
} else {
|
||||||
|
if (keys) {
|
||||||
|
blockForKeys(c,BLOCKED_MODULE,keys,numkeys,timeout,NULL,NULL);
|
||||||
|
} else {
|
||||||
|
blockClient(c,BLOCKED_MODULE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return bc;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This function is called from module.c in order to check if a module
|
||||||
|
* blocked for BLOCKED_MODULE and subtype 'on keys' (bc->blocked_on_keys true)
|
||||||
|
* can really be unblocked, since the module was able to serve the client.
|
||||||
|
* If the callback returns REDISMODULE_OK, then the client can be unblocked,
|
||||||
|
* otherwise the client remains blocked and we'll retry again when one of
|
||||||
|
* the keys it blocked for becomes "ready" again. */
|
||||||
|
int moduleTryServeClientBlockedOnKey(client *c, robj *key) {
|
||||||
|
int served = 0;
|
||||||
|
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||||
|
/* Protect against re-processing: don't serve clients that are already
|
||||||
|
* in the unblocking list for any reason (including RM_UnblockClient()
|
||||||
|
* explicit call). */
|
||||||
|
if (bc->unblocked) return REDISMODULE_ERR;
|
||||||
|
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
||||||
|
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
|
||||||
|
ctx.blocked_ready_key = key;
|
||||||
|
ctx.blocked_privdata = bc->privdata;
|
||||||
|
ctx.module = bc->module;
|
||||||
|
ctx.client = bc->client;
|
||||||
|
ctx.blocked_client = bc;
|
||||||
|
if (bc->reply_callback(&ctx,(void**)c->argv,c->argc) == REDISMODULE_OK)
|
||||||
|
served = 1;
|
||||||
|
moduleFreeContext(&ctx);
|
||||||
|
return served;
|
||||||
|
}
|
||||||
|
|
||||||
/* Block a client in the context of a blocking command, returning an handle
|
/* Block a client in the context of a blocking command, returning an handle
|
||||||
* which will be used, later, in order to unblock the client with a call to
|
* which will be used, later, in order to unblock the client with a call to
|
||||||
* RedisModule_UnblockClient(). The arguments specify callback functions
|
* RedisModule_UnblockClient(). The arguments specify callback functions
|
||||||
@ -4051,39 +4144,96 @@ void unblockClientFromModule(client *c) {
|
|||||||
* by RedisModule_UnblockClient() call.
|
* by RedisModule_UnblockClient() call.
|
||||||
*/
|
*/
|
||||||
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
|
RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms) {
|
||||||
client *c = ctx->client;
|
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, NULL,0,NULL);
|
||||||
int islua = c->flags & CLIENT_LUA;
|
}
|
||||||
int ismulti = c->flags & CLIENT_MULTI;
|
|
||||||
|
|
||||||
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
|
/* This call is similar to RedisModule_BlockClient(), however in this case we
|
||||||
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
* don't just block the client, but also ask Redis to unblock it automatically
|
||||||
ctx->module->blocked_clients++;
|
* once certain keys become "ready", that is, contain more data.
|
||||||
|
*
|
||||||
|
* Basically this is similar to what a typical Redis command usually does,
|
||||||
|
* like BLPOP or ZPOPMAX: the client blocks if it cannot be served ASAP,
|
||||||
|
* and later when the key receives new data (a list push for instance), the
|
||||||
|
* client is unblocked and served.
|
||||||
|
*
|
||||||
|
* However in the case of this module API, when the client is unblocked?
|
||||||
|
*
|
||||||
|
* 1. If you block ok a key of a type that has blocking operations associated,
|
||||||
|
* like a list, a sorted set, a stream, and so forth, the client may be
|
||||||
|
* unblocked once the relevant key is targeted by an operation that normally
|
||||||
|
* unblocks the native blocking operations for that type. So if we block
|
||||||
|
* on a list key, an RPUSH command may unblock our client and so forth.
|
||||||
|
* 2. If you are implementing your native data type, or if you want to add new
|
||||||
|
* unblocking conditions in addition to "1", you can call the modules API
|
||||||
|
* RedisModule_SignalKeyAsReady().
|
||||||
|
*
|
||||||
|
* Anyway we can't be sure if the client should be unblocked just because the
|
||||||
|
* key is signaled as ready: for instance a successive operation may change the
|
||||||
|
* key, or a client in queue before this one can be served, modifying the key
|
||||||
|
* as well and making it empty again. So when a client is blocked with
|
||||||
|
* RedisModule_BlockClientOnKeys() the reply callback is not called after
|
||||||
|
* RM_UnblockCLient() is called, but every time a key is signaled as ready:
|
||||||
|
* if the reply callback can serve the client, it returns REDISMODULE_OK
|
||||||
|
* and the client is unblocked, otherwise it will return REDISMODULE_ERR
|
||||||
|
* and we'll try again later.
|
||||||
|
*
|
||||||
|
* The reply callback can access the key that was signaled as ready by
|
||||||
|
* calling the API RedisModule_GetBlockedClientReadyKey(), that returns
|
||||||
|
* just the string name of the key as a RedisModuleString object.
|
||||||
|
*
|
||||||
|
* Thanks to this system we can setup complex blocking scenarios, like
|
||||||
|
* unblocking a client only if a list contains at least 5 items or other
|
||||||
|
* more fancy logics.
|
||||||
|
*
|
||||||
|
* Note that another difference with RedisModule_BlockClient(), is that here
|
||||||
|
* we pass the private data directly when blocking the client: it will
|
||||||
|
* be accessible later in the reply callback. Normally when blocking with
|
||||||
|
* RedisModule_BlockClient() the private data to reply to the client is
|
||||||
|
* passed when calling RedisModule_UnblockClient() but here the unblocking
|
||||||
|
* is performed by Redis itself, so we need to have some private data before
|
||||||
|
* hand. The private data is used to store any information about the specific
|
||||||
|
* unblocking operation that you are implementing. Such information will be
|
||||||
|
* freed using the free_privdata callback provided by the user.
|
||||||
|
*
|
||||||
|
* However the reply callback will be able to access the argument vector of
|
||||||
|
* the command, so the private data is often not needed. */
|
||||||
|
RedisModuleBlockedClient *RM_BlockClientOnKeys(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
|
||||||
|
return moduleBlockClient(ctx,reply_callback,timeout_callback,free_privdata,timeout_ms, keys,numkeys,privdata);
|
||||||
|
}
|
||||||
|
|
||||||
/* We need to handle the invalid operation of calling modules blocking
|
/* This function is used in order to potentially unblock a client blocked
|
||||||
* commands from Lua or MULTI. We actually create an already aborted
|
* on keys with RedisModule_BlockClientOnKeys(). When this function is called,
|
||||||
* (client set to NULL) blocked client handle, and actually reply with
|
* all the clients blocked for this key will get their reply callback called,
|
||||||
* an error. */
|
* and if the callback returns REDISMODULE_OK the client will be unblocked. */
|
||||||
bc->client = (islua || ismulti) ? NULL : c;
|
void RM_SignalKeyAsReady(RedisModuleCtx *ctx, RedisModuleString *key) {
|
||||||
bc->module = ctx->module;
|
signalKeyAsReady(ctx->client->db, key);
|
||||||
bc->reply_callback = reply_callback;
|
}
|
||||||
bc->timeout_callback = timeout_callback;
|
|
||||||
bc->disconnect_callback = NULL; /* Set by RM_SetDisconnectCallback() */
|
|
||||||
bc->free_privdata = free_privdata;
|
|
||||||
bc->privdata = NULL;
|
|
||||||
bc->reply_client = createClient(NULL);
|
|
||||||
bc->reply_client->flags |= CLIENT_MODULE;
|
|
||||||
bc->dbid = c->db->id;
|
|
||||||
c->bpop.timeout = timeout_ms ? (mstime()+timeout_ms) : 0;
|
|
||||||
|
|
||||||
if (islua || ismulti) {
|
/* Implements RM_UnblockClient() and moduleUnblockClient(). */
|
||||||
c->bpop.module_blocked_handle = NULL;
|
int moduleUnblockClientByHandle(RedisModuleBlockedClient *bc, void *privdata) {
|
||||||
addReplyError(c, islua ?
|
pthread_mutex_lock(&moduleUnblockedClientsMutex);
|
||||||
"Blocking module command called from Lua script" :
|
if (!bc->blocked_on_keys) bc->privdata = privdata;
|
||||||
"Blocking module command called from transaction");
|
bc->unblocked = 1;
|
||||||
} else {
|
listAddNodeTail(moduleUnblockedClients,bc);
|
||||||
blockClient(c,BLOCKED_MODULE);
|
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
|
||||||
|
/* Ignore the error, this is best-effort. */
|
||||||
}
|
}
|
||||||
return bc;
|
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* This API is used by the Redis core to unblock a client that was blocked
|
||||||
|
* by a module. */
|
||||||
|
void moduleUnblockClient(client *c) {
|
||||||
|
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||||
|
moduleUnblockClientByHandle(bc,NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Return true if the client 'c' was blocked by a module using
|
||||||
|
* RM_BlockClientOnKeys(). */
|
||||||
|
int moduleClientIsBlockedOnKeys(client *c) {
|
||||||
|
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||||
|
return bc->blocked_on_keys;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
|
/* Unblock a client blocked by `RedisModule_BlockedClient`. This will trigger
|
||||||
@ -4096,15 +4246,25 @@ RedisModuleBlockedClient *RM_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc
|
|||||||
* needs to be passed to the client, included but not limited some slow
|
* needs to be passed to the client, included but not limited some slow
|
||||||
* to compute reply or some reply obtained via networking.
|
* to compute reply or some reply obtained via networking.
|
||||||
*
|
*
|
||||||
* Note: this function can be called from threads spawned by the module. */
|
* Note 1: this function can be called from threads spawned by the module.
|
||||||
|
*
|
||||||
|
* Note 2: when we unblock a client that is blocked for keys using
|
||||||
|
* the API RedisModule_BlockClientOnKeys(), the privdata argument here is
|
||||||
|
* not used, and the reply callback is called with the privdata pointer that
|
||||||
|
* was passed when blocking the client.
|
||||||
|
*
|
||||||
|
* Unblocking a client that was blocked for keys using this API will still
|
||||||
|
* require the client to get some reply, so the function will use the
|
||||||
|
* "timeout" handler in order to do so. */
|
||||||
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
|
int RM_UnblockClient(RedisModuleBlockedClient *bc, void *privdata) {
|
||||||
pthread_mutex_lock(&moduleUnblockedClientsMutex);
|
if (bc->blocked_on_keys) {
|
||||||
bc->privdata = privdata;
|
/* In theory the user should always pass the timeout handler as an
|
||||||
listAddNodeTail(moduleUnblockedClients,bc);
|
* argument, but better to be safe than sorry. */
|
||||||
if (write(server.module_blocked_pipe[1],"A",1) != 1) {
|
if (bc->timeout_callback == NULL) return REDISMODULE_ERR;
|
||||||
/* Ignore the error, this is best-effort. */
|
if (bc->unblocked) return REDISMODULE_OK;
|
||||||
|
if (bc->client) moduleBlockedClientTimedOut(bc->client);
|
||||||
}
|
}
|
||||||
pthread_mutex_unlock(&moduleUnblockedClientsMutex);
|
moduleUnblockClientByHandle(bc,privdata);
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4164,16 +4324,19 @@ void moduleHandleBlockedClients(void) {
|
|||||||
* touch the shared list. */
|
* touch the shared list. */
|
||||||
|
|
||||||
/* Call the reply callback if the client is valid and we have
|
/* Call the reply callback if the client is valid and we have
|
||||||
* any callback. */
|
* any callback. However the callback is not called if the client
|
||||||
if (c && bc->reply_callback) {
|
* was blocked on keys (RM_BlockClientOnKeys()), because we already
|
||||||
|
* called such callback in moduleTryServeClientBlockedOnKey() when
|
||||||
|
* the key was signaled as ready. */
|
||||||
|
if (c && !bc->blocked_on_keys && bc->reply_callback) {
|
||||||
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
||||||
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
|
ctx.flags |= REDISMODULE_CTX_BLOCKED_REPLY;
|
||||||
ctx.blocked_privdata = bc->privdata;
|
ctx.blocked_privdata = bc->privdata;
|
||||||
|
ctx.blocked_ready_key = NULL;
|
||||||
ctx.module = bc->module;
|
ctx.module = bc->module;
|
||||||
ctx.client = bc->client;
|
ctx.client = bc->client;
|
||||||
ctx.blocked_client = bc;
|
ctx.blocked_client = bc;
|
||||||
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
|
bc->reply_callback(&ctx,(void**)c->argv,c->argc);
|
||||||
moduleHandlePropagationAfterCommandCallback(&ctx);
|
|
||||||
moduleFreeContext(&ctx);
|
moduleFreeContext(&ctx);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4261,6 +4424,12 @@ void *RM_GetBlockedClientPrivateData(RedisModuleCtx *ctx) {
|
|||||||
return ctx->blocked_privdata;
|
return ctx->blocked_privdata;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Get the key that is ready when the reply callback is called in the context
|
||||||
|
* of a client blocked by RedisModule_BlockClientOnKeys(). */
|
||||||
|
RedisModuleString *RM_GetBlockedClientReadyKey(RedisModuleCtx *ctx) {
|
||||||
|
return ctx->blocked_ready_key;
|
||||||
|
}
|
||||||
|
|
||||||
/* Get the blocked client associated with a given context.
|
/* Get the blocked client associated with a given context.
|
||||||
* This is useful in the reply and timeout callbacks of blocked clients,
|
* This is useful in the reply and timeout callbacks of blocked clients,
|
||||||
* before sometimes the module has the blocked client handle references
|
* before sometimes the module has the blocked client handle references
|
||||||
@ -4315,7 +4484,7 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) {
|
|||||||
ctx->client = createClient(NULL);
|
ctx->client = createClient(NULL);
|
||||||
if (bc) {
|
if (bc) {
|
||||||
selectDb(ctx->client,bc->dbid);
|
selectDb(ctx->client,bc->dbid);
|
||||||
ctx->client->id = bc->client->id;
|
if (bc->client) ctx->client->id = bc->client->id;
|
||||||
}
|
}
|
||||||
return ctx;
|
return ctx;
|
||||||
}
|
}
|
||||||
@ -6702,4 +6871,7 @@ void moduleRegisterCoreAPI(void) {
|
|||||||
REGISTER_API(InfoAddFieldULongLong);
|
REGISTER_API(InfoAddFieldULongLong);
|
||||||
REGISTER_API(GetClientInfoById);
|
REGISTER_API(GetClientInfoById);
|
||||||
REGISTER_API(SubscribeToServerEvent);
|
REGISTER_API(SubscribeToServerEvent);
|
||||||
|
REGISTER_API(BlockClientOnKeys);
|
||||||
|
REGISTER_API(SignalKeyAsReady);
|
||||||
|
REGISTER_API(GetBlockedClientReadyKey);
|
||||||
}
|
}
|
||||||
|
@ -129,6 +129,7 @@ int HelloTypeInsert_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
|
|||||||
|
|
||||||
/* Insert the new element. */
|
/* Insert the new element. */
|
||||||
HelloTypeInsert(hto,value);
|
HelloTypeInsert(hto,value);
|
||||||
|
RedisModule_SignalKeyAsReady(ctx,argv[1]);
|
||||||
|
|
||||||
RedisModule_ReplyWithLongLong(ctx,hto->len);
|
RedisModule_ReplyWithLongLong(ctx,hto->len);
|
||||||
RedisModule_ReplicateVerbatim(ctx);
|
RedisModule_ReplicateVerbatim(ctx);
|
||||||
@ -190,6 +191,77 @@ int HelloTypeLen_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int
|
|||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* ====================== Example of a blocking command ==================== */
|
||||||
|
|
||||||
|
/* Reply callback for blocking command HELLOTYPE.BRANGE, this will get
|
||||||
|
* called when the key we blocked for is ready: we need to check if we
|
||||||
|
* can really serve the client, and reply OK or ERR accordingly. */
|
||||||
|
int HelloBlock_Reply(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
|
||||||
|
RedisModuleString *keyname = RedisModule_GetBlockedClientReadyKey(ctx);
|
||||||
|
RedisModuleKey *key = RedisModule_OpenKey(ctx,keyname,REDISMODULE_READ);
|
||||||
|
int type = RedisModule_KeyType(key);
|
||||||
|
if (type != REDISMODULE_KEYTYPE_MODULE ||
|
||||||
|
RedisModule_ModuleTypeGetType(key) != HelloType)
|
||||||
|
{
|
||||||
|
RedisModule_CloseKey(key);
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* In case the key is able to serve our blocked client, let's directly
|
||||||
|
* use our original command implementation to make this example simpler. */
|
||||||
|
RedisModule_CloseKey(key);
|
||||||
|
return HelloTypeRange_RedisCommand(ctx,argv,argc-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Timeout callback for blocking command HELLOTYPE.BRANGE */
|
||||||
|
int HelloBlock_Timeout(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
REDISMODULE_NOT_USED(argv);
|
||||||
|
REDISMODULE_NOT_USED(argc);
|
||||||
|
return RedisModule_ReplyWithSimpleString(ctx,"Request timedout");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Private data freeing callback for HELLOTYPE.BRANGE command. */
|
||||||
|
void HelloBlock_FreeData(RedisModuleCtx *ctx, void *privdata) {
|
||||||
|
REDISMODULE_NOT_USED(ctx);
|
||||||
|
RedisModule_Free(privdata);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* HELLOTYPE.BRANGE key first count timeout -- This is a blocking verison of
|
||||||
|
* the RANGE operation, in order to show how to use the API
|
||||||
|
* RedisModule_BlockClientOnKeys(). */
|
||||||
|
int HelloTypeBRange_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||||
|
if (argc != 5) return RedisModule_WrongArity(ctx);
|
||||||
|
RedisModule_AutoMemory(ctx); /* Use automatic memory management. */
|
||||||
|
RedisModuleKey *key = RedisModule_OpenKey(ctx,argv[1],
|
||||||
|
REDISMODULE_READ|REDISMODULE_WRITE);
|
||||||
|
int type = RedisModule_KeyType(key);
|
||||||
|
if (type != REDISMODULE_KEYTYPE_EMPTY &&
|
||||||
|
RedisModule_ModuleTypeGetType(key) != HelloType)
|
||||||
|
{
|
||||||
|
return RedisModule_ReplyWithError(ctx,REDISMODULE_ERRORMSG_WRONGTYPE);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Parse the timeout before even trying to serve the client synchronously,
|
||||||
|
* so that we always fail ASAP on syntax errors. */
|
||||||
|
long long timeout;
|
||||||
|
if (RedisModule_StringToLongLong(argv[4],&timeout) != REDISMODULE_OK) {
|
||||||
|
return RedisModule_ReplyWithError(ctx,
|
||||||
|
"ERR invalid timeout parameter");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Can we serve the reply synchronously? */
|
||||||
|
if (type != REDISMODULE_KEYTYPE_EMPTY) {
|
||||||
|
return HelloTypeRange_RedisCommand(ctx,argv,argc-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Otherwise let's block on the key. */
|
||||||
|
void *privdata = RedisModule_Alloc(100);
|
||||||
|
RedisModule_BlockClientOnKeys(ctx,HelloBlock_Reply,HelloBlock_Timeout,HelloBlock_FreeData,timeout,argv+1,1,privdata);
|
||||||
|
return REDISMODULE_OK;
|
||||||
|
}
|
||||||
|
|
||||||
/* ========================== "hellotype" type methods ======================= */
|
/* ========================== "hellotype" type methods ======================= */
|
||||||
|
|
||||||
@ -282,5 +354,9 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
|||||||
HelloTypeLen_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
|
HelloTypeLen_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
|
||||||
return REDISMODULE_ERR;
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
|
if (RedisModule_CreateCommand(ctx,"hellotype.brange",
|
||||||
|
HelloTypeBRange_RedisCommand,"readonly",1,1,1) == REDISMODULE_ERR)
|
||||||
|
return REDISMODULE_ERR;
|
||||||
|
|
||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
@ -573,6 +573,9 @@ int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldDouble)(RedisModuleInfoCtx *ctx
|
|||||||
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value);
|
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldLongLong)(RedisModuleInfoCtx *ctx, char *field, long long value);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value);
|
int REDISMODULE_API_FUNC(RedisModule_InfoAddFieldULongLong)(RedisModuleInfoCtx *ctx, char *field, unsigned long long value);
|
||||||
int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback);
|
int REDISMODULE_API_FUNC(RedisModule_SubscribeToServerEvent)(RedisModuleCtx *ctx, RedisModuleEvent event, RedisModuleEventCallback callback);
|
||||||
|
RedisModuleBlockedClient *REDISMODULE_API_FUNC(RedisModule_BlockClientOnKeys)(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata);
|
||||||
|
void REDISMODULE_API_FUNC(RedisModule_SignalKeyAsReady)(RedisModuleCtx *ctx, RedisModuleString *key);
|
||||||
|
RedisModuleString *REDISMODULE_API_FUNC(RedisModule_GetBlockedClientReadyKey)(RedisModuleCtx *ctx);
|
||||||
|
|
||||||
/* Experimental APIs */
|
/* Experimental APIs */
|
||||||
#ifdef REDISMODULE_EXPERIMENTAL_API
|
#ifdef REDISMODULE_EXPERIMENTAL_API
|
||||||
@ -773,6 +776,9 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
|
|||||||
REDISMODULE_GET_API(InfoAddFieldULongLong);
|
REDISMODULE_GET_API(InfoAddFieldULongLong);
|
||||||
REDISMODULE_GET_API(GetClientInfoById);
|
REDISMODULE_GET_API(GetClientInfoById);
|
||||||
REDISMODULE_GET_API(SubscribeToServerEvent);
|
REDISMODULE_GET_API(SubscribeToServerEvent);
|
||||||
|
REDISMODULE_GET_API(BlockClientOnKeys);
|
||||||
|
REDISMODULE_GET_API(SignalKeyAsReady);
|
||||||
|
REDISMODULE_GET_API(GetBlockedClientReadyKey);
|
||||||
|
|
||||||
#ifdef REDISMODULE_EXPERIMENTAL_API
|
#ifdef REDISMODULE_EXPERIMENTAL_API
|
||||||
REDISMODULE_GET_API(GetThreadSafeContext);
|
REDISMODULE_GET_API(GetThreadSafeContext);
|
||||||
|
@ -1603,6 +1603,9 @@ int moduleAllDatatypesHandleErrors();
|
|||||||
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
|
sds modulesCollectInfo(sds info, sds section, int for_crash_report, int sections);
|
||||||
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
|
void moduleFireServerEvent(uint64_t eid, int subid, void *data);
|
||||||
void processModuleLoadingProgressEvent(int is_aof);
|
void processModuleLoadingProgressEvent(int is_aof);
|
||||||
|
int moduleTryServeClientBlockedOnKey(client *c, robj *key);
|
||||||
|
void moduleUnblockClient(client *c);
|
||||||
|
int moduleClientIsBlockedOnKeys(client *c);
|
||||||
|
|
||||||
/* Utils */
|
/* Utils */
|
||||||
long long ustime(void);
|
long long ustime(void);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user