update slotToKey during fast load (#589)
This commit is contained in:
parent
c2077a2fbd
commit
b398021b2c
@ -140,7 +140,7 @@ void SnapshotPayloadParseState::flushQueuedKeys() {
|
|||||||
auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
|
auto &insertsInFlightTmp = insertsInFlight; // C++ GRRRRRRRRRRRRRRRR, we don't want to capute "this" because that's dangerous
|
||||||
if (current_database < cserver.dbnum) {
|
if (current_database < cserver.dbnum) {
|
||||||
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
|
g_pserver->asyncworkqueue->AddWorkFunction([idb, vecqueuedKeys = std::move(this->vecqueuedKeys), vecqueuedKeysCb = std::move(this->vecqueuedKeysCb), vecqueuedVals = std::move(this->vecqueuedVals), vecqueuedValsCb = std::move(this->vecqueuedValsCb), &insertsInFlightTmp, pallocator = m_spallocator.release()]() mutable {
|
||||||
g_pserver->db[idb]->bulkStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
|
g_pserver->db[idb]->bulkDirectStorageInsert(vecqueuedKeys.data(), vecqueuedKeysCb.data(), vecqueuedVals.data(), vecqueuedValsCb.data(), vecqueuedKeys.size());
|
||||||
--insertsInFlightTmp;
|
--insertsInFlightTmp;
|
||||||
delete pallocator;
|
delete pallocator;
|
||||||
});
|
});
|
||||||
|
10
src/db.cpp
10
src/db.cpp
@ -3002,8 +3002,16 @@ void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void redisDbPersistentData::bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem)
|
/* This function is to bulk insert directly to storage provider bypassing in memory, assumes rgKeys and rgVals are not sds strings */
|
||||||
|
void redisDbPersistentData::bulkDirectStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem)
|
||||||
{
|
{
|
||||||
|
if (g_pserver->cluster_enabled) {
|
||||||
|
aeAcquireLock();
|
||||||
|
for (size_t i = 0; i < celem; i++) {
|
||||||
|
slotToKeyUpdateKeyCore(rgKeys[i], rgcbKeys[i], 1);
|
||||||
|
}
|
||||||
|
aeReleaseLock();
|
||||||
|
}
|
||||||
m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem);
|
m_spstorage->bulkInsert(rgKeys, rgcbKeys, rgVals, rgcbVals, celem);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1209,7 +1209,7 @@ public:
|
|||||||
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
|
bool FSnapshot() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||||
|
|
||||||
std::unique_ptr<const StorageCache> CloneStorageCache() { return std::unique_ptr<const StorageCache>(m_spstorage->clone()); }
|
std::unique_ptr<const StorageCache> CloneStorageCache() { return std::unique_ptr<const StorageCache>(m_spstorage->clone()); }
|
||||||
void bulkStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem);
|
void bulkDirectStorageInsert(char **rgKeys, size_t *rgcbKeys, char **rgVals, size_t *rgcbVals, size_t celem);
|
||||||
|
|
||||||
dict_iter find_cached_threadsafe(const char *key) const;
|
dict_iter find_cached_threadsafe(const char *key) const;
|
||||||
|
|
||||||
@ -1370,7 +1370,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
|||||||
using redisDbPersistentData::FRehashing;
|
using redisDbPersistentData::FRehashing;
|
||||||
using redisDbPersistentData::FTrackingChanges;
|
using redisDbPersistentData::FTrackingChanges;
|
||||||
using redisDbPersistentData::CloneStorageCache;
|
using redisDbPersistentData::CloneStorageCache;
|
||||||
using redisDbPersistentData::bulkStorageInsert;
|
using redisDbPersistentData::bulkDirectStorageInsert;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {
|
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user