Fix propagate in transaction crash (#200)

* ignore propagate_in_transaction in processCommand if command is async

* make propogate_in_transaction thread local

* forgot module.cpp

* move client_pause_in_transaction to thread local

* don't do async commands while in eval or exec

* don't do keys or scan async if in transaction
This commit is contained in:
Malavan Sotheeswaran 2023-06-21 16:40:35 -04:00
parent 46660b9f18
commit b258322f9e
6 changed files with 21 additions and 21 deletions

View File

@ -1059,7 +1059,7 @@ void keysCommand(client *c) {
sds pattern = szFromObj(c->argv[1]); sds pattern = szFromObj(c->argv[1]);
const redisDbPersistentDataSnapshot *snapshot = nullptr; const redisDbPersistentDataSnapshot *snapshot = nullptr;
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED | CLIENT_DENY_BLOCKING))) if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED | CLIENT_DENY_BLOCKING)) && !(serverTL->in_eval || serverTL->in_exec))
snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */); snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */);
if (snapshot != nullptr) if (snapshot != nullptr)
{ {
@ -1224,7 +1224,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) {
} }
} }
if (o == nullptr && count >= 100) if (o == nullptr && count >= 100 && !(serverTL->in_eval || serverTL->in_exec))
{ {
// Do an async version // Do an async version
if (c->asyncCommand( if (c->asyncCommand(

View File

@ -656,7 +656,7 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) {
/* We don't need to do anything here if the server isn't inside /* We don't need to do anything here if the server isn't inside
* a transaction. */ * a transaction. */
if (!g_pserver->propagate_in_transaction) return; if (!serverTL->propagate_in_transaction) return;
/* If this command is executed from with Lua or MULTI/EXEC we do not /* If this command is executed from with Lua or MULTI/EXEC we do not
* need to propagate EXEC */ * need to propagate EXEC */
@ -1814,7 +1814,7 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) {
* the module command was called by a script. */ * the module command was called by a script. */
if (serverTL->in_eval || serverTL->in_exec) return; if (serverTL->in_eval || serverTL->in_exec) return;
/* If we already emitted MULTI return ASAP. */ /* If we already emitted MULTI return ASAP. */
if (g_pserver->propagate_in_transaction) return; if (serverTL->propagate_in_transaction) return;
/* If this is a thread safe context, we do not want to wrap commands /* If this is a thread safe context, we do not want to wrap commands
* executed into MULTI/EXEC, they are executed as single commands * executed into MULTI/EXEC, they are executed as single commands
* from an external client in essence. */ * from an external client in essence. */

View File

@ -118,14 +118,14 @@ void discardCommand(client *c) {
void beforePropagateMulti() { void beforePropagateMulti() {
/* Propagating MULTI */ /* Propagating MULTI */
serverAssert(!g_pserver->propagate_in_transaction); serverAssert(!serverTL->propagate_in_transaction);
g_pserver->propagate_in_transaction = 1; serverTL->propagate_in_transaction = 1;
} }
void afterPropagateExec() { void afterPropagateExec() {
/* Propagating EXEC */ /* Propagating EXEC */
serverAssert(g_pserver->propagate_in_transaction == 1); serverAssert(serverTL->propagate_in_transaction == 1);
g_pserver->propagate_in_transaction = 0; serverTL->propagate_in_transaction = 0;
} }
/* Send a MULTI command to all the slaves and AOF file. Check the execCommand /* Send a MULTI command to all the slaves and AOF file. Check the execCommand
@ -264,7 +264,7 @@ void execCommand(client *c) {
/* Make sure the EXEC command will be propagated as well if MULTI /* Make sure the EXEC command will be propagated as well if MULTI
* was already propagated. */ * was already propagated. */
if (g_pserver->propagate_in_transaction) { if (serverTL->propagate_in_transaction) {
int is_master = listLength(g_pserver->masters) == 0; int is_master = listLength(g_pserver->masters) == 0;
g_pserver->dirty++; g_pserver->dirty++;
/* If inside the MULTI/EXEC block this instance was suddenly /* If inside the MULTI/EXEC block this instance was suddenly

View File

@ -2753,7 +2753,7 @@ void readQueryFromClient(connection *conn) {
if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) { if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) {
parseClientCommandBuffer(c); parseClientCommandBuffer(c);
if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) { if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode) && !serverTL->in_eval && !serverTL->in_exec) {
// Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often // Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often
// so we exclude them unless the snapshot we need already exists. // so we exclude them unless the snapshot we need already exists.
// Note: In test mode we want to create snapshots as often as possibl to excercise them - we don't care about perf // Note: In test mode we want to create snapshots as often as possibl to excercise them - we don't care about perf
@ -3995,7 +3995,7 @@ void pauseClients(mstime_t end, pause_type type) {
* to track this state so that we don't assert * to track this state so that we don't assert
* in propagate(). */ * in propagate(). */
if (serverTL->in_exec) { if (serverTL->in_exec) {
g_pserver->client_pause_in_transaction = 1; serverTL->client_pause_in_transaction = 1;
} }
} }

View File

@ -3961,8 +3961,6 @@ void initServer(void) {
g_pserver->pubsub_channels = dictCreate(&keylistDictType,NULL); g_pserver->pubsub_channels = dictCreate(&keylistDictType,NULL);
g_pserver->pubsub_patterns = dictCreate(&keylistDictType,NULL); g_pserver->pubsub_patterns = dictCreate(&keylistDictType,NULL);
g_pserver->cronloops = 0; g_pserver->cronloops = 0;
g_pserver->propagate_in_transaction = 0;
g_pserver->client_pause_in_transaction = 0;
g_pserver->child_pid = -1; g_pserver->child_pid = -1;
g_pserver->child_type = CHILD_TYPE_NONE; g_pserver->child_type = CHILD_TYPE_NONE;
g_pserver->rdbThreadVars.fRdbThreadCancel = false; g_pserver->rdbThreadVars.fRdbThreadCancel = false;
@ -4335,12 +4333,12 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
* This way we'll deliver the MULTI/..../EXEC block as a whole and * This way we'll deliver the MULTI/..../EXEC block as a whole and
* both the AOF and the replication link will have the same consistency * both the AOF and the replication link will have the same consistency
* and atomicity guarantees. */ * and atomicity guarantees. */
if (serverTL->in_exec && !g_pserver->propagate_in_transaction) if (serverTL->in_exec && !serverTL->propagate_in_transaction)
execCommandPropagateMulti(dbid); execCommandPropagateMulti(dbid);
/* This needs to be unreachable since the dataset should be fixed during /* This needs to be unreachable since the dataset should be fixed during
* client pause, otherwise data may be lossed during a failover. */ * client pause, otherwise data may be lossed during a failover. */
serverAssert(!(areClientsPaused() && !g_pserver->client_pause_in_transaction)); serverAssert(!(areClientsPaused() && !serverTL->client_pause_in_transaction));
if (g_pserver->aof_state != AOF_OFF && flags & PROPAGATE_AOF) if (g_pserver->aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc); feedAppendOnlyFile(cmd,dbid,argv,argc);
@ -4662,8 +4660,8 @@ void call(client *c, int flags) {
/* Client pause takes effect after a transaction has finished. This needs /* Client pause takes effect after a transaction has finished. This needs
* to be located after everything is propagated. */ * to be located after everything is propagated. */
if (!serverTL->in_exec && g_pserver->client_pause_in_transaction) { if (!serverTL->in_exec && serverTL->client_pause_in_transaction) {
g_pserver->client_pause_in_transaction = 0; serverTL->client_pause_in_transaction = 0;
} }
/* If the client has keys tracking enabled for client side caching, /* If the client has keys tracking enabled for client side caching,
@ -4754,8 +4752,9 @@ int processCommand(client *c, int callFlags) {
/* Both EXEC and EVAL call call() directly so there should be /* Both EXEC and EVAL call call() directly so there should be
* no way in_exec or in_eval or propagate_in_transaction is 1. * no way in_exec or in_eval or propagate_in_transaction is 1.
* That is unless lua_timedout, in which case client may run * That is unless lua_timedout, in which case client may run
* some commands. */ * some commands. Also possible that some other thread set
serverAssert(!g_pserver->propagate_in_transaction); * propagate_in_transaction if this is an async command. */
serverAssert(!serverTL->propagate_in_transaction);
serverAssert(!serverTL->in_exec); serverAssert(!serverTL->in_exec);
serverAssert(!serverTL->in_eval); serverAssert(!serverTL->in_eval);
} }

View File

@ -2199,6 +2199,9 @@ struct redisServerThreadVars {
bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before
the thread went to sleep? */ the thread went to sleep? */
bool disable_async_commands = false; /* this is only valid for one cycle of the AE loop and is reset in afterSleep */ bool disable_async_commands = false; /* this is only valid for one cycle of the AE loop and is reset in afterSleep */
int propagate_in_transaction = 0; /* Make sure we don't propagate nested MULTI/EXEC */
int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */
std::vector<client*> vecclientsProcess; std::vector<client*> vecclientsProcess;
dictAsyncRehashCtl *rehashCtl = nullptr; dictAsyncRehashCtl *rehashCtl = nullptr;
@ -2304,9 +2307,7 @@ struct redisServer {
int sentinel_mode; /* True if this instance is a Sentinel. */ int sentinel_mode; /* True if this instance is a Sentinel. */
size_t initial_memory_usage; /* Bytes used after initialization. */ size_t initial_memory_usage; /* Bytes used after initialization. */
int always_show_logo; /* Show logo even for non-stdout logging. */ int always_show_logo; /* Show logo even for non-stdout logging. */
int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */
char *ignore_warnings; /* Config: warnings that should be ignored. */ char *ignore_warnings; /* Config: warnings that should be ignored. */
int client_pause_in_transaction; /* Was a client pause executed during this Exec? */
pause_type client_pause_type; /* True if clients are currently paused */ pause_type client_pause_type; /* True if clients are currently paused */
/* Modules */ /* Modules */
::dict *moduleapi; /* Exported core APIs dictionary for modules. */ ::dict *moduleapi; /* Exported core APIs dictionary for modules. */