From 52b8c153f830a8d63a8fcdfd26c478a6fd004076 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 7 Apr 2022 13:20:38 -0400 Subject: [PATCH] Make multithread load configurable and disabled by default (#57) Co-authored-by: John Sully --- src/config.cpp | 1 + src/rdb.cpp | 33 +++++++++++++++++++++++---------- src/server.h | 1 + 3 files changed, 25 insertions(+), 10 deletions(-) diff --git a/src/config.cpp b/src/config.cpp index e7cf6cc24..efd7928bd 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2778,6 +2778,7 @@ standardConfig configs[] = { createBoolConfig("cluster-allow-replica-migration", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_replica_migration, 1, NULL, NULL), createBoolConfig("replica-announced", NULL, MODIFIABLE_CONFIG, g_pserver->replica_announced, 1, NULL, NULL), createBoolConfig("enable-async-commands", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_commands, 1, NULL, NULL), + createBoolConfig("multithread-load-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->multithread_load_enabled, 0, NULL, NULL), /* String Configs */ createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL), diff --git a/src/rdb.cpp b/src/rdb.cpp index 4d6cde7d3..2666e0e8c 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2791,12 +2791,17 @@ public: } } - void enqueue(std::unique_ptr &spjob) { - vecbatch.push_back(spjob.release()); - if (vecbatch.size() >= 64) { - queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size()); - vecbatch.clear(); - throttle(); + void enqueue(std::unique_ptr &spjob) { + if (!fLaunched) { + processJob(*spjob); + spjob = nullptr; + } else { + vecbatch.push_back(spjob.release()); + if (vecbatch.size() >= 64) { + queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size()); + vecbatch.clear(); + throttle(); + } } } @@ -2809,9 +2814,13 @@ public: } void enqueue(std::function &&fn) { - std::unique_ptr spjob = std::make_unique(std::move(fn)); - queueJobs.enqueue(spjob.release()); - throttle(); + if (!fLaunched) { + fn(); + } else { + std::unique_ptr spjob = std::make_unique(std::move(fn)); + queueJobs.enqueue(spjob.release()); + throttle(); + } } void ProcessWhileBlocked() { @@ -2834,6 +2843,9 @@ public: size_t ckeys() { return ckeysLoaded; } size_t endWork() { + if (!fLaunched) { + return ckeysLoaded; + } if (!vecbatch.empty()) { queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size()); vecbatch.clear(); @@ -3098,7 +3110,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } lru_clock = LRU_CLOCK(); - wqueue.start(); + if (g_pserver->multithread_load_enabled) + wqueue.start(); while(1) { robj *val; diff --git a/src/server.h b/src/server.h index b4098cf01..974661ad8 100644 --- a/src/server.h +++ b/src/server.h @@ -2634,6 +2634,7 @@ struct redisServer { int failover_state; /* Failover state */ int enable_async_commands; + int multithread_load_enabled = 0; long long repl_batch_offStart = -1; long long repl_batch_idxStart = -1;