From b398021b2c90b815ad99372d0c857bbb7d7c487d Mon Sep 17 00:00:00 2001 From: Malavan Sotheeswaran <105669860+msotheeswaran@users.noreply.github.com> Date: Wed, 8 Mar 2023 15:54:50 -0500 Subject: [PATCH] update slotToKey during fast load (#589) --- src/SnapshotPayloadParseState.cpp | 2 +- src/db.cpp | 10 +++++++++- src/server.h | 4 ++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp index 8ba4b109b..9da163653 100644 --- a/src/SnapshotPayloadParseState.cpp +++ b/src/SnapshotPayloadParseState.cpp @@ -140,7 +140,7 @@ void SnapshotPayloadParseState::flushQueuedKeys() { auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous if (current_database < cserver.dbnum) { g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { - g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); + g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); --insertsInFlightTmp; delete pallocator; }); diff --git a/src/db.cpp b/src/db.cpp index b959f178e..0c6dc5e8a 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -3002,8 +3002,16 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) }); } -void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem) +/* This function is to bulk insert directly to storage provider bypassing in memory, assumes rgKeys and rgVals are not sds strings */ +void redisDbPersistentData::bulkDirectStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem) { + if (g_pserver->cluster_enabled) { + aeAcquireLock(); + for (size_t i = 0; i < celem; i++) { + slotToKeyUpdateKeyCore(rgKeys[i], rgcbKeys[i], 1); + } + aeReleaseLock(); + } m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem); } diff --git a/src/server.h b/src/server.h index ae3611fc2..eb54f608c 100644 --- a/src/server.h +++ b/src/server.h @@ -1209,7 +1209,7 @@ public: bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } std::unique_ptr CloneStorageCache() { return std::unique_ptr(m_spstorage->clone()); } - void bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem); + void bulkDirectStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem); dict_iter find_cached_threadsafe(const char *key) const; @@ -1370,7 +1370,7 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::FRehashing; using redisDbPersistentData::FTrackingChanges; using redisDbPersistentData::CloneStorageCache; - using redisDbPersistentData::bulkStorageInsert; + using redisDbPersistentData::bulkDirectStorageInsert; public: const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {