Make sure we do not propagate nested MULTI/EXEC (#8097)
One way this was happening is when a module issued an RM_Call which would inject MULTI. If the module command that does that was itself issued by something else that already did added MULTI (e.g. another module, or a Lua script), it would have caused nested MULTI. In fact the MULTI state in the client or the MULTI_EMITTED flag in the context isn't the right indication that we need to propagate MULTI or not, because on a nested calls (possibly a module action called by a keyspace event of another module action), these flags aren't retained / reflected. instead there's now a global propagate_in_transaction flag for that. in addition to that, we now have a global in_eval and in_exec flags, to serve the flags of RM_GetContextFlags, since their dependence on the current client is wrong for the same reasons mentioned above.
This commit is contained in:
parent
5aa078afb0
commit
a84a6160ec
47
src/module.c
47
src/module.c
@ -158,14 +158,13 @@ struct RedisModuleCtx {
|
||||
typedef struct RedisModuleCtx RedisModuleCtx;
|
||||
|
||||
#define REDISMODULE_CTX_INIT {(void*)(unsigned long)&RM_GetApi, NULL, NULL, NULL, NULL, 0, 0, 0, NULL, 0, NULL, NULL, NULL, NULL, {0}}
|
||||
#define REDISMODULE_CTX_MULTI_EMITTED (1<<0)
|
||||
#define REDISMODULE_CTX_AUTO_MEMORY (1<<1)
|
||||
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<2)
|
||||
#define REDISMODULE_CTX_BLOCKED_REPLY (1<<3)
|
||||
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<4)
|
||||
#define REDISMODULE_CTX_THREAD_SAFE (1<<5)
|
||||
#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<6)
|
||||
#define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<7)
|
||||
#define REDISMODULE_CTX_AUTO_MEMORY (1<<0)
|
||||
#define REDISMODULE_CTX_KEYS_POS_REQUEST (1<<1)
|
||||
#define REDISMODULE_CTX_BLOCKED_REPLY (1<<2)
|
||||
#define REDISMODULE_CTX_BLOCKED_TIMEOUT (1<<3)
|
||||
#define REDISMODULE_CTX_THREAD_SAFE (1<<4)
|
||||
#define REDISMODULE_CTX_BLOCKED_DISCONNECTED (1<<5)
|
||||
#define REDISMODULE_CTX_MODULE_COMMAND_CALL (1<<6)
|
||||
|
||||
/* This represents a Redis key opened with RM_OpenKey(). */
|
||||
struct RedisModuleKey {
|
||||
@ -578,12 +577,15 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
|
||||
|
||||
/* We don't need to do anything here if the context was never used
|
||||
* in order to propagate commands. */
|
||||
if (!(ctx->flags & REDISMODULE_CTX_MULTI_EMITTED)) return;
|
||||
if (!server.propagate_in_transaction) return;
|
||||
|
||||
if (c->flags & CLIENT_LUA) return;
|
||||
/* If this command is executed from with Lua or MULTI/EXEC we do noy
|
||||
* need to propagate EXEC */
|
||||
if (server.in_eval || server.in_exec) return;
|
||||
|
||||
/* Handle the replication of the final EXEC, since whatever a command
|
||||
* emits is always wrapped around MULTI/EXEC. */
|
||||
beforePropagateMultiOrExec(0);
|
||||
alsoPropagate(server.execCommand,c->db->id,&shared.exec,1,
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
|
||||
@ -1610,9 +1612,9 @@ int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) {
|
||||
void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
|
||||
/* Skip this if client explicitly wrap the command with MULTI, or if
|
||||
* the module command was called by a script. */
|
||||
if (ctx->client->flags & (CLIENT_MULTI|CLIENT_LUA)) return;
|
||||
if (server.lua_caller || server.in_exec) return;
|
||||
/* If we already emitted MULTI return ASAP. */
|
||||
if (ctx->flags & REDISMODULE_CTX_MULTI_EMITTED) return;
|
||||
if (server.propagate_in_transaction) return;
|
||||
/* If this is a thread safe context, we do not want to wrap commands
|
||||
* executed into MULTI/EXEC, they are executed as single commands
|
||||
* from an external client in essence. */
|
||||
@ -1625,7 +1627,6 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
|
||||
redisOpArrayInit(&server.also_propagate);
|
||||
}
|
||||
execCommandPropagateMulti(ctx->client);
|
||||
ctx->flags |= REDISMODULE_CTX_MULTI_EMITTED;
|
||||
}
|
||||
|
||||
/* Replicate the specified command and arguments to slaves and AOF, as effect
|
||||
@ -1938,20 +1939,16 @@ int RM_GetSelectedDb(RedisModuleCtx *ctx) {
|
||||
* background child process.
|
||||
*/
|
||||
int RM_GetContextFlags(RedisModuleCtx *ctx) {
|
||||
|
||||
int flags = 0;
|
||||
|
||||
/* Client specific flags */
|
||||
if (ctx) {
|
||||
if (ctx->client) {
|
||||
if (ctx->client->flags & CLIENT_LUA)
|
||||
flags |= REDISMODULE_CTX_FLAGS_LUA;
|
||||
if (ctx->client->flags & CLIENT_MULTI)
|
||||
flags |= REDISMODULE_CTX_FLAGS_MULTI;
|
||||
if (ctx->client->flags & CLIENT_DENY_BLOCKING)
|
||||
flags |= REDISMODULE_CTX_FLAGS_DENY_BLOCKING;
|
||||
flags |= REDISMODULE_CTX_FLAGS_DENY_BLOCKING;
|
||||
/* Module command received from MASTER, is replicated. */
|
||||
if (ctx->client->flags & CLIENT_MASTER)
|
||||
flags |= REDISMODULE_CTX_FLAGS_REPLICATED;
|
||||
flags |= REDISMODULE_CTX_FLAGS_REPLICATED;
|
||||
}
|
||||
|
||||
/* For DIRTY flags, we need the blocked client if used */
|
||||
@ -1961,6 +1958,12 @@ int RM_GetContextFlags(RedisModuleCtx *ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
if (server.in_eval)
|
||||
flags |= REDISMODULE_CTX_FLAGS_LUA;
|
||||
|
||||
if (server.in_exec)
|
||||
flags |= REDISMODULE_CTX_FLAGS_MULTI;
|
||||
|
||||
if (server.cluster_enabled)
|
||||
flags |= REDISMODULE_CTX_FLAGS_CLUSTER;
|
||||
|
||||
@ -4497,8 +4500,8 @@ void unblockClientFromModule(client *c) {
|
||||
*/
|
||||
RedisModuleBlockedClient *moduleBlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(RedisModuleCtx*,void*), long long timeout_ms, RedisModuleString **keys, int numkeys, void *privdata) {
|
||||
client *c = ctx->client;
|
||||
int islua = c->flags & CLIENT_LUA;
|
||||
int ismulti = c->flags & CLIENT_MULTI;
|
||||
int islua = server.in_eval;
|
||||
int ismulti = server.in_exec;
|
||||
|
||||
c->bpop.module_blocked_handle = zmalloc(sizeof(RedisModuleBlockedClient));
|
||||
RedisModuleBlockedClient *bc = c->bpop.module_blocked_handle;
|
||||
|
20
src/multi.c
20
src/multi.c
@ -113,14 +113,28 @@ void discardCommand(client *c) {
|
||||
addReply(c,shared.ok);
|
||||
}
|
||||
|
||||
void beforePropagateMultiOrExec(int multi) {
|
||||
if (multi) {
|
||||
/* Propagating MULTI */
|
||||
serverAssert(!server.propagate_in_transaction);
|
||||
server.propagate_in_transaction = 1;
|
||||
} else {
|
||||
/* Propagating EXEC */
|
||||
serverAssert(server.propagate_in_transaction == 1);
|
||||
server.propagate_in_transaction = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Send a MULTI command to all the slaves and AOF file. Check the execCommand
|
||||
* implementation for more information. */
|
||||
void execCommandPropagateMulti(client *c) {
|
||||
beforePropagateMultiOrExec(1);
|
||||
propagate(server.multiCommand,c->db->id,&shared.multi,1,
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
}
|
||||
|
||||
void execCommandPropagateExec(client *c) {
|
||||
beforePropagateMultiOrExec(0);
|
||||
propagate(server.execCommand,c->db->id,&shared.exec,1,
|
||||
PROPAGATE_AOF|PROPAGATE_REPL);
|
||||
}
|
||||
@ -176,6 +190,9 @@ void execCommand(client *c) {
|
||||
|
||||
/* Exec all the queued commands */
|
||||
unwatchAllKeys(c); /* Unwatch ASAP otherwise we'll waste CPU cycles */
|
||||
|
||||
server.in_exec = 1;
|
||||
|
||||
orig_argv = c->argv;
|
||||
orig_argc = c->argc;
|
||||
orig_cmd = c->cmd;
|
||||
@ -251,6 +268,7 @@ void execCommand(client *c) {
|
||||
if (must_propagate) {
|
||||
int is_master = server.masterhost == NULL;
|
||||
server.dirty++;
|
||||
beforePropagateMultiOrExec(0);
|
||||
/* If inside the MULTI/EXEC block this instance was suddenly
|
||||
* switched from master to slave (using the SLAVEOF command), the
|
||||
* initial MULTI was propagated into the replication backlog, but the
|
||||
@ -262,6 +280,8 @@ void execCommand(client *c) {
|
||||
}
|
||||
}
|
||||
|
||||
server.in_exec = 0;
|
||||
|
||||
handle_monitor:
|
||||
/* Send EXEC to clients waiting data from MONITOR. We do it here
|
||||
* since the natural order of commands execution is actually:
|
||||
|
@ -1498,6 +1498,7 @@ void evalGenericCommand(client *c, int evalsha) {
|
||||
server.lua_replicate_commands = server.lua_always_replicate_commands;
|
||||
server.lua_multi_emitted = 0;
|
||||
server.lua_repl = PROPAGATE_AOF|PROPAGATE_REPL;
|
||||
server.in_eval = 1;
|
||||
|
||||
/* Get the number of arguments that are keys */
|
||||
if (getLongLongFromObjectOrReply(c,c->argv[2],&numkeys,NULL) != C_OK)
|
||||
@ -1678,6 +1679,8 @@ void evalGenericCommand(client *c, int evalsha) {
|
||||
forceCommandPropagation(c,PROPAGATE_REPL|PROPAGATE_AOF);
|
||||
}
|
||||
}
|
||||
|
||||
server.in_eval = 0;
|
||||
}
|
||||
|
||||
void evalCommand(client *c) {
|
||||
|
@ -3049,6 +3049,9 @@ void initServer(void) {
|
||||
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
|
||||
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
|
||||
server.cronloops = 0;
|
||||
server.in_eval = 0;
|
||||
server.in_exec = 0;
|
||||
server.propagate_in_transaction = 0;
|
||||
server.rdb_child_pid = -1;
|
||||
server.aof_child_pid = -1;
|
||||
server.module_child_pid = -1;
|
||||
|
@ -1116,6 +1116,9 @@ struct redisServer {
|
||||
int sentinel_mode; /* True if this instance is a Sentinel. */
|
||||
size_t initial_memory_usage; /* Bytes used after initialization. */
|
||||
int always_show_logo; /* Show logo even for non-stdout logging. */
|
||||
int in_eval; /* Are we inside EVAL? */
|
||||
int in_exec; /* Are we inside EXEC? */
|
||||
int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */
|
||||
/* Modules */
|
||||
dict *moduleapi; /* Exported core APIs dictionary for modules. */
|
||||
dict *sharedapi; /* Like moduleapi but containing the APIs that
|
||||
@ -1834,6 +1837,7 @@ void flagTransaction(client *c);
|
||||
void execCommandAbort(client *c, sds error);
|
||||
void execCommandPropagateMulti(client *c);
|
||||
void execCommandPropagateExec(client *c);
|
||||
void beforePropagateMultiOrExec(int multi);
|
||||
|
||||
/* Redis object implementation */
|
||||
void decrRefCount(robj *o);
|
||||
|
@ -39,7 +39,7 @@
|
||||
/** strores all the keys on which we got 'loaded' keyspace notification **/
|
||||
RedisModuleDict *loaded_event_log = NULL;
|
||||
|
||||
static int KeySpace_Notification(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
|
||||
static int KeySpace_NotificationLoaded(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key){
|
||||
REDISMODULE_NOT_USED(ctx);
|
||||
REDISMODULE_NOT_USED(type);
|
||||
|
||||
@ -55,6 +55,29 @@ static int KeySpace_Notification(RedisModuleCtx *ctx, int type, const char *even
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
static int KeySpace_NotificationGeneric(RedisModuleCtx *ctx, int type, const char *event, RedisModuleString *key) {
|
||||
REDISMODULE_NOT_USED(type);
|
||||
|
||||
if (strcmp(event, "del") == 0) {
|
||||
RedisModuleString *copykey = RedisModule_CreateStringPrintf(ctx, "%s_copy", RedisModule_StringPtrLen(key, NULL));
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, "DEL", "s!", copykey);
|
||||
RedisModule_FreeString(ctx, copykey);
|
||||
RedisModule_FreeCallReply(rep);
|
||||
|
||||
int ctx_flags = RedisModule_GetContextFlags(ctx);
|
||||
if (ctx_flags & REDISMODULE_CTX_FLAGS_LUA) {
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c", "lua");
|
||||
RedisModule_FreeCallReply(rep);
|
||||
}
|
||||
if (ctx_flags & REDISMODULE_CTX_FLAGS_MULTI) {
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "c", "multi");
|
||||
RedisModule_FreeCallReply(rep);
|
||||
}
|
||||
}
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
static int cmdIsKeyLoaded(RedisModuleCtx *ctx, RedisModuleString **argv, int argc){
|
||||
if(argc != 2){
|
||||
return RedisModule_WrongArity(ctx);
|
||||
@ -75,6 +98,68 @@ static int cmdIsKeyLoaded(RedisModuleCtx *ctx, RedisModuleString **argv, int arg
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
static int cmdDelKeyCopy(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2)
|
||||
return RedisModule_WrongArity(ctx);
|
||||
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, "DEL", "s!", argv[1]);
|
||||
if (!rep) {
|
||||
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
||||
} else {
|
||||
RedisModule_ReplyWithCallReply(ctx, rep);
|
||||
RedisModule_FreeCallReply(rep);
|
||||
}
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Call INCR and propagate using RM_Call with `!`. */
|
||||
static int cmdIncrCase1(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2)
|
||||
return RedisModule_WrongArity(ctx);
|
||||
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s!", argv[1]);
|
||||
if (!rep) {
|
||||
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
||||
} else {
|
||||
RedisModule_ReplyWithCallReply(ctx, rep);
|
||||
RedisModule_FreeCallReply(rep);
|
||||
}
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Call INCR and propagate using RM_Replicate. */
|
||||
static int cmdIncrCase2(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2)
|
||||
return RedisModule_WrongArity(ctx);
|
||||
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s", argv[1]);
|
||||
if (!rep) {
|
||||
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
||||
} else {
|
||||
RedisModule_ReplyWithCallReply(ctx, rep);
|
||||
RedisModule_FreeCallReply(rep);
|
||||
}
|
||||
RedisModule_Replicate(ctx, "INCR", "s", argv[1]);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
/* Call INCR and propagate using RM_ReplicateVerbatim. */
|
||||
static int cmdIncrCase3(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
if (argc != 2)
|
||||
return RedisModule_WrongArity(ctx);
|
||||
|
||||
RedisModuleCallReply* rep = RedisModule_Call(ctx, "INCR", "s", argv[1]);
|
||||
if (!rep) {
|
||||
RedisModule_ReplyWithError(ctx, "NULL reply returned");
|
||||
} else {
|
||||
RedisModule_ReplyWithCallReply(ctx, rep);
|
||||
RedisModule_FreeCallReply(rep);
|
||||
}
|
||||
RedisModule_ReplicateVerbatim(ctx);
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
|
||||
/* This function must be present on each Redis module. It is used in order to
|
||||
* register the commands into the Redis server. */
|
||||
int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) {
|
||||
@ -94,7 +179,11 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_LOADED, KeySpace_Notification) != REDISMODULE_OK){
|
||||
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_LOADED, KeySpace_NotificationLoaded) != REDISMODULE_OK){
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if(RedisModule_SubscribeToKeyspaceEvents(ctx, REDISMODULE_NOTIFY_GENERIC, KeySpace_NotificationGeneric) != REDISMODULE_OK){
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
@ -102,6 +191,22 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc)
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"keyspace.del_key_copy", cmdDelKeyCopy,"",0,0,0) == REDISMODULE_ERR){
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"keyspace.incr_case1", cmdIncrCase1,"",0,0,0) == REDISMODULE_ERR){
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"keyspace.incr_case2", cmdIncrCase2,"",0,0,0) == REDISMODULE_ERR){
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
if (RedisModule_CreateCommand(ctx,"keyspace.incr_case3", cmdIncrCase3,"",0,0,0) == REDISMODULE_ERR){
|
||||
return REDISMODULE_ERR;
|
||||
}
|
||||
|
||||
return REDISMODULE_OK;
|
||||
}
|
||||
|
||||
|
@ -18,5 +18,54 @@ tags "modules" {
|
||||
assert_equal {1 t} [r keyspace.is_key_loaded t]
|
||||
assert_equal {1 s} [r keyspace.is_key_loaded s]
|
||||
}
|
||||
|
||||
test {Nested multi due to RM_Call} {
|
||||
r del multi
|
||||
r del lua
|
||||
|
||||
r set x 1
|
||||
r set x_copy 1
|
||||
r keyspace.del_key_copy x
|
||||
r keyspace.incr_case1 x
|
||||
r keyspace.incr_case2 x
|
||||
r keyspace.incr_case3 x
|
||||
assert_equal {} [r get multi]
|
||||
assert_equal {} [r get lua]
|
||||
r get x
|
||||
} {3}
|
||||
|
||||
test {Nested multi due to RM_Call, with client MULTI} {
|
||||
r del multi
|
||||
r del lua
|
||||
|
||||
r set x 1
|
||||
r set x_copy 1
|
||||
r multi
|
||||
r keyspace.del_key_copy x
|
||||
r keyspace.incr_case1 x
|
||||
r keyspace.incr_case2 x
|
||||
r keyspace.incr_case3 x
|
||||
r exec
|
||||
assert_equal {1} [r get multi]
|
||||
assert_equal {} [r get lua]
|
||||
r get x
|
||||
} {3}
|
||||
|
||||
test {Nested multi due to RM_Call, with EVAL} {
|
||||
r del multi
|
||||
r del lua
|
||||
|
||||
r set x 1
|
||||
r set x_copy 1
|
||||
r eval {
|
||||
redis.pcall('keyspace.del_key_copy', KEYS[1])
|
||||
redis.pcall('keyspace.incr_case1', KEYS[1])
|
||||
redis.pcall('keyspace.incr_case2', KEYS[1])
|
||||
redis.pcall('keyspace.incr_case3', KEYS[1])
|
||||
} 1 x
|
||||
assert_equal {} [r get multi]
|
||||
assert_equal {1} [r get lua]
|
||||
r get x
|
||||
} {3}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user