Evicted flash no longer keeps the key around

Former-commit-id: 908b303d1a8dd032c092f0bc035361a09c0291a5
This commit is contained in:
John Sully 2019-12-20 17:45:07 -05:00
parent ac8450b332
commit 6a77ceaa4a
8 changed files with 101 additions and 83 deletions

View File

@ -12,15 +12,17 @@ public:
class IStorage
{
public:
typedef std::function<void(const char *, size_t, const void *, size_t)> callback;
typedef std::function<bool(const char *, size_t, const void *, size_t)> callback;
typedef std::function<void(const char *, size_t, const void *, size_t)> callbackSingle;
virtual ~IStorage();
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0;
virtual void erase(const char *key, size_t cchKey) = 0;
virtual void retrieve(const char *key, size_t cchKey, callback fn) const = 0;
virtual bool erase(const char *key, size_t cchKey) = 0;
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const = 0;
virtual size_t clear() = 0;
virtual void enumerate(callback fn) const = 0;
virtual bool enumerate(callback fn) const = 0;
virtual size_t count() const = 0;
virtual void beginWriteBatch() {} // NOP
virtual void endWriteBatch() {} // NOP

View File

@ -100,7 +100,7 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) {
}
static robj_roptr lookupKeyConst(redisDb *db, robj *key, int flags) {
serverAssert((flags & LOOKUP_UPDATEMVCC) == 0);
robj_roptr val = db->find_threadsafe(szFromObj(key));
robj_roptr val = db->find(szFromObj(key));
if (val != nullptr) {
lookupKeyUpdateObj(val.unsafe_robjcast(), flags);
return val;
@ -315,7 +315,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
*
* All the new keys in the database should be created via this interface. */
void setKey(redisDb *db, robj *key, robj *val) {
auto itr = db->find(key);
auto itr = db->find_cached_threadsafe(szFromObj(key));
if (itr == NULL) {
dbAdd(db,key,val);
} else {
@ -380,13 +380,18 @@ bool redisDbPersistentData::syncDelete(robj *key)
* the key, because it is shared with the main dictionary. */
auto itr = find(szFromObj(key));
trackkey(szFromObj(key));
if (itr != nullptr && itr.val()->FExpires())
removeExpire(key, itr);
if (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) {
bool fDeleted = false;
if (m_spstorage != nullptr)
fDeleted = m_spstorage->erase(szFromObj(key), sdslen(szFromObj(key)));
fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted;
if (fDeleted) {
if (m_pdbSnapshot != nullptr)
{
auto itr = m_pdbSnapshot->find_threadsafe(szFromObj(key));
auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key));
if (itr != nullptr)
{
dictAdd(m_pdictTombstone, sdsdup(szFromObj(key)), nullptr);
@ -649,27 +654,32 @@ void randomkeyCommand(client *c) {
decrRefCount(key);
}
static bool FEvictedDE(dictEntry *de)
{
return (de != nullptr) && dictGetVal(de) == nullptr;
}
bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
{
if (m_spstorage != nullptr)
{
bool fSawAll = m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *, size_t )->bool{
sds sdsKey = sdsnewlen(key, cchKey);
dictEntry *de = dictFind(m_pdict, sdsKey);
bool fEvict = (de == nullptr);
ensure(sdsKey, &de);
bool fContinue = fn((const char*)dictGetKey(de), (robj*)dictGetVal(de));
if (fEvict)
removeCachedValue(sdsKey);
sdsfree(sdsKey);
return fContinue;
});
return fSawAll;
}
dictIterator *di = dictGetSafeIterator(m_pdict);
dictEntry *de = nullptr;
bool fResult = true;
while(fResult && ((de = dictNext(di)) != nullptr))
{
bool fEvicted = FEvictedDE(de);
ensure((const char*)dictGetKey(de), &de);
if (!fn((const char*)dictGetKey(de), (robj*)dictGetVal(de)))
fResult = false;
// re-evict the key so we don't OOM
if (fEvicted)
removeCachedValue((const char*)dictGetKey(de));
}
dictReleaseIterator(di);
@ -686,13 +696,9 @@ bool redisDbPersistentData::iterate(std::function<bool(const char*, robj*)> fn)
return true;
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
bool fEvicted = FEvictedDE(de);
ensure(key);
deCurrent = dictFind(m_pdict, key);
bool fResult = fn(key, (robj*)dictGetVal(deCurrent));
if (fEvicted)
removeCachedValue(key);
return fResult;
return fn(key, (robj*)dictGetVal(deCurrent));
}, true /*fKeyOnly*/);
}
@ -1419,13 +1425,9 @@ expireEntry *redisDbPersistentDataSnapshot::getExpire(const char *key) {
if (expireSize() == 0)
return nullptr;
auto itr = find_threadsafe(key);
if (itr == nullptr)
auto itrExpire = m_setexpire->find(key);
if (itrExpire == m_setexpire->end())
return nullptr;
if (!itr.val()->FExpires())
return nullptr;
auto itrExpire = findExpire(itr.key());
return itrExpire.operator->();
}
@ -1933,7 +1935,7 @@ bool redisDbPersistentData::insert(char *key, robj *o)
int res = dictAdd(m_pdict, key, o);
if (res == DICT_OK)
{
if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_threadsafe(key) != nullptr)
if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(key) != nullptr)
{
serverAssert(dictFind(m_pdictTombstone, key) != nullptr);
}
@ -2036,7 +2038,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
dictEntry *deTombstone = dictFind(m_pdictTombstone, sdsKey);
if (deTombstone == nullptr)
{
auto itr = m_pdbSnapshot->find_threadsafe(sdsKey);
auto itr = m_pdbSnapshot->find_cached_threadsafe(sdsKey);
if (itr == m_pdbSnapshot->end())
return; // not found
@ -2128,16 +2130,12 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges()
{
for (unique_sds_ptr &key : m_vecchanged)
{
robj *o = find(key.get());
if (o != nullptr)
{
sds temp = serializeStoredObject(o);
vecRet.emplace_back(std::move(key), unique_sds_ptr(temp));
}
else
{
m_spstorage->erase(key.get(), sdslen(key.get()));
}
dictEntry *de = dictFind(m_pdict, key.get());
if (de == nullptr)
continue;
robj *o = (robj*)dictGetVal(de);
sds temp = serializeStoredObject(o);
vecRet.emplace_back(std::move(key), unique_sds_ptr(temp));
}
}
m_vecchanged.clear();
@ -2182,7 +2180,7 @@ dict_iter redisDbPersistentData::random()
double randval = (double)rand()/RAND_MAX;
if (randval <= pctInSnapshot)
{
iter = m_pdbSnapshot->random_threadsafe();
iter = m_pdbSnapshot->random_cache_threadsafe(); // BUG: RANDOM doesn't consider keys not in RAM
ensure(iter.key());
dictEntry *de = dictFind(m_pdict, iter.key());
return dict_iter(de);
@ -2196,7 +2194,11 @@ dict_iter redisDbPersistentData::random()
size_t redisDbPersistentData::size() const
{
return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
if (m_spstorage != nullptr)
return m_spstorage->count();
return dictSize(m_pdict)
+ (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
}
bool redisDbPersistentData::removeCachedValue(const char *key)

View File

@ -248,7 +248,7 @@ struct visitFunctor
bool operator()(const expireEntry &e)
{
dictEntry *de = dictFind(dbdict, e.key());
if (dictGetVal(de) != nullptr)
if (de != nullptr)
{
processEvictionCandidate(dbid, (sds)dictGetKey(de), (robj*)dictGetVal(de), &e, pool);
++count;
@ -580,7 +580,7 @@ int freeMemoryIfNeeded(void) {
if (g_pserver->maxmemory_policy == MAXMEMORY_ALLKEYS_RANDOM)
{
if (db->size() != 0) {
auto itr = db->random_threadsafe();
auto itr = db->random_cache_threadsafe();
bestkey = itr.key();
bestdbid = j;
break;

View File

@ -55,6 +55,10 @@ bool redisDbPersistentData::asyncDelete(robj *key) {
/* If the value is composed of a few allocations, to free in a lazy way
* is actually just slower... So under a certain limit we just free
* the object synchronously. */
if (m_spstorage != nullptr)
return syncDelete(key); // async delte never makes sense with a storage provider
dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key));
if (m_pdbSnapshot != nullptr)
dictAdd(m_pdictTombstone, sdsdup((sds)dictGetKey(de)), nullptr);

View File

@ -1271,7 +1271,7 @@ public:
void trackkey(const char *key)
{
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage)
m_vecchanged.push_back(unique_sds_ptr(sdsdup(key)));
m_vecchanged.push_back(unique_sds_ptr(sdsdupshared(key)));
}
dict_iter find(const char *key)
@ -1293,11 +1293,6 @@ public:
return m_setexpire->random_value();
}
auto findExpire(const char *key)
{
return m_setexpire->find(key);
}
dict_iter end() { return dict_iter(nullptr); }
dict_const_iter end() const { return dict_const_iter(nullptr); }
@ -1393,8 +1388,8 @@ public:
using redisDbPersistentData::endSnapshot;
using redisDbPersistentData::end;
dict_iter random_threadsafe() const;
dict_iter find_threadsafe(const char *key) const;
dict_iter random_cache_threadsafe() const;
dict_iter find_cached_threadsafe(const char *key) const;
expireEntry *getExpire(robj_roptr key) { return getExpire(szFromObj(key)); }
expireEntry *getExpire(const char *key);
@ -1449,7 +1444,6 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::find;
using redisDbPersistentData::random;
using redisDbPersistentData::random_expire;
using redisDbPersistentData::findExpire;
using redisDbPersistentData::end;
using redisDbPersistentData::getStats;
using redisDbPersistentData::getExpireStats;
@ -1475,16 +1469,6 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::endSnapshot;
using redisDbPersistentData::consolidate_snapshot;
dict_iter find_threadsafe(const char *key) const
{
dict_iter itr = redisDbPersistentDataSnapshot::find_threadsafe(key);
if (itr.key() != nullptr && itr.val() == nullptr)
{
return const_cast<redisDb*>(this)->redisDbPersistentData::find(key);
}
return itr;
}
public:
expireset::setiter expireitr;
dict *blocking_keys; /* Keys with clients waiting for data (BLPOP)*/

View File

@ -161,7 +161,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
if (deSnapshot == nullptr)
{
// The tombstone is for a grand child, propogate it
serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_threadsafe((const char*)dictGetKey(de)) != nullptr);
serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr);
dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr);
continue;
}
@ -222,7 +222,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
serverAssert(m_spdbSnapshotHOLDER != nullptr || dictSize(m_pdictTombstone) == 0);
}
dict_iter redisDbPersistentDataSnapshot::random_threadsafe() const
dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe() const
{
if (size() == 0)
return dict_iter(nullptr);
@ -233,20 +233,21 @@ dict_iter redisDbPersistentDataSnapshot::random_threadsafe() const
double randval = (double)rand()/RAND_MAX;
if (randval <= pctInSnapshot)
{
return m_pdbSnapshot->random_threadsafe();
return m_pdbSnapshot->random_cache_threadsafe();
}
}
serverAssert(dictSize(m_pdict) > 0);
if (dictSize(m_pdict) == 0)
return dict_iter(nullptr);
dictEntry *de = dictGetRandomKey(m_pdict);
return dict_iter(de);
}
dict_iter redisDbPersistentDataSnapshot::find_threadsafe(const char *key) const
dict_iter redisDbPersistentDataSnapshot::find_cached_threadsafe(const char *key) const
{
dictEntry *de = dictFind(m_pdict, key);
if (de == nullptr && m_pdbSnapshot != nullptr)
{
auto itr = m_pdbSnapshot->find_threadsafe(key);
auto itr = m_pdbSnapshot->find_cached_threadsafe(key);
if (itr != nullptr && dictFind(m_pdictTombstone, itr.key()) == nullptr)
return itr;
}
@ -255,6 +256,30 @@ dict_iter redisDbPersistentDataSnapshot::find_threadsafe(const char *key) const
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly) const
{
if (m_spstorage != nullptr)
{
bool fSawAll = m_spstorage->enumerate([&](const char *key, size_t cchKey, const void *data, size_t cbData){
sds sdsKey = sdsnewlen(key, cchKey);
dictEntry *de = dictFind(m_pdict, sdsKey);
bool fContinue = false;
if (de != nullptr)
{
fContinue = fn((const char*)dictGetKey(de), (robj*)dictGetVal(de));
}
else
{
robj *o = fKeyOnly ? nullptr : deserializeStoredObject(this, sdsKey, data, cbData);
fContinue = fn(sdsKey, o);
if (o != nullptr)
decrRefCount(o);
}
sdsfree(sdsKey);
return fContinue;
});
return fSawAll;
}
dictEntry *de = nullptr;
bool fResult = true;
@ -351,8 +376,7 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){
if (o != nullptr)
incrRefCount(o);
dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast());
dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast());
return true;
}, true /*fKeyOnly*/);
spdb->m_spstorage = m_pdbSnapshot->m_spstorage;

View File

@ -20,18 +20,17 @@ void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data,
throw status.ToString();
}
void RocksDBStorageProvider::erase(const char *key, size_t cchKey)
bool RocksDBStorageProvider::erase(const char *key, size_t cchKey)
{
rocksdb::Status status;
if (m_spbatch != nullptr)
status = m_spbatch->Delete(m_spcolfamily.get(), rocksdb::Slice(key, cchKey));
else
status = m_spdb->Delete(WriteOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey));
if (!status.ok())
throw status.ToString();
return status.ok();
}
void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callback fn) const
void RocksDBStorageProvider::retrieve(const char *key, size_t cchKey, callbackSingle fn) const
{
std::string value;
auto status = m_spdb->Get(ReadOptions(), m_spcolfamily.get(), rocksdb::Slice(key, cchKey), &value);
@ -65,13 +64,16 @@ size_t RocksDBStorageProvider::count() const
return count;
}
void RocksDBStorageProvider::enumerate(callback fn) const
bool RocksDBStorageProvider::enumerate(callback fn) const
{
std::unique_ptr<rocksdb::Iterator> it = std::unique_ptr<rocksdb::Iterator>(m_spdb->NewIterator(ReadOptions(), m_spcolfamily.get()));
for (it->SeekToFirst(); it->Valid(); it->Next()) {
fn(it->key().data(), it->key().size(), it->value().data(), it->value().size());
bool fContinue = fn(it->key().data(), it->key().size(), it->value().data(), it->value().size());
if (!fContinue)
break;
}
assert(it->status().ok()); // Check for any errors found during the scan
return !it->Valid();
}
const IStorage *RocksDBStorageProvider::clone() const

View File

@ -17,10 +17,10 @@ public:
~RocksDBStorageProvider();
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override;
virtual void erase(const char *key, size_t cchKey) override;
virtual void retrieve(const char *key, size_t cchKey, callback fn) const override;
virtual bool erase(const char *key, size_t cchKey) override;
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override;
virtual size_t clear() override;
virtual void enumerate(callback fn) const override;
virtual bool enumerate(callback fn) const override;
virtual const IStorage *clone() const override;