Make multithread load configurable and disabled by default (#57)

Co-authored-by: John Sully <john@csquare.ca>
This commit is contained in:
John Sully 2022-04-07 13:20:38 -04:00 committed by GitHub Enterprise
parent 6a483cc7ae
commit 52b8c153f8
3 changed files with 25 additions and 10 deletions

View File

@ -2778,6 +2778,7 @@ standardConfig configs[] = {
createBoolConfig("cluster-allow-replica-migration", NULL, MODIFIABLE_CONFIG, g_pserver->cluster_allow_replica_migration, 1, NULL, NULL),
createBoolConfig("replica-announced", NULL, MODIFIABLE_CONFIG, g_pserver->replica_announced, 1, NULL, NULL),
createBoolConfig("enable-async-commands", NULL, MODIFIABLE_CONFIG, g_pserver->enable_async_commands, 1, NULL, NULL),
createBoolConfig("multithread-load-enabled", NULL, MODIFIABLE_CONFIG, g_pserver->multithread_load_enabled, 0, NULL, NULL),
/* String Configs */
createStringConfig("aclfile", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, g_pserver->acl_filename, "", NULL, NULL),

View File

@ -2791,12 +2791,17 @@ public:
}
}
void enqueue(std::unique_ptr<rdbInsertJob> &spjob) {
vecbatch.push_back(spjob.release());
if (vecbatch.size() >= 64) {
queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size());
vecbatch.clear();
throttle();
void enqueue(std::unique_ptr<rdbInsertJob> &spjob) {
if (!fLaunched) {
processJob(*spjob);
spjob = nullptr;
} else {
vecbatch.push_back(spjob.release());
if (vecbatch.size() >= 64) {
queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size());
vecbatch.clear();
throttle();
}
}
}
@ -2809,9 +2814,13 @@ public:
}
void enqueue(std::function<void()> &&fn) {
std::unique_ptr<JobBase> spjob = std::make_unique<rdbFunctionJob>(std::move(fn));
queueJobs.enqueue(spjob.release());
throttle();
if (!fLaunched) {
fn();
} else {
std::unique_ptr<JobBase> spjob = std::make_unique<rdbFunctionJob>(std::move(fn));
queueJobs.enqueue(spjob.release());
throttle();
}
}
void ProcessWhileBlocked() {
@ -2834,6 +2843,9 @@ public:
size_t ckeys() { return ckeysLoaded; }
size_t endWork() {
if (!fLaunched) {
return ckeysLoaded;
}
if (!vecbatch.empty()) {
queueJobs.enqueue_bulk(vecbatch.data(), vecbatch.size());
vecbatch.clear();
@ -3098,7 +3110,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
lru_clock = LRU_CLOCK();
wqueue.start();
if (g_pserver->multithread_load_enabled)
wqueue.start();
while(1) {
robj *val;

View File

@ -2634,6 +2634,7 @@ struct redisServer {
int failover_state; /* Failover state */
int enable_async_commands;
int multithread_load_enabled = 0;
long long repl_batch_offStart = -1;
long long repl_batch_idxStart = -1;