Remove snapshot consolidation because its not properly thread safe
Former-commit-id: dcb86d2b92e3ecdb9f914b6f5de9e184cdd23036
This commit is contained in:
parent
15e1ee620f
commit
61d548079c
@ -2424,18 +2424,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
run_with_period(100) {
|
|
||||||
bool fAnySnapshots = false;
|
|
||||||
for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb)
|
|
||||||
fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot();
|
|
||||||
if (fAnySnapshots)
|
|
||||||
{
|
|
||||||
g_pserver->asyncworkqueue->AddWorkFunction([]{
|
|
||||||
g_pserver->db[0]->consolidate_snapshot();
|
|
||||||
}, true /*HiPri*/);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Fire the cron loop modules event. */
|
/* Fire the cron loop modules event. */
|
||||||
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,g_pserver->hz};
|
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,g_pserver->hz};
|
||||||
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
|
moduleFireServerEvent(REDISMODULE_EVENT_CRON_LOOP,
|
||||||
|
@ -1130,8 +1130,6 @@ public:
|
|||||||
void endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot);
|
void endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot);
|
||||||
void restoreSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
|
void restoreSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
|
||||||
|
|
||||||
void consolidate_snapshot();
|
|
||||||
|
|
||||||
bool FStorageProvider() { return m_spstorage != nullptr; }
|
bool FStorageProvider() { return m_spstorage != nullptr; }
|
||||||
bool removeCachedValue(const char *key);
|
bool removeCachedValue(const char *key);
|
||||||
void removeAllCachedValues();
|
void removeAllCachedValues();
|
||||||
@ -1187,7 +1185,6 @@ private:
|
|||||||
|
|
||||||
protected:
|
protected:
|
||||||
static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot);
|
static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot);
|
||||||
void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce);
|
|
||||||
bool freeTombstoneObjects(int depth);
|
bool freeTombstoneObjects(int depth);
|
||||||
|
|
||||||
public:
|
public:
|
||||||
@ -1289,7 +1286,6 @@ struct redisDb : public redisDbPersistentDataSnapshot
|
|||||||
using redisDbPersistentData::createSnapshot;
|
using redisDbPersistentData::createSnapshot;
|
||||||
using redisDbPersistentData::endSnapshot;
|
using redisDbPersistentData::endSnapshot;
|
||||||
using redisDbPersistentData::restoreSnapshot;
|
using redisDbPersistentData::restoreSnapshot;
|
||||||
using redisDbPersistentData::consolidate_snapshot;
|
|
||||||
using redisDbPersistentData::removeAllCachedValues;
|
using redisDbPersistentData::removeAllCachedValues;
|
||||||
using redisDbPersistentData::dictUnsafeKeyOnly;
|
using redisDbPersistentData::dictUnsafeKeyOnly;
|
||||||
using redisDbPersistentData::resortExpire;
|
using redisDbPersistentData::resortExpire;
|
||||||
|
103
src/snapshot.cpp
103
src/snapshot.cpp
@ -247,7 +247,6 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
|
|||||||
aeReleaseLock();
|
aeReleaseLock();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->consolidate_children(this, true);
|
|
||||||
|
|
||||||
// Final Cleanup
|
// Final Cleanup
|
||||||
aeAcquireLock(); latencyStartMonitor(latency);
|
aeAcquireLock(); latencyStartMonitor(latency);
|
||||||
@ -636,108 +635,6 @@ int redisDbPersistentDataSnapshot::snapshot_depth() const
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void redisDbPersistentData::consolidate_snapshot()
|
|
||||||
{
|
|
||||||
aeAcquireLock();
|
|
||||||
auto psnapshot = (m_pdbSnapshot != nullptr) ? m_spdbSnapshotHOLDER.get() : nullptr;
|
|
||||||
if (psnapshot == nullptr || psnapshot->snapshot_depth() == 0)
|
|
||||||
{
|
|
||||||
aeReleaseLock();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
psnapshot->m_refCount++; // ensure it's not free'd
|
|
||||||
aeReleaseLock();
|
|
||||||
psnapshot->consolidate_children(this, false /* fForce */);
|
|
||||||
aeAcquireLock();
|
|
||||||
endSnapshot(psnapshot);
|
|
||||||
aeReleaseLock();
|
|
||||||
}
|
|
||||||
|
|
||||||
// only call this on the "real" database to consolidate the first child
|
|
||||||
void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce)
|
|
||||||
{
|
|
||||||
std::unique_lock<fastlock> lock(s_lock, std::defer_lock);
|
|
||||||
if (!lock.try_lock())
|
|
||||||
return; // this is a best effort function
|
|
||||||
|
|
||||||
if (!fForce && snapshot_depth() < 2)
|
|
||||||
return;
|
|
||||||
|
|
||||||
auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot());
|
|
||||||
spdb->initialize();
|
|
||||||
dictExpand(spdb->m_pdict, m_pdbSnapshot->size());
|
|
||||||
|
|
||||||
volatile size_t skipped = 0;
|
|
||||||
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o) {
|
|
||||||
if (o != nullptr || !m_spstorage) {
|
|
||||||
dictAdd(spdb->m_pdict, sdsdupshared(key), o.unsafe_robjcast());
|
|
||||||
if (o != nullptr) {
|
|
||||||
incrRefCount(o);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
++skipped;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}, true /*fKeyOnly*/, true /*fCacheOnly*/);
|
|
||||||
spdb->m_spstorage = m_pdbSnapshot->m_spstorage;
|
|
||||||
{
|
|
||||||
std::unique_lock<fastlock> ul(g_expireLock);
|
|
||||||
delete spdb->m_setexpire;
|
|
||||||
spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_pdbSnapshot->m_setexpire);
|
|
||||||
}
|
|
||||||
|
|
||||||
spdb->m_pdict->iterators++;
|
|
||||||
|
|
||||||
if (m_spstorage) {
|
|
||||||
serverAssert(spdb->size() == m_pdbSnapshot->size());
|
|
||||||
} else {
|
|
||||||
serverAssert((spdb->size()+skipped) == m_pdbSnapshot->size());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Now wire us in (Acquire the LOCK)
|
|
||||||
AeLocker locker;
|
|
||||||
locker.arm(nullptr);
|
|
||||||
|
|
||||||
int depth = 0;
|
|
||||||
redisDbPersistentDataSnapshot *psnapshotT = pdbPrimary->m_spdbSnapshotHOLDER.get();
|
|
||||||
while (psnapshotT != nullptr)
|
|
||||||
{
|
|
||||||
++depth;
|
|
||||||
if (psnapshotT == this)
|
|
||||||
break;
|
|
||||||
psnapshotT = psnapshotT->m_spdbSnapshotHOLDER.get();
|
|
||||||
}
|
|
||||||
if (psnapshotT != this)
|
|
||||||
{
|
|
||||||
locker.disarm(); // don't run spdb's dtor in the lock
|
|
||||||
return; // we were unlinked and this was a waste of time
|
|
||||||
}
|
|
||||||
|
|
||||||
serverLog(LL_VERBOSE, "cleaned %d snapshots", snapshot_depth()-1);
|
|
||||||
spdb->m_refCount = depth;
|
|
||||||
// Drop our refs from this snapshot and its children
|
|
||||||
psnapshotT = this;
|
|
||||||
std::vector<redisDbPersistentDataSnapshot*> vecT;
|
|
||||||
while ((psnapshotT = psnapshotT->m_spdbSnapshotHOLDER.get()) != nullptr)
|
|
||||||
{
|
|
||||||
vecT.push_back(psnapshotT);
|
|
||||||
}
|
|
||||||
for (auto itr = vecT.rbegin(); itr != vecT.rend(); ++itr)
|
|
||||||
{
|
|
||||||
psnapshotT = *itr;
|
|
||||||
psnapshotT->m_refCount -= (depth-1); // -1 because dispose will sub another
|
|
||||||
gcDisposeSnapshot(psnapshotT);
|
|
||||||
}
|
|
||||||
std::atomic_thread_fence(std::memory_order_seq_cst);
|
|
||||||
m_spdbSnapshotHOLDER.release(); // GC has responsibility for it now
|
|
||||||
m_spdbSnapshotHOLDER = std::move(spdb);
|
|
||||||
const redisDbPersistentDataSnapshot *ptrT = m_spdbSnapshotHOLDER.get();
|
|
||||||
__atomic_store(&m_pdbSnapshot, &ptrT, __ATOMIC_SEQ_CST);
|
|
||||||
locker.disarm(); // ensure we're not locked for any dtors
|
|
||||||
}
|
|
||||||
|
|
||||||
bool redisDbPersistentDataSnapshot::FStale() const
|
bool redisDbPersistentDataSnapshot::FStale() const
|
||||||
{
|
{
|
||||||
// 0.5 seconds considered stale;
|
// 0.5 seconds considered stale;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user