Modules: handle propagation when ctx is freed. Flag modules commands ctx.
This commit is contained in:
parent
758b39be99
commit
c549513acd
42
src/module.c
42
src/module.c
@ -158,6 +158,7 @@ typedef struct RedisModuleCtx RedisModuleCtx;
|
|||||||
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
|
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
|
||||||
#define REDISMODULE_CTX_THREAD_SAFE (1<<5)
|
#define REDISMODULE_CTX_THREAD_SAFE (1<<5)
|
||||||
#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6)
|
#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6)
|
||||||
|
#define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<7)
|
||||||
|
|
||||||
/* This represents a Redis key opened with RM_OpenKey(). */
|
/* This represents a Redis key opened with RM_OpenKey(). */
|
||||||
struct RedisModuleKey {
|
struct RedisModuleKey {
|
||||||
@ -519,8 +520,29 @@ int RM_GetApi(const char *funcname, void **targetPtrPtr) {
|
|||||||
return REDISMODULE_OK;
|
return REDISMODULE_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Helper function for when a command callback is called, in order to handle
|
||||||
|
* details needed to correctly replicate commands. */
|
||||||
|
void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
|
||||||
|
client *c = ctx->client;
|
||||||
|
|
||||||
|
/* We don't need to do anything here if the context was never used
|
||||||
|
* in order to propagate commands. */
|
||||||
|
if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return;
|
||||||
|
|
||||||
|
if (c->flags & CLIENT_LUA) return;
|
||||||
|
|
||||||
|
/* Handle the replication of the final EXEC, since whatever a command
|
||||||
|
* emits is always wrapped around MULTI/EXEC. */
|
||||||
|
robj *propargv[1];
|
||||||
|
propargv[0] = createStringObject("EXEC",4);
|
||||||
|
alsoPropagate(server.execCommand,c->db->id,propargv,1,
|
||||||
|
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||||
|
decrRefCount(propargv[0]);
|
||||||
|
}
|
||||||
|
|
||||||
/* Free the context after the user function was called. */
|
/* Free the context after the user function was called. */
|
||||||
void moduleFreeContext(RedisModuleCtx *ctx) {
|
void moduleFreeContext(RedisModuleCtx *ctx) {
|
||||||
|
moduleHandlePropagationAfterCommandCallback(ctx);
|
||||||
autoMemoryCollect(ctx);
|
autoMemoryCollect(ctx);
|
||||||
poolAllocRelease(ctx);
|
poolAllocRelease(ctx);
|
||||||
if (ctx->postponed_arrays) {
|
if (ctx->postponed_arrays) {
|
||||||
@ -536,34 +558,16 @@ void moduleFreeContext(RedisModuleCtx *ctx) {
|
|||||||
if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client);
|
if (ctx->flags & REDISMODULE_CTX_THREAD_SAFE) freeClient(ctx->client);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Helper function for when a command callback is called, in order to handle
|
|
||||||
* details needed to correctly replicate commands. */
|
|
||||||
void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
|
|
||||||
client *c = ctx->client;
|
|
||||||
|
|
||||||
if (c->flags & CLIENT_LUA) return;
|
|
||||||
|
|
||||||
/* Handle the replication of the final EXEC, since whatever a command
|
|
||||||
* emits is always wrapped around MULTI/EXEC. */
|
|
||||||
if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) {
|
|
||||||
robj *propargv[1];
|
|
||||||
propargv[0] = createStringObject("EXEC",4);
|
|
||||||
alsoPropagate(server.execCommand,c->db->id,propargv,1,
|
|
||||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
|
||||||
decrRefCount(propargv[0]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* This Redis command binds the normal Redis command invocation with commands
|
/* This Redis command binds the normal Redis command invocation with commands
|
||||||
* exported by modules. */
|
* exported by modules. */
|
||||||
void RedisModuleCommandDispatcher(client *c) {
|
void RedisModuleCommandDispatcher(client *c) {
|
||||||
RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
|
RedisModuleCommandProxy *cp = (void*)(unsigned long)c->cmd->getkeys_proc;
|
||||||
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
RedisModuleCtx ctx = REDISMODULE_CTX_INIT;
|
||||||
|
|
||||||
|
ctx.flags |= REDISMODULE_CTX_MODULE_COMMAND_CALL;
|
||||||
ctx.module = cp->module;
|
ctx.module = cp->module;
|
||||||
ctx.client = c;
|
ctx.client = c;
|
||||||
cp->func(&ctx,(void**)c->argv,c->argc);
|
cp->func(&ctx,(void**)c->argv,c->argc);
|
||||||
moduleHandlePropagationAfterCommandCallback(&ctx);
|
|
||||||
moduleFreeContext(&ctx);
|
moduleFreeContext(&ctx);
|
||||||
|
|
||||||
/* In some cases processMultibulkBuffer uses sdsMakeRoomFor to
|
/* In some cases processMultibulkBuffer uses sdsMakeRoomFor to
|
||||||
|
Loading…
x
Reference in New Issue
Block a user