Fix FLASH eviction issue, the algo relies on correct key count
Former-commit-id: dc260a7f44fe197b9b0022d044d559be88bf91be
This commit is contained in:
parent
3a3ff9318a
commit
b43dda66e7
@ -22,7 +22,7 @@ public:
|
||||
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const = 0;
|
||||
virtual size_t clear() = 0;
|
||||
virtual bool enumerate(callback fn) const = 0;
|
||||
virtual size_t count(bool fStrict) const = 0;
|
||||
virtual size_t count() const = 0;
|
||||
|
||||
virtual void beginWriteBatch() {} // NOP
|
||||
virtual void endWriteBatch() {} // NOP
|
||||
|
@ -2195,7 +2195,7 @@ dict_iter redisDbPersistentData::random()
|
||||
size_t redisDbPersistentData::size() const
|
||||
{
|
||||
if (m_spstorage != nullptr)
|
||||
return m_spstorage->count(false);
|
||||
return m_spstorage->count();
|
||||
|
||||
return dictSize(m_pdict)
|
||||
+ (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
|
||||
|
@ -492,14 +492,6 @@ int freeMemoryIfNeeded(void) {
|
||||
|
||||
mem_freed = 0;
|
||||
|
||||
size_t maxKeys = 0;
|
||||
size_t totalKeysFreed = 0;
|
||||
for (int i = 0; i < cserver.dbnum; ++i)
|
||||
maxKeys += g_pserver->db[i]->size();
|
||||
|
||||
// A random search is n log n, so if we run longer than this we know we're not going to terminate
|
||||
maxKeys = static_cast<size_t>(maxKeys * (log2(maxKeys)));
|
||||
|
||||
if (g_pserver->maxmemory_policy == MAXMEMORY_NO_EVICTION)
|
||||
goto cant_free; /* We need to free memory, but policy forbids. */
|
||||
|
||||
@ -640,7 +632,6 @@ int freeMemoryIfNeeded(void) {
|
||||
}
|
||||
|
||||
keys_freed++;
|
||||
totalKeysFreed++;
|
||||
|
||||
/* When the memory to free starts to be big enough, we may
|
||||
* start spending so much time here that is impossible to
|
||||
@ -663,16 +654,6 @@ int freeMemoryIfNeeded(void) {
|
||||
}
|
||||
}
|
||||
|
||||
// When using FLASH we're not actually evicting and we leave around the key
|
||||
// its possible we tried to evict the whole database and failed to free enough memory
|
||||
// quit if this happens
|
||||
if (totalKeysFreed >= maxKeys)
|
||||
{
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("eviction-cycle",latency);
|
||||
goto cant_free;
|
||||
}
|
||||
|
||||
if (keys_freed <= 0) {
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("eviction-cycle",latency);
|
||||
|
@ -2,8 +2,8 @@
|
||||
#include <string>
|
||||
#include <sstream>
|
||||
|
||||
RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot)
|
||||
: m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam)
|
||||
RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count)
|
||||
: m_spdb(spdb), m_psnapshot(psnapshot), m_spcolfamily(spcolfam), m_count(count)
|
||||
{
|
||||
m_readOptionsTemplate = rocksdb::ReadOptions();
|
||||
m_readOptionsTemplate.snapshot = m_psnapshot;
|
||||
@ -18,6 +18,7 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data,
|
||||
status = m_spdb->Put(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
|
||||
if (!status.ok())
|
||||
throw status.ToString();
|
||||
++m_count;
|
||||
}
|
||||
|
||||
bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
|
||||
@ -30,10 +31,12 @@ bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
|
||||
else
|
||||
{
|
||||
std::string strT;
|
||||
if (!m_spdb->KeyMayExist(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &strT))
|
||||
if (!m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &strT).ok())
|
||||
return false;
|
||||
status = m_spdb->Delete(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey));
|
||||
}
|
||||
if (status.ok())
|
||||
-- m_count;
|
||||
return status.ok();
|
||||
}
|
||||
|
||||
@ -47,7 +50,7 @@ void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callbackSi
|
||||
|
||||
size_t RocksDBStorageProvider::clear()
|
||||
{
|
||||
size_t celem = count(false);
|
||||
size_t celem = count();
|
||||
auto status = m_spdb->DropColumnFamily(m_spcolfamily.get());
|
||||
auto strName = m_spcolfamily->GetName();
|
||||
|
||||
@ -57,40 +60,29 @@ size_t RocksDBStorageProvider::clear()
|
||||
|
||||
if (!status.ok())
|
||||
throw status.ToString();
|
||||
m_count = 0;
|
||||
return celem;
|
||||
}
|
||||
|
||||
size_t RocksDBStorageProvider::count(bool fStrict) const
|
||||
size_t RocksDBStorageProvider::count() const
|
||||
{
|
||||
size_t count = 0;
|
||||
|
||||
if (fStrict)
|
||||
{
|
||||
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get()));
|
||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||
++count;
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
std::string strelem;
|
||||
if (!m_spdb->GetProperty(m_spcolfamily.get(), rocksdb::DB::Properties::kEstimateNumKeys, &strelem))
|
||||
throw "Failed to get database size";
|
||||
std::stringstream sstream(strelem);
|
||||
size_t count;
|
||||
sstream >> count;
|
||||
}
|
||||
return count;
|
||||
return m_count;
|
||||
}
|
||||
|
||||
bool RocksDBStorageProvider::enumerate(callback fn) const
|
||||
{
|
||||
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get()));
|
||||
size_t count = 0;
|
||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||
++count;
|
||||
bool fContinue = fn(it->key().data(), it->key().size(), it->value().data(), it->value().size());
|
||||
if (!fContinue)
|
||||
break;
|
||||
}
|
||||
if (!it->Valid() && count != m_count)
|
||||
{
|
||||
const_cast<RocksDBStorageProvider*>(this)->m_count = count; // BUG!!! but be resilient
|
||||
}
|
||||
assert(it->status().ok()); // Check for any errors found during the scan
|
||||
return !it->Valid();
|
||||
}
|
||||
@ -98,7 +90,7 @@ bool RocksDBStorageProvider::enumerate(callback fn) const
|
||||
const IStorage *RocksDBStorageProvider::clone() const
|
||||
{
|
||||
const rocksdb::Snapshot *psnapshot = const_cast<RocksDBStorageProvider*>(this)->m_spdb->GetSnapshot();
|
||||
return new RocksDBStorageProvider(const_cast<RocksDBStorageProvider*>(this)->m_spdb, const_cast<RocksDBStorageProvider*>(this)->m_spcolfamily, psnapshot);
|
||||
return new RocksDBStorageProvider(const_cast<RocksDBStorageProvider*>(this)->m_spdb, const_cast<RocksDBStorageProvider*>(this)->m_spcolfamily, psnapshot, m_count);
|
||||
}
|
||||
|
||||
RocksDBStorageProvider::~RocksDBStorageProvider()
|
||||
|
@ -11,9 +11,10 @@ class RocksDBStorageProvider : public IStorage
|
||||
const rocksdb::Snapshot *m_psnapshot = nullptr;
|
||||
std::shared_ptr<rocksdb::ColumnFamilyHandle> m_spcolfamily;
|
||||
rocksdb::ReadOptions m_readOptionsTemplate;
|
||||
size_t m_count = 0;
|
||||
|
||||
public:
|
||||
RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot);
|
||||
RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count);
|
||||
~RocksDBStorageProvider();
|
||||
|
||||
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override;
|
||||
@ -29,7 +30,7 @@ public:
|
||||
|
||||
virtual void flush() override;
|
||||
|
||||
size_t count(bool fStrict) const;
|
||||
size_t count() const;
|
||||
|
||||
protected:
|
||||
|
||||
|
@ -60,7 +60,12 @@ IStorage *RocksDBStorageFactory::create(int db)
|
||||
{
|
||||
++db; // skip default col family
|
||||
std::shared_ptr<rocksdb::ColumnFamilyHandle> spcolfamily(m_vecspcols[db].release());
|
||||
return new RocksDBStorageProvider(m_spdb, spcolfamily, nullptr);
|
||||
size_t count = 0;
|
||||
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(rocksdb::ReadOptions(), spcolfamily.get()));
|
||||
for (it->SeekToFirst(); it->Valid(); it->Next()) {
|
||||
++count;
|
||||
}
|
||||
return new RocksDBStorageProvider(m_spdb, spcolfamily, nullptr, count);
|
||||
}
|
||||
|
||||
const char *RocksDBStorageFactory::name() const
|
||||
|
Loading…
x
Reference in New Issue
Block a user