diff --git a/src/db.cpp b/src/db.cpp index 13734aed5..e4cc6b8f7 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2765,6 +2765,34 @@ bool redisDbPersistentData::processChanges(bool fSnapshot) return (m_spstorage != nullptr); } +void redisDbPersistentData::processChangesAsync(std::atomic &pendingJobs) +{ + ++pendingJobs; + dictEmpty(m_dictChanged, nullptr); + dict *dictNew = dictCreate(&dbDictType, nullptr); + std::swap(dictNew, m_pdict); + m_cnewKeysPending = 0; + g_pserver->asyncworkqueue->AddWorkFunction([dictNew, this, &pendingJobs]{ + dictIterator *di = dictGetIterator(dictNew); + dictEntry *de; + std::vector veckeys; + std::vector vecvals; + while ((de = dictNext(di)) != nullptr) + { + robj *o = (robj*)dictGetVal(de); + sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o); + veckeys.push_back((sds)dictGetKey(de)); + vecvals.push_back(temp); + } + m_spstorage->bulkInsert(veckeys.data(), vecvals.data(), veckeys.size()); + for (auto val : vecvals) + sdsfree(val); + dictReleaseIterator(di); + dictRelease(dictNew); + --pendingJobs; + }); +} + void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree) { if (m_pdbSnapshotStorageFlush) diff --git a/src/rdb.cpp b/src/rdb.cpp index 7198124c3..c940a11cc 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2365,6 +2365,8 @@ class rdbAsyncWorkThread bool fLaunched = false; bool fExit = false; std::atomic ckeysLoaded; + std::atomic cstorageWritesInFlight; + std::atomic workerThreadDone; std::thread m_thread; long long now; @@ -2378,6 +2380,7 @@ public: : rsi(rsi), rdbflags(rdbflags), now(now) { ckeysLoaded = 0; + cstorageWritesInFlight = 0; listJobs = listCreate(); listSetFreeMethod(listJobs, listFreeMethod); } @@ -2397,6 +2400,14 @@ public: void throttle(std::unique_lock &l) { if (listLength(listJobs) > 0 && (listLength(listJobs) % 1024 == 0) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) { cvThrottle.wait(l); + while (cstorageWritesInFlight.load(std::memory_order_relaxed) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) { + l.unlock(); + usleep(100); + pauseExecution(); + processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); + resumeExecution(); + l.lock(); + } } } @@ -2404,9 +2415,8 @@ public: rdbInsertJob *pjob = new rdbInsertJob(job); std::unique_lock l(mutex); throttle(l); - bool fNotify = listLength(listJobs) == 0; listAddNodeTail(listJobs, pjob); - if (fNotify) + if (listLength(listJobs) == 1) cv.notify_one(); } @@ -2434,7 +2444,15 @@ public: fExit = true; cv.notify_one(); l.unlock(); + while (!workerThreadDone) { + usleep(100); + processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); + } m_thread.join(); + while (cstorageWritesInFlight.load(std::memory_order_seq_cst)) { + usleep(100); + processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); + } fLaunched = false; fExit = false; serverAssert(listLength(listJobs) == 0); @@ -2538,13 +2556,10 @@ public: { for (int idb = 0; idb < cserver.dbnum; ++idb) { - if (g_pserver->db[idb]->processChanges(false)) - g_pserver->db[idb]->commitChanges(); - if (fHighMemory && !(queue.rsi && queue.rsi->fForceSetKey)) { - g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica) - fHighMemory = false; // we took care of it + if (g_pserver->m_pstorageFactory) { + g_pserver->db[idb]->processChangesAsync(queue.cstorageWritesInFlight); + fHighMemory = false; } - g_pserver->db[idb]->trackChanges(false, 1024); } if (fHighMemory) freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); @@ -2563,6 +2578,13 @@ public: g_pserver->garbageCollector.endEpoch(vars.gcEpoch); } + + if (g_pserver->m_pstorageFactory) { + for (int idb = 0; idb < cserver.dbnum; ++idb) + g_pserver->db[idb]->processChangesAsync(queue.cstorageWritesInFlight); + } + + queue.workerThreadDone = true; std::unique_lock lock(queue.mutex); serverAssert(queue.queuefn.empty()); serverAssert(listLength(queue.listJobs) == 0); @@ -2584,8 +2606,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { (r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys))) { rdbAsyncWorkThread *pwthread = reinterpret_cast(r->chksum_arg); - if (pwthread) - pwthread->pauseExecution(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing listIter li; listNode *ln; listRewind(g_pserver->masters, &li); @@ -2596,7 +2616,14 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { replicationSendNewlineToMaster(mi); } loadingProgress(r->processed_bytes); + + if (pwthread) + pwthread->pauseExecution(); processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); + if (pwthread) + pwthread->resumeExecution(); + + processModuleLoadingProgressEvent(0); robj *ping_argv[1]; @@ -2604,8 +2631,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1); decrRefCount(ping_argv[0]); - if (pwthread) - pwthread->resumeExecution(); r->keys_since_last_callback = 0; } @@ -2629,11 +2654,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { sds key = nullptr; bool fLastKeyExpired = false; - for (int idb = 0; idb < cserver.dbnum; ++idb) - { - g_pserver->db[idb]->trackChanges(true, 1024); - } - rdb->update_cksum = rdbLoadProgressCallback; rdb->chksum_arg = &wqueue; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; @@ -2946,11 +2966,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { wqueue.endWork(); - for (int idb = 0; idb < cserver.dbnum; ++idb) - { - if (g_pserver->db[idb]->processChanges(false)) - g_pserver->db[idb]->commitChanges(); - } return C_OK; /* Unexpected end of file is handled here calling rdbReportReadError(): diff --git a/src/server.h b/src/server.h index c0b34defb..bd411aba8 100644 --- a/src/server.h +++ b/src/server.h @@ -1114,6 +1114,7 @@ public: // either release the global lock or keep the same global lock between the two functions as // a second look is kept to ensure writes to secondary storage are ordered bool processChanges(bool fSnapshot); + void processChangesAsync(std::atomic &pendingJobs); void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr); // This should only be used if you look at the key, we do not fixup @@ -1278,6 +1279,7 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::setExpire; using redisDbPersistentData::trackChanges; using redisDbPersistentData::processChanges; + using redisDbPersistentData::processChangesAsync; using redisDbPersistentData::commitChanges; using redisDbPersistentData::setexpireUnsafe; using redisDbPersistentData::setexpire;