Properly handle case where we delete a key that doesn't exist while a write batch is active (#550)

This commit is contained in:
Malavan Sotheeswaran 2023-01-23 16:21:27 -05:00 committed by GitHub
parent c3343ddbd7
commit d7c34a8ced
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 8 additions and 5 deletions

View File

@ -100,14 +100,14 @@ bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
{ {
rocksdb::Status status; rocksdb::Status status;
std::unique_lock<fastlock> l(m_lock); std::unique_lock<fastlock> l(m_lock);
if (!FKeyExists(key, cchKey))
return false;
if (m_spbatch != nullptr) if (m_spbatch != nullptr)
{ {
status = m_spbatch->Delete(m_spcolfamily.get(), rocksdb::Slice(key, cchKey)); status = m_spbatch->Delete(m_spcolfamily.get(), rocksdb::Slice(key, cchKey));
} }
else else
{ {
if (!FKeyExists(key, cchKey))
return false;
status = m_spdb->Delete(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey)); status = m_spdb->Delete(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey));
} }
if (status.ok()) if (status.ok())
@ -202,12 +202,12 @@ rocksdb::WriteOptions RocksDBStorageProvider::WriteOptions() const
void RocksDBStorageProvider::beginWriteBatch() void RocksDBStorageProvider::beginWriteBatch()
{ {
m_lock.lock(); m_lock.lock();
m_spbatch = std::make_unique<rocksdb::WriteBatch>(); m_spbatch = std::make_unique<rocksdb::WriteBatchWithIndex>();
} }
void RocksDBStorageProvider::endWriteBatch() void RocksDBStorageProvider::endWriteBatch()
{ {
m_spdb->Write(WriteOptions(), m_spbatch.get()); m_spdb->Write(WriteOptions(), m_spbatch.get()->GetWriteBatch());
m_spbatch = nullptr; m_spbatch = nullptr;
m_lock.unlock(); m_lock.unlock();
} }
@ -230,5 +230,7 @@ void RocksDBStorageProvider::flush()
bool RocksDBStorageProvider::FKeyExists(const char *key, size_t cch) const bool RocksDBStorageProvider::FKeyExists(const char *key, size_t cch) const
{ {
rocksdb::PinnableSlice slice; rocksdb::PinnableSlice slice;
if (m_spbatch)
return m_spbatch->GetFromBatchAndDB(m_spdb.get(), ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cch), &slice).ok();
return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cch), &slice).ok(); return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cch), &slice).ok();
} }

View File

@ -3,6 +3,7 @@
#include <memory> #include <memory>
#include "../IStorage.h" #include "../IStorage.h"
#include <rocksdb/db.h> #include <rocksdb/db.h>
#include <rocksdb/utilities/write_batch_with_index.h>
#include "../fastlock.h" #include "../fastlock.h"
#define INTERNAL_KEY_PREFIX "\x00\x04\x03\x00\x05\x02\x04" #define INTERNAL_KEY_PREFIX "\x00\x04\x03\x00\x05\x02\x04"
@ -15,7 +16,7 @@ class RocksDBStorageProvider : public IStorage
{ {
RocksDBStorageFactory *m_pfactory; RocksDBStorageFactory *m_pfactory;
std::shared_ptr<rocksdb::DB> m_spdb; // Note: This must be first so it is deleted last std::shared_ptr<rocksdb::DB> m_spdb; // Note: This must be first so it is deleted last
std::unique_ptr<rocksdb::WriteBatch> m_spbatch; std::unique_ptr<rocksdb::WriteBatchWithIndex> m_spbatch;
const rocksdb::Snapshot *m_psnapshot = nullptr; const rocksdb::Snapshot *m_psnapshot = nullptr;
std::shared_ptr<rocksdb::ColumnFamilyHandle> m_spcolfamily; std::shared_ptr<rocksdb::ColumnFamilyHandle> m_spcolfamily;
rocksdb::ReadOptions m_readOptionsTemplate; rocksdb::ReadOptions m_readOptionsTemplate;