Fixup IStorage uses

Former-commit-id: 5ea0ce143a79365fb3903c6fc7caeb1c9760b0cc
This commit is contained in:
John Sully 2019-12-06 17:43:28 -05:00
parent e61aa0261b
commit 11f710c5fc
6 changed files with 75 additions and 44 deletions

View File

@ -6,9 +6,14 @@ class IStorage
public:
typedef std::function<void(const char *, size_t, const void *, size_t)> callback;
virtual ~IStorage() {}
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0;
virtual void erase(const char *key, size_t cchKey) = 0;
virtual void retrieve(const char *key, size_t cchKey, bool fDelete, callback fn) = 0;
virtual void retrieve(const char *key, size_t cchKey, bool fDelete, callback fn) const = 0;
virtual size_t clear() = 0;
virtual void enumerate(callback fn) = 0;
virtual void enumerate(callback fn) const = 0;
/* This is permitted to be a shallow clone */
virtual const IStorage *clone() const = 0;
};

View File

@ -1956,8 +1956,8 @@ void redisDbPersistentData::clear(void(callback)(void*))
m_fAllChanged = true;
delete m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset();
if (m_pstorage != nullptr)
m_pstorage->clear();
if (m_spstorage != nullptr)
m_spstorage->clear();
m_pdbSnapshot = nullptr;
}
@ -1968,7 +1968,7 @@ void redisDbPersistentData::clear(void(callback)(void*))
db1->m_fTrackingChanges = db2->m_fTrackingChanges;
db1->m_fAllChanged = db2->m_fAllChanged;
db1->m_setexpire = db2->m_setexpire;
db1->m_pstorage = db2->m_pstorage;
db1->m_spstorage = std::move(db2->m_spstorage);
db1->m_pdbSnapshot = db2->m_pdbSnapshot;
db1->m_spdbSnapshotHOLDER = std::move(db2->m_spdbSnapshotHOLDER);
@ -1976,7 +1976,7 @@ void redisDbPersistentData::clear(void(callback)(void*))
db2->m_fTrackingChanges = aux.m_fTrackingChanges;
db2->m_fAllChanged = aux.m_fAllChanged;
db2->m_setexpire = aux.m_setexpire;
db2->m_pstorage = aux.m_pstorage;
db2->m_spstorage = std::move(aux.m_spstorage);
db2->m_pdbSnapshot = aux.m_pdbSnapshot;
db2->m_spdbSnapshotHOLDER = std::move(aux.m_spdbSnapshotHOLDER);
@ -2067,9 +2067,8 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
}
else if (*pde != nullptr && dictGetVal(*pde) == nullptr)
{
serverAssert(m_pstorage != nullptr);
sds key = (sds)dictGetKey(*pde);
m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){
serverAssert(m_spstorage != nullptr);
m_spstorage->retrieve(sdsKey, sdslen(sdsKey), true, [&](const char *, size_t, const void *data, size_t cb){
robj *o = deserializeStoredObject(data, cb);
serverAssert(o != nullptr);
dictSetVal(m_pdict, *pde, o);
@ -2080,7 +2079,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o)
{
sds temp = serializeStoredObject(o);
m_pstorage->insert(szKey, cchKey, temp, sdslen(temp));
m_spstorage->insert(szKey, cchKey, temp, sdslen(temp));
sdsfree(temp);
}
@ -2101,13 +2100,13 @@ void redisDbPersistentData::processChanges()
--m_fTrackingChanges;
serverAssert(m_fTrackingChanges >= 0);
if (m_pstorage != nullptr)
if (m_spstorage != nullptr)
{
if (m_fTrackingChanges == 0)
{
if (m_fAllChanged)
{
m_pstorage->clear();
m_spstorage->clear();
storeDatabase();
}
else
@ -2122,7 +2121,7 @@ void redisDbPersistentData::processChanges()
}
else
{
m_pstorage->erase(str.data(), str.size());
m_spstorage->erase(str.data(), str.size());
}
sdsfree(sdsKey);
}
@ -2171,4 +2170,13 @@ dict_iter redisDbPersistentData::random()
size_t redisDbPersistentData::size() const
{
return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
}
void redisDbPersistentData::removeCachedValue(const char *key)
{
serverAssert(m_spstorage != nullptr);
dictEntry *de = dictFind(m_pdict, key);
serverAssert(de != nullptr);
decrRefCount((robj*)dictGetVal(de));
dictSetVal(m_pdict, de, nullptr);
}

View File

@ -583,31 +583,42 @@ int freeMemoryIfNeeded(void) {
if (bestkey) {
db = g_pserver->db+bestdbid;
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction);
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
if (g_pserver->lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
if (db->FStorageProvider())
{
// This key is in the storage so we only need to free the object
delta = (long long) zmalloc_used_memory();
db->removeCachedValue(bestkey);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
}
else
dbSyncDelete(db,keyobj);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
latencyRemoveNestedEvent(latency,eviction_latency);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
g_pserver->stat_evictedkeys++;
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
decrRefCount(keyobj);
{
robj *keyobj = createStringObject(bestkey,sdslen(bestkey));
propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction);
/* We compute the amount of memory freed by db*Delete() alone.
* It is possible that actually the memory needed to propagate
* the DEL in AOF and replication link is greater than the one
* we are freeing removing the key, but we can't account for
* that otherwise we would never exit the loop.
*
* AOF and Output buffer memory will be freed eventually so
* we only care about memory used by the key space. */
delta = (long long) zmalloc_used_memory();
latencyStartMonitor(eviction_latency);
if (g_pserver->lazyfree_lazy_eviction)
dbAsyncDelete(db,keyobj);
else
dbSyncDelete(db,keyobj);
latencyEndMonitor(eviction_latency);
latencyAddSampleIfNeeded("eviction-del",eviction_latency);
latencyRemoveNestedEvent(latency,eviction_latency);
delta -= (long long) zmalloc_used_memory();
mem_freed += delta;
g_pserver->stat_evictedkeys++;
notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted",
keyobj, db->id);
decrRefCount(keyobj);
}
keys_freed++;
/* When the memory to free starts to be big enough, we may

View File

@ -118,8 +118,8 @@ void redisDbPersistentData::emptyDbAsync() {
auto *set = m_setexpire;
m_setexpire = new (MALLOC_LOCAL) expireset();
m_pdict = dictCreate(&dbDictType,this);
if (m_pstorage != nullptr)
m_pstorage->clear();
if (m_spstorage != nullptr)
m_spstorage->clear();
if (m_fTrackingChanges)
m_fAllChanged = true;
atomicIncr(lazyfree_objects,dictSize(oldht1));

View File

@ -1281,6 +1281,9 @@ public:
void consolidate_snapshot();
bool FStorageProvider() { return m_spstorage != nullptr; }
void removeCachedValue(const char *key);
private:
void ensure(const char *key);
void ensure(const char *key, dictEntry **de);
@ -1294,7 +1297,7 @@ private:
int m_fTrackingChanges = 0; // Note: Stack based
bool m_fAllChanged = false;
std::set<std::string> m_setchanged;
IStorage *m_pstorage = nullptr;
std::shared_ptr<IStorage> m_spstorage = nullptr;
uint64_t mvccCheckpoint = 0;
// Expire

View File

@ -14,8 +14,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
++levels;
psnapshot = psnapshot->m_spdbSnapshotHOLDER.get();
}
//if (fOptional && (levels > 8))
// return nullptr;
if (fOptional && (levels > 8))
return nullptr;
if (m_spdbSnapshotHOLDER != nullptr)
{
@ -41,6 +41,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
spdb->m_pdict->iterators++;
dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now
spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER);
if (m_spstorage != nullptr)
spdb->m_spstorage = std::shared_ptr<IStorage>(const_cast<IStorage*>(m_spstorage->clone()));
spdb->m_pdbSnapshot = m_pdbSnapshot;
spdb->m_refCount = 1;
spdb->mvccCheckpoint = getMvccTstamp();
@ -50,7 +52,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
m_pdict = dictCreate(&dbDictType,this);
m_pdictTombstone = dictCreate(&dbDictType, this);
m_setexpire = new (MALLOC_LOCAL) expireset();
serverAssert(spdb->m_pdict->iterators == 1);
m_spdbSnapshotHOLDER = std::move(spdb);
@ -348,10 +350,12 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
dictExpand(spdb->m_pdict, m_pdbSnapshot->size());
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){
incrRefCount(o);
if (o != nullptr)
incrRefCount(o);
dictAdd(spdb->m_pdict, sdsdup(key), o.unsafe_robjcast());
return true;
});
spdb->m_spstorage = m_pdbSnapshot->m_spstorage;
spdb->m_pdict->iterators++;