Don't free snapshot objects in a critical path (under the AE lock)
Former-commit-id: d0da3d3cb74334cc8a2d14f4bdaef7935181700a
This commit is contained in:
parent
311b286d41
commit
cf4e74006f
@ -2040,6 +2040,29 @@ void flushStorageWeak()
|
||||
}
|
||||
}
|
||||
|
||||
void freeSnapshotLazyFreesAsync()
|
||||
{
|
||||
aeAcquireLock();
|
||||
std::vector<robj*> vecObjs = std::move(g_pserver->vecobjLazyFree);
|
||||
std::vector<dict*> vecDicts = std::move(g_pserver->vecdictLazyFree);
|
||||
std::vector<std::vector<dictEntry*>> vecvecde = std::move(g_pserver->vecvecde);
|
||||
aeReleaseLock();
|
||||
|
||||
for (auto &vecdeFree : vecvecde)
|
||||
{
|
||||
for (auto *de : vecdeFree)
|
||||
{
|
||||
dbDictType.keyDestructor(nullptr, dictGetKey(de));
|
||||
dbDictType.valDestructor(nullptr, dictGetVal(de));
|
||||
zfree(de);
|
||||
}
|
||||
}
|
||||
for (robj *o : vecObjs)
|
||||
decrRefCount(o);
|
||||
for (dict *d : vecDicts)
|
||||
dictRelease(d);
|
||||
}
|
||||
|
||||
/* This is our timer interrupt, called g_pserver->hz times per second.
|
||||
* Here is where we do a number of things that need to be done asynchronously.
|
||||
* For instance:
|
||||
@ -2313,14 +2336,18 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
}
|
||||
}
|
||||
|
||||
bool fAnySnapshots = false;
|
||||
for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb)
|
||||
fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot();
|
||||
if (fAnySnapshots)
|
||||
{
|
||||
g_pserver->asyncworkqueue->AddWorkFunction([]{
|
||||
g_pserver->db[0]->consolidate_snapshot();
|
||||
}, true /*HiPri*/);
|
||||
run_with_period(100) {
|
||||
bool fAsyncFrees = g_pserver->vecobjLazyFree.size() || g_pserver->vecdictLazyFree.size() || g_pserver->vecvecde.size();
|
||||
bool fAnySnapshots = false;
|
||||
for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots && !fAsyncFrees; ++idb)
|
||||
fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot();
|
||||
if (fAnySnapshots || fAsyncFrees)
|
||||
{
|
||||
g_pserver->asyncworkqueue->AddWorkFunction([fAsyncFrees]{
|
||||
g_pserver->db[0]->consolidate_snapshot();
|
||||
freeSnapshotLazyFreesAsync();
|
||||
}, true /*HiPri*/);
|
||||
}
|
||||
}
|
||||
|
||||
/* Fire the cron loop modules event. */
|
||||
@ -6049,6 +6076,7 @@ int main(int argc, char **argv) {
|
||||
serverAssert(fLockAcquired);
|
||||
|
||||
g_pserver->garbageCollector.shutdown();
|
||||
freeSnapshotLazyFreesAsync();
|
||||
delete g_pserver->m_pstorageFactory;
|
||||
|
||||
return 0;
|
||||
|
@ -1425,7 +1425,7 @@ protected:
|
||||
static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot);
|
||||
int snapshot_depth() const;
|
||||
void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce);
|
||||
void freeTombstoneObjects(int depth);
|
||||
bool freeTombstoneObjects(int depth);
|
||||
|
||||
public:
|
||||
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||
@ -2452,6 +2452,10 @@ struct redisServer {
|
||||
char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */
|
||||
char *bgsave_cpulist; /* cpu affinity list of bgsave process. */
|
||||
|
||||
std::vector<dict*> vecdictLazyFree;
|
||||
std::vector<robj*> vecobjLazyFree;
|
||||
std::vector<std::vector<dictEntry*>> vecvecde;
|
||||
|
||||
bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
|
||||
};
|
||||
|
||||
|
@ -233,17 +233,23 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
|
||||
latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency);
|
||||
}
|
||||
|
||||
void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
||||
bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
||||
{
|
||||
if (m_pdbSnapshot == nullptr)
|
||||
return;
|
||||
{
|
||||
serverAssert(dictSize(m_pdictTombstone) == 0);
|
||||
return true;
|
||||
}
|
||||
|
||||
const_cast<redisDbPersistentDataSnapshot*>(m_pdbSnapshot)->freeTombstoneObjects(depth+1);
|
||||
bool fPrevResult = const_cast<redisDbPersistentDataSnapshot*>(m_pdbSnapshot)->freeTombstoneObjects(depth+1);
|
||||
if (m_pdbSnapshot->m_refCount != depth && (m_pdbSnapshot->m_refCount != (m_refCount+1)))
|
||||
return;
|
||||
return false;
|
||||
|
||||
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
||||
dictEntry *de;
|
||||
std::vector<dictEntry*> vecdeFree;
|
||||
vecdeFree.reserve(dictSize(m_pdictTombstone));
|
||||
bool fAllCovered = true;
|
||||
while ((de = dictNext(di)) != nullptr)
|
||||
{
|
||||
dictEntry **dePrev = nullptr;
|
||||
@ -252,12 +258,28 @@ void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
||||
dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared(key));
|
||||
if (deObj != nullptr)
|
||||
{
|
||||
decrRefCount((robj*)dictGetVal(deObj));
|
||||
void *ptrSet = nullptr;
|
||||
__atomic_store(&deObj->v.val, &ptrSet, __ATOMIC_RELAXED);
|
||||
// Now unlink the DE
|
||||
__atomic_store(dePrev, &deObj->next, __ATOMIC_RELEASE);
|
||||
ht->used--;
|
||||
vecdeFree.push_back(deObj);
|
||||
}
|
||||
else
|
||||
{
|
||||
fAllCovered = fPrevResult;
|
||||
}
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
aeAcquireLock();
|
||||
if (fAllCovered)
|
||||
{
|
||||
g_pserver->vecdictLazyFree.push_back(m_pdictTombstone);
|
||||
m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr);
|
||||
}
|
||||
g_pserver->vecvecde.emplace_back(std::move(vecdeFree));
|
||||
aeReleaseLock();
|
||||
|
||||
return fAllCovered;
|
||||
}
|
||||
|
||||
void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot)
|
||||
@ -308,6 +330,8 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
||||
// Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
|
||||
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
||||
dictEntry *de;
|
||||
m_spdbSnapshotHOLDER->m_pdict->iterators++;
|
||||
std::vector<dictEntry*> vecde;
|
||||
while ((de = dictNext(di)) != NULL)
|
||||
{
|
||||
dictEntry **dePrev;
|
||||
@ -327,15 +351,15 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
||||
}
|
||||
|
||||
// Delete the object from the source dict, we don't use dictDelete to avoid a second search
|
||||
dictFreeKey(m_spdbSnapshotHOLDER->m_pdict, deSnapshot);
|
||||
dictFreeVal(m_spdbSnapshotHOLDER->m_pdict, deSnapshot);
|
||||
serverAssert(*dePrev == deSnapshot);
|
||||
vecde.push_back(deSnapshot);
|
||||
*dePrev = deSnapshot->next;
|
||||
zfree(deSnapshot);
|
||||
ht->used--;
|
||||
}
|
||||
g_pserver->vecvecde.emplace_back(std::move(vecde));
|
||||
m_spdbSnapshotHOLDER->m_pdict->iterators--;
|
||||
dictReleaseIterator(di);
|
||||
dictEmpty(m_pdictTombstone, nullptr);
|
||||
g_pserver->vecdictLazyFree.push_back(m_pdictTombstone);
|
||||
m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr);
|
||||
|
||||
// Stage 2 Move all new keys to the snapshot DB
|
||||
dictMerge(m_spdbSnapshotHOLDER->m_pdict, m_pdict);
|
||||
|
Loading…
x
Reference in New Issue
Block a user