Ensure multithread load works with FLASH storage

Former-commit-id: 24e2991c7aa2cef90a89b1640f7095235c5d34ed
This commit is contained in:
John Sully 2021-05-03 20:18:45 +00:00
parent eaaff16cca
commit 0c5585e5de
2 changed files with 65 additions and 37 deletions

View File

@ -2356,27 +2356,35 @@ class rdbAsyncWorkThread
{
rdbSaveInfo *rsi;
int rdbflags;
std::vector<rdbInsertJob> queuejobs;
list *listJobs;
std::vector<std::function<void()>> queuefn; // for custom jobs
std::mutex mutex;
std::condition_variable cv;
std::condition_variable cvThrottle;
bool fLaunched = false;
bool fExit = false;
std::atomic<size_t> ckeysLoaded;
std::thread m_thread;
long long now;
static void listFreeMethod(const void *v) {
delete reinterpret_cast<const rdbInsertJob*>(v);
}
public:
rdbAsyncWorkThread(rdbSaveInfo *rsi, int rdbflags, long long now)
: rsi(rsi), rdbflags(rdbflags), now(now)
{
ckeysLoaded = 0;
listJobs = listCreate();
listSetFreeMethod(listJobs, listFreeMethod);
}
~rdbAsyncWorkThread() {
if (m_thread.joinable())
endWork();
endWork();
listRelease(listJobs);
}
void start() {
@ -2385,10 +2393,18 @@ public:
fLaunched = true;
}
void throttle(std::unique_lock<std::mutex> &l) {
if (listLength(listJobs) > 0 && (listLength(listJobs) % 1024 == 0) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
cvThrottle.wait(l);
}
}
void enqueue(rdbInsertJob &job) {
rdbInsertJob *pjob = new rdbInsertJob(job);
std::unique_lock<std::mutex> l(mutex);
bool fNotify = queuejobs.empty();
queuejobs.push_back(job);
throttle(l);
bool fNotify = listLength(listJobs) == 0;
listAddNodeTail(listJobs, pjob);
if (fNotify)
cv.notify_one();
}
@ -2412,7 +2428,7 @@ public:
m_thread.join();
fLaunched = false;
fExit = false;
serverAssert(queuejobs.empty());
serverAssert(listLength(listJobs) == 0);
serverAssert(queuefn.empty());
return ckeysLoaded;
}
@ -2425,24 +2441,30 @@ public:
aeSetThreadOwnsLockOverride(true);
for (;;) {
std::unique_lock<std::mutex> lock(queue.mutex);
if (queue.queuejobs.empty() && queue.queuefn.empty()) {
if (listLength(queue.listJobs) == 0 && queue.queuefn.empty()) {
if (queue.fExit)
break;
queue.cv.wait(lock);
if (queue.queuejobs.empty() && queue.queuefn.empty() && queue.fExit)
if (listLength(queue.listJobs) == 0 && queue.queuefn.empty() && queue.fExit)
break;
}
pqueue->cvThrottle.notify_one();
auto queuejobs = std::move(queue.queuejobs);
queue.queuejobs.reserve(1024);
list *listJobs = queue.listJobs;
queue.listJobs = listCreate();
listSetFreeMethod(queue.listJobs, listFreeMethod);
auto queuefn = std::move(queue.queuefn);
lock.unlock();
bool f1024thKey = false;
for (auto &job : queuejobs) {
vars.gcEpoch = g_pserver->garbageCollector.startEpoch();
while (listLength(listJobs)) {
rdbInsertJob &job = *((rdbInsertJob*)listNodeValue(listFirst(listJobs)));
redisObjectStack keyobj;
initStaticStringObject(keyobj,job.key);
bool f1024thKey = false;
bool fStaleMvccKey = (pqueue->rsi) ? mvccFromObj(job.val) < pqueue->rsi->mvccMinThreshold : false;
/* Check if the key already expired. This function is used when loading
@ -2469,7 +2491,7 @@ public:
if (fInserted)
{
auto ckeys = queue.ckeysLoaded.fetch_add(1, std::memory_order_relaxed);
f1024thKey = f1024thKey || (ckeys % 1024) == 0;
f1024thKey = (ckeys % 1024) == 0;
/* Set the expire time if needed */
if (job.expiretime != -1)
@ -2496,37 +2518,43 @@ public:
{
sdsfree(job.key);
}
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */
if (g_pserver->m_pstorageFactory && f1024thKey)
{
bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
if (fHighMemory || f1024thKey)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
if (fHighMemory && !(queue.rsi && queue.rsi->fForceSetKey)) {
g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica)
fHighMemory = false; // we took care of it
}
g_pserver->db[idb]->trackChanges(false, 1024);
}
if (fHighMemory)
freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/);
}
}
// Pop from the list
listDelNode(listJobs, listFirst(listJobs));
}
listRelease(listJobs);
for (auto &fn : queuefn) {
fn();
}
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */
if (g_pserver->m_pstorageFactory && f1024thKey)
{
bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
if (fHighMemory || f1024thKey)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
if (fHighMemory && !(queue.rsi && queue.rsi->fForceSetKey)) {
g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica)
fHighMemory = false; // we took care of it
}
g_pserver->db[idb]->trackChanges(false, 1024);
}
if (fHighMemory)
freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/);
}
}
g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
}
std::unique_lock<std::mutex> lock(queue.mutex);
serverAssert(queue.queuefn.empty());
serverAssert(queue.queuejobs.empty());
serverAssert(listLength(queue.listJobs) == 0);
ProcessPendingAsyncWrites();
listRelease(vars.clients_pending_asyncwrite);
aeSetThreadOwnsLockOverride(false);

View File

@ -2424,7 +2424,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
g_pserver->rdb_bgsave_scheduled = 0;
}
if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory) {
if (cserver.storage_memory_model == STORAGE_WRITEBACK && g_pserver->m_pstorageFactory && !g_pserver->loading) {
run_with_period(g_pserver->storage_flush_period) {
flushStorageWeak();
}
@ -2611,7 +2611,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
static thread_local bool fFirstRun = true;
// note: we also copy the DB pointer in case a DB swap is done while the lock is released
std::vector<redisDb*> vecdb; // note we cache the database pointer in case a dbswap is done while the lock is released
if (cserver.storage_memory_model == STORAGE_WRITETHROUGH && g_pserver->m_pstorageFactory != nullptr)
if (cserver.storage_memory_model == STORAGE_WRITETHROUGH && g_pserver->m_pstorageFactory != nullptr && !g_pserver->loading)
{
if (!fFirstRun) {
mstime_t storage_process_latency;