Reduce lock contention when loading to a storage provider

Former-commit-id: 58bc777f2215918043325753b6e2bf89dc3108f7
This commit is contained in:
John Sully 2021-06-01 20:01:41 +00:00
parent 3675a59496
commit d0e69e4c47
3 changed files with 67 additions and 22 deletions

View File

@ -2765,6 +2765,34 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
return (m_spstorage != nullptr);
}
void redisDbPersistentData::processChangesAsync(std::atomic<int> &pendingJobs)
{
++pendingJobs;
dictEmpty(m_dictChanged, nullptr);
dict *dictNew = dictCreate(&dbDictType, nullptr);
std::swap(dictNew, m_pdict);
m_cnewKeysPending = 0;
g_pserver->asyncworkqueue->AddWorkFunction([dictNew, this, &pendingJobs]{
dictIterator *di = dictGetIterator(dictNew);
dictEntry *de;
std::vector<sds> veckeys;
std::vector<sds> vecvals;
while ((de = dictNext(di)) != nullptr)
{
robj *o = (robj*)dictGetVal(de);
sds temp = serializeStoredObjectAndExpire(this, (const char*) dictGetKey(de), o);
veckeys.push_back((sds)dictGetKey(de));
vecvals.push_back(temp);
}
m_spstorage->bulkInsert(veckeys.data(), vecvals.data(), veckeys.size());
for (auto val : vecvals)
sdsfree(val);
dictReleaseIterator(di);
dictRelease(dictNew);
--pendingJobs;
});
}
void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree)
{
if (m_pdbSnapshotStorageFlush)

View File

@ -2365,6 +2365,8 @@ class rdbAsyncWorkThread
bool fLaunched = false;
bool fExit = false;
std::atomic<size_t> ckeysLoaded;
std::atomic<int> cstorageWritesInFlight;
std::atomic<bool> workerThreadDone;
std::thread m_thread;
long long now;
@ -2378,6 +2380,7 @@ public:
: rsi(rsi), rdbflags(rdbflags), now(now)
{
ckeysLoaded = 0;
cstorageWritesInFlight = 0;
listJobs = listCreate();
listSetFreeMethod(listJobs, listFreeMethod);
}
@ -2397,6 +2400,14 @@ public:
void throttle(std::unique_lock<std::mutex> &l) {
if (listLength(listJobs) > 0 && (listLength(listJobs) % 1024 == 0) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
cvThrottle.wait(l);
while (cstorageWritesInFlight.load(std::memory_order_relaxed) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
l.unlock();
usleep(100);
pauseExecution();
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
resumeExecution();
l.lock();
}
}
}
@ -2404,9 +2415,8 @@ public:
rdbInsertJob *pjob = new rdbInsertJob(job);
std::unique_lock<std::mutex> l(mutex);
throttle(l);
bool fNotify = listLength(listJobs) == 0;
listAddNodeTail(listJobs, pjob);
if (fNotify)
if (listLength(listJobs) == 1)
cv.notify_one();
}
@ -2434,7 +2444,15 @@ public:
fExit = true;
cv.notify_one();
l.unlock();
while (!workerThreadDone) {
usleep(100);
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
}
m_thread.join();
while (cstorageWritesInFlight.load(std::memory_order_seq_cst)) {
usleep(100);
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
}
fLaunched = false;
fExit = false;
serverAssert(listLength(listJobs) == 0);
@ -2538,13 +2556,10 @@ public:
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
if (fHighMemory && !(queue.rsi && queue.rsi->fForceSetKey)) {
g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica)
fHighMemory = false; // we took care of it
if (g_pserver->m_pstorageFactory) {
g_pserver->db[idb]->processChangesAsync(queue.cstorageWritesInFlight);
fHighMemory = false;
}
g_pserver->db[idb]->trackChanges(false, 1024);
}
if (fHighMemory)
freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/);
@ -2563,6 +2578,13 @@ public:
g_pserver->garbageCollector.endEpoch(vars.gcEpoch);
}
if (g_pserver->m_pstorageFactory) {
for (int idb = 0; idb < cserver.dbnum; ++idb)
g_pserver->db[idb]->processChangesAsync(queue.cstorageWritesInFlight);
}
queue.workerThreadDone = true;
std::unique_lock<std::mutex> lock(queue.mutex);
serverAssert(queue.queuefn.empty());
serverAssert(listLength(queue.listJobs) == 0);
@ -2584,8 +2606,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
{
rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg);
if (pwthread)
pwthread->pauseExecution(); // We can't have the work queue modifying the database while processEventsWhileBlocked does its thing
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
@ -2596,7 +2616,14 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
replicationSendNewlineToMaster(mi);
}
loadingProgress(r->processed_bytes);
if (pwthread)
pwthread->pauseExecution();
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
if (pwthread)
pwthread->resumeExecution();
processModuleLoadingProgressEvent(0);
robj *ping_argv[1];
@ -2604,8 +2631,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1);
decrRefCount(ping_argv[0]);
if (pwthread)
pwthread->resumeExecution();
r->keys_since_last_callback = 0;
}
@ -2629,11 +2654,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
sds key = nullptr;
bool fLastKeyExpired = false;
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
g_pserver->db[idb]->trackChanges(true, 1024);
}
rdb->update_cksum = rdbLoadProgressCallback;
rdb->chksum_arg = &wqueue;
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
@ -2946,11 +2966,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
wqueue.endWork();
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
}
return C_OK;
/* Unexpected end of file is handled here calling rdbReportReadError():

View File

@ -1114,6 +1114,7 @@ public:
// either release the global lock or keep the same global lock between the two functions as
// a second look is kept to ensure writes to secondary storage are ordered
bool processChanges(bool fSnapshot);
void processChangesAsync(std::atomic<int> &pendingJobs);
void commitChanges(const redisDbPersistentDataSnapshot **psnapshotFree = nullptr);
// This should only be used if you look at the key, we do not fixup
@ -1278,6 +1279,7 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::setExpire;
using redisDbPersistentData::trackChanges;
using redisDbPersistentData::processChanges;
using redisDbPersistentData::processChangesAsync;
using redisDbPersistentData::commitChanges;
using redisDbPersistentData::setexpireUnsafe;
using redisDbPersistentData::setexpire;