Pause execution during rdbLoadProgressCallback as its too risky to let it run

Former-commit-id: e70c01cb3e756d1e02ed190b76c73b7b7010c0d3
This commit is contained in:
John Sully 2021-05-06 00:42:49 +00:00
parent 10383a8234
commit a8d177e758
2 changed files with 26 additions and 15 deletions

View File

@ -2361,6 +2361,7 @@ class rdbAsyncWorkThread
std::mutex mutex; std::mutex mutex;
std::condition_variable cv; std::condition_variable cv;
std::condition_variable cvThrottle; std::condition_variable cvThrottle;
fastlock m_lockPause { "rdbAsyncWork-Pause"};
bool fLaunched = false; bool fLaunched = false;
bool fExit = false; bool fExit = false;
std::atomic<size_t> ckeysLoaded; std::atomic<size_t> ckeysLoaded;
@ -2409,6 +2410,14 @@ public:
cv.notify_one(); cv.notify_one();
} }
void pauseExecution() {
m_lockPause.lock();
}
void resumeExecution() {
m_lockPause.unlock();
}
void enqueue(std::function<void()> &&fn) { void enqueue(std::function<void()> &&fn) {
std::unique_lock<std::mutex> l(mutex); std::unique_lock<std::mutex> l(mutex);
bool fNotify = queuefn.empty(); bool fNotify = queuefn.empty();
@ -2459,6 +2468,7 @@ public:
vars.gcEpoch = g_pserver->garbageCollector.startEpoch(); vars.gcEpoch = g_pserver->garbageCollector.startEpoch();
while (listLength(listJobs)) { while (listLength(listJobs)) {
std::unique_lock<fastlock> ulPause(pqueue->m_lockPause);
rdbInsertJob &job = *((rdbInsertJob*)listNodeValue(listFirst(listJobs))); rdbInsertJob &job = *((rdbInsertJob*)listNodeValue(listFirst(listJobs)));
redisObjectStack keyobj; redisObjectStack keyobj;
@ -2547,6 +2557,7 @@ public:
listRelease(listJobs); listRelease(listJobs);
for (auto &fn : queuefn) { for (auto &fn : queuefn) {
std::unique_lock<fastlock> ulPause(pqueue->m_lockPause);
fn(); fn();
} }
@ -2573,8 +2584,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys))) (r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
{ {
rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg); rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg);
if (pwthread && g_pserver->fActiveReplica) if (pwthread)
pwthread->endWork(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing pwthread->pauseExecution(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing
listIter li; listIter li;
listNode *ln; listNode *ln;
listRewind(g_pserver->masters, &li); listRewind(g_pserver->masters, &li);
@ -2593,8 +2604,8 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
ping_argv[0] = createStringObject("PING",4); ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1); replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1);
decrRefCount(ping_argv[0]); decrRefCount(ping_argv[0]);
if (pwthread && g_pserver->fActiveReplica) if (pwthread)
pwthread->start(); pwthread->resumeExecution();
r->keys_since_last_callback = 0; r->keys_since_last_callback = 0;
} }

View File

@ -53,17 +53,6 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
options.create_missing_column_families = true; options.create_missing_column_families = true;
rocksdb::DB *db = nullptr; rocksdb::DB *db = nullptr;
if (rgchConfig != nullptr)
{
std::string options_string(rgchConfig, cchConfig);
rocksdb::Status status;
if (!(status = rocksdb::GetDBOptionsFromString(options, options_string, &options)).ok())
{
fprintf(stderr, "Failed to parse FLASH options: %s\r\n", status.ToString().c_str());
exit(EXIT_FAILURE);
}
}
options.max_background_compactions = 4; options.max_background_compactions = 4;
options.max_background_flushes = 2; options.max_background_flushes = 2;
options.bytes_per_sync = 1048576; options.bytes_per_sync = 1048576;
@ -90,6 +79,17 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons
cf_options.level_compaction_dynamic_level_bytes = true; cf_options.level_compaction_dynamic_level_bytes = true;
veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(std::to_string(idb), cf_options)); veccoldesc.push_back(rocksdb::ColumnFamilyDescriptor(std::to_string(idb), cf_options));
} }
if (rgchConfig != nullptr)
{
std::string options_string(rgchConfig, cchConfig);
rocksdb::Status status;
if (!(status = rocksdb::GetDBOptionsFromString(options, options_string, &options)).ok())
{
fprintf(stderr, "Failed to parse FLASH options: %s\r\n", status.ToString().c_str());
exit(EXIT_FAILURE);
}
}
std::vector<rocksdb::ColumnFamilyHandle*> handles; std::vector<rocksdb::ColumnFamilyHandle*> handles;
status = rocksdb::DB::Open(options, dbfile, veccoldesc, &handles, &db); status = rocksdb::DB::Open(options, dbfile, veccoldesc, &handles, &db);