diff --git a/src/SnapshotPayloadParseState.cpp b/src/SnapshotPayloadParseState.cpp index 8ba4b109b..9da163653 100644 --- a/src/SnapshotPayloadParseState.cpp +++ b/src/SnapshotPayloadParseState.cpp @@ -140,7 +140,7 @@ void SnapshotPayloadParseState::flushQueuedKeys() { auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous if (current_database < cserver.dbnum) { g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable { - g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); + g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size()); --insertsInFlightTmp; delete pallocator; }); diff --git a/src/db.cpp b/src/db.cpp index b959f178e..0c6dc5e8a 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -3002,8 +3002,16 @@ void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) }); } -void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem) +/* This function is to bulk insert directly to storage provider bypassing in memory, assumes rgKeys and rgVals are not sds strings */ +void redisDbPersistentData::bulkDirectStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem) { + if (g_pserver->cluster_enabled) { + aeAcquireLock(); + for (size_t i = 0; i < celem; i++) { + slotToKeyUpdateKeyCore(rgKeys[i], rgcbKeys[i], 1); + } + aeReleaseLock(); + } m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem); } diff --git a/src/server.h b/src/server.h index ae3611fc2..eb54f608c 100644 --- a/src/server.h +++ b/src/server.h @@ -1209,7 +1209,7 @@ public: bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; } std::unique_ptr CloneStorageCache() { return std::unique_ptr(m_spstorage->clone()); } - void bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem); + void bulkDirectStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem); dict_iter find_cached_threadsafe(const char *key) const; @@ -1370,7 +1370,7 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::FRehashing; using redisDbPersistentData::FTrackingChanges; using redisDbPersistentData::CloneStorageCache; - using redisDbPersistentData::bulkStorageInsert; + using redisDbPersistentData::bulkDirectStorageInsert; public: const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {