Merge branch 'keydbpro_collab' into multithread_load
Former-commit-id: 31ddf772cd89344017042a18547ae83a60ce02b9
This commit is contained in:
commit
fa9f1b4f95
@ -1,5 +1,6 @@
|
||||
#pragma once
|
||||
#include <functional>
|
||||
#include "sds.h"
|
||||
|
||||
class IStorageFactory
|
||||
{
|
||||
@ -28,6 +29,14 @@ public:
|
||||
virtual bool enumerate(callback fn) const = 0;
|
||||
virtual size_t count() const = 0;
|
||||
|
||||
virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) {
|
||||
beginWriteBatch();
|
||||
for (size_t ielem = 0; ielem < celem; ++ielem) {
|
||||
insert(rgkeys[ielem], sdslen(rgkeys[ielem]), rgvals[ielem], sdslen(rgvals[ielem]), false);
|
||||
}
|
||||
endWriteBatch();
|
||||
}
|
||||
|
||||
virtual void beginWriteBatch() {} // NOP
|
||||
virtual void endWriteBatch() {} // NOP
|
||||
|
||||
|
@ -91,6 +91,20 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr
|
||||
m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite);
|
||||
}
|
||||
|
||||
void StorageCache::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem)
|
||||
{
|
||||
std::unique_lock<fastlock> ul(m_lock);
|
||||
bulkInsertsInProgress++;
|
||||
if (m_pdict != nullptr) {
|
||||
for (size_t ielem = 0; ielem < celem; ++ielem) {
|
||||
cacheKey(rgkeys[ielem]);
|
||||
}
|
||||
}
|
||||
ul.unlock();
|
||||
m_spstorage->bulkInsert(rgkeys, rgvals, celem);
|
||||
bulkInsertsInProgress--;
|
||||
}
|
||||
|
||||
const StorageCache *StorageCache::clone()
|
||||
{
|
||||
std::unique_lock<fastlock> ul(m_lock);
|
||||
@ -119,7 +133,7 @@ size_t StorageCache::count() const
|
||||
std::unique_lock<fastlock> ul(m_lock);
|
||||
size_t count = m_spstorage->count();
|
||||
if (m_pdict != nullptr) {
|
||||
serverAssert(count == (dictSize(m_pdict) + m_collisionCount));
|
||||
serverAssert(bulkInsertsInProgress.load(std::memory_order_seq_cst) || count == (dictSize(m_pdict) + m_collisionCount));
|
||||
}
|
||||
return count;
|
||||
}
|
||||
|
@ -7,6 +7,7 @@ class StorageCache
|
||||
dict *m_pdict = nullptr;
|
||||
int m_collisionCount = 0;
|
||||
mutable fastlock m_lock {"StorageCache"};
|
||||
std::atomic<int> bulkInsertsInProgress;
|
||||
|
||||
StorageCache(IStorage *storage, bool fNoCache);
|
||||
|
||||
@ -37,13 +38,14 @@ public:
|
||||
|
||||
void clear();
|
||||
void insert(sds key, const void *data, size_t cbdata, bool fOverwrite);
|
||||
void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem);
|
||||
void retrieve(sds key, IStorage::callbackSingle fn) const;
|
||||
bool erase(sds key);
|
||||
|
||||
bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); }
|
||||
|
||||
void beginWriteBatch();
|
||||
void endWriteBatch() { m_spstorage->endWriteBatch(); }
|
||||
void endWriteBatch() { m_spstorage->endWriteBatch(); m_lock.unlock(); }
|
||||
void batch_lock() { return m_spstorage->batch_lock(); }
|
||||
void batch_unlock() { return m_spstorage->batch_unlock(); }
|
||||
|
||||
|
@ -38,6 +38,17 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data,
|
||||
++m_count;
|
||||
}
|
||||
|
||||
void RocksDBStorageProvider::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem)
|
||||
{
|
||||
auto spbatch = std::make_unique<rocksdb::WriteBatch>();
|
||||
for (size_t ielem = 0; ielem < celem; ++ielem) {
|
||||
spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(rgkeys[ielem], sdslen(rgkeys[ielem])), rocksdb::Slice(rgvals[ielem], sdslen(rgvals[ielem])));
|
||||
}
|
||||
m_spdb->Write(WriteOptions(), spbatch.get());
|
||||
std::unique_lock<fastlock> l(m_lock);
|
||||
m_count += celem;
|
||||
}
|
||||
|
||||
bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
|
||||
{
|
||||
rocksdb::Status status;
|
||||
|
@ -34,6 +34,8 @@ public:
|
||||
virtual void beginWriteBatch() override;
|
||||
virtual void endWriteBatch() override;
|
||||
|
||||
virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) override;
|
||||
|
||||
virtual void batch_lock() override;
|
||||
virtual void batch_unlock() override;
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user