endSnapshotAsync implementation (fails tests w/ multithreading)
Former-commit-id: b401e9fcae40cf1d4e4c1584f57e760d9adf36a9
This commit is contained in:
parent
a223fb7bed
commit
d8dcb94e37
@ -1464,10 +1464,8 @@ void *rdbSaveThread(void *vargs)
|
||||
sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB");
|
||||
|
||||
// If we were told to cancel the requesting thread holds the lock for us
|
||||
aeAcquireLock();
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||
g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
|
||||
aeReleaseLock();
|
||||
g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]);
|
||||
zfree(args);
|
||||
g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
|
||||
|
||||
@ -2674,10 +2672,8 @@ void *rdbSaveToSlavesSocketsThread(void *vargs)
|
||||
close(g_pserver->rdb_pipe_write); /* wake up the reader, tell it we're done. */
|
||||
|
||||
// If we were told to cancel the requesting thread is holding the lock for us
|
||||
aeAcquireLock();
|
||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||
g_pserver->db[idb]->endSnapshot(args->rgpdb[idb]);
|
||||
aeReleaseLock();
|
||||
g_pserver->db[idb]->endSnapshotAsync(args->rgpdb[idb]);
|
||||
zfree(args);
|
||||
|
||||
return (retval == C_OK) ? (void*)0 : (void*)1;
|
||||
|
@ -1224,6 +1224,7 @@ class redisDbPersistentData
|
||||
{
|
||||
friend void dictDbKeyDestructor(void *privdata, void *key);
|
||||
friend class redisDbPersistentDataSnapshot;
|
||||
|
||||
public:
|
||||
~redisDbPersistentData();
|
||||
|
||||
@ -1311,6 +1312,7 @@ public:
|
||||
|
||||
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional);
|
||||
void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
|
||||
void endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot);
|
||||
void restoreSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
|
||||
|
||||
void consolidate_snapshot();
|
||||
@ -1354,6 +1356,7 @@ private:
|
||||
// in a snapshot
|
||||
const redisDbPersistentDataSnapshot *m_pdbSnapshot = nullptr;
|
||||
std::unique_ptr<redisDbPersistentDataSnapshot> m_spdbSnapshotHOLDER;
|
||||
const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr;
|
||||
int m_refCount = 0;
|
||||
fastlock m_lockStorage { "storage" };
|
||||
};
|
||||
@ -1365,7 +1368,7 @@ protected:
|
||||
bool m_fConsolidated = false;
|
||||
static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot);
|
||||
int snapshot_depth() const;
|
||||
void consolidate_children(redisDbPersistentData *pdbPrimary);
|
||||
void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce);
|
||||
|
||||
public:
|
||||
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||
@ -1373,6 +1376,7 @@ public:
|
||||
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false) const;
|
||||
using redisDbPersistentData::createSnapshot;
|
||||
using redisDbPersistentData::endSnapshot;
|
||||
using redisDbPersistentData::endSnapshotAsync;
|
||||
using redisDbPersistentData::end;
|
||||
|
||||
dict_iter random_cache_threadsafe() const;
|
||||
@ -2443,6 +2447,7 @@ extern dictType zsetDictType;
|
||||
extern dictType clusterNodesDictType;
|
||||
extern dictType clusterNodesBlackListDictType;
|
||||
extern dictType dbDictType;
|
||||
extern dictType dbDictTypeTombstone;
|
||||
extern dictType dbSnapshotDictType;
|
||||
extern dictType shaScriptObjectDictType;
|
||||
extern double R_Zero, R_PosInf, R_NegInf, R_Nan;
|
||||
|
@ -54,7 +54,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
||||
}
|
||||
|
||||
m_pdict = dictCreate(&dbDictType,this);
|
||||
m_pdictTombstone = dictCreate(&dbDictType, this);
|
||||
m_pdictTombstone = dictCreate(&dbDictTypeTombstone, this);
|
||||
|
||||
serverAssert(spdb->m_pdict->iterators == 1);
|
||||
|
||||
@ -69,6 +69,13 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
||||
pdbSnapshotNext = pdbSnapshotNext->m_spdbSnapshotHOLDER.get();
|
||||
}
|
||||
|
||||
if (m_pdbSnapshotASYNC != nullptr)
|
||||
{
|
||||
// free the async snapshot, it's done its job
|
||||
endSnapshot(m_pdbSnapshotASYNC); // should be just a dec ref (FAST)
|
||||
m_pdbSnapshotASYNC = nullptr;
|
||||
}
|
||||
|
||||
return m_pdbSnapshot;
|
||||
}
|
||||
|
||||
@ -127,10 +134,54 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot
|
||||
serverAssert(size() == expectedSize);
|
||||
}
|
||||
|
||||
// This function is all about minimizing the amount of work done under global lock
|
||||
// when there has been lots of changes since snapshot creation a naive endSnapshot()
|
||||
// will block for a very long time and will cause latency spikes.
|
||||
//
|
||||
// Note that this function uses a lot more CPU time than a simple endSnapshot(), we
|
||||
// have some internal heuristics to do a synchronous endSnapshot if it makes sense
|
||||
void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot)
|
||||
{
|
||||
aeAcquireLock();
|
||||
if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint)
|
||||
{
|
||||
// Free a stale async snapshot so consolidate_children can clean it up later
|
||||
endSnapshot(m_pdbSnapshotASYNC); // FAST: just a ref decrement
|
||||
m_pdbSnapshotASYNC = nullptr;
|
||||
}
|
||||
|
||||
size_t elements = dictSize(m_pdictTombstone);
|
||||
// if neither dict is rehashing then the merge is O(1) so don't count the size
|
||||
if (dictIsRehashing(psnapshot->m_pdict) || dictIsRehashing(m_pdict))
|
||||
elements += dictSize(m_pdict);
|
||||
if (elements < 1000000 || psnapshot != m_spdbSnapshotHOLDER.get()) // heuristic
|
||||
{
|
||||
// For small snapshots it makes more sense just to merge it directly
|
||||
endSnapshot(psnapshot);
|
||||
aeReleaseLock();
|
||||
return;
|
||||
}
|
||||
|
||||
// OK this is a big snapshot so lets do the merge work outside the lock
|
||||
auto psnapshotT = createSnapshot(LLONG_MAX, false);
|
||||
endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref
|
||||
aeReleaseLock();
|
||||
|
||||
// do the expensive work of merging snapshots outside the ref
|
||||
reinterpret_cast<redisDbPersistentDataSnapshot*>(this)->consolidate_children(this, true /* fForce */);
|
||||
|
||||
// Final Cleanup
|
||||
aeAcquireLock();
|
||||
if (m_pdbSnapshotASYNC == nullptr)
|
||||
m_pdbSnapshotASYNC = psnapshotT;
|
||||
else
|
||||
endSnapshot(psnapshotT); // finally clena up our temp snapshot
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot)
|
||||
{
|
||||
// Note: This function is dependent on GlobalLocksAcquried(), but rdb background saving has a weird case where
|
||||
// a seperate thread holds the lock for it. Yes that's pretty crazy and should be fixed somehow...
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
if (m_spdbSnapshotHOLDER.get() != psnapshot)
|
||||
{
|
||||
@ -215,7 +266,12 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
||||
m_spdbSnapshotHOLDER->m_pdict->iterators--;
|
||||
}
|
||||
|
||||
m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER->m_spdbSnapshotHOLDER);
|
||||
auto spsnapshotFree = std::move(m_spdbSnapshotHOLDER);
|
||||
m_spdbSnapshotHOLDER = std::move(spsnapshotFree->m_spdbSnapshotHOLDER);
|
||||
if (serverTL != nullptr)
|
||||
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(spsnapshotFree));
|
||||
|
||||
// Sanity Checks
|
||||
serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr);
|
||||
serverAssert(m_pdbSnapshot == m_spdbSnapshotHOLDER.get() || m_pdbSnapshot == nullptr);
|
||||
serverAssert((m_refCount == 0 && m_pdict->iterators == 0) || (m_refCount != 0 && m_pdict->iterators == 1));
|
||||
@ -340,14 +396,14 @@ void redisDbPersistentData::consolidate_snapshot()
|
||||
}
|
||||
psnapshot->m_refCount++; // ensure it's not free'd
|
||||
aeReleaseLock();
|
||||
psnapshot->consolidate_children(this);
|
||||
psnapshot->consolidate_children(this, false /* fForce */);
|
||||
aeAcquireLock();
|
||||
endSnapshot(psnapshot);
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
// only call this on the "real" database to consolidate the first child
|
||||
void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary)
|
||||
void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce)
|
||||
{
|
||||
static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time
|
||||
|
||||
@ -355,14 +411,15 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
|
||||
if (!lock.try_lock())
|
||||
return; // this is a best effort function
|
||||
|
||||
if (snapshot_depth() < 4)
|
||||
if (!fForce && snapshot_depth() < 4)
|
||||
return;
|
||||
|
||||
serverAssert(snapshot_depth() > 1);
|
||||
auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot());
|
||||
spdb->initialize();
|
||||
dictExpand(spdb->m_pdict, m_pdbSnapshot->size());
|
||||
|
||||
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){
|
||||
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o) {
|
||||
if (o != nullptr) {
|
||||
dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast());
|
||||
incrRefCount(o);
|
||||
|
Loading…
x
Reference in New Issue
Block a user