From 8242ba5b47bdf56b6455385f051809c7df39e4c2 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 21 Jun 2023 16:40:35 -0400 Subject: [PATCH 1/4] Fix propagate in transaction crash (#200) * ignore propagate_in_transaction in processCommand if command is async * make propogate_in_transaction thread local * forgot module.cpp * move client_pause_in_transaction to thread local * don't do async commands while in eval or exec * don't do keys or scan async if in transaction --- src/db.cpp | 4 ++-- src/module.cpp | 4 ++-- src/multi.cpp | 10 +++++----- src/networking.cpp | 4 ++-- src/server.cpp | 15 +++++++-------- src/server.h | 5 +++-- 6 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index dcf617c10..c6b7b099b 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1059,7 +1059,7 @@ void keysCommand(client *c) { sds pattern = szFromObj(c->argv[1]); const redisDbPersistentDataSnapshot *snapshot = nullptr; - if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED | CLIENT_DENY_BLOCKING))) + if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED | CLIENT_DENY_BLOCKING)) && !(serverTL->in_eval || serverTL->in_exec)) snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */); if (snapshot != nullptr) { @@ -1224,7 +1224,7 @@ void scanGenericCommand(client *c, robj_roptr o, unsigned long cursor) { } } - if (o == nullptr && count >= 100) + if (o == nullptr && count >= 100 && !(serverTL->in_eval || serverTL->in_exec)) { // Do an async version if (c->asyncCommand( diff --git a/src/module.cpp b/src/module.cpp index cf3c81c1a..7bd32aebb 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -656,7 +656,7 @@ void moduleHandlePropagationAfterCommandCallback(RedisModuleCtx *ctx) { /* We don't need to do anything here if the server isn't inside * a transaction. */ - if (!g_pserver->propagate_in_transaction) return; + if (!serverTL->propagate_in_transaction) return; /* If this command is executed from with Lua or MULTI/EXEC we do not * need to propagate EXEC */ @@ -1814,7 +1814,7 @@ void moduleReplicateMultiIfNeeded(RedisModuleCtx *ctx) { * the module command was called by a script. */ if (serverTL->in_eval || serverTL->in_exec) return; /* If we already emitted MULTI return ASAP. */ - if (g_pserver->propagate_in_transaction) return; + if (serverTL->propagate_in_transaction) return; /* If this is a thread safe context, we do not want to wrap commands * executed into MULTI/EXEC, they are executed as single commands * from an external client in essence. */ diff --git a/src/multi.cpp b/src/multi.cpp index 207ce9c40..b2f1ccf22 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -118,14 +118,14 @@ void discardCommand(client *c) { void beforePropagateMulti() { /* Propagating MULTI */ - serverAssert(!g_pserver->propagate_in_transaction); - g_pserver->propagate_in_transaction = 1; + serverAssert(!serverTL->propagate_in_transaction); + serverTL->propagate_in_transaction = 1; } void afterPropagateExec() { /* Propagating EXEC */ - serverAssert(g_pserver->propagate_in_transaction == 1); - g_pserver->propagate_in_transaction = 0; + serverAssert(serverTL->propagate_in_transaction == 1); + serverTL->propagate_in_transaction = 0; } /* Send a MULTI command to all the slaves and AOF file. Check the execCommand @@ -264,7 +264,7 @@ void execCommand(client *c) { /* Make sure the EXEC command will be propagated as well if MULTI * was already propagated. */ - if (g_pserver->propagate_in_transaction) { + if (serverTL->propagate_in_transaction) { int is_master = listLength(g_pserver->masters) == 0; g_pserver->dirty++; /* If inside the MULTI/EXEC block this instance was suddenly diff --git a/src/networking.cpp b/src/networking.cpp index c9223c8ae..637ba0592 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -2753,7 +2753,7 @@ void readQueryFromClient(connection *conn) { if (cserver.cthreads > 1 || g_pserver->m_pstorageFactory) { parseClientCommandBuffer(c); - if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode)) { + if (g_pserver->enable_async_commands && !serverTL->disable_async_commands && listLength(g_pserver->monitors) == 0 && (aeLockContention() || serverTL->rgdbSnapshot[c->db->id] || g_fTestMode) && !serverTL->in_eval && !serverTL->in_exec) { // Frequent writers aren't good candidates for this optimization, they cause us to renew the snapshot too often // so we exclude them unless the snapshot we need already exists. // Note: In test mode we want to create snapshots as often as possibl to excercise them - we don't care about perf @@ -3995,7 +3995,7 @@ void pauseClients(mstime_t end, pause_type type) { * to track this state so that we don't assert * in propagate(). */ if (serverTL->in_exec) { - g_pserver->client_pause_in_transaction = 1; + serverTL->client_pause_in_transaction = 1; } } diff --git a/src/server.cpp b/src/server.cpp index 70b8b4374..09535e36d 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3945,8 +3945,6 @@ void initServer(void) { g_pserver->pubsub_channels = dictCreate(&keylistDictType,NULL); g_pserver->pubsub_patterns = dictCreate(&keylistDictType,NULL); g_pserver->cronloops = 0; - g_pserver->propagate_in_transaction = 0; - g_pserver->client_pause_in_transaction = 0; g_pserver->child_pid = -1; g_pserver->child_type = CHILD_TYPE_NONE; g_pserver->rdbThreadVars.fRdbThreadCancel = false; @@ -4320,12 +4318,12 @@ void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ - if (serverTL->in_exec && !g_pserver->propagate_in_transaction) + if (serverTL->in_exec && !serverTL->propagate_in_transaction) execCommandPropagateMulti(dbid); /* This needs to be unreachable since the dataset should be fixed during * client pause, otherwise data may be lossed during a failover. */ - serverAssert(!(areClientsPaused() && !g_pserver->client_pause_in_transaction)); + serverAssert(!(areClientsPaused() && !serverTL->client_pause_in_transaction)); if (g_pserver->aof_state != AOF_OFF && flags & PROPAGATE_AOF) feedAppendOnlyFile(cmd,dbid,argv,argc); @@ -4647,8 +4645,8 @@ void call(client *c, int flags) { /* Client pause takes effect after a transaction has finished. This needs * to be located after everything is propagated. */ - if (!serverTL->in_exec && g_pserver->client_pause_in_transaction) { - g_pserver->client_pause_in_transaction = 0; + if (!serverTL->in_exec && serverTL->client_pause_in_transaction) { + serverTL->client_pause_in_transaction = 0; } /* If the client has keys tracking enabled for client side caching, @@ -4739,8 +4737,9 @@ int processCommand(client *c, int callFlags) { /* Both EXEC and EVAL call call() directly so there should be * no way in_exec or in_eval or propagate_in_transaction is 1. * That is unless lua_timedout, in which case client may run - * some commands. */ - serverAssert(!g_pserver->propagate_in_transaction); + * some commands. Also possible that some other thread set + * propagate_in_transaction if this is an async command. */ + serverAssert(!serverTL->propagate_in_transaction); serverAssert(!serverTL->in_exec); serverAssert(!serverTL->in_eval); } diff --git a/src/server.h b/src/server.h index ac927d07a..022cca2dd 100644 --- a/src/server.h +++ b/src/server.h @@ -2199,6 +2199,9 @@ struct redisServerThreadVars { bool modulesEnabledThisAeLoop = false; /* In this loop of aeMain, were modules enabled before the thread went to sleep? */ bool disable_async_commands = false; /* this is only valid for one cycle of the AE loop and is reset in afterSleep */ + + int propagate_in_transaction = 0; /* Make sure we don't propagate nested MULTI/EXEC */ + int client_pause_in_transaction = 0; /* Was a client pause executed during this Exec? */ std::vector vecclientsProcess; dictAsyncRehashCtl *rehashCtl = nullptr; @@ -2304,9 +2307,7 @@ struct redisServer { int sentinel_mode; /* True if this instance is a Sentinel. */ size_t initial_memory_usage; /* Bytes used after initialization. */ int always_show_logo; /* Show logo even for non-stdout logging. */ - int propagate_in_transaction; /* Make sure we don't propagate nested MULTI/EXEC */ char *ignore_warnings; /* Config: warnings that should be ignored. */ - int client_pause_in_transaction; /* Was a client pause executed during this Exec? */ pause_type client_pause_type; /* True if clients are currently paused */ /* Modules */ ::dict *moduleapi; /* Exported core APIs dictionary for modules. */ From 2adf93ba0e28e918b38e0a627730051700709a4d Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 21 Jun 2023 16:47:12 -0400 Subject: [PATCH 2/4] have FreeMemoryLazyFree increment dict refcount so it doesn't try to use a dict that has already been released (#199) --- src/evict.cpp | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/evict.cpp b/src/evict.cpp index 954a850ac..94bc132c1 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -483,6 +483,7 @@ public: for (auto de : pair.second) { dictFreeUnlinkedEntry(pair.first, de); } + dictRelease(pair.first); } aeReleaseLock(); --s_clazyFreesInProgress; @@ -513,6 +514,7 @@ public: ); if (itr == vecdictvecde.end() || itr->first != d) { itr = vecdictvecde.insert(itr, std::make_pair(d, std::vector())); + __atomic_fetch_add(&d->refcount, 1, __ATOMIC_ACQ_REL); } serverAssert(itr->first == d); itr->second.push_back(de); From 9e310b1bea7da3834dca11e23cda86c4793e07b0 Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran Date: Wed, 21 Jun 2023 16:47:21 -0400 Subject: [PATCH 3/4] Make inserts in flight a shared_ptr to avoid double free (#198) * remove keyproxy test from machamp * Update build.yaml * make insertsinflight shared --- src/SnapshotPayloadParseState.cpp | 12 ++++++------ src/SnapshotPayloadParseState.h | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp index 9da163653..ef28e54f1 100644 --- a/src/SnapshotPayloadParseState.cpp +++ b/src/SnapshotPayloadParseState.cpp @@ -136,12 +136,12 @@ void SnapshotPayloadParseState::flushQueuedKeys() { int idb = current_database; serverAssert(vecqueuedKeys.size() == vecqueuedVals.size()); auto sizePrev = vecqueuedKeys.size(); - ++insertsInFlight; - auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous + (*insertsInFlight)++; + std::weak_ptr> insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous if (current_database < cserver.dbnum) { - g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { + g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); - --insertsInFlightTmp; + (*(insertsInFlightTmp.lock()))--; delete pallocator; }); } else { @@ -172,7 +172,7 @@ SnapshotPayloadParseState::SnapshotPayloadParseState() { dictLongLongMetaData = dictCreate(&metadataLongLongDictType, nullptr); dictMetaData = dictCreate(&metadataDictType, nullptr); - insertsInFlight = 0; + insertsInFlight = std::make_shared>(); m_spallocator = std::make_unique(); } @@ -214,7 +214,7 @@ void SnapshotPayloadParseState::trimState() { if (stackParse.empty()) { flushQueuedKeys(); - while (insertsInFlight > 0) { + while (*insertsInFlight > 0) { // TODO: ProcessEventsWhileBlocked aeReleaseLock(); aeAcquireLock(); diff --git a/src/SnapshotPayloadParseState.h b/src/SnapshotPayloadParseState.h index cb1e0420a..29ac28fb6 100644 --- a/src/SnapshotPayloadParseState.h +++ b/src/SnapshotPayloadParseState.h @@ -36,7 +36,7 @@ class SnapshotPayloadParseState { std::vector vecqueuedValsCb; - std::atomic insertsInFlight; + std::shared_ptr> insertsInFlight; std::unique_ptr m_spallocator; dict *dictLongLongMetaData = nullptr; dict *dictMetaData = nullptr; @@ -62,5 +62,5 @@ public: void pushArray(long long size); void pushValue(const char *rgch, long long cch); void pushValue(long long value); - bool shouldThrottle() const { return insertsInFlight > (cserver.cthreads*4); } + bool shouldThrottle() const { return *insertsInFlight > (cserver.cthreads*4); } }; \ No newline at end of file From 533c1f70b5b5483a611811baf4c14b4d59b9e6f8 Mon Sep 17 00:00:00 2001 From: Karthick Ariyaratnam Date: Wed, 21 Jun 2023 16:26:14 -0400 Subject: [PATCH 4/4] Fix a bug where swapdb result was not recovered after keydb restarts in FLASH mode. (#668) Co-authored-by: k00809413 --- src/db.cpp | 23 ++++++++++++++++++----- src/server.cpp | 23 +++++++++++++++++++---- 2 files changed, 37 insertions(+), 9 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index c6b7b099b..297910b9e 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1769,7 +1769,7 @@ int dbSwapDatabases(int id1, int id2) { /* SWAPDB db1 db2 */ void swapdbCommand(client *c) { - int id1, id2, oriId; + int id1, id2, oriIdx; /* Not allowed in cluster mode: we have just DB 0 there. */ if (g_pserver->cluster_enabled) { @@ -1786,8 +1786,13 @@ void swapdbCommand(client *c) { "invalid second DB index") != C_OK) return; - //get client's original db's id - oriId=c->db->id; + // get client's original db's index + for (int idb=0; idb < cserver.dbnum; ++idb) { + if (g_pserver->db[idb]->id == c->db->id) { + oriIdx = idb; + break; + } + } /* Swap... */ if (dbSwapDatabases(id1,id2) == C_ERR) { @@ -1798,9 +1803,17 @@ void swapdbCommand(client *c) { moduleFireServerEvent(REDISMODULE_EVENT_SWAPDB,0,&si); g_pserver->dirty++; - //set client's db to original db - c->db=g_pserver->db[oriId]; + // set client's db to original db + c->db=g_pserver->db[oriIdx]; + // Persist the databse index to dbid mapping into FLASH for later recovery. + if (g_pserver->m_pstorageFactory != nullptr && g_pserver->metadataDb != nullptr) { + std::string dbid_key = "db-" + std::to_string(id1); + g_pserver->metadataDb->insert(dbid_key.c_str(), dbid_key.length(), &g_pserver->db[id1]->id, sizeof(g_pserver->db[id1]->id), true); + + dbid_key = "db-" + std::to_string(id2); + g_pserver->metadataDb->insert(dbid_key.c_str(), dbid_key.length(), &g_pserver->db[id2]->id, sizeof(g_pserver->db[id2]->id), true); + } addReply(c,shared.ok); } } diff --git a/src/server.cpp b/src/server.cpp index 09535e36d..95d19459b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3881,9 +3881,25 @@ void initServer(void) { g_pserver->db = (redisDb**)zmalloc(sizeof(redisDb*)*cserver.dbnum, MALLOC_LOCAL); /* Create the Redis databases, and initialize other internal state. */ - for (int j = 0; j < cserver.dbnum; j++) { - g_pserver->db[j] = new (MALLOC_LOCAL) redisDb(); - g_pserver->db[j]->initialize(j); + if (g_pserver->m_pstorageFactory == nullptr) { + for (int j = 0; j < cserver.dbnum; j++) { + g_pserver->db[j] = new (MALLOC_LOCAL) redisDb(); + g_pserver->db[j]->initialize(j); + } + } else { + // Read FLASH metadata and load the appropriate dbid into each databse index, as each DB index can have different dbid mapped due to the swapdb command. + g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb(); + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + int dbid = idb; + std::string dbid_key = "db-" + std::to_string(idb); + g_pserver->metadataDb->retrieve(dbid_key.c_str(), dbid_key.length(), [&](const char *, size_t, const void *data, size_t){ + dbid = *(int*)data; + }); + + g_pserver->db[idb] = new (MALLOC_LOCAL) redisDb(); + g_pserver->db[idb]->initialize(dbid); + } } for (int i = 0; i < MAX_EVENT_LOOPS; ++i) @@ -4031,7 +4047,6 @@ void initServer(void) { latencyMonitorInit(); if (g_pserver->m_pstorageFactory) { - g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb(); if (g_pserver->metadataDb) { g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){ if (cb == sizeof(g_pserver->replid)) {