Optimize freeMemory by lazy freeing objects
Former-commit-id: cca31ed5ee2f42975f0051cfabf1e88720b3d678
This commit is contained in:
parent
0adcb82652
commit
0c7589580d
@ -2838,7 +2838,7 @@ size_t redisDbPersistentData::size() const
|
|||||||
+ (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
|
+ (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool redisDbPersistentData::removeCachedValue(const char *key)
|
bool redisDbPersistentData::removeCachedValue(const char *key, dictEntry **ppde)
|
||||||
{
|
{
|
||||||
serverAssert(m_spstorage != nullptr);
|
serverAssert(m_spstorage != nullptr);
|
||||||
// First ensure its not a pending key
|
// First ensure its not a pending key
|
||||||
@ -2854,7 +2854,11 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// since we write ASAP the database already has a valid copy so safe to delete
|
// since we write ASAP the database already has a valid copy so safe to delete
|
||||||
dictDelete(m_pdict, key);
|
if (ppde != nullptr) {
|
||||||
|
*ppde = dictUnlink(m_pdict, key);
|
||||||
|
} else {
|
||||||
|
dictDelete(m_pdict, key);
|
||||||
|
}
|
||||||
|
|
||||||
if (m_spstorage != nullptr)
|
if (m_spstorage != nullptr)
|
||||||
m_spstorage->batch_unlock();
|
m_spstorage->batch_unlock();
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
#include "bio.h"
|
#include "bio.h"
|
||||||
#include "atomicvar.h"
|
#include "atomicvar.h"
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
|
#include <map>
|
||||||
|
|
||||||
/* ----------------------------------------------------------------------------
|
/* ----------------------------------------------------------------------------
|
||||||
* Data structures
|
* Data structures
|
||||||
@ -475,6 +476,64 @@ int getMaxmemoryState(size_t *total, size_t *logical, size_t *tofree, float *lev
|
|||||||
return C_ERR;
|
return C_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class FreeMemoryLazyFree : public ICollectable
|
||||||
|
{
|
||||||
|
ssize_t m_cb = 0;
|
||||||
|
std::vector<std::pair<dict*, std::vector<dictEntry*>>> vecdictvecde;
|
||||||
|
|
||||||
|
public:
|
||||||
|
static std::atomic<int> s_clazyFreesInProgress;
|
||||||
|
|
||||||
|
FreeMemoryLazyFree() {
|
||||||
|
s_clazyFreesInProgress++;
|
||||||
|
}
|
||||||
|
|
||||||
|
~FreeMemoryLazyFree() {
|
||||||
|
--s_clazyFreesInProgress;
|
||||||
|
for (auto &pair : vecdictvecde) {
|
||||||
|
for (auto de : pair.second) {
|
||||||
|
dictFreeUnlinkedEntry(pair.first, de);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ssize_t addEntry(dict *d, dictEntry *de) {
|
||||||
|
ssize_t cbFreedNow = 0;
|
||||||
|
ssize_t cb = sizeof(dictEntry);
|
||||||
|
cb += sdsAllocSize((sds)dictGetKey(de));
|
||||||
|
robj *o = (robj*)dictGetVal(de);
|
||||||
|
switch (o->type) {
|
||||||
|
case OBJ_STRING:
|
||||||
|
cb += getStringObjectSdsUsedMemory(o)+sizeof(robj);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
// If we don't know about it we can't accurately track the memory so free now
|
||||||
|
cbFreedNow = zmalloc_used_memory();
|
||||||
|
decrRefCount(o);
|
||||||
|
cbFreedNow -= zmalloc_used_memory();
|
||||||
|
de->v.val = nullptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto itr = std::lower_bound(vecdictvecde.begin(), vecdictvecde.end(), d,
|
||||||
|
[](const std::pair<dict*, std::vector<dictEntry*>> &a, dict *d) -> bool {
|
||||||
|
return a.first < d;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
if (itr == vecdictvecde.end() || itr->first != d) {
|
||||||
|
itr = vecdictvecde.insert(itr, std::make_pair(d, std::vector<dictEntry*>()));
|
||||||
|
}
|
||||||
|
serverAssert(itr->first == d);
|
||||||
|
itr->second.push_back(de);
|
||||||
|
m_cb += cb;
|
||||||
|
return cb + cbFreedNow;
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t memory_queued() { return m_cb; }
|
||||||
|
};
|
||||||
|
|
||||||
|
std::atomic<int> FreeMemoryLazyFree::s_clazyFreesInProgress {0};
|
||||||
|
|
||||||
/* This function is periodically called to see if there is memory to free
|
/* This function is periodically called to see if there is memory to free
|
||||||
* according to the current "maxmemory" settings. In case we are over the
|
* according to the current "maxmemory" settings. In case we are over the
|
||||||
* memory limit, the function will try to free some memory to return back
|
* memory limit, the function will try to free some memory to return back
|
||||||
@ -491,6 +550,9 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
|
|||||||
* and just be masters exact copies. */
|
* and just be masters exact copies. */
|
||||||
if (g_pserver->m_pstorageFactory == nullptr && listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory && !g_pserver->fActiveReplica) return C_OK;
|
if (g_pserver->m_pstorageFactory == nullptr && listLength(g_pserver->masters) && g_pserver->repl_slave_ignore_maxmemory && !g_pserver->fActiveReplica) return C_OK;
|
||||||
|
|
||||||
|
/* If we have a lazy free obj pending, our amounts will be off, wait for it to go away */
|
||||||
|
if (FreeMemoryLazyFree::s_clazyFreesInProgress > 0) return C_OK;
|
||||||
|
|
||||||
size_t mem_reported, mem_tofree, mem_freed;
|
size_t mem_reported, mem_tofree, mem_freed;
|
||||||
mstime_t latency, eviction_latency, lazyfree_latency;
|
mstime_t latency, eviction_latency, lazyfree_latency;
|
||||||
long long delta;
|
long long delta;
|
||||||
@ -500,6 +562,8 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
|
|||||||
int ckeysFailed = 0;
|
int ckeysFailed = 0;
|
||||||
int keys_freed = 0;
|
int keys_freed = 0;
|
||||||
|
|
||||||
|
std::unique_ptr<FreeMemoryLazyFree> splazy = std::make_unique<FreeMemoryLazyFree>();
|
||||||
|
|
||||||
if (g_pserver->maxstorage && g_pserver->m_pstorageFactory != nullptr && g_pserver->m_pstorageFactory->totalDiskspaceUsed() >= g_pserver->maxstorage)
|
if (g_pserver->maxstorage && g_pserver->m_pstorageFactory != nullptr && g_pserver->m_pstorageFactory->totalDiskspaceUsed() >= g_pserver->maxstorage)
|
||||||
goto cant_free_storage;
|
goto cant_free_storage;
|
||||||
|
|
||||||
@ -623,9 +687,9 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
|
|||||||
if (fEvictToStorage)
|
if (fEvictToStorage)
|
||||||
{
|
{
|
||||||
// This key is in the storage so we only need to free the object
|
// This key is in the storage so we only need to free the object
|
||||||
delta = (long long) zmalloc_used_memory();
|
dictEntry *deT;
|
||||||
if (db->removeCachedValue(bestkey)) {
|
if (db->removeCachedValue(bestkey, &deT)) {
|
||||||
delta -= (long long) zmalloc_used_memory();
|
mem_freed += splazy->addEntry(db->dictUnsafeKeyOnly(), deT);
|
||||||
ckeysFailed = 0;
|
ckeysFailed = 0;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
@ -634,7 +698,6 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
|
|||||||
if (ckeysFailed > 1024)
|
if (ckeysFailed > 1024)
|
||||||
goto cant_free;
|
goto cant_free;
|
||||||
}
|
}
|
||||||
mem_freed += delta;
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -691,6 +754,10 @@ int freeMemoryIfNeeded(bool fQuickCycle, bool fPreSnapshot) {
|
|||||||
}
|
}
|
||||||
result = C_OK;
|
result = C_OK;
|
||||||
|
|
||||||
|
if (splazy != nullptr && splazy->memory_queued() > 0 && !serverTL->gcEpoch.isReset()) {
|
||||||
|
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy));
|
||||||
|
}
|
||||||
|
|
||||||
cant_free:
|
cant_free:
|
||||||
if (g_pserver->m_pstorageFactory)
|
if (g_pserver->m_pstorageFactory)
|
||||||
{
|
{
|
||||||
|
@ -1131,7 +1131,7 @@ public:
|
|||||||
void restoreSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
|
void restoreSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
|
||||||
|
|
||||||
bool FStorageProvider() { return m_spstorage != nullptr; }
|
bool FStorageProvider() { return m_spstorage != nullptr; }
|
||||||
bool removeCachedValue(const char *key);
|
bool removeCachedValue(const char *key, dictEntry **ppde = nullptr);
|
||||||
void removeAllCachedValues();
|
void removeAllCachedValues();
|
||||||
|
|
||||||
bool prefetchKeysAsync(client *c, struct parsed_command &command, bool fExecOK);
|
bool prefetchKeysAsync(client *c, struct parsed_command &command, bool fExecOK);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user