Initial bulkInsert implementation

Former-commit-id: 2a721d4dff3c9f78ec4ba1c7b490ea1f5d216a89
This commit is contained in:
John Sully 2021-06-01 19:15:01 +00:00
parent 4c88843720
commit ceb98b062c
3 changed files with 27 additions and 2 deletions

View File

@ -1,5 +1,6 @@
#pragma once #pragma once
#include <functional> #include <functional>
#include "sds.h"
class IStorageFactory class IStorageFactory
{ {
@ -28,6 +29,14 @@ public:
virtual bool enumerate(callback fn) const = 0; virtual bool enumerate(callback fn) const = 0;
virtual size_t count() const = 0; virtual size_t count() const = 0;
virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) {
beginWriteBatch();
for (size_t ielem = 0; ielem < celem; ++ielem) {
insert(rgkeys[ielem], sdslen(rgkeys[ielem]), rgvals[ielem], sdslen(rgvals[ielem]), false);
}
endWriteBatch();
}
virtual void beginWriteBatch() {} // NOP virtual void beginWriteBatch() {} // NOP
virtual void endWriteBatch() {} // NOP virtual void endWriteBatch() {} // NOP

View File

@ -91,6 +91,20 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr
m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite);
} }
void StorageCache::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem)
{
std::unique_lock<fastlock> ul(m_lock);
bulkInsertsInProgress++;
if (m_pdict != nullptr) {
for (size_t ielem = 0; ielem < celem; ++ielem) {
cacheKey(rgkeys[ielem]);
}
}
ul.unlock();
m_spstorage->bulkInsert(rgkeys, rgvals, celem);
bulkInsertsInProgress--;
}
const StorageCache *StorageCache::clone() const StorageCache *StorageCache::clone()
{ {
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
@ -119,7 +133,7 @@ size_t StorageCache::count() const
std::unique_lock<fastlock> ul(m_lock); std::unique_lock<fastlock> ul(m_lock);
size_t count = m_spstorage->count(); size_t count = m_spstorage->count();
if (m_pdict != nullptr) { if (m_pdict != nullptr) {
serverAssert(count == (dictSize(m_pdict) + m_collisionCount)); serverAssert(bulkInsertsInProgress.load(std::memory_order_seq_cst) || count == (dictSize(m_pdict) + m_collisionCount));
} }
return count; return count;
} }

View File

@ -7,6 +7,7 @@ class StorageCache
dict *m_pdict = nullptr; dict *m_pdict = nullptr;
int m_collisionCount = 0; int m_collisionCount = 0;
mutable fastlock m_lock {"StorageCache"}; mutable fastlock m_lock {"StorageCache"};
std::atomic<int> bulkInsertsInProgress;
StorageCache(IStorage *storage, bool fNoCache); StorageCache(IStorage *storage, bool fNoCache);
@ -37,13 +38,14 @@ public:
void clear(); void clear();
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem);
void retrieve(sds key, IStorage::callbackSingle fn) const; void retrieve(sds key, IStorage::callbackSingle fn) const;
bool erase(sds key); bool erase(sds key);
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }
void beginWriteBatch(); void beginWriteBatch();
void endWriteBatch() { m_spstorage->endWriteBatch(); } void endWriteBatch() { m_spstorage->endWriteBatch(); m_lock.unlock(); }
void batch_lock() { return m_spstorage->batch_lock(); } void batch_lock() { return m_spstorage->batch_lock(); }
void batch_unlock() { return m_spstorage->batch_unlock(); } void batch_unlock() { return m_spstorage->batch_unlock(); }