From 90ca602123bfadeb042dcd8b225e2b8bb4411c4e Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 1 Jun 2021 23:59:22 +0000 Subject: [PATCH] We need to refactor to gurantee the key is visible when loading subexpires. Keys may be temporarily invisible while waiting to be added to the storage Former-commit-id: 222eecb95925f7c60e28a5717d73163ad64b522b --- src/rdb.cpp | 102 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 64 insertions(+), 38 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index c940a11cc..962eba589 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2343,13 +2343,41 @@ void stopSaving(int success) { struct rdbInsertJob { - redisDb *db; - sds key; - robj *val; + redisDb *db = nullptr; + sds key = nullptr; + robj *val = nullptr; long long lru_clock; long long expiretime; long long lru_idle; long long lfu_freq; + std::vector> vecsubexpires; + + void addSubexpireKey(robj *subkey, long long when) { + vecsubexpires.push_back(std::make_pair(robj_sharedptr(subkey), when)); + decrRefCount(subkey); + } + + rdbInsertJob() = default; + rdbInsertJob(rdbInsertJob &&src) { + db = src.db; + src.db = nullptr; + key = src.key; + src.key = nullptr; + val = src.val; + src.val = nullptr; + lru_clock = src.lru_clock; + expiretime = src.expiretime; + lru_idle = src.lru_idle; + lfu_freq = src.lfu_freq; + vecsubexpires = std::move(src.vecsubexpires); + } + + ~rdbInsertJob() { + if (key) + sdsfree(key); + if (val) + decrRefCount(val); + } }; class rdbAsyncWorkThread @@ -2402,7 +2430,7 @@ public: cvThrottle.wait(l); while (cstorageWritesInFlight.load(std::memory_order_relaxed) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) { l.unlock(); - usleep(100); + usleep(10); pauseExecution(); processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); resumeExecution(); @@ -2411,11 +2439,10 @@ public: } } - void enqueue(rdbInsertJob &job) { - rdbInsertJob *pjob = new rdbInsertJob(job); + void enqueue(std::unique_ptr &spjob) { std::unique_lock l(mutex); throttle(l); - listAddNodeTail(listJobs, pjob); + listAddNodeTail(listJobs, spjob.release()); if (listLength(listJobs) == 1) cv.notify_one(); } @@ -2445,12 +2472,14 @@ public: cv.notify_one(); l.unlock(); while (!workerThreadDone) { - usleep(100); + usleep(10); + pauseExecution(); processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); + resumeExecution(); } m_thread.join(); while (cstorageWritesInFlight.load(std::memory_order_seq_cst)) { - usleep(100); + usleep(10); processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); } fLaunched = false; @@ -2534,17 +2563,15 @@ public: moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id); replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime); - } - else - { - decrRefCount(job.val); - } - } - - if (job.key != nullptr) - { - sdsfree(job.key); + for (auto &pair : job.vecsubexpires) + { + setExpire(NULL, job.db, &keyobj, pair.first, pair.second); + replicateSubkeyExpire(job.db, &keyobj, pair.first.get(), pair.second); + } + + job.val = nullptr; // don't free this as we moved ownership to the DB + } } /* If we have a storage provider check if we need to evict some keys to stay under our memory limit, @@ -2606,6 +2633,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { (r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys))) { rdbAsyncWorkThread *pwthread = reinterpret_cast(r->chksum_arg); + listIter li; listNode *ln; listRewind(g_pserver->masters, &li); @@ -2622,7 +2650,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); if (pwthread) pwthread->resumeExecution(); - processModuleLoadingProgressEvent(0); @@ -2653,6 +2680,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { robj *subexpireKey = nullptr; sds key = nullptr; bool fLastKeyExpired = false; + std::unique_ptr spjob; rdb->update_cksum = rdbLoadProgressCallback; rdb->chksum_arg = &wqueue; @@ -2809,15 +2837,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } else { long long expireT = strtoll(szFromObj(auxval), nullptr, 10); - sds keyT = sdsdupshared(key); - wqueue.enqueue([dbCur, subexpireKey, keyT, expireT]{ - redisObjectStack keyobj; - initStaticStringObject(keyobj,keyT); - setExpire(NULL, dbCur, &keyobj, subexpireKey, expireT); - replicateSubkeyExpire(dbCur, &keyobj, subexpireKey, expireT); - decrRefCount(subexpireKey); - sdsfree(keyT); - }); + serverAssert(spjob != nullptr); + serverAssert(sdscmp(key, spjob->key) == 0); + spjob->addSubexpireKey(subexpireKey, expireT); subexpireKey = nullptr; } } else { @@ -2901,15 +2923,16 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false; - rdbInsertJob job; - job.db = dbCur; - job.key = sdsdupshared(key); - job.val = val; - job.lru_clock = lru_clock; - job.expiretime = expiretime; - job.lru_idle = lru_idle; - job.lfu_freq = lfu_freq; - wqueue.enqueue(job); + if (spjob != nullptr) + wqueue.enqueue(spjob); + spjob = std::make_unique(); + spjob->db = dbCur; + spjob->key = sdsdupshared(key); + spjob->val = val; + spjob->lru_clock = lru_clock; + spjob->expiretime = expiretime; + spjob->lru_idle = lru_idle; + spjob->lfu_freq = lfu_freq; val = nullptr; /* Check if the key already expired. This function is used when loading @@ -2932,6 +2955,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { lru_idle = -1; } + if (spjob != nullptr) + wqueue.enqueue(spjob); + if (key != nullptr) { sdsfree(key);