Fast cleanup of snapshots without leaving them forever
Former-commit-id: fdd83b2b49244ed2988b080892ee5cffe9fd2684
This commit is contained in:
parent
c0586b3aed
commit
e8e7061a73
@ -200,6 +200,8 @@ int dictMerge(dict *dst, dict *src)
|
||||
std::swap(dst->iterators, src->iterators);
|
||||
}
|
||||
|
||||
src->rehashidx = -1;
|
||||
|
||||
if (!dictIsRehashing(dst) && !dictIsRehashing(src))
|
||||
{
|
||||
if (dst->ht[0].size >= src->ht[0].size)
|
||||
|
@ -2040,29 +2040,6 @@ void flushStorageWeak()
|
||||
}
|
||||
}
|
||||
|
||||
void freeSnapshotLazyFreesAsync()
|
||||
{
|
||||
aeAcquireLock();
|
||||
std::vector<robj*> vecObjs = std::move(g_pserver->vecobjLazyFree);
|
||||
std::vector<dict*> vecDicts = std::move(g_pserver->vecdictLazyFree);
|
||||
std::vector<std::vector<dictEntry*>> vecvecde = std::move(g_pserver->vecvecde);
|
||||
aeReleaseLock();
|
||||
|
||||
for (auto &vecdeFree : vecvecde)
|
||||
{
|
||||
for (auto *de : vecdeFree)
|
||||
{
|
||||
dbDictType.keyDestructor(nullptr, dictGetKey(de));
|
||||
dbDictType.valDestructor(nullptr, dictGetVal(de));
|
||||
zfree(de);
|
||||
}
|
||||
}
|
||||
for (robj *o : vecObjs)
|
||||
decrRefCount(o);
|
||||
for (dict *d : vecDicts)
|
||||
dictRelease(d);
|
||||
}
|
||||
|
||||
/* This is our timer interrupt, called g_pserver->hz times per second.
|
||||
* Here is where we do a number of things that need to be done asynchronously.
|
||||
* For instance:
|
||||
@ -2243,11 +2220,22 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
CONFIG_BGSAVE_RETRY_DELAY ||
|
||||
g_pserver->lastbgsave_status == C_OK))
|
||||
{
|
||||
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
|
||||
sp->changes, (int)sp->seconds);
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
rdbSaveBackground(rsiptr);
|
||||
// Ensure rehashing is complete
|
||||
bool fRehashInProgress = false;
|
||||
if (g_pserver->activerehashing) {
|
||||
for (int idb = 0; idb < cserver.dbnum && !fRehashInProgress; ++idb) {
|
||||
if (g_pserver->db[idb]->FRehashing())
|
||||
fRehashInProgress = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!fRehashInProgress) {
|
||||
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
|
||||
sp->changes, (int)sp->seconds);
|
||||
rdbSaveInfo rsi, *rsiptr;
|
||||
rsiptr = rdbPopulateSaveInfo(&rsi);
|
||||
rdbSaveBackground(rsiptr);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
@ -2337,15 +2325,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
}
|
||||
|
||||
run_with_period(100) {
|
||||
bool fAsyncFrees = g_pserver->vecobjLazyFree.size() || g_pserver->vecdictLazyFree.size() || g_pserver->vecvecde.size();
|
||||
bool fAnySnapshots = false;
|
||||
for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots && !fAsyncFrees; ++idb)
|
||||
for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb)
|
||||
fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot();
|
||||
if (fAnySnapshots || fAsyncFrees)
|
||||
if (fAnySnapshots)
|
||||
{
|
||||
g_pserver->asyncworkqueue->AddWorkFunction([fAsyncFrees]{
|
||||
g_pserver->asyncworkqueue->AddWorkFunction([]{
|
||||
g_pserver->db[0]->consolidate_snapshot();
|
||||
freeSnapshotLazyFreesAsync();
|
||||
}, true /*HiPri*/);
|
||||
}
|
||||
}
|
||||
@ -6084,7 +6070,6 @@ int main(int argc, char **argv) {
|
||||
serverAssert(fLockAcquired);
|
||||
|
||||
g_pserver->garbageCollector.shutdown();
|
||||
freeSnapshotLazyFreesAsync();
|
||||
delete g_pserver->m_pstorageFactory;
|
||||
|
||||
return 0;
|
||||
|
@ -1420,6 +1420,9 @@ private:
|
||||
class redisDbPersistentDataSnapshot : protected redisDbPersistentData
|
||||
{
|
||||
friend class redisDbPersistentData;
|
||||
private:
|
||||
bool iterate_threadsafe_core(std::function<bool(const char*, robj_roptr o)> &fn, bool fKeyOnly, bool fCacheOnly, bool fTop) const;
|
||||
|
||||
protected:
|
||||
static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot);
|
||||
void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce);
|
||||
@ -2503,10 +2506,6 @@ struct redisServer {
|
||||
char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */
|
||||
char *bgsave_cpulist; /* cpu affinity list of bgsave process. */
|
||||
|
||||
std::vector<dict*> vecdictLazyFree;
|
||||
std::vector<robj*> vecobjLazyFree;
|
||||
std::vector<std::vector<dictEntry*>> vecvecde;
|
||||
|
||||
bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
|
||||
};
|
||||
|
||||
|
153
src/snapshot.cpp
153
src/snapshot.cpp
@ -2,6 +2,29 @@
|
||||
#include "aelocker.h"
|
||||
|
||||
static const size_t c_elementsSmallLimit = 500000;
|
||||
static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time
|
||||
|
||||
class LazyFree : public ICollectable
|
||||
{
|
||||
public:
|
||||
virtual ~LazyFree()
|
||||
{
|
||||
for (auto *de : vecde)
|
||||
{
|
||||
dbDictType.keyDestructor(nullptr, dictGetKey(de));
|
||||
dbDictType.valDestructor(nullptr, dictGetVal(de));
|
||||
zfree(de);
|
||||
}
|
||||
for (robj *o : vecobjLazyFree)
|
||||
decrRefCount(o);
|
||||
for (dict *d : vecdictLazyFree)
|
||||
dictRelease(d);
|
||||
}
|
||||
|
||||
std::vector<dict*> vecdictLazyFree;
|
||||
std::vector<robj*> vecobjLazyFree;
|
||||
std::vector<dictEntry*> vecde;
|
||||
};
|
||||
|
||||
const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional)
|
||||
{
|
||||
@ -189,7 +212,18 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot
|
||||
void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot)
|
||||
{
|
||||
mstime_t latency;
|
||||
aeAcquireLock(); latencyStartMonitor(latency);
|
||||
|
||||
aeAcquireLock();
|
||||
while (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) {
|
||||
dictRehashMilliseconds(m_pdict, 1);
|
||||
dictRehashMilliseconds(m_pdictTombstone, 1);
|
||||
// Give someone else a chance
|
||||
aeReleaseLock();
|
||||
usleep(300);
|
||||
aeAcquireLock();
|
||||
}
|
||||
|
||||
latencyStartMonitor(latency);
|
||||
if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint)
|
||||
{
|
||||
// Free a stale async snapshot so consolidate_children can clean it up later
|
||||
@ -215,11 +249,22 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
|
||||
auto psnapshotT = createSnapshot(LLONG_MAX, false);
|
||||
endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref
|
||||
psnapshot = nullptr;
|
||||
aeReleaseLock(); latencyEndMonitor(latency);
|
||||
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("end-snapshot-async-phase-1", latency);
|
||||
aeReleaseLock();
|
||||
|
||||
// do the expensive work of merging snapshots outside the ref
|
||||
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->freeTombstoneObjects(1); // depth is one because we just creted it
|
||||
if (const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->freeTombstoneObjects(1)) // depth is one because we just creted it
|
||||
{
|
||||
aeAcquireLock();
|
||||
if (m_pdbSnapshotASYNC != nullptr)
|
||||
endSnapshot(m_pdbSnapshotASYNC);
|
||||
m_pdbSnapshotASYNC = nullptr;
|
||||
endSnapshot(psnapshotT);
|
||||
aeReleaseLock();
|
||||
return;
|
||||
}
|
||||
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->consolidate_children(this, true);
|
||||
|
||||
// Final Cleanup
|
||||
@ -228,9 +273,10 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
|
||||
m_pdbSnapshotASYNC = psnapshotT;
|
||||
else
|
||||
endSnapshot(psnapshotT); // finally clean up our temp snapshot
|
||||
aeReleaseLock(); latencyEndMonitor(latency);
|
||||
|
||||
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency);
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
||||
@ -241,45 +287,66 @@ bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
||||
return true;
|
||||
}
|
||||
|
||||
bool fPrevResult = const_cast<redisDbPersistentDataSnapshot*>(m_pdbSnapshot)->freeTombstoneObjects(depth+1);
|
||||
if (!const_cast<redisDbPersistentDataSnapshot*>(m_pdbSnapshot)->freeTombstoneObjects(depth+1))
|
||||
return false;
|
||||
|
||||
{
|
||||
AeLocker ae;
|
||||
ae.arm(nullptr);
|
||||
if (m_pdbSnapshot->m_refCount != depth && (m_pdbSnapshot->m_refCount != (m_refCount+1)))
|
||||
return false;
|
||||
ae.disarm();
|
||||
}
|
||||
|
||||
std::unique_lock<fastlock> lock(s_lock, std::defer_lock);
|
||||
if (!lock.try_lock())
|
||||
return false; // this is a best effort function
|
||||
|
||||
std::unique_ptr<LazyFree> splazy = std::make_unique<LazyFree>();
|
||||
|
||||
dict *dictTombstoneNew = dictCreate(&dbTombstoneDictType, nullptr);
|
||||
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
||||
dictEntry *de;
|
||||
std::vector<dictEntry*> vecdeFree;
|
||||
vecdeFree.reserve(dictSize(m_pdictTombstone));
|
||||
bool fAllCovered = true;
|
||||
unsigned rgcremoved[2] = {0};
|
||||
while ((de = dictNext(di)) != nullptr)
|
||||
{
|
||||
dictEntry **dePrev = nullptr;
|
||||
dictht *ht = nullptr;
|
||||
sds key = (sds)dictGetKey(de);
|
||||
dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared(key));
|
||||
// BUG BUG: Why can't we do a shallow search here?
|
||||
dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, false);
|
||||
|
||||
if (deObj != nullptr)
|
||||
{
|
||||
// Now unlink the DE
|
||||
__atomic_store(dePrev, &deObj->next, __ATOMIC_RELEASE);
|
||||
ht->used--;
|
||||
vecdeFree.push_back(deObj);
|
||||
}
|
||||
else
|
||||
{
|
||||
fAllCovered = fPrevResult;
|
||||
if (ht == &m_pdbSnapshot->m_pdict->ht[0])
|
||||
rgcremoved[0]++;
|
||||
else
|
||||
rgcremoved[1]++;
|
||||
splazy->vecde.push_back(deObj);
|
||||
} else {
|
||||
serverAssert(dictFind(m_pdbSnapshot->m_pdict, key) == nullptr);
|
||||
serverAssert(m_pdbSnapshot->find_cached_threadsafe(key) != nullptr);
|
||||
dictAdd(dictTombstoneNew, sdsdupshared((sds)dictGetKey(de)), dictGetVal(de));
|
||||
}
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
dictForceRehash(dictTombstoneNew);
|
||||
aeAcquireLock();
|
||||
if (fAllCovered)
|
||||
{
|
||||
g_pserver->vecdictLazyFree.push_back(m_pdictTombstone);
|
||||
m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr);
|
||||
}
|
||||
g_pserver->vecvecde.emplace_back(std::move(vecdeFree));
|
||||
dict *dT = m_pdbSnapshot->m_pdict;
|
||||
splazy->vecdictLazyFree.push_back(m_pdictTombstone);
|
||||
__atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE);
|
||||
__atomic_fetch_sub(&dT->ht[0].used, rgcremoved[0], __ATOMIC_RELEASE);
|
||||
__atomic_fetch_sub(&dT->ht[1].used, rgcremoved[1], __ATOMIC_RELEASE);
|
||||
serverLog(LL_WARNING, "tombstones removed: %u, remain: %lu", rgcremoved[0]+rgcremoved[1], dictSize(m_pdictTombstone));
|
||||
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy));
|
||||
aeReleaseLock();
|
||||
|
||||
return fAllCovered;
|
||||
return true;
|
||||
}
|
||||
|
||||
void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot)
|
||||
@ -331,16 +398,19 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
||||
dictIterator *di = dictGetIterator(m_pdictTombstone);
|
||||
dictEntry *de;
|
||||
m_spdbSnapshotHOLDER->m_pdict->iterators++;
|
||||
std::vector<dictEntry*> vecde;
|
||||
auto splazy = std::make_unique<LazyFree>();
|
||||
while ((de = dictNext(di)) != NULL)
|
||||
{
|
||||
dictEntry **dePrev;
|
||||
dictht *ht;
|
||||
dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared((sds)dictGetKey(de)));
|
||||
// BUG BUG Why not a shallow search?
|
||||
dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, false /*!!sdsisshared((sds)dictGetKey(de))*/);
|
||||
if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot)
|
||||
{
|
||||
// The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt)
|
||||
#ifdef CHECKED_BUILD
|
||||
serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr);
|
||||
#endif
|
||||
dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr);
|
||||
continue;
|
||||
}
|
||||
@ -351,14 +421,15 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
||||
}
|
||||
|
||||
// Delete the object from the source dict, we don't use dictDelete to avoid a second search
|
||||
vecde.push_back(deSnapshot);
|
||||
splazy->vecde.push_back(deSnapshot);
|
||||
*dePrev = deSnapshot->next;
|
||||
ht->used--;
|
||||
}
|
||||
g_pserver->vecvecde.emplace_back(std::move(vecde));
|
||||
|
||||
|
||||
m_spdbSnapshotHOLDER->m_pdict->iterators--;
|
||||
dictReleaseIterator(di);
|
||||
g_pserver->vecdictLazyFree.push_back(m_pdictTombstone);
|
||||
splazy->vecdictLazyFree.push_back(m_pdictTombstone);
|
||||
m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr);
|
||||
|
||||
// Stage 2 Move all new keys to the snapshot DB
|
||||
@ -388,8 +459,10 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
||||
|
||||
auto spsnapshotFree = std::move(m_spdbSnapshotHOLDER);
|
||||
m_spdbSnapshotHOLDER = std::move(spsnapshotFree->m_spdbSnapshotHOLDER);
|
||||
if (serverTL != nullptr)
|
||||
if (serverTL != nullptr) {
|
||||
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(spsnapshotFree));
|
||||
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy));
|
||||
}
|
||||
|
||||
// Sanity Checks
|
||||
serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr);
|
||||
@ -426,8 +499,10 @@ dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOn
|
||||
|
||||
dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const
|
||||
{
|
||||
dict *dictTombstone;
|
||||
__atomic_load(&m_pdictTombstone, &dictTombstone, __ATOMIC_ACQUIRE);
|
||||
dictEntry *de = dictFind(m_pdict, key);
|
||||
if (de == nullptr && m_pdbSnapshot != nullptr && dictFind(m_pdictTombstone, key) == nullptr)
|
||||
if (de == nullptr && m_pdbSnapshot != nullptr && dictFind(dictTombstone, key) == nullptr)
|
||||
{
|
||||
auto itr = m_pdbSnapshot->find_cached_threadsafe(key);
|
||||
if (itr != nullptr)
|
||||
@ -493,11 +568,20 @@ unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long itera
|
||||
}
|
||||
|
||||
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly, bool fCacheOnly) const
|
||||
{
|
||||
return iterate_threadsafe_core(fn, fKeyOnly, fCacheOnly, true);
|
||||
}
|
||||
|
||||
bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function<bool(const char*, robj_roptr o)> &fn, bool fKeyOnly, bool fCacheOnly, bool fFirst) const
|
||||
{
|
||||
// Take the size so we can ensure we visited every element exactly once
|
||||
// use volatile to ensure it's not checked too late. This makes it more
|
||||
// likely we'll detect races (but it won't gurantee it)
|
||||
aeAcquireLock();
|
||||
dict *dictTombstone;
|
||||
__atomic_load(&m_pdictTombstone, &dictTombstone, __ATOMIC_ACQUIRE);
|
||||
volatile ssize_t celem = (ssize_t)size();
|
||||
aeReleaseLock();
|
||||
|
||||
dictEntry *de = nullptr;
|
||||
bool fResult = true;
|
||||
@ -543,19 +627,22 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
|
||||
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
|
||||
if (fResult && psnapshot != nullptr)
|
||||
{
|
||||
fResult = psnapshot->iterate_threadsafe([this, &fn, &celem](const char *key, robj_roptr o) {
|
||||
dictEntry *deTombstone = dictFind(m_pdictTombstone, key);
|
||||
std::function<bool(const char*, robj_roptr o)> fnNew = [this, &fn, &celem, dictTombstone](const char *key, robj_roptr o) {
|
||||
dictEntry *deTombstone = dictFind(dictTombstone, key);
|
||||
if (deTombstone != nullptr)
|
||||
return true;
|
||||
|
||||
// Alright it's a key in the use keyspace, lets ensure it and then pass it off
|
||||
--celem;
|
||||
return fn(key, o);
|
||||
}, fKeyOnly, fCacheOnly);
|
||||
};
|
||||
fResult = psnapshot->iterate_threadsafe_core(fnNew, fKeyOnly, fCacheOnly, false);
|
||||
}
|
||||
|
||||
// we should have hit all keys or had a good reason not to
|
||||
serverAssert(!fResult || celem == 0 || (m_spstorage && fCacheOnly));
|
||||
if (!(!fResult || celem == 0 || (m_spstorage && fCacheOnly)))
|
||||
serverLog(LL_WARNING, "celem: %ld", celem);
|
||||
serverAssert(!fResult || celem == 0 || (m_spstorage && fCacheOnly) || !fFirst);
|
||||
return fResult;
|
||||
}
|
||||
|
||||
@ -588,8 +675,6 @@ void redisDbPersistentData::consolidate_snapshot()
|
||||
// only call this on the "real" database to consolidate the first child
|
||||
void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce)
|
||||
{
|
||||
static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time
|
||||
|
||||
std::unique_lock<fastlock> lock(s_lock, std::defer_lock);
|
||||
if (!lock.try_lock())
|
||||
return; // this is a best effort function
|
||||
|
Loading…
x
Reference in New Issue
Block a user