diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index ece5518ba..91fc2d2d2 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -13,8 +13,7 @@ AsyncWorkQueue::AsyncWorkQueue(int nthreads) void AsyncWorkQueue::WorkerThreadMain() { - static redisServerThreadVars vars; - memset(&vars, 0, sizeof(redisServerThreadVars)); + redisServerThreadVars vars; serverTL = &vars; vars.clients_pending_asyncwrite = listCreate(); @@ -27,20 +26,26 @@ void AsyncWorkQueue::WorkerThreadMain() { std::unique_lock lock(m_mutex); m_cvWakeup.wait(lock); + while (!m_workqueue.empty()) { WorkItem task = std::move(m_workqueue.front()); - m_workqueue.pop(); + m_workqueue.pop_front(); lock.unlock(); + serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); task.fnAsync(); + g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); lock.lock(); } lock.unlock(); + serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); aeAcquireLock(); ProcessPendingAsyncWrites(); aeReleaseLock(); + g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); + serverTL->gcEpoch = 0; } listRelease(vars.clients_pending_asyncwrite); @@ -91,9 +96,12 @@ AsyncWorkQueue::~AsyncWorkQueue() abandonThreads(); } -void AsyncWorkQueue::AddWorkFunction(std::function &&fnAsync) +void AsyncWorkQueue::AddWorkFunction(std::function &&fnAsync, bool fHiPri) { std::unique_lock lock(m_mutex); - m_workqueue.emplace(std::move(fnAsync)); + if (fHiPri) + m_workqueue.emplace_front(std::move(fnAsync)); + else + m_workqueue.emplace_back(std::move(fnAsync)); m_cvWakeup.notify_one(); } \ No newline at end of file diff --git a/src/AsyncWorkQueue.h b/src/AsyncWorkQueue.h index b3144f585..2a413404d 100644 --- a/src/AsyncWorkQueue.h +++ b/src/AsyncWorkQueue.h @@ -1,7 +1,7 @@ #pragma once #include "fastlock.h" #include -#include +#include #include #include #include @@ -21,7 +21,7 @@ class AsyncWorkQueue }; std::vector m_vecthreads; std::vector m_vecpthreadVars; - std::queue m_workqueue; + std::deque m_workqueue; std::mutex m_mutex; std::condition_variable m_cvWakeup; std::atomic m_fQuitting { false }; @@ -31,7 +31,7 @@ public: AsyncWorkQueue(int nthreads); ~AsyncWorkQueue(); - void AddWorkFunction(std::function &&fnAsync); + void AddWorkFunction(std::function &&fnAsync, bool fHiPri = false); bool removeClientAsyncWrites(struct client *c); void abandonThreads(); diff --git a/src/db.cpp b/src/db.cpp index 30ed1ea65..513356afd 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -64,27 +64,31 @@ void updateExpire(redisDb *db, sds key, robj *valOld, robj *valNew) } +static void lookupKeyUpdateObj(robj *val, int flags) +{ + /* Update the access time for the ageing algorithm. + * Don't do it if we have a saving child, as this will trigger + * a copy on write madness. */ + if (!g_pserver->FRdbSaveInProgress() && + g_pserver->aof_child_pid == -1 && + !(flags & LOOKUP_NOTOUCH)) + { + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { + updateLFU(val); + } else { + val->lru = LRU_CLOCK(); + } + } +} + /* Low level key lookup API, not actually called directly from commands * implementations that should instead rely on lookupKeyRead(), * lookupKeyWrite() and lookupKeyReadWithFlags(). */ -static robj *lookupKey(redisDb *db, robj *key, int flags) { +static robj* lookupKey(redisDb *db, robj *key, int flags) { auto itr = db->find(key); if (itr) { robj *val = itr.val(); - /* Update the access time for the ageing algorithm. - * Don't do it if we have a saving child, as this will trigger - * a copy on write madness. */ - if (!g_pserver->FRdbSaveInProgress() && - g_pserver->aof_child_pid == -1 && - !(flags & LOOKUP_NOTOUCH)) - { - if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { - updateLFU(val); - } else { - val->lru = LRU_CLOCK(); - } - } - + lookupKeyUpdateObj(val, flags); if (flags & LOOKUP_UPDATEMVCC) { val->mvcc_tstamp = getMvccTstamp(); db->trackkey(key); @@ -94,6 +98,15 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) { return NULL; } } +static robj_roptr lookupKeyConst(redisDb *db, robj *key, int flags) { + serverAssert((flags & LOOKUP_UPDATEMVCC) == 0); + robj_roptr val = db->find_threadsafe(szFromObj(key)); + if (val != nullptr) { + lookupKeyUpdateObj(val.unsafe_robjcast(), flags); + return val; + } + return nullptr; +} /* Lookup a key for read operations, or return NULL if the key is not found * in the specified DB. @@ -118,7 +131,7 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) { * correctly report a key is expired on slaves even if the master is lagging * expiring our key via DELs in the replication link. */ robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { - robj *val; + robj_roptr val; serverAssert(GlobalLocksAcquired()); if (expireIfNeeded(db,key) == 1) { @@ -153,8 +166,8 @@ robj_roptr lookupKeyReadWithFlags(redisDb *db, robj *key, int flags) { return NULL; } } - val = lookupKey(db,key,flags); - if (val == NULL) { + val = lookupKeyConst(db,key,flags); + if (val == nullptr) { g_pserver->stat_keyspace_misses++; notifyKeyspaceEvent(NOTIFY_KEY_MISS, "keymiss", key, db->id); } @@ -2122,9 +2135,9 @@ void redisDbPersistentData::processChanges() redisDbPersistentData::~redisDbPersistentData() { serverAssert(m_spdbSnapshotHOLDER == nullptr); - serverAssert(m_pdbSnapshot == nullptr); + //serverAssert(m_pdbSnapshot == nullptr); serverAssert(m_refCount == 0); - serverAssert(m_pdict->iterators == 0); + //serverAssert(m_pdict->iterators == 0); serverAssert(m_pdictTombstone == nullptr || m_pdictTombstone->iterators == 0); dictRelease(m_pdict); if (m_pdictTombstone) diff --git a/src/gc.h b/src/gc.h new file mode 100644 index 000000000..b4d244b16 --- /dev/null +++ b/src/gc.h @@ -0,0 +1,101 @@ +#pragma once +#include +#include + +template +class GarbageCollector +{ + struct EpochHolder + { + uint64_t tstamp; + std::vector> m_vecObjs; + + bool operator<(uint64_t tstamp) const + { + return this->tstamp < tstamp; + } + + bool operator==(uint64_t tstamp) const + { + return this->tstamp == tstamp; + } + }; + +public: + uint64_t startEpoch() + { + std::unique_lock lock(m_lock); + ++m_epochNext; + m_setepochOutstanding.insert(m_epochNext); + return m_epochNext; + } + + void endEpoch(uint64_t epoch, bool fNoFree = false) + { + std::unique_lock lock(m_lock); + assert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); + bool fMinElement = *std::min_element(m_setepochOutstanding.begin(), m_setepochOutstanding.end()); + m_setepochOutstanding.erase(epoch); + if (fNoFree) + return; + std::vector vecclean; + + // No outstanding epochs? + if (m_setepochOutstanding.empty()) + { + vecclean = std::move(m_vecepochs); // Everything goes! + } + else + { + uint64_t minepoch = *std::min_element(m_setepochOutstanding.begin(), m_setepochOutstanding.end()); + if (minepoch == 0) + return; // No available epochs to free + + // Clean any epochs available (after the lock) + for (size_t iepoch = 0; iepoch < m_vecepochs.size(); ++iepoch) + { + auto &e = m_vecepochs[iepoch]; + if (e < minepoch) + { + vecclean.emplace_back(std::move(e)); + m_vecepochs.erase(m_vecepochs.begin() + iepoch); + --iepoch; + } + } + + if (!(vecclean.empty() || fMinElement)) + printf("############################## ERROR CASE ############################\n"); + } + + lock.unlock(); // don't hold it for the potentially long delete of vecclean + if (vecclean.size()) + printf("!!!!!!!!!!!!!!!! Deleted %lu snapshots !!!!!!!!!!!!!!!!!!!!!\n", vecclean.size()); + } + + void enqueue(uint64_t epoch, std::unique_ptr &&sp) + { + std::unique_lock lock(m_lock); + assert(m_setepochOutstanding.find(epoch) != m_setepochOutstanding.end()); + assert(sp->FWillFreeChildDebug() == false); + + auto itr = std::find(m_vecepochs.begin(), m_vecepochs.end(), m_epochNext+1); + if (itr == m_vecepochs.end()) + { + EpochHolder e; + e.tstamp = m_epochNext+1; + e.m_vecObjs.push_back(std::move(sp)); + m_vecepochs.emplace_back(std::move(e)); + } + else + { + itr->m_vecObjs.push_back(std::move(sp)); + } + } + +private: + fastlock m_lock { "Garbage Collector"}; + + std::vector m_vecepochs; + std::set m_setepochOutstanding; + uint64_t m_epochNext = 0; +}; \ No newline at end of file diff --git a/src/rdb.cpp b/src/rdb.cpp index f100c5843..6035f451b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1367,7 +1367,11 @@ void *rdbSaveThread(void *vargs) { rdbSaveThreadArgs *args = reinterpret_cast(vargs); serverAssert(serverTL == nullptr); - int retval = rdbSave(args->rgpdb, &args->rsi); + redisServerThreadVars vars; + serverTL = &vars; + vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); + + int retval = rdbSave(args->rgpdb, &args->rsi); if (retval == C_OK) { size_t private_dirty = zmalloc_get_private_dirty(-1); @@ -1389,6 +1393,7 @@ void *rdbSaveThread(void *vargs) if (!g_pserver->rdbThreadVars.fRdbThreadCancel) aeReleaseLock(); zfree(args); + g_pserver->garbageCollector.endEpoch(vars.gcEpoch); return (retval == C_OK) ? (void*)0 : (void*)1; } diff --git a/src/server.cpp b/src/server.cpp index d68e46fb6..a81a4a3d6 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -82,7 +82,7 @@ struct redisServer server; /* Server global state */ } redisServer *g_pserver = &GlobalHidden::server; struct redisServerConst cserver; -__thread struct redisServerThreadVars *serverTL = NULL; // thread local server vars +thread_local struct redisServerThreadVars *serverTL = NULL; // thread local server vars volatile unsigned long lru_clock; /* Server global current LRU time. */ /* Our command table. @@ -2089,6 +2089,10 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { g_pserver->rdb_bgsave_scheduled = 0; } + g_pserver->asyncworkqueue->AddWorkFunction([]{ + g_pserver->db[0].consolidate_snapshot(); + }, true /*HiPri*/); + g_pserver->cronloops++; return 1000/g_pserver->hz; } @@ -2173,6 +2177,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Handle writes with pending output buffers. */ aeReleaseLock(); handleClientsWithPendingWrites(IDX_EVENT_LOOP_MAIN); + if (serverTL->gcEpoch != 0) + g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); + serverTL->gcEpoch = 0; aeAcquireLock(); /* Close clients that need to be closed asynchronous */ @@ -2207,6 +2214,10 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) freeClientsInAsyncFreeQueue(iel); aeReleaseLock(); + if (serverTL->gcEpoch != 0) + g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); + serverTL->gcEpoch = 0; + /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this * time. */ @@ -2219,6 +2230,9 @@ void beforeSleepLite(struct aeEventLoop *eventLoop) void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/); + + serverAssert(serverTL->gcEpoch == 0); + serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); } /* =========================== Server initialization ======================== */ diff --git a/src/server.h b/src/server.h index 947e03092..8685a778f 100644 --- a/src/server.h +++ b/src/server.h @@ -94,6 +94,7 @@ typedef long long mstime_t; /* millisecond time type. */ #include "crc64.h" #include "IStorage.h" #include "AsyncWorkQueue.h" +#include "gc.h" extern int g_fTestMode; @@ -1278,6 +1279,8 @@ public: const redisDbPersistentDataSnapshot *createSnapshot(uint64_t mvccCheckpoint, bool fOptional); void endSnapshot(const redisDbPersistentDataSnapshot *psnapshot); + void consolidate_snapshot(); + private: void ensure(const char *key); void ensure(const char *key, dictEntry **de); @@ -1308,7 +1311,15 @@ private: class redisDbPersistentDataSnapshot : protected redisDbPersistentData { friend class redisDbPersistentData; +protected: + bool m_fConsolidated = false; + static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot); + int snapshot_depth() const; + void consolidate_children(redisDbPersistentData *pdbPrimary); + public: + bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } + bool iterate_threadsafe(std::function fn) const; using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; @@ -1390,6 +1401,7 @@ typedef struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::setexpire; using redisDbPersistentData::createSnapshot; using redisDbPersistentData::endSnapshot; + using redisDbPersistentData::consolidate_snapshot; public: expireset::setiter expireitr; @@ -1790,6 +1802,7 @@ struct redisServerThreadVars { struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; + uint64_t gcEpoch = 0; }; struct redisMaster { @@ -2187,6 +2200,8 @@ struct redisServer { /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */ + GarbageCollector garbageCollector; + bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; } }; @@ -2284,7 +2299,7 @@ typedef struct { //extern struct redisServer server; extern redisServer *g_pserver; extern struct redisServerConst cserver; -extern __thread struct redisServerThreadVars *serverTL; // thread local server vars +extern thread_local struct redisServerThreadVars *serverTL; // thread local server vars extern struct sharedObjectsStruct shared; extern dictType objectKeyPointerValueDictType; extern dictType objectKeyHeapPointerValueDictType; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 3a3643c5f..ee2092db6 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -1,4 +1,5 @@ #include "server.h" +#include "aelocker.h" const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) { @@ -13,15 +14,20 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 ++levels; psnapshot = psnapshot->m_spdbSnapshotHOLDER.get(); } - if (fOptional && (levels > 8)) - return nullptr; + //if (fOptional && (levels > 8)) + // return nullptr; if (m_spdbSnapshotHOLDER != nullptr) { + // If possible reuse an existing snapshot (we want to minimize nesting) if (mvccCheckpoint <= m_spdbSnapshotHOLDER->mvccCheckpoint) { - m_spdbSnapshotHOLDER->m_refCount++; - return m_spdbSnapshotHOLDER.get(); + if (((getMvccTstamp() - m_spdbSnapshotHOLDER->mvccCheckpoint) >> MVCC_MS_SHIFT) < 10*1000) + { + m_spdbSnapshotHOLDER->m_refCount++; + return m_spdbSnapshotHOLDER.get(); + } + serverLog(LL_WARNING, "Existing snapshot too old, creating a new one"); } serverLog(levels > 5 ? LL_NOTICE : LL_VERBOSE, "Nested snapshot created: %d levels", levels); } @@ -37,6 +43,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER); spdb->m_pdbSnapshot = m_pdbSnapshot; spdb->m_refCount = 1; + spdb->mvccCheckpoint = getMvccTstamp(); if (m_setexpire != nullptr) spdb->m_setexpire = m_setexpire; @@ -77,6 +84,29 @@ void redisDbPersistentData::recursiveFreeSnapshots(redisDbPersistentDataSnapshot } } +/* static */ void redisDbPersistentDataSnapshot::gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot) +{ + psnapshot->m_refCount--; + if (psnapshot->m_refCount <= 0) + { + serverAssert(psnapshot->m_refCount == 0); + // Remove our ref from any children and dispose them too + redisDbPersistentDataSnapshot *psnapshotChild = psnapshot; + std::vector vecClean; + while ((psnapshotChild = psnapshotChild->m_spdbSnapshotHOLDER.get()) != nullptr) + vecClean.push_back(psnapshotChild); + + for (auto psnapshotChild : vecClean) + gcDisposeSnapshot(psnapshotChild); + + //psnapshot->m_pdict->iterators--; + psnapshot->m_spdbSnapshotHOLDER.release(); + //psnapshot->m_pdbSnapshot = nullptr; + g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::unique_ptr(psnapshot)); + serverLog(LL_WARNING, "Garbage collected snapshot"); + } +} + void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot) { // Note: This function is dependent on GlobalLocksAcquried(), but rdb background saving has a weird case where @@ -84,7 +114,12 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn if (m_spdbSnapshotHOLDER.get() != psnapshot) { - serverAssert(m_spdbSnapshotHOLDER != nullptr); + if (m_spdbSnapshotHOLDER == nullptr) + { + // This is an orphaned snapshot + redisDbPersistentDataSnapshot::gcDisposeSnapshot(const_cast(psnapshot)); + return; + } m_spdbSnapshotHOLDER->endSnapshot(psnapshot); return; } @@ -162,7 +197,8 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn // Stage 3 swap the databases with the snapshot std::swap(m_pdict, m_spdbSnapshotHOLDER->m_pdict); - std::swap(m_pdictTombstone, m_spdbSnapshotHOLDER->m_pdictTombstone); + if (m_spdbSnapshotHOLDER->m_pdbSnapshot != nullptr) + std::swap(m_pdictTombstone, m_spdbSnapshotHOLDER->m_pdictTombstone); // Stage 4 merge all expires // TODO @@ -245,9 +281,11 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functioniterate_threadsafe([&](const char *key, robj_roptr o){ + fResult = psnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){ // Before passing off to the user we need to make sure it's not already in the // the current set, and not deleted dictEntry *deCurrent = dictFind(m_pdict, key); @@ -267,3 +305,97 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functionsnapshot_depth() + 1; + return 0; +} + + +void redisDbPersistentData::consolidate_snapshot() +{ + aeAcquireLock(); + auto psnapshot = (m_pdbSnapshot != nullptr) ? m_spdbSnapshotHOLDER.get() : nullptr; + if (psnapshot == nullptr) + { + aeReleaseLock(); + return; + } + psnapshot->m_refCount++; // ensure it's not free'd + aeReleaseLock(); + psnapshot->consolidate_children(this); + aeAcquireLock(); + endSnapshot(psnapshot); + aeReleaseLock(); +} + +// only call this on the "real" database to consolidate the first child +void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary) +{ + static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time + + std::unique_lock lock(s_lock, std::defer_lock); + if (!lock.try_lock()) + return; // this is a best effort function + + if (snapshot_depth() < 4) + return; + + auto spdb = std::unique_ptr(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); + spdb->initialize(); + dictExpand(spdb->m_pdict, m_pdbSnapshot->size()); + + m_pdbSnapshot->iterate_threadsafe([&](const char *key, robj_roptr o){ + incrRefCount(o); + dictAdd(spdb->m_pdict, sdsdup(key), o.unsafe_robjcast()); + return true; + }); + + spdb->m_pdict->iterators++; + + serverAssert(spdb->size() == m_pdbSnapshot->size()); + + // Now wire us in (Acquire the LOCK) + AeLocker locker; + locker.arm(nullptr); + + int depth = 0; + redisDbPersistentDataSnapshot *psnapshotT = pdbPrimary->m_spdbSnapshotHOLDER.get(); + while (psnapshotT != nullptr) + { + ++depth; + if (psnapshotT == this) + break; + psnapshotT = psnapshotT->m_spdbSnapshotHOLDER.get(); + } + if (psnapshotT != this) + { + locker.disarm(); // don't run spdb's dtor in the lock + return; // we were unlinked and this was a waste of time + } + + serverLog(LL_WARNING, "cleaned %d snapshots", snapshot_depth()-1); + spdb->m_refCount = depth; + spdb->m_fConsolidated = true; + // Drop our refs from this snapshot and its children + psnapshotT = this; + std::vector vecT; + while ((psnapshotT = psnapshotT->m_spdbSnapshotHOLDER.get()) != nullptr) + { + vecT.push_back(psnapshotT); + } + for (auto itr = vecT.rbegin(); itr != vecT.rend(); ++itr) + { + psnapshotT = *itr; + psnapshotT->m_refCount -= (depth-1); // -1 because dispose will sub another + gcDisposeSnapshot(psnapshotT); + } + std::atomic_thread_fence(std::memory_order_seq_cst); + m_spdbSnapshotHOLDER.release(); // GC has responsibility for it now + m_spdbSnapshotHOLDER = std::move(spdb); + auto ptrT = m_spdbSnapshotHOLDER.get(); + __atomic_store(&m_pdbSnapshot, &ptrT, __ATOMIC_SEQ_CST); + locker.disarm(); // ensure we're not locked for any dtors +} \ No newline at end of file