From 3023bf4e6e3767f7e03b34edabaea2b0bda3a667 Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 30 Apr 2021 17:32:54 +0000 Subject: [PATCH] Initial implementation of multithread load Former-commit-id: 87b0657c3acd7a3c89964afe1702851b44467c9a --- src/rdb.cpp | 237 +++++++++++++++++++++++++++++++++++++++------------- 1 file changed, 181 insertions(+), 56 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index ec510546d..a9d701933 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2370,18 +2370,169 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { } } + +struct rdbInsertJob +{ + redisDb *db; + sds key; + robj *val; + long long lru_clock; + long long expiretime; + long long lru_idle; + long long lfu_freq; +}; + +class rdbAsyncWorkThread +{ + rdbSaveInfo *rsi; + int rdbflags; + std::vector queuejobs; + std::vector> queuefn; // for custom jobs + std::mutex mutex; + std::condition_variable cv; + bool fExit = false; + std::atomic ckeysLoaded; + std::thread m_thread; + +public: + + rdbAsyncWorkThread(rdbSaveInfo *rsi, int rdbflags) + : rsi(rsi), rdbflags(rdbflags) + { + ckeysLoaded = 0; + } + + ~rdbAsyncWorkThread() { + if (!fExit && m_thread.joinable()) + endWork(); + } + + void start() { + m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this); + } + + void enqueue(rdbInsertJob &job) { + std::unique_lock l(mutex); + bool fNotify = queuejobs.empty(); + queuejobs.push_back(job); + if (fNotify) + cv.notify_one(); + } + + void enqueue(std::function &&fn) { + std::unique_lock l(mutex); + bool fNotify = queuefn.empty(); + queuefn.push_back(std::move(fn)); + if (fNotify) + cv.notify_one(); + } + + size_t ckeys() { return ckeysLoaded; } + + size_t endWork() { + std::unique_lock l(mutex); + fExit = true; + cv.notify_one(); + l.unlock(); + m_thread.join(); + return ckeysLoaded; + } + + static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue) { + rdbAsyncWorkThread &queue = *pqueue; + for (;;) { + std::unique_lock lock(queue.mutex); + if (queue.queuejobs.empty() && queue.queuefn.empty()) { + if (queue.fExit) + break; + queue.cv.wait(lock); + if (queue.fExit) + break; + } + + auto queuejobs = std::move(queue.queuejobs); + queue.queuejobs.reserve(1024); + auto queuefn = std::move(queue.queuefn); + lock.unlock(); + + for (auto &fn : queuefn) { + fn(); + } + + bool f1024thKey = false; + for (auto &job : queuejobs) { + redisObjectStack keyobj; + initStaticStringObject(keyobj,job.key); + + /* Add the new object in the hash table */ + int fInserted = dbMerge(job.db, &keyobj, job.val, (queue.rsi && queue.rsi->fForceSetKey) || (queue.rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef + + if (fInserted) + { + auto ckeys = queue.ckeysLoaded.fetch_add(1, std::memory_order_relaxed); + f1024thKey = f1024thKey || (ckeys % 1024) == 0; + + /* Set the expire time if needed */ + if (job.expiretime != -1) + { + setExpire(NULL,job.db,&keyobj,nullptr,job.expiretime); + } + + /* Set usage information (for eviction). */ + objectSetLRUOrLFU(job.val,job.lfu_freq,job.lru_idle,job.lru_clock,1000); + + /* call key space notification on key loaded for modules only */ + moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id); + + replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime); + } + else + { + decrRefCount(job.val); + } + if (job.key != nullptr) + { + sdsfree(job.key); + } + } + + /* If we have a storage provider check if we need to evict some keys to stay under our memory limit, + do this every 16 keys to limit the perf impact */ + if (g_pserver->m_pstorageFactory && f1024thKey) + { + bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK); + if (fHighMemory || f1024thKey) + { + for (int idb = 0; idb < cserver.dbnum; ++idb) + { + if (g_pserver->db[idb]->processChanges(false)) + g_pserver->db[idb]->commitChanges(); + if (fHighMemory && !(queue.rsi && queue.rsi->fForceSetKey)) { + g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica) + fHighMemory = false; // we took care of it + } + g_pserver->db[idb]->trackChanges(false, 1024); + } + if (fHighMemory) + freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); + } + } + } + } +}; + /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { uint64_t dbid = 0; int type, rdbver; - redisDb *db = g_pserver->db[dbid]; + redisDb *dbCur = g_pserver->db[dbid]; char buf[1024]; /* Key-specific attributes, set by opcodes before the key type. */ long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now; long long lru_clock = 0; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; - size_t ckeysLoaded = 0; + rdbAsyncWorkThread wqueue(rsi, rdbflags); robj *subexpireKey = nullptr; sds key = nullptr; bool fLastKeyExpired = false; @@ -2409,6 +2560,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { now = mstime(); lru_clock = LRU_CLOCK(); + wqueue.start(); while(1) { robj *val; @@ -2456,7 +2608,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { "databases. Exiting\n", cserver.dbnum); exit(1); } - db = g_pserver->db[dbid]; + dbCur = g_pserver->db[dbid]; continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_RESIZEDB) { /* RESIZEDB: Hint about the size of the keys in the currently @@ -2466,7 +2618,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { goto eoferr; if ((expires_size = rdbLoadLen(rdb,NULL)) == RDB_LENERR) goto eoferr; - db->expand(db_size); + wqueue.enqueue([dbCur, db_size]{ + dbCur->expand(db_size); + }); continue; /* Read next opcode. */ } else if (type == RDB_OPCODE_AUX) { /* AUX: generic string-string fields. Use to add state to RDB @@ -2540,12 +2694,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } } else { - redisObjectStack keyobj; - initStaticStringObject(keyobj,key); long long expireT = strtoll(szFromObj(auxval), nullptr, 10); - setExpire(NULL, db, &keyobj, subexpireKey, expireT); - replicateSubkeyExpire(db, &keyobj, subexpireKey, expireT); - decrRefCount(subexpireKey); + wqueue.enqueue([dbCur, subexpireKey, key, expireT]{ + redisObjectStack keyobj; + initStaticStringObject(keyobj,key); + setExpire(NULL, dbCur, &keyobj, subexpireKey, expireT); + replicateSubkeyExpire(dbCur, &keyobj, subexpireKey, expireT); + decrRefCount(subexpireKey); + }); subexpireKey = nullptr; } } else { @@ -2637,6 +2793,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { initStaticStringObject(keyobj,key); bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now; if (fStaleMvccKey || fExpiredKey) { + #if 0 // TODO! if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) { // We have a key that we've already deleted and is not back in our database. // We'll need to inform the sending master of the delete if it is also a replica of us @@ -2648,56 +2805,21 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { key = nullptr; decrRefCount(val); val = nullptr; + #endif } else { - /* If we have a storage provider check if we need to evict some keys to stay under our memory limit, - do this every 16 keys to limit the perf impact */ - if (g_pserver->m_pstorageFactory && (ckeysLoaded % 128) == 0) - { - bool fHighMemory = (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK); - if (fHighMemory || (ckeysLoaded % (1024)) == 0) - { - for (int idb = 0; idb < cserver.dbnum; ++idb) - { - if (g_pserver->db[idb]->processChanges(false)) - g_pserver->db[idb]->commitChanges(); - if (fHighMemory && !(rsi && rsi->fForceSetKey)) { - g_pserver->db[idb]->removeAllCachedValues(); // During load we don't go through the normal eviction unless we're merging (i.e. an active replica) - fHighMemory = false; // we took care of it - } - g_pserver->db[idb]->trackChanges(false, 1024); - } - if (fHighMemory) - freeMemoryIfNeeded(false /*fQuickCycle*/, false /* fPreSnapshot*/); - } - } - - /* Add the new object in the hash table */ - int fInserted = dbMerge(db, &keyobj, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef fLastKeyExpired = false; + rdbInsertJob job; + job.db = dbCur; + job.key = key; + job.val = val; + job.lru_clock = lru_clock; + job.expiretime = expiretime; + job.lru_idle = lru_idle; + job.lfu_freq = lfu_freq; + wqueue.enqueue(job); - if (fInserted) - { - ++ckeysLoaded; - - /* Set the expire time if needed */ - if (expiretime != -1) - { - setExpire(NULL,db,&keyobj,nullptr,expiretime); - } - - /* Set usage information (for eviction). */ - objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock,1000); - - /* call key space notification on key loaded for modules only */ - moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, db->id); - - replicationNotifyLoadedKey(db, &keyobj, val, expiretime); - } - else - { - decrRefCount(val); - val = nullptr; - } + key = nullptr; + val = nullptr; } if (g_pserver->key_load_delay) @@ -2744,6 +2866,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } } + wqueue.endWork(); + for (int idb = 0; idb < cserver.dbnum; ++idb) { if (g_pserver->db[idb]->processChanges(false)) @@ -2756,6 +2880,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { * the RDB file from a socket during initial SYNC (diskless replica mode), * we'll report the error to the caller, so that we can retry. */ eoferr: + wqueue.endWork(); if (key != nullptr) { sdsfree(key);