From ceb98b062c5c1cc6e5e02e5a868d9bef39b248ad Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 1 Jun 2021 19:15:01 +0000 Subject: [PATCH] Initial bulkInsert implementation Former-commit-id: 2a721d4dff3c9f78ec4ba1c7b490ea1f5d216a89 --- src/IStorage.h | 9 +++++++++ src/StorageCache.cpp | 16 +++++++++++++++- src/StorageCache.h | 4 +++- 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/src/IStorage.h b/src/IStorage.h index 69611de13..d5808c4c1 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -1,5 +1,6 @@ #pragma once #include +#include "sds.h" class IStorageFactory { @@ -28,6 +29,14 @@ public: virtual bool enumerate(callback fn) const = 0; virtual size_t count() const = 0; + virtual void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) { + beginWriteBatch(); + for (size_t ielem = 0; ielem < celem; ++ielem) { + insert(rgkeys[ielem], sdslen(rgkeys[ielem]), rgvals[ielem], sdslen(rgvals[ielem]), false); + } + endWriteBatch(); + } + virtual void beginWriteBatch() {} // NOP virtual void endWriteBatch() {} // NOP diff --git a/src/StorageCache.cpp b/src/StorageCache.cpp index 7f86b595e..29908c7f5 100644 --- a/src/StorageCache.cpp +++ b/src/StorageCache.cpp @@ -91,6 +91,20 @@ void StorageCache::insert(sds key, const void *data, size_t cbdata, bool fOverwr m_spstorage->insert(key, sdslen(key), (void*)data, cbdata, fOverwrite); } +void StorageCache::bulkInsert(sds *rgkeys, sds *rgvals, size_t celem) +{ + std::unique_lock ul(m_lock); + bulkInsertsInProgress++; + if (m_pdict != nullptr) { + for (size_t ielem = 0; ielem < celem; ++ielem) { + cacheKey(rgkeys[ielem]); + } + } + ul.unlock(); + m_spstorage->bulkInsert(rgkeys, rgvals, celem); + bulkInsertsInProgress--; +} + const StorageCache *StorageCache::clone() { std::unique_lock ul(m_lock); @@ -119,7 +133,7 @@ size_t StorageCache::count() const std::unique_lock ul(m_lock); size_t count = m_spstorage->count(); if (m_pdict != nullptr) { - serverAssert(count == (dictSize(m_pdict) + m_collisionCount)); + serverAssert(bulkInsertsInProgress.load(std::memory_order_seq_cst) || count == (dictSize(m_pdict) + m_collisionCount)); } return count; } diff --git a/src/StorageCache.h b/src/StorageCache.h index c2170b7d0..2536cf2ec 100644 --- a/src/StorageCache.h +++ b/src/StorageCache.h @@ -7,6 +7,7 @@ class StorageCache dict *m_pdict = nullptr; int m_collisionCount = 0; mutable fastlock m_lock {"StorageCache"}; + std::atomic bulkInsertsInProgress; StorageCache(IStorage *storage, bool fNoCache); @@ -37,13 +38,14 @@ public: void clear(); void insert(sds key, const void *data, size_t cbdata, bool fOverwrite); + void bulkInsert(sds *rgkeys, sds *rgvals, size_t celem); void retrieve(sds key, IStorage::callbackSingle fn) const; bool erase(sds key); bool enumerate(IStorage::callback fn) const { return m_spstorage->enumerate(fn); } void beginWriteBatch(); - void endWriteBatch() { m_spstorage->endWriteBatch(); } + void endWriteBatch() { m_spstorage->endWriteBatch(); m_lock.unlock(); } void batch_lock() { return m_spstorage->batch_lock(); } void batch_unlock() { return m_spstorage->batch_unlock(); }