Fix race when we free an item before we finish writing to disk making it temporarily unavailable
Former-commit-id: 6139cfee295261d0210f2af6217c2a23eeaf52b8
This commit is contained in:
parent
9851fde2fa
commit
d2a29bf343
@ -29,6 +29,9 @@ public:
|
|||||||
virtual void beginWriteBatch() {} // NOP
|
virtual void beginWriteBatch() {} // NOP
|
||||||
virtual void endWriteBatch() {} // NOP
|
virtual void endWriteBatch() {} // NOP
|
||||||
|
|
||||||
|
virtual void batch_lock() {} // NOP
|
||||||
|
virtual void batch_unlock() {} // NOP
|
||||||
|
|
||||||
virtual void flush() = 0;
|
virtual void flush() = 0;
|
||||||
|
|
||||||
/* This is permitted to be a shallow clone */
|
/* This is permitted to be a shallow clone */
|
||||||
|
@ -2514,12 +2514,19 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
|
|||||||
{
|
{
|
||||||
serverAssert(m_spstorage != nullptr);
|
serverAssert(m_spstorage != nullptr);
|
||||||
// First ensure its not a pending key
|
// First ensure its not a pending key
|
||||||
|
if (m_spstorage != nullptr)
|
||||||
|
m_spstorage->batch_lock();
|
||||||
|
|
||||||
auto itr = m_setchanged.find(key);
|
auto itr = m_setchanged.find(key);
|
||||||
if (itr != m_setchanged.end())
|
if (itr != m_setchanged.end())
|
||||||
return false; // can't evict
|
return false; // can't evict
|
||||||
|
|
||||||
// since we write ASAP the database already has a valid copy so safe to delete
|
// since we write ASAP the database already has a valid copy so safe to delete
|
||||||
dictDelete(m_pdict, key);
|
dictDelete(m_pdict, key);
|
||||||
|
|
||||||
|
if (m_spstorage != nullptr)
|
||||||
|
m_spstorage->batch_unlock();
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -58,10 +58,10 @@ bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
|
|||||||
|
|
||||||
void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callbackSingle fn) const
|
void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callbackSingle fn) const
|
||||||
{
|
{
|
||||||
std::string value;
|
rocksdb::PinnableSlice slice;
|
||||||
auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &value);
|
auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &slice);
|
||||||
if (status.ok())
|
if (status.ok())
|
||||||
fn(key, cchKey, value.data(), value.size());
|
fn(key, cchKey, slice.data(), slice.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t RocksDBStorageProvider::clear()
|
size_t RocksDBStorageProvider::clear()
|
||||||
@ -99,7 +99,9 @@ bool RocksDBStorageProvider::enumerate(callback fn) const
|
|||||||
}
|
}
|
||||||
if (!it->Valid() && count != m_count)
|
if (!it->Valid() && count != m_count)
|
||||||
{
|
{
|
||||||
const_cast<RocksDBStorageProvider*>(this)->m_count = count; // BUG!!! but be resilient
|
if (const_cast<RocksDBStorageProvider*>(this)->m_count != count)
|
||||||
|
printf("WARNING: rocksdb count mismatch");
|
||||||
|
const_cast<RocksDBStorageProvider*>(this)->m_count = count;
|
||||||
}
|
}
|
||||||
assert(it->status().ok()); // Check for any errors found during the scan
|
assert(it->status().ok()); // Check for any errors found during the scan
|
||||||
return !it->Valid();
|
return !it->Valid();
|
||||||
@ -148,6 +150,16 @@ void RocksDBStorageProvider::endWriteBatch()
|
|||||||
m_lock.unlock();
|
m_lock.unlock();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void RocksDBStorageProvider::batch_lock()
|
||||||
|
{
|
||||||
|
m_lock.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
void RocksDBStorageProvider::batch_unlock()
|
||||||
|
{
|
||||||
|
m_lock.unlock();
|
||||||
|
}
|
||||||
|
|
||||||
void RocksDBStorageProvider::flush()
|
void RocksDBStorageProvider::flush()
|
||||||
{
|
{
|
||||||
m_spdb->SyncWAL();
|
m_spdb->SyncWAL();
|
||||||
@ -155,6 +167,6 @@ void RocksDBStorageProvider::flush()
|
|||||||
|
|
||||||
bool RocksDBStorageProvider::FKeyExists(const char *key, size_t cch) const
|
bool RocksDBStorageProvider::FKeyExists(const char *key, size_t cch) const
|
||||||
{
|
{
|
||||||
std::string strT;
|
rocksdb::PinnableSlice slice;
|
||||||
return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cch), &strT).ok();
|
return m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cch), &slice).ok();
|
||||||
}
|
}
|
@ -34,6 +34,9 @@ public:
|
|||||||
virtual void beginWriteBatch() override;
|
virtual void beginWriteBatch() override;
|
||||||
virtual void endWriteBatch() override;
|
virtual void endWriteBatch() override;
|
||||||
|
|
||||||
|
virtual void batch_lock() override;
|
||||||
|
virtual void batch_unlock() override;
|
||||||
|
|
||||||
virtual void flush() override;
|
virtual void flush() override;
|
||||||
|
|
||||||
size_t count() const;
|
size_t count() const;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user