Initial implementation of multithread load

Former-commit-id: 87b0657c3acd7a3c89964afe1702851b44467c9a
This commit is contained in:
John Sully 2021-04-30 17:32:54 +00:00
parent 0c7589580d
commit 3023bf4e6e

View File

@ -2370,18 +2370,169 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
} }
} }
struct rdbInsertJob
{
redisDb *db;
sds key;
robj *val;
long long lru_clock;
long long expiretime;
long long lru_idle;
long long lfu_freq;
};
class rdbAsyncWorkThread
{
rdbSaveInfo *rsi;
int rdbflags;
std::vector<rdbInsertJob> queuejobs;
std::vector<std::function<void()>> queuefn; // for custom jobs
std::mutex mutex;
std::condition_variable cv;
bool fExit = false;
std::atomic<size_t> ckeysLoaded;
std::thread m_thread;
public:
rdbAsyncWorkThread(rdbSaveInfo *rsi, int rdbflags)
: rsi(rsi), rdbflags(rdbflags)
{
ckeysLoaded = 0;
}
~rdbAsyncWorkThread() {
if (!fExit && m_thread.joinable())
endWork();
}
void start() {
m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this);
}
void enqueue(rdbInsertJob &job) {
std::unique_lock<std::mutex> l(mutex);
bool fNotify = queuejobs.empty();
queuejobs.push_back(job);
if (fNotify)
cv.notify_one();
}
void enqueue(std::function<void()> &&fn) {
std::unique_lock<std::mutex> l(mutex);
bool fNotify = queuefn.empty();
queuefn.push_back(std::move(fn));
if (fNotify)
cv.notify_one();
}
size_t ckeys() { return ckeysLoaded; }
size_t endWork() {
std::unique_lock<std::mutex> l(mutex);
fExit = true;
cv.notify_one();
l.unlock();
m_thread.join();
return ckeysLoaded;
}
static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue) {
rdbAsyncWorkThread &queue = *pqueue;
for (;;) {
std::unique_lock<std::mutex> lock(queue.mutex);
if (queue.queuejobs.empty() && queue.queuefn.empty()) {
if (queue.fExit)
break;
queue.cv.wait(lock);
if (queue.fExit)
break;
}
auto queuejobs = std::move(queue.queuejobs);
queue.queuejobs.reserve(1024);
auto queuefn = std::move(queue.queuefn);
lock.unlock();
for (auto &fn : queuefn) {
fn();
}
bool f1024thKey = false;
for (auto &job : queuejobs) {
redisObjectStack keyobj;
initStaticStringObject(keyobj,job.key);
/* Add the new object in the hash table */
int fInserted = dbMerge(job.db, &keyobj, job.val, (queue.rsi && queue.rsi->fForceSetKey) || (queue.rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
if (fInserted)
{
auto ckeys = queue.ckeysLoaded.fetch_add(1, std::memory_order_relaxed);
f1024thKey = f1024thKey || (ckeys % 1024) == 0;
/* Set the expire time if needed */
if (job.expiretime != -1)
{
setExpire(NULL,job.db,&keyobj,nullptr,job.expiretime);
}
/* Set usage information (for eviction). */
objectSetLRUOrLFU(job.val,job.lfu_freq,job.lru_idle,job.lru_clock,1000);
/* call key space notification on key loaded for modules only */
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id);
replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime);
}
else
{
decrRefCount(job.val);
}
if (job.key != nullptr)
{
sdsfree(job.key);
}
}
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */
if (g_pserver->m_pstorageFactory && f1024thKey)
{
bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
if (fHighMemory || f1024thKey)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
if (fHighMemory && !(queue.rsi && queue.rsi->fForceSetKey)) {
g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica)
fHighMemory = false; // we took care of it
}
g_pserver->db[idb]->trackChanges(false, 1024);
}
if (fHighMemory)
freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/);
}
}
}
}
};
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */ * otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t dbid = 0; uint64_t dbid = 0;
int type, rdbver; int type, rdbver;
redisDb *db = g_pserver->db[dbid]; redisDb *dbCur = g_pserver->db[dbid];
char buf[1024]; char buf[1024];
/* Key-specific attributes, set by opcodes before the key type. */ /* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now; long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now;
long long lru_clock = 0; long long lru_clock = 0;
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
size_t ckeysLoaded = 0; rdbAsyncWorkThread wqueue(rsi, rdbflags);
robj *subexpireKey = nullptr; robj *subexpireKey = nullptr;
sds key = nullptr; sds key = nullptr;
bool fLastKeyExpired = false; bool fLastKeyExpired = false;
@ -2409,6 +2560,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
now = mstime(); now = mstime();
lru_clock = LRU_CLOCK(); lru_clock = LRU_CLOCK();
wqueue.start();
while(1) { while(1) {
robj *val; robj *val;
@ -2456,7 +2608,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
"databases. Exiting\n", cserver.dbnum); "databases. Exiting\n", cserver.dbnum);
exit(1); exit(1);
} }
db = g_pserver->db[dbid]; dbCur = g_pserver->db[dbid];
continue; /* Read next opcode. */ continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_RESIZEDB) { } else if (type == RDB_OPCODE_RESIZEDB) {
/* RESIZEDB: Hint about the size of the keys in the currently /* RESIZEDB: Hint about the size of the keys in the currently
@ -2466,7 +2618,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
goto eoferr; goto eoferr;
if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR)
goto eoferr; goto eoferr;
db->expand(db_size); wqueue.enqueue([dbCur, db_size]{
dbCur->expand(db_size);
});
continue; /* Read next opcode. */ continue; /* Read next opcode. */
} else if (type == RDB_OPCODE_AUX) { } else if (type == RDB_OPCODE_AUX) {
/* AUX: generic string-string fields. Use to add state to RDB /* AUX: generic string-string fields. Use to add state to RDB
@ -2540,12 +2694,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
} }
} }
else { else {
redisObjectStack keyobj;
initStaticStringObject(keyobj,key);
long long expireT = strtoll(szFromObj(auxval), nullptr, 10); long long expireT = strtoll(szFromObj(auxval), nullptr, 10);
setExpire(NULL, db, &keyobj, subexpireKey, expireT); wqueue.enqueue([dbCur, subexpireKey, key, expireT]{
replicateSubkeyExpire(db, &keyobj, subexpireKey, expireT); redisObjectStack keyobj;
decrRefCount(subexpireKey); initStaticStringObject(keyobj,key);
setExpire(NULL, dbCur, &keyobj, subexpireKey, expireT);
replicateSubkeyExpire(dbCur, &keyobj, subexpireKey, expireT);
decrRefCount(subexpireKey);
});
subexpireKey = nullptr; subexpireKey = nullptr;
} }
} else { } else {
@ -2637,6 +2793,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
initStaticStringObject(keyobj,key); initStaticStringObject(keyobj,key);
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now; bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
if (fStaleMvccKey || fExpiredKey) { if (fStaleMvccKey || fExpiredKey) {
#if 0 // TODO!
if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) { if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database. // We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us // We'll need to inform the sending master of the delete if it is also a replica of us
@ -2648,56 +2805,21 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
key = nullptr; key = nullptr;
decrRefCount(val); decrRefCount(val);
val = nullptr; val = nullptr;
#endif
} else { } else {
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
do this every 16 keys to limit the perf impact */
if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0)
{
bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK);
if (fHighMemory || (ckeysLoaded % (1024)) == 0)
{
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges();
if (fHighMemory && !(rsi && rsi->fForceSetKey)) {
g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica)
fHighMemory = false; // we took care of it
}
g_pserver->db[idb]->trackChanges(false, 1024);
}
if (fHighMemory)
freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/);
}
}
/* Add the new object in the hash table */
int fInserted = dbMerge(db, &keyobj, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
fLastKeyExpired = false; fLastKeyExpired = false;
rdbInsertJob job;
job.db = dbCur;
job.key = key;
job.val = val;
job.lru_clock = lru_clock;
job.expiretime = expiretime;
job.lru_idle = lru_idle;
job.lfu_freq = lfu_freq;
wqueue.enqueue(job);
if (fInserted) key = nullptr;
{ val = nullptr;
++ckeysLoaded;
/* Set the expire time if needed */
if (expiretime != -1)
{
setExpire(NULL,db,&keyobj,nullptr,expiretime);
}
/* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000);
/* call key space notification on key loaded for modules only */
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id);
replicationNotifyLoadedKey(db, &keyobj, val, expiretime);
}
else
{
decrRefCount(val);
val = nullptr;
}
} }
if (g_pserver->key_load_delay) if (g_pserver->key_load_delay)
@ -2744,6 +2866,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
} }
} }
wqueue.endWork();
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
if (g_pserver->db[idb]->processChanges(false)) if (g_pserver->db[idb]->processChanges(false))
@ -2756,6 +2880,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
* the RDB file from a socket during initial SYNC (diskless replica mode), * the RDB file from a socket during initial SYNC (diskless replica mode),
* we'll report the error to the caller, so that we can retry. */ * we'll report the error to the caller, so that we can retry. */
eoferr: eoferr:
wqueue.endWork();
if (key != nullptr) if (key != nullptr)
{ {
sdsfree(key); sdsfree(key);