From b258322f9e90496b829e9389a2c0927d07b6da96 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 21 Jun 2023 16:40:35 -0400 Subject: [PATCH] Fix propagate in transaction crash (#200) * ignore propagate_in_transaction in processCommand if command is async * make propogate_in_transaction thread local * forgot module.cpp * move client_pause_in_transaction to thread local * don't do async commands while in eval or exec * don't do keys or scan async if in transaction --- src/db.cpp | 4 ++-- src/module.cpp | 4 ++-- src/multi.cpp | 10 +++++----- src/networking.cpp | 4 ++-- src/server.cpp | 15 +++++++-------- src/server.h | 5 +++-- 6 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 476908f5e..297910b9e 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1059,7 +1059,7 @@ void keysCommand(client *c) { sds pattern = szFromObj(c->argv[1]); const redisDbPersistentDataSnapshot *snapshot = nullptr; - if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED | CLIENT_DENY_BLOCKING))) + if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED | CLIENT_DENY_BLOCKING)) && !(serverTL->in_eval || serverTL->in_exec)) snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */); if (snapshot != nullptr) { @@ -1224,7 +1224,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { } } - if (o == nullptr && count >= 100) + if (o == nullptr && count >= 100 && !(serverTL->in_eval || serverTL->in_exec)) { // Do an async version if (c->asyncCommand( diff --git a/src/module.cpp b/src/module.cpp index cf3c81c1a..7bd32aebb 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -656,7 +656,7 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { /* We don't need to do anything here if the server isn't inside * a transaction. */ - if (!g_pserver->propagate_in_transaction) return; + if (!serverTL->propagate_in_transaction) return; /* If this command is executed from with Lua or MULTI/EXEC we do not * need to propagate EXEC */ @@ -1814,7 +1814,7 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) { * the module command was called by a script. */ if (serverTL->in_eval || serverTL->in_exec) return; /* If we already emitted MULTI return ASAP. */ - if (g_pserver->propagate_in_transaction) return; + if (serverTL->propagate_in_transaction) return; /* If this is a thread safe context, we do not want to wrap commands * executed into MULTI/EXEC, they are executed as single commands * from an external client in essence. */ diff --git a/src/multi.cpp b/src/multi.cpp index 207ce9c40..b2f1ccf22 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -118,14 +118,14 @@ void discardCommand(client *c) { void beforePropagateMulti() { /* Propagating MULTI */ - serverAssert(!g_pserver->propagate_in_transaction); - g_pserver->propagate_in_transaction = 1; + serverAssert(!serverTL->propagate_in_transaction); + serverTL->propagate_in_transaction = 1; } void afterPropagateExec() { /* Propagating EXEC */ - serverAssert(g_pserver->propagate_in_transaction == 1); - g_pserver->propagate_in_transaction = 0; + serverAssert(serverTL->propagate_in_transaction == 1); + serverTL->propagate_in_transaction = 0; } /* Send a MULTI command to all the slaves and AOF file. Check the execCommand @@ -264,7 +264,7 @@ void execCommand(client *c) { /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ - if (g_pserver->propagate_in_transaction) { + if (serverTL->propagate_in_transaction) { int is_master = listLength(g_pserver->masters) == 0; g_pserver->dirty++; /* If inside the MULTI/EXEC block this instance was suddenly diff --git a/src/networking.cpp b/src/networking.cpp index c9223c8ae..637ba0592 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2753,7 +2753,7 @@ void readQueryFromClient(connection *conn) { if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) { parseClientCommandBuffer(c); - if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) { + if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode) && !serverTL->in_eval && !serverTL->in_exec) { // Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often // so we exclude them unless the snapshot we need already exists. // Note: In test mode we want to create snapshots as often as possibl to excercise them - we don't care about perf @@ -3995,7 +3995,7 @@ void pauseClients(mstime_t end, pause_type type) { * to track this state so that we don't assert * in propagate(). */ if (serverTL->in_exec) { - g_pserver->client_pause_in_transaction = 1; + serverTL->client_pause_in_transaction = 1; } } diff --git a/src/server.cpp b/src/server.cpp index 15afd2cde..95d19459b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3961,8 +3961,6 @@ void initServer(void) { g_pserver->pubsub_channels = dictCreate(&keylistDictType,NULL); g_pserver->pubsub_patterns = dictCreate(&keylistDictType,NULL); g_pserver->cronloops = 0; - g_pserver->propagate_in_transaction = 0; - g_pserver->client_pause_in_transaction = 0; g_pserver->child_pid = -1; g_pserver->child_type = CHILD_TYPE_NONE; g_pserver->rdbThreadVars.fRdbThreadCancel = false; @@ -4335,12 +4333,12 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ - if (serverTL->in_exec && !g_pserver->propagate_in_transaction) + if (serverTL->in_exec && !serverTL->propagate_in_transaction) execCommandPropagateMulti(dbid); /* This needs to be unreachable since the dataset should be fixed during * client pause, otherwise data may be lossed during a failover. */ - serverAssert(!(areClientsPaused() && !g_pserver->client_pause_in_transaction)); + serverAssert(!(areClientsPaused() && !serverTL->client_pause_in_transaction)); if (g_pserver->aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); @@ -4662,8 +4660,8 @@ void call(client *c, int flags) { /* Client pause takes effect after a transaction has finished. This needs * to be located after everything is propagated. */ - if (!serverTL->in_exec && g_pserver->client_pause_in_transaction) { - g_pserver->client_pause_in_transaction = 0; + if (!serverTL->in_exec && serverTL->client_pause_in_transaction) { + serverTL->client_pause_in_transaction = 0; } /* If the client has keys tracking enabled for client side caching, @@ -4754,8 +4752,9 @@ int processCommand(client *c, int callFlags) { /* Both EXEC and EVAL call call() directly so there should be * no way in_exec or in_eval or propagate_in_transaction is 1. * That is unless lua_timedout, in which case client may run - * some commands. */ - serverAssert(!g_pserver->propagate_in_transaction); + * some commands. Also possible that some other thread set + * propagate_in_transaction if this is an async command. */ + serverAssert(!serverTL->propagate_in_transaction); serverAssert(!serverTL->in_exec); serverAssert(!serverTL->in_eval); } diff --git a/src/server.h b/src/server.h index ac927d07a..022cca2dd 100644 --- a/src/server.h +++ b/src/server.h @@ -2199,6 +2199,9 @@ struct redisServerThreadVars { bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before the thread went to sleep? */ bool disable_async_commands = false; /* this is only valid for one cycle of the AE loop and is reset in afterSleep */ + + int propagate_in_transaction = 0; /* Make sure we don't propagate nested MULTI/EXEC */ + int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */ std::vector vecclientsProcess; dictAsyncRehashCtl *rehashCtl = nullptr; @@ -2304,9 +2307,7 @@ struct redisServer { int sentinel_mode; /* True if this instance is a Sentinel. */ size_t initial_memory_usage; /* Bytes used after initialization. */ int always_show_logo; /* Show logo even for non-stdout logging. */ - int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */ char *ignore_warnings; /* Config: warnings that should be ignored. */ - int client_pause_in_transaction; /* Was a client pause executed during this Exec? */ pause_type client_pause_type; /* True if clients are currently paused */ /* Modules */ ::dict *moduleapi; /* Exported core APIs dictionary for modules. */