diff --git a/src/IStorage.h b/src/IStorage.h index b12b98260..fa870933c 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -6,9 +6,14 @@ class IStorage public: typedef std::function callback; + virtual ~IStorage() {} + virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0; virtual void erase(const char *key, size_t cchKey) = 0; - virtual void retrieve(const char *key, size_t cchKey, bool fDelete, callback fn) = 0; + virtual void retrieve(const char *key, size_t cchKey, bool fDelete, callback fn) const = 0; virtual size_t clear() = 0; - virtual void enumerate(callback fn) = 0; + virtual void enumerate(callback fn) const = 0; + + /* This is permitted to be a shallow clone */ + virtual const IStorage *clone() const = 0; }; diff --git a/src/db.cpp b/src/db.cpp index 513356afd..c7f75bcab 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1956,8 +1956,8 @@ void redisDbPersistentData::clear(void(callback)(void*)) m_fAllChanged = true; delete m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(); - if (m_pstorage != nullptr) - m_pstorage->clear(); + if (m_spstorage != nullptr) + m_spstorage->clear(); m_pdbSnapshot = nullptr; } @@ -1968,7 +1968,7 @@ void redisDbPersistentData::clear(void(callback)(void*)) db1->m_fTrackingChanges = db2->m_fTrackingChanges; db1->m_fAllChanged = db2->m_fAllChanged; db1->m_setexpire = db2->m_setexpire; - db1->m_pstorage = db2->m_pstorage; + db1->m_spstorage = std::move(db2->m_spstorage); db1->m_pdbSnapshot = db2->m_pdbSnapshot; db1->m_spdbSnapshotHOLDER = std::move(db2->m_spdbSnapshotHOLDER); @@ -1976,7 +1976,7 @@ void redisDbPersistentData::clear(void(callback)(void*)) db2->m_fTrackingChanges = aux.m_fTrackingChanges; db2->m_fAllChanged = aux.m_fAllChanged; db2->m_setexpire = aux.m_setexpire; - db2->m_pstorage = aux.m_pstorage; + db2->m_spstorage = std::move(aux.m_spstorage); db2->m_pdbSnapshot = aux.m_pdbSnapshot; db2->m_spdbSnapshotHOLDER = std::move(aux.m_spdbSnapshotHOLDER); @@ -2067,9 +2067,8 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) } else if (*pde != nullptr && dictGetVal(*pde) == nullptr) { - serverAssert(m_pstorage != nullptr); - sds key = (sds)dictGetKey(*pde); - m_pstorage->retrieve(key, sdslen(key), true, [&](const char *, size_t, const void *data, size_t cb){ + serverAssert(m_spstorage != nullptr); + m_spstorage->retrieve(sdsKey, sdslen(sdsKey), true, [&](const char *, size_t, const void *data, size_t cb){ robj *o = deserializeStoredObject(data, cb); serverAssert(o != nullptr); dictSetVal(m_pdict, *pde, o); @@ -2080,7 +2079,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o) { sds temp = serializeStoredObject(o); - m_pstorage->insert(szKey, cchKey, temp, sdslen(temp)); + m_spstorage->insert(szKey, cchKey, temp, sdslen(temp)); sdsfree(temp); } @@ -2101,13 +2100,13 @@ void redisDbPersistentData::processChanges() --m_fTrackingChanges; serverAssert(m_fTrackingChanges >= 0); - if (m_pstorage != nullptr) + if (m_spstorage != nullptr) { if (m_fTrackingChanges == 0) { if (m_fAllChanged) { - m_pstorage->clear(); + m_spstorage->clear(); storeDatabase(); } else @@ -2122,7 +2121,7 @@ void redisDbPersistentData::processChanges() } else { - m_pstorage->erase(str.data(), str.size()); + m_spstorage->erase(str.data(), str.size()); } sdsfree(sdsKey); } @@ -2171,4 +2170,13 @@ dict_iter redisDbPersistentData::random() size_t redisDbPersistentData::size() const { return dictSize(m_pdict) + (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0); +} + +void redisDbPersistentData::removeCachedValue(const char *key) +{ + serverAssert(m_spstorage != nullptr); + dictEntry *de = dictFind(m_pdict, key); + serverAssert(de != nullptr); + decrRefCount((robj*)dictGetVal(de)); + dictSetVal(m_pdict, de, nullptr); } \ No newline at end of file diff --git a/src/evict.cpp b/src/evict.cpp index 164eb8c2e..c0d6d1737 100644 --- a/src/evict.cpp +++ b/src/evict.cpp @@ -583,31 +583,42 @@ int freeMemoryIfNeeded(void) { if (bestkey) { db = g_pserver->db+bestdbid; - robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); - propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction); - /* We compute the amount of memory freed by db*Delete() alone. - * It is possible that actually the memory needed to propagate - * the DEL in AOF and replication link is greater than the one - * we are freeing removing the key, but we can't account for - * that otherwise we would never exit the loop. - * - * AOF and Output buffer memory will be freed eventually so - * we only care about memory used by the key space. */ - delta = (long long) zmalloc_used_memory(); - latencyStartMonitor(eviction_latency); - if (g_pserver->lazyfree_lazy_eviction) - dbAsyncDelete(db,keyobj); + if (db->FStorageProvider()) + { + // This key is in the storage so we only need to free the object + delta = (long long) zmalloc_used_memory(); + db->removeCachedValue(bestkey); + delta -= (long long) zmalloc_used_memory(); + mem_freed += delta; + } else - dbSyncDelete(db,keyobj); - latencyEndMonitor(eviction_latency); - latencyAddSampleIfNeeded("eviction-del",eviction_latency); - latencyRemoveNestedEvent(latency,eviction_latency); - delta -= (long long) zmalloc_used_memory(); - mem_freed += delta; - g_pserver->stat_evictedkeys++; - notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", - keyobj, db->id); - decrRefCount(keyobj); + { + robj *keyobj = createStringObject(bestkey,sdslen(bestkey)); + propagateExpire(db,keyobj,g_pserver->lazyfree_lazy_eviction); + /* We compute the amount of memory freed by db*Delete() alone. + * It is possible that actually the memory needed to propagate + * the DEL in AOF and replication link is greater than the one + * we are freeing removing the key, but we can't account for + * that otherwise we would never exit the loop. + * + * AOF and Output buffer memory will be freed eventually so + * we only care about memory used by the key space. */ + delta = (long long) zmalloc_used_memory(); + latencyStartMonitor(eviction_latency); + if (g_pserver->lazyfree_lazy_eviction) + dbAsyncDelete(db,keyobj); + else + dbSyncDelete(db,keyobj); + latencyEndMonitor(eviction_latency); + latencyAddSampleIfNeeded("eviction-del",eviction_latency); + latencyRemoveNestedEvent(latency,eviction_latency); + delta -= (long long) zmalloc_used_memory(); + mem_freed += delta; + g_pserver->stat_evictedkeys++; + notifyKeyspaceEvent(NOTIFY_EVICTED, "evicted", + keyobj, db->id); + decrRefCount(keyobj); + } keys_freed++; /* When the memory to free starts to be big enough, we may diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 1e2981105..b08eec542 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -118,8 +118,8 @@ void redisDbPersistentData::emptyDbAsync() { auto *set = m_setexpire; m_setexpire = new (MALLOC_LOCAL) expireset(); m_pdict = dictCreate(&dbDictType,this); - if (m_pstorage != nullptr) - m_pstorage->clear(); + if (m_spstorage != nullptr) + m_spstorage->clear(); if (m_fTrackingChanges) m_fAllChanged = true; atomicIncr(lazyfree_objects,dictSize(oldht1)); diff --git a/src/server.h b/src/server.h index 8685a778f..79c841dc0 100644 --- a/src/server.h +++ b/src/server.h @@ -1281,6 +1281,9 @@ public: void consolidate_snapshot(); + bool FStorageProvider() { return m_spstorage != nullptr; } + void removeCachedValue(const char *key); + private: void ensure(const char *key); void ensure(const char *key, dictEntry **de); @@ -1294,7 +1297,7 @@ private: int m_fTrackingChanges = 0; // Note: Stack based bool m_fAllChanged = false; std::set m_setchanged; - IStorage *m_pstorage = nullptr; + std::shared_ptr m_spstorage = nullptr; uint64_t mvccCheckpoint = 0; // Expire diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 55d62a951..f5499bbea 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -14,8 +14,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 ++levels; psnapshot = psnapshot->m_spdbSnapshotHOLDER.get(); } - //if (fOptional && (levels > 8)) - // return nullptr; + if (fOptional && (levels > 8)) + return nullptr; if (m_spdbSnapshotHOLDER != nullptr) { @@ -41,6 +41,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 spdb->m_pdict->iterators++; dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER); + if (m_spstorage != nullptr) + spdb->m_spstorage = std::shared_ptr(const_cast(m_spstorage->clone())); spdb->m_pdbSnapshot = m_pdbSnapshot; spdb->m_refCount = 1; spdb->mvccCheckpoint = getMvccTstamp(); @@ -50,7 +52,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 m_pdict = dictCreate(&dbDictType,this); m_pdictTombstone = dictCreate(&dbDictType, this); m_setexpire = new (MALLOC_LOCAL) expireset(); - + serverAssert(spdb->m_pdict->iterators == 1); m_spdbSnapshotHOLDER = std::move(spdb); @@ -348,10 +350,12 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData * dictExpand(spdb->m_pdict, m_pdbSnapshot->size()); m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){ - incrRefCount(o); + if (o != nullptr) + incrRefCount(o); dictAdd(spdb->m_pdict, sdsdup(key), o.unsafe_robjcast()); return true; }); + spdb->m_spstorage = m_pdbSnapshot->m_spstorage; spdb->m_pdict->iterators++;