From d2a29bf3434ea4847c6b94ec5097bea83e3beb5a Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 4 Jun 2020 22:18:03 -0400 Subject: [PATCH] Fix race when we free an item before we finish writing to disk making it temporarily unavailable Former-commit-id: 6139cfee295261d0210f2af6217c2a23eeaf52b8 --- src/IStorage.h | 3 +++ src/db.cpp | 7 +++++++ src/storage/rocksdb.cpp | 24 ++++++++++++++++++------ src/storage/rocksdb.h | 3 +++ 4 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/IStorage.h b/src/IStorage.h index 3ac6d9c59..16526e80d 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -29,6 +29,9 @@ public: virtual void beginWriteBatch() {} // NOP virtual void endWriteBatch() {} // NOP + virtual void batch_lock() {} // NOP + virtual void batch_unlock() {} // NOP + virtual void flush() = 0; /* This is permitted to be a shallow clone */ diff --git a/src/db.cpp b/src/db.cpp index b866948fa..4c7867bd7 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2514,12 +2514,19 @@ bool redisDbPersistentData::removeCachedValue(const char *key) { serverAssert(m_spstorage != nullptr); // First ensure its not a pending key + if (m_spstorage != nullptr) + m_spstorage->batch_lock(); + auto itr = m_setchanged.find(key); if (itr != m_setchanged.end()) return false; // can't evict // since we write ASAP the database already has a valid copy so safe to delete dictDelete(m_pdict, key); + + if (m_spstorage != nullptr) + m_spstorage->batch_unlock(); + return true; } diff --git a/src/storage/rocksdb.cpp b/src/storage/rocksdb.cpp index 647340370..0d3bbc4f3 100644 --- a/src/storage/rocksdb.cpp +++ b/src/storage/rocksdb.cpp @@ -58,10 +58,10 @@ bool RocksDBStorageProvider::erase(const char *key, size_t cchKey) void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callbackSingle fn) const { - std::string value; - auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &value); + rocksdb::PinnableSlice slice; + auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &slice); if (status.ok()) - fn(key, cchKey, value.data(), value.size()); + fn(key, cchKey, slice.data(), slice.size()); } size_t RocksDBStorageProvider::clear() @@ -99,7 +99,9 @@ bool RocksDBStorageProvider::enumerate(callback fn) const } if (!it->Valid() && count != m_count) { - const_cast(this)->m_count = count; // BUG!!! but be resilient + if (const_cast(this)->m_count != count) + printf("WARNING: rocksdb count mismatch"); + const_cast(this)->m_count = count; } assert(it->status().ok()); // Check for any errors found during the scan return !it->Valid(); @@ -148,6 +150,16 @@ void RocksDBStorageProvider::endWriteBatch() m_lock.unlock(); } +void RocksDBStorageProvider::batch_lock() +{ + m_lock.lock(); +} + +void RocksDBStorageProvider::batch_unlock() +{ + m_lock.unlock(); +} + void RocksDBStorageProvider::flush() { m_spdb->SyncWAL(); @@ -155,6 +167,6 @@ void RocksDBStorageProvider::flush() bool RocksDBStorageProvider::FKeyExists(const char *key, size_t cch) const { - std::string strT; - return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cch), &strT).ok(); + rocksdb::PinnableSlice slice; + return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cch), &slice).ok(); } \ No newline at end of file diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 5f9261288..8e03a3471 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -34,6 +34,9 @@ public: virtual void beginWriteBatch() override; virtual void endWriteBatch() override; + virtual void batch_lock() override; + virtual void batch_unlock() override; + virtual void flush() override; size_t count() const;