Nested snapshot garbage collection. Works but huge fragmentation
Former-commit-id: 82b2a3f3dbf8f864d9157655b5422c69845c4019
This commit is contained in:
parent
65a3485dfb
commit
5fd848eedc
@ -13,8 +13,7 @@ AsyncWorkQueue::AsyncWorkQueue(int nthreads)
|
||||
|
||||
void AsyncWorkQueue::WorkerThreadMain()
|
||||
{
|
||||
static redisServerThreadVars vars;
|
||||
memset(&vars, 0, sizeof(redisServerThreadVars));
|
||||
redisServerThreadVars vars;
|
||||
serverTL = &vars;
|
||||
|
||||
vars.clients_pending_asyncwrite = listCreate();
|
||||
@ -27,20 +26,26 @@ void AsyncWorkQueue::WorkerThreadMain()
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
m_cvWakeup.wait(lock);
|
||||
|
||||
while (!m_workqueue.empty())
|
||||
{
|
||||
WorkItem task = std::move(m_workqueue.front());
|
||||
m_workqueue.pop();
|
||||
m_workqueue.pop_front();
|
||||
|
||||
lock.unlock();
|
||||
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
||||
task.fnAsync();
|
||||
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
|
||||
lock.lock();
|
||||
}
|
||||
|
||||
lock.unlock();
|
||||
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
||||
aeAcquireLock();
|
||||
ProcessPendingAsyncWrites();
|
||||
aeReleaseLock();
|
||||
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
|
||||
serverTL->gcEpoch = 0;
|
||||
}
|
||||
|
||||
listRelease(vars.clients_pending_asyncwrite);
|
||||
@ -91,9 +96,12 @@ AsyncWorkQueue::~AsyncWorkQueue()
|
||||
abandonThreads();
|
||||
}
|
||||
|
||||
void AsyncWorkQueue::AddWorkFunction(std::function<void()> &&fnAsync)
|
||||
void AsyncWorkQueue::AddWorkFunction(std::function<void()> &&fnAsync, bool fHiPri)
|
||||
{
|
||||
std::unique_lock<std::mutex> lock(m_mutex);
|
||||
m_workqueue.emplace(std::move(fnAsync));
|
||||
if (fHiPri)
|
||||
m_workqueue.emplace_front(std::move(fnAsync));
|
||||
else
|
||||
m_workqueue.emplace_back(std::move(fnAsync));
|
||||
m_cvWakeup.notify_one();
|
||||
}
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
#include "fastlock.h"
|
||||
#include <vector>
|
||||
#include <queue>
|
||||
#include <deque>
|
||||
#include <mutex>
|
||||
#include <thread>
|
||||
#include <condition_variable>
|
||||
@ -21,7 +21,7 @@ class AsyncWorkQueue
|
||||
};
|
||||
std::vector<std::thread> m_vecthreads;
|
||||
std::vector<struct redisServerThreadVars*> m_vecpthreadVars;
|
||||
std::queue<WorkItem> m_workqueue;
|
||||
std::deque<WorkItem> m_workqueue;
|
||||
std::mutex m_mutex;
|
||||
std::condition_variable m_cvWakeup;
|
||||
std::atomic<bool> m_fQuitting { false };
|
||||
@ -31,7 +31,7 @@ public:
|
||||
AsyncWorkQueue(int nthreads);
|
||||
~AsyncWorkQueue();
|
||||
|
||||
void AddWorkFunction(std::function<void()> &&fnAsync);
|
||||
void AddWorkFunction(std::function<void()> &&fnAsync, bool fHiPri = false);
|
||||
bool removeClientAsyncWrites(struct client *c);
|
||||
|
||||
void abandonThreads();
|
||||
|
53
src/db.cpp
53
src/db.cpp
@ -64,27 +64,31 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew)
|
||||
}
|
||||
|
||||
|
||||
static void lookupKeyUpdateObj(robj *val, int flags)
|
||||
{
|
||||
/* Update the access time for the ageing algorithm.
|
||||
* Don't do it if we have a saving child, as this will trigger
|
||||
* a copy on write madness. */
|
||||
if (!g_pserver->FRdbSaveInProgress() &&
|
||||
g_pserver->aof_child_pid == -1 &&
|
||||
!(flags & LOOKUP_NOTOUCH))
|
||||
{
|
||||
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||
updateLFU(val);
|
||||
} else {
|
||||
val->lru = LRU_CLOCK();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Low level key lookup API, not actually called directly from commands
|
||||
* implementations that should instead rely on lookupKeyRead(),
|
||||
* lookupKeyWrite() and lookupKeyReadWithFlags(). */
|
||||
static robj *lookupKey(redisDb *db, robj *key, int flags) {
|
||||
static robj* lookupKey(redisDb *db, robj *key, int flags) {
|
||||
auto itr = db->find(key);
|
||||
if (itr) {
|
||||
robj *val = itr.val();
|
||||
/* Update the access time for the ageing algorithm.
|
||||
* Don't do it if we have a saving child, as this will trigger
|
||||
* a copy on write madness. */
|
||||
if (!g_pserver->FRdbSaveInProgress() &&
|
||||
g_pserver->aof_child_pid == -1 &&
|
||||
!(flags & LOOKUP_NOTOUCH))
|
||||
{
|
||||
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||
updateLFU(val);
|
||||
} else {
|
||||
val->lru = LRU_CLOCK();
|
||||
}
|
||||
}
|
||||
|
||||
lookupKeyUpdateObj(val, flags);
|
||||
if (flags & LOOKUP_UPDATEMVCC) {
|
||||
val->mvcc_tstamp = getMvccTstamp();
|
||||
db->trackkey(key);
|
||||
@ -94,6 +98,15 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
static robj_roptr lookupKeyConst(redisDb *db, robj *key, int flags) {
|
||||
serverAssert((flags & LOOKUP_UPDATEMVCC) == 0);
|
||||
robj_roptr val = db->find_threadsafe(szFromObj(key));
|
||||
if (val != nullptr) {
|
||||
lookupKeyUpdateObj(val.unsafe_robjcast(), flags);
|
||||
return val;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
/* Lookup a key for read operations, or return NULL if the key is not found
|
||||
* in the specified DB.
|
||||
@ -118,7 +131,7 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) {
|
||||
* correctly report a key is expired on slaves even if the master is lagging
|
||||
* expiring our key via DELs in the replication link. */
|
||||
robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
|
||||
robj *val;
|
||||
robj_roptr val;
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
|
||||
if (expireIfNeeded(db,key) == 1) {
|
||||
@ -153,8 +166,8 @@ robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
val = lookupKey(db,key,flags);
|
||||
if (val == NULL) {
|
||||
val = lookupKeyConst(db,key,flags);
|
||||
if (val == nullptr) {
|
||||
g_pserver->stat_keyspace_misses++;
|
||||
notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id);
|
||||
}
|
||||
@ -2122,9 +2135,9 @@ void redisDbPersistentData::processChanges()
|
||||
redisDbPersistentData::~redisDbPersistentData()
|
||||
{
|
||||
serverAssert(m_spdbSnapshotHOLDER == nullptr);
|
||||
serverAssert(m_pdbSnapshot == nullptr);
|
||||
//serverAssert(m_pdbSnapshot == nullptr);
|
||||
serverAssert(m_refCount == 0);
|
||||
serverAssert(m_pdict->iterators == 0);
|
||||
//serverAssert(m_pdict->iterators == 0);
|
||||
serverAssert(m_pdictTombstone == nullptr || m_pdictTombstone->iterators == 0);
|
||||
dictRelease(m_pdict);
|
||||
if (m_pdictTombstone)
|
||||
|
101
src/gc.h
Normal file
101
src/gc.h
Normal file
@ -0,0 +1,101 @@
|
||||
#pragma once
|
||||
#include <vector>
|
||||
#include <assert.h>
|
||||
|
||||
template<typename T>
|
||||
class GarbageCollector
|
||||
{
|
||||
struct EpochHolder
|
||||
{
|
||||
uint64_t tstamp;
|
||||
std::vector<std::unique_ptr<T>> m_vecObjs;
|
||||
|
||||
bool operator<(uint64_t tstamp) const
|
||||
{
|
||||
return this->tstamp < tstamp;
|
||||
}
|
||||
|
||||
bool operator==(uint64_t tstamp) const
|
||||
{
|
||||
return this->tstamp == tstamp;
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
uint64_t startEpoch()
|
||||
{
|
||||
std::unique_lock<fastlock> lock(m_lock);
|
||||
++m_epochNext;
|
||||
m_setepochOutstanding.insert(m_epochNext);
|
||||
return m_epochNext;
|
||||
}
|
||||
|
||||
void endEpoch(uint64_t epoch, bool fNoFree = false)
|
||||
{
|
||||
std::unique_lock<fastlock> lock(m_lock);
|
||||
assert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end());
|
||||
bool fMinElement = *std::min_element(m_setepochOutstanding.begin(), m_setepochOutstanding.end());
|
||||
m_setepochOutstanding.erase(epoch);
|
||||
if (fNoFree)
|
||||
return;
|
||||
std::vector<EpochHolder> vecclean;
|
||||
|
||||
// No outstanding epochs?
|
||||
if (m_setepochOutstanding.empty())
|
||||
{
|
||||
vecclean = std::move(m_vecepochs); // Everything goes!
|
||||
}
|
||||
else
|
||||
{
|
||||
uint64_t minepoch = *std::min_element(m_setepochOutstanding.begin(), m_setepochOutstanding.end());
|
||||
if (minepoch == 0)
|
||||
return; // No available epochs to free
|
||||
|
||||
// Clean any epochs available (after the lock)
|
||||
for (size_t iepoch = 0; iepoch < m_vecepochs.size(); ++iepoch)
|
||||
{
|
||||
auto &e = m_vecepochs[iepoch];
|
||||
if (e < minepoch)
|
||||
{
|
||||
vecclean.emplace_back(std::move(e));
|
||||
m_vecepochs.erase(m_vecepochs.begin() + iepoch);
|
||||
--iepoch;
|
||||
}
|
||||
}
|
||||
|
||||
if (!(vecclean.empty() || fMinElement))
|
||||
printf("############################## ERROR CASE ############################\n");
|
||||
}
|
||||
|
||||
lock.unlock(); // don't hold it for the potentially long delete of vecclean
|
||||
if (vecclean.size())
|
||||
printf("!!!!!!!!!!!!!!!! Deleted %lu snapshots !!!!!!!!!!!!!!!!!!!!!\n", vecclean.size());
|
||||
}
|
||||
|
||||
void enqueue(uint64_t epoch, std::unique_ptr<T> &&sp)
|
||||
{
|
||||
std::unique_lock<fastlock> lock(m_lock);
|
||||
assert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end());
|
||||
assert(sp->FWillFreeChildDebug() == false);
|
||||
|
||||
auto itr = std::find(m_vecepochs.begin(), m_vecepochs.end(), m_epochNext+1);
|
||||
if (itr == m_vecepochs.end())
|
||||
{
|
||||
EpochHolder e;
|
||||
e.tstamp = m_epochNext+1;
|
||||
e.m_vecObjs.push_back(std::move(sp));
|
||||
m_vecepochs.emplace_back(std::move(e));
|
||||
}
|
||||
else
|
||||
{
|
||||
itr->m_vecObjs.push_back(std::move(sp));
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
fastlock m_lock { "Garbage Collector"};
|
||||
|
||||
std::vector<EpochHolder> m_vecepochs;
|
||||
std::set<uint64_t> m_setepochOutstanding;
|
||||
uint64_t m_epochNext = 0;
|
||||
};
|
@ -1367,7 +1367,11 @@ void *rdbSaveThread(void *vargs)
|
||||
{
|
||||
rdbSaveThreadArgs *args = reinterpret_cast<rdbSaveThreadArgs*>(vargs);
|
||||
serverAssert(serverTL == nullptr);
|
||||
int retval = rdbSave(args->rgpdb, &args->rsi);
|
||||
redisServerThreadVars vars;
|
||||
serverTL = &vars;
|
||||
vars.gcEpoch = g_pserver->garbageCollector.startEpoch();
|
||||
|
||||
int retval = rdbSave(args->rgpdb, &args->rsi);
|
||||
if (retval == C_OK) {
|
||||
size_t private_dirty = zmalloc_get_private_dirty(-1);
|
||||
|
||||
@ -1389,6 +1393,7 @@ void *rdbSaveThread(void *vargs)
|
||||
if (!g_pserver->rdbThreadVars.fRdbThreadCancel)
|
||||
aeReleaseLock();
|
||||
zfree(args);
|
||||
g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
|
||||
return (retval == C_OK) ? (void*)0 : (void*)1;
|
||||
}
|
||||
|
||||
|
@ -82,7 +82,7 @@ struct redisServer server; /* Server global state */
|
||||
}
|
||||
redisServer *g_pserver = &GlobalHidden::server;
|
||||
struct redisServerConst cserver;
|
||||
__thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars
|
||||
thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars
|
||||
volatile unsigned long lru_clock; /* Server global current LRU time. */
|
||||
|
||||
/* Our command table.
|
||||
@ -2089,6 +2089,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
g_pserver->rdb_bgsave_scheduled = 0;
|
||||
}
|
||||
|
||||
g_pserver->asyncworkqueue->AddWorkFunction([]{
|
||||
g_pserver->db[0].consolidate_snapshot();
|
||||
}, true /*HiPri*/);
|
||||
|
||||
g_pserver->cronloops++;
|
||||
return 1000/g_pserver->hz;
|
||||
}
|
||||
@ -2173,6 +2177,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
|
||||
/* Handle writes with pending output buffers. */
|
||||
aeReleaseLock();
|
||||
handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN);
|
||||
if (serverTL->gcEpoch != 0)
|
||||
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
|
||||
serverTL->gcEpoch = 0;
|
||||
aeAcquireLock();
|
||||
|
||||
/* Close clients that need to be closed asynchronous */
|
||||
@ -2207,6 +2214,10 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
|
||||
freeClientsInAsyncFreeQueue(iel);
|
||||
aeReleaseLock();
|
||||
|
||||
if (serverTL->gcEpoch != 0)
|
||||
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
|
||||
serverTL->gcEpoch = 0;
|
||||
|
||||
/* Before we are going to sleep, let the threads access the dataset by
|
||||
* releasing the GIL. Redis main thread will not touch anything at this
|
||||
* time. */
|
||||
@ -2219,6 +2230,9 @@ void beforeSleepLite(struct aeEventLoop *eventLoop)
|
||||
void afterSleep(struct aeEventLoop *eventLoop) {
|
||||
UNUSED(eventLoop);
|
||||
if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/);
|
||||
|
||||
serverAssert(serverTL->gcEpoch == 0);
|
||||
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
|
||||
}
|
||||
|
||||
/* =========================== Server initialization ======================== */
|
||||
|
17
src/server.h
17
src/server.h
@ -94,6 +94,7 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#include "crc64.h"
|
||||
#include "IStorage.h"
|
||||
#include "AsyncWorkQueue.h"
|
||||
#include "gc.h"
|
||||
|
||||
extern int g_fTestMode;
|
||||
|
||||
@ -1278,6 +1279,8 @@ public:
|
||||
const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional);
|
||||
void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot);
|
||||
|
||||
void consolidate_snapshot();
|
||||
|
||||
private:
|
||||
void ensure(const char *key);
|
||||
void ensure(const char *key, dictEntry **de);
|
||||
@ -1308,7 +1311,15 @@ private:
|
||||
class redisDbPersistentDataSnapshot : protected redisDbPersistentData
|
||||
{
|
||||
friend class redisDbPersistentData;
|
||||
protected:
|
||||
bool m_fConsolidated = false;
|
||||
static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot);
|
||||
int snapshot_depth() const;
|
||||
void consolidate_children(redisDbPersistentData *pdbPrimary);
|
||||
|
||||
public:
|
||||
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
|
||||
|
||||
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn) const;
|
||||
using redisDbPersistentData::createSnapshot;
|
||||
using redisDbPersistentData::endSnapshot;
|
||||
@ -1390,6 +1401,7 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot
|
||||
using redisDbPersistentData::setexpire;
|
||||
using redisDbPersistentData::createSnapshot;
|
||||
using redisDbPersistentData::endSnapshot;
|
||||
using redisDbPersistentData::consolidate_snapshot;
|
||||
|
||||
public:
|
||||
expireset::setiter expireitr;
|
||||
@ -1790,6 +1802,7 @@ struct redisServerThreadVars {
|
||||
struct fastlock lockPendingWrite { "thread pending write" };
|
||||
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
|
||||
long unsigned commandsExecuted = 0;
|
||||
uint64_t gcEpoch = 0;
|
||||
};
|
||||
|
||||
struct redisMaster {
|
||||
@ -2187,6 +2200,8 @@ struct redisServer {
|
||||
/* System hardware info */
|
||||
size_t system_memory_size; /* Total memory in system as reported by OS */
|
||||
|
||||
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollector;
|
||||
|
||||
bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; }
|
||||
};
|
||||
|
||||
@ -2284,7 +2299,7 @@ typedef struct {
|
||||
//extern struct redisServer server;
|
||||
extern redisServer *g_pserver;
|
||||
extern struct redisServerConst cserver;
|
||||
extern __thread struct redisServerThreadVars *serverTL; // thread local server vars
|
||||
extern thread_local struct redisServerThreadVars *serverTL; // thread local server vars
|
||||
extern struct sharedObjectsStruct shared;
|
||||
extern dictType objectKeyPointerValueDictType;
|
||||
extern dictType objectKeyHeapPointerValueDictType;
|
||||
|
148
src/snapshot.cpp
148
src/snapshot.cpp
@ -1,4 +1,5 @@
|
||||
#include "server.h"
|
||||
#include "aelocker.h"
|
||||
|
||||
const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional)
|
||||
{
|
||||
@ -13,15 +14,20 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
||||
++levels;
|
||||
psnapshot = psnapshot->m_spdbSnapshotHOLDER.get();
|
||||
}
|
||||
if (fOptional && (levels > 8))
|
||||
return nullptr;
|
||||
//if (fOptional && (levels > 8))
|
||||
// return nullptr;
|
||||
|
||||
if (m_spdbSnapshotHOLDER != nullptr)
|
||||
{
|
||||
// If possible reuse an existing snapshot (we want to minimize nesting)
|
||||
if (mvccCheckpoint <= m_spdbSnapshotHOLDER->mvccCheckpoint)
|
||||
{
|
||||
m_spdbSnapshotHOLDER->m_refCount++;
|
||||
return m_spdbSnapshotHOLDER.get();
|
||||
if (((getMvccTstamp() - m_spdbSnapshotHOLDER->mvccCheckpoint) >> MVCC_MS_SHIFT) < 10*1000)
|
||||
{
|
||||
m_spdbSnapshotHOLDER->m_refCount++;
|
||||
return m_spdbSnapshotHOLDER.get();
|
||||
}
|
||||
serverLog(LL_WARNING, "Existing snapshot too old, creating a new one");
|
||||
}
|
||||
serverLog(levels > 5 ? LL_NOTICE : LL_VERBOSE, "Nested snapshot created: %d levels", levels);
|
||||
}
|
||||
@ -37,6 +43,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
||||
spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER);
|
||||
spdb->m_pdbSnapshot = m_pdbSnapshot;
|
||||
spdb->m_refCount = 1;
|
||||
spdb->mvccCheckpoint = getMvccTstamp();
|
||||
if (m_setexpire != nullptr)
|
||||
spdb->m_setexpire = m_setexpire;
|
||||
|
||||
@ -77,6 +84,29 @@ void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentDataSnapshot
|
||||
}
|
||||
}
|
||||
|
||||
/* static */ void redisDbPersistentDataSnapshot::gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot)
|
||||
{
|
||||
psnapshot->m_refCount--;
|
||||
if (psnapshot->m_refCount <= 0)
|
||||
{
|
||||
serverAssert(psnapshot->m_refCount == 0);
|
||||
// Remove our ref from any children and dispose them too
|
||||
redisDbPersistentDataSnapshot *psnapshotChild = psnapshot;
|
||||
std::vector<redisDbPersistentDataSnapshot*> vecClean;
|
||||
while ((psnapshotChild = psnapshotChild->m_spdbSnapshotHOLDER.get()) != nullptr)
|
||||
vecClean.push_back(psnapshotChild);
|
||||
|
||||
for (auto psnapshotChild : vecClean)
|
||||
gcDisposeSnapshot(psnapshotChild);
|
||||
|
||||
//psnapshot->m_pdict->iterators--;
|
||||
psnapshot->m_spdbSnapshotHOLDER.release();
|
||||
//psnapshot->m_pdbSnapshot = nullptr;
|
||||
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::unique_ptr<redisDbPersistentDataSnapshot>(psnapshot));
|
||||
serverLog(LL_WARNING, "Garbage collected snapshot");
|
||||
}
|
||||
}
|
||||
|
||||
void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot)
|
||||
{
|
||||
// Note: This function is dependent on GlobalLocksAcquried(), but rdb background saving has a weird case where
|
||||
@ -84,7 +114,12 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
||||
|
||||
if (m_spdbSnapshotHOLDER.get() != psnapshot)
|
||||
{
|
||||
serverAssert(m_spdbSnapshotHOLDER != nullptr);
|
||||
if (m_spdbSnapshotHOLDER == nullptr)
|
||||
{
|
||||
// This is an orphaned snapshot
|
||||
redisDbPersistentDataSnapshot::gcDisposeSnapshot(const_cast<redisDbPersistentDataSnapshot*>(psnapshot));
|
||||
return;
|
||||
}
|
||||
m_spdbSnapshotHOLDER->endSnapshot(psnapshot);
|
||||
return;
|
||||
}
|
||||
@ -162,7 +197,8 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
||||
|
||||
// Stage 3 swap the databases with the snapshot
|
||||
std::swap(m_pdict, m_spdbSnapshotHOLDER->m_pdict);
|
||||
std::swap(m_pdictTombstone, m_spdbSnapshotHOLDER->m_pdictTombstone);
|
||||
if (m_spdbSnapshotHOLDER->m_pdbSnapshot != nullptr)
|
||||
std::swap(m_pdictTombstone, m_spdbSnapshotHOLDER->m_pdictTombstone);
|
||||
|
||||
// Stage 4 merge all expires
|
||||
// TODO
|
||||
@ -245,9 +281,11 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
if (fResult && m_pdbSnapshot != nullptr)
|
||||
redisDbPersistentDataSnapshot *psnapshot;
|
||||
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
|
||||
if (fResult && psnapshot != nullptr)
|
||||
{
|
||||
fResult = m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){
|
||||
fResult = psnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){
|
||||
// Before passing off to the user we need to make sure it's not already in the
|
||||
// the current set, and not deleted
|
||||
dictEntry *deCurrent = dictFind(m_pdict, key);
|
||||
@ -267,3 +305,97 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
|
||||
serverAssert(!fResult || celem == 0);
|
||||
return fResult;
|
||||
}
|
||||
|
||||
int redisDbPersistentDataSnapshot::snapshot_depth() const
|
||||
{
|
||||
if (m_pdbSnapshot)
|
||||
return m_pdbSnapshot->snapshot_depth() + 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
void redisDbPersistentData::consolidate_snapshot()
|
||||
{
|
||||
aeAcquireLock();
|
||||
auto psnapshot = (m_pdbSnapshot != nullptr) ? m_spdbSnapshotHOLDER.get() : nullptr;
|
||||
if (psnapshot == nullptr)
|
||||
{
|
||||
aeReleaseLock();
|
||||
return;
|
||||
}
|
||||
psnapshot->m_refCount++; // ensure it's not free'd
|
||||
aeReleaseLock();
|
||||
psnapshot->consolidate_children(this);
|
||||
aeAcquireLock();
|
||||
endSnapshot(psnapshot);
|
||||
aeReleaseLock();
|
||||
}
|
||||
|
||||
// only call this on the "real" database to consolidate the first child
|
||||
void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary)
|
||||
{
|
||||
static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time
|
||||
|
||||
std::unique_lock<fastlock> lock(s_lock, std::defer_lock);
|
||||
if (!lock.try_lock())
|
||||
return; // this is a best effort function
|
||||
|
||||
if (snapshot_depth() < 4)
|
||||
return;
|
||||
|
||||
auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot());
|
||||
spdb->initialize();
|
||||
dictExpand(spdb->m_pdict, m_pdbSnapshot->size());
|
||||
|
||||
m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){
|
||||
incrRefCount(o);
|
||||
dictAdd(spdb->m_pdict, sdsdup(key), o.unsafe_robjcast());
|
||||
return true;
|
||||
});
|
||||
|
||||
spdb->m_pdict->iterators++;
|
||||
|
||||
serverAssert(spdb->size() == m_pdbSnapshot->size());
|
||||
|
||||
// Now wire us in (Acquire the LOCK)
|
||||
AeLocker locker;
|
||||
locker.arm(nullptr);
|
||||
|
||||
int depth = 0;
|
||||
redisDbPersistentDataSnapshot *psnapshotT = pdbPrimary->m_spdbSnapshotHOLDER.get();
|
||||
while (psnapshotT != nullptr)
|
||||
{
|
||||
++depth;
|
||||
if (psnapshotT == this)
|
||||
break;
|
||||
psnapshotT = psnapshotT->m_spdbSnapshotHOLDER.get();
|
||||
}
|
||||
if (psnapshotT != this)
|
||||
{
|
||||
locker.disarm(); // don't run spdb's dtor in the lock
|
||||
return; // we were unlinked and this was a waste of time
|
||||
}
|
||||
|
||||
serverLog(LL_WARNING, "cleaned %d snapshots", snapshot_depth()-1);
|
||||
spdb->m_refCount = depth;
|
||||
spdb->m_fConsolidated = true;
|
||||
// Drop our refs from this snapshot and its children
|
||||
psnapshotT = this;
|
||||
std::vector<redisDbPersistentDataSnapshot*> vecT;
|
||||
while ((psnapshotT = psnapshotT->m_spdbSnapshotHOLDER.get()) != nullptr)
|
||||
{
|
||||
vecT.push_back(psnapshotT);
|
||||
}
|
||||
for (auto itr = vecT.rbegin(); itr != vecT.rend(); ++itr)
|
||||
{
|
||||
psnapshotT = *itr;
|
||||
psnapshotT->m_refCount -= (depth-1); // -1 because dispose will sub another
|
||||
gcDisposeSnapshot(psnapshotT);
|
||||
}
|
||||
std::atomic_thread_fence(std::memory_order_seq_cst);
|
||||
m_spdbSnapshotHOLDER.release(); // GC has responsibility for it now
|
||||
m_spdbSnapshotHOLDER = std::move(spdb);
|
||||
auto ptrT = m_spdbSnapshotHOLDER.get();
|
||||
__atomic_store(&m_pdbSnapshot, &ptrT, __ATOMIC_SEQ_CST);
|
||||
locker.disarm(); // ensure we're not locked for any dtors
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user