From 0c5585e5ded16582e62f19eb86de0f5aa19ff7f6 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 3 May 2021 20:18:45 +0000 Subject: [PATCH] Ensure multithread load works with FLASH storage Former-commit-id: 24e2991c7aa2cef90a89b1640f7095235c5d34ed --- src/rdb.cpp | 98 ++++++++++++++++++++++++++++++++------------------ src/server.cpp | 4 +-- 2 files changed, 65 insertions(+), 37 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 2191f7bd8..ac35c219c 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2356,27 +2356,35 @@ class rdbAsyncWorkThread { rdbSaveInfo *rsi; int rdbflags; - std::vector queuejobs; + list *listJobs; std::vector> queuefn; // for custom jobs std::mutex mutex; std::condition_variable cv; + std::condition_variable cvThrottle; bool fLaunched = false; bool fExit = false; std::atomic ckeysLoaded; std::thread m_thread; long long now; + static void listFreeMethod(const void *v) { + delete reinterpret_cast(v); + } + public: rdbAsyncWorkThread(rdbSaveInfo *rsi, int rdbflags, long long now) : rsi(rsi), rdbflags(rdbflags), now(now) { ckeysLoaded = 0; + listJobs = listCreate(); + listSetFreeMethod(listJobs, listFreeMethod); } ~rdbAsyncWorkThread() { if (m_thread.joinable()) - endWork(); + endWork(); + listRelease(listJobs); } void start() { @@ -2385,10 +2393,18 @@ public: fLaunched = true; } + void throttle(std::unique_lock &l) { + if (listLength(listJobs) > 0 && (listLength(listJobs) % 1024 == 0) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) { + cvThrottle.wait(l); + } + } + void enqueue(rdbInsertJob &job) { + rdbInsertJob *pjob = new rdbInsertJob(job); std::unique_lock l(mutex); - bool fNotify = queuejobs.empty(); - queuejobs.push_back(job); + throttle(l); + bool fNotify = listLength(listJobs) == 0; + listAddNodeTail(listJobs, pjob); if (fNotify) cv.notify_one(); } @@ -2412,7 +2428,7 @@ public: m_thread.join(); fLaunched = false; fExit = false; - serverAssert(queuejobs.empty()); + serverAssert(listLength(listJobs) == 0); serverAssert(queuefn.empty()); return ckeysLoaded; } @@ -2425,24 +2441,30 @@ public: aeSetThreadOwnsLockOverride(true); for (;;) { std::unique_lock lock(queue.mutex); - if (queue.queuejobs.empty() && queue.queuefn.empty()) { + if (listLength(queue.listJobs) == 0 && queue.queuefn.empty()) { if (queue.fExit) break; queue.cv.wait(lock); - if (queue.queuejobs.empty() && queue.queuefn.empty() && queue.fExit) + if (listLength(queue.listJobs) == 0 && queue.queuefn.empty() && queue.fExit) break; } + pqueue->cvThrottle.notify_one(); - auto queuejobs = std::move(queue.queuejobs); - queue.queuejobs.reserve(1024); + list *listJobs = queue.listJobs; + queue.listJobs = listCreate(); + listSetFreeMethod(queue.listJobs, listFreeMethod); + auto queuefn = std::move(queue.queuefn); lock.unlock(); - bool f1024thKey = false; - for (auto &job : queuejobs) { + vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); + while (listLength(listJobs)) { + rdbInsertJob &job = *((rdbInsertJob*)listNodeValue(listFirst(listJobs))); + redisObjectStack keyobj; initStaticStringObject(keyobj,job.key); + bool f1024thKey = false; bool fStaleMvccKey = (pqueue->rsi) ? mvccFromObj(job.val) < pqueue->rsi->mvccMinThreshold : false; /* Check if the key already expired. This function is used when loading @@ -2469,7 +2491,7 @@ public: if (fInserted) { auto ckeys = queue.ckeysLoaded.fetch_add(1, std::memory_order_relaxed); - f1024thKey = f1024thKey || (ckeys % 1024) == 0; + f1024thKey = (ckeys % 1024) == 0; /* Set the expire time if needed */ if (job.expiretime != -1) @@ -2496,37 +2518,43 @@ public: { sdsfree(job.key); } + + /* If we have a storage provider check if we need to evict some keys to stay under our memory limit, + do this every 16 keys to limit the perf impact */ + if (g_pserver->m_pstorageFactory && f1024thKey) + { + bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK); + if (fHighMemory || f1024thKey) + { + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); + if (fHighMemory && !(queue.rsi && queue.rsi->fForceSetKey)) { + g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica) + fHighMemory = false; // we took care of it + } + g_pserver->db[idb]->trackChanges(false, 1024); + } + if (fHighMemory) + freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); + } + } + + // Pop from the list + listDelNode(listJobs, listFirst(listJobs)); } - + listRelease(listJobs); + for (auto &fn : queuefn) { fn(); } - /* If we have a storage provider check if we need to evict some keys to stay under our memory limit, - do this every 16 keys to limit the perf impact */ - if (g_pserver->m_pstorageFactory && f1024thKey) - { - bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK); - if (fHighMemory || f1024thKey) - { - for (int idb = 0; idb < cserver.dbnum; ++idb) - { - if (g_pserver->db[idb]->processChanges(false)) - g_pserver->db[idb]->commitChanges(); - if (fHighMemory && !(queue.rsi && queue.rsi->fForceSetKey)) { - g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica) - fHighMemory = false; // we took care of it - } - g_pserver->db[idb]->trackChanges(false, 1024); - } - if (fHighMemory) - freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); - } - } + g_pserver->garbageCollector.endEpoch(vars.gcEpoch); } std::unique_lock lock(queue.mutex); serverAssert(queue.queuefn.empty()); - serverAssert(queue.queuejobs.empty()); + serverAssert(listLength(queue.listJobs) == 0); ProcessPendingAsyncWrites(); listRelease(vars.clients_pending_asyncwrite); aeSetThreadOwnsLockOverride(false); diff --git a/src/server.cpp b/src/server.cpp index fbcd0cc43..d4ace1aae 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2424,7 +2424,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { g_pserver->rdb_bgsave_scheduled = 0; } - if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory) { + if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory && !g_pserver->loading) { run_with_period(g_pserver->storage_flush_period) { flushStorageWeak(); } @@ -2611,7 +2611,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { static thread_local bool fFirstRun = true; // note: we also copy the DB pointer in case a DB swap is done while the lock is released std::vector vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released - if (cserver.storage_memory_model == STORAGE_WRITETHROUGH && g_pserver->m_pstorageFactory != nullptr) + if (cserver.storage_memory_model == STORAGE_WRITETHROUGH && g_pserver->m_pstorageFactory != nullptr && !g_pserver->loading) { if (!fFirstRun) { mstime_t storage_process_latency;