We need to refactor to gurantee the key is visible when loading subexpires. Keys may be temporarily invisible while waiting to be added to the storage

Former-commit-id: 222eecb95925f7c60e28a5717d73163ad64b522b
This commit is contained in:
John Sully 2021-06-01 23:59:22 +00:00
parent 5037a83257
commit 90ca602123

View File

@ -2343,13 +2343,41 @@ void stopSaving(int success) {
struct rdbInsertJob struct rdbInsertJob
{ {
redisDb *db; redisDb *db = nullptr;
sds key; sds key = nullptr;
robj *val; robj *val = nullptr;
long long lru_clock; long long lru_clock;
long long expiretime; long long expiretime;
long long lru_idle; long long lru_idle;
long long lfu_freq; long long lfu_freq;
std::vector<std::pair<robj_sharedptr, long long>> vecsubexpires;
void addSubexpireKey(robj *subkey, long long when) {
vecsubexpires.push_back(std::make_pair(robj_sharedptr(subkey), when));
decrRefCount(subkey);
}
rdbInsertJob() = default;
rdbInsertJob(rdbInsertJob &&src) {
db = src.db;
src.db = nullptr;
key = src.key;
src.key = nullptr;
val = src.val;
src.val = nullptr;
lru_clock = src.lru_clock;
expiretime = src.expiretime;
lru_idle = src.lru_idle;
lfu_freq = src.lfu_freq;
vecsubexpires = std::move(src.vecsubexpires);
}
~rdbInsertJob() {
if (key)
sdsfree(key);
if (val)
decrRefCount(val);
}
}; };
class rdbAsyncWorkThread class rdbAsyncWorkThread
@ -2402,7 +2430,7 @@ public:
cvThrottle.wait(l); cvThrottle.wait(l);
while (cstorageWritesInFlight.load(std::memory_order_relaxed) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) { while (cstorageWritesInFlight.load(std::memory_order_relaxed) && (getMaxmemoryState(NULL,NULL,NULL,NULL) != C_OK)) {
l.unlock(); l.unlock();
usleep(100); usleep(10);
pauseExecution(); pauseExecution();
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
resumeExecution(); resumeExecution();
@ -2411,11 +2439,10 @@ public:
} }
} }
void enqueue(rdbInsertJob &job) { void enqueue(std::unique_ptr<rdbInsertJob> &spjob) {
rdbInsertJob *pjob = new rdbInsertJob(job);
std::unique_lock<std::mutex> l(mutex); std::unique_lock<std::mutex> l(mutex);
throttle(l); throttle(l);
listAddNodeTail(listJobs, pjob); listAddNodeTail(listJobs, spjob.release());
if (listLength(listJobs) == 1) if (listLength(listJobs) == 1)
cv.notify_one(); cv.notify_one();
} }
@ -2445,12 +2472,14 @@ public:
cv.notify_one(); cv.notify_one();
l.unlock(); l.unlock();
while (!workerThreadDone) { while (!workerThreadDone) {
usleep(100); usleep(10);
pauseExecution();
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
resumeExecution();
} }
m_thread.join(); m_thread.join();
while (cstorageWritesInFlight.load(std::memory_order_seq_cst)) { while (cstorageWritesInFlight.load(std::memory_order_seq_cst)) {
usleep(100); usleep(10);
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
} }
fLaunched = false; fLaunched = false;
@ -2534,17 +2563,15 @@ public:
moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id); moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id);
replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime); replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime);
}
else
{
decrRefCount(job.val);
}
}
for (auto &pair : job.vecsubexpires)
if (job.key != nullptr) {
{ setExpire(NULL, job.db, &keyobj, pair.first, pair.second);
sdsfree(job.key); replicateSubkeyExpire(job.db, &keyobj, pair.first.get(), pair.second);
}
job.val = nullptr; // don't free this as we moved ownership to the DB
}
} }
/* If we have a storage provider check if we need to evict some keys to stay under our memory limit, /* If we have a storage provider check if we need to evict some keys to stay under our memory limit,
@ -2606,6 +2633,7 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
(r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys))) (r->keys_since_last_callback >= g_pserver->loading_process_events_interval_keys)))
{ {
rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg); rdbAsyncWorkThread *pwthread = reinterpret_cast<rdbAsyncWorkThread*>(r->chksum_arg);
listIter li; listIter li;
listNode *ln; listNode *ln;
listRewind(g_pserver->masters, &li); listRewind(g_pserver->masters, &li);
@ -2622,7 +2650,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar); processEventsWhileBlocked(serverTL - g_pserver->rgthreadvar);
if (pwthread) if (pwthread)
pwthread->resumeExecution(); pwthread->resumeExecution();
processModuleLoadingProgressEvent(0); processModuleLoadingProgressEvent(0);
@ -2653,6 +2680,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
robj *subexpireKey = nullptr; robj *subexpireKey = nullptr;
sds key = nullptr; sds key = nullptr;
bool fLastKeyExpired = false; bool fLastKeyExpired = false;
std::unique_ptr<rdbInsertJob> spjob;
rdb->update_cksum = rdbLoadProgressCallback; rdb->update_cksum = rdbLoadProgressCallback;
rdb->chksum_arg = &wqueue; rdb->chksum_arg = &wqueue;
@ -2809,15 +2837,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
} }
else { else {
long long expireT = strtoll(szFromObj(auxval), nullptr, 10); long long expireT = strtoll(szFromObj(auxval), nullptr, 10);
sds keyT = sdsdupshared(key); serverAssert(spjob != nullptr);
wqueue.enqueue([dbCur, subexpireKey, keyT, expireT]{ serverAssert(sdscmp(key, spjob->key) == 0);
redisObjectStack keyobj; spjob->addSubexpireKey(subexpireKey, expireT);
initStaticStringObject(keyobj,keyT);
setExpire(NULL, dbCur, &keyobj, subexpireKey, expireT);
replicateSubkeyExpire(dbCur, &keyobj, subexpireKey, expireT);
decrRefCount(subexpireKey);
sdsfree(keyT);
});
subexpireKey = nullptr; subexpireKey = nullptr;
} }
} else { } else {
@ -2901,15 +2923,16 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false; bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false;
rdbInsertJob job; if (spjob != nullptr)
job.db = dbCur; wqueue.enqueue(spjob);
job.key = sdsdupshared(key); spjob = std::make_unique<rdbInsertJob>();
job.val = val; spjob->db = dbCur;
job.lru_clock = lru_clock; spjob->key = sdsdupshared(key);
job.expiretime = expiretime; spjob->val = val;
job.lru_idle = lru_idle; spjob->lru_clock = lru_clock;
job.lfu_freq = lfu_freq; spjob->expiretime = expiretime;
wqueue.enqueue(job); spjob->lru_idle = lru_idle;
spjob->lfu_freq = lfu_freq;
val = nullptr; val = nullptr;
/* Check if the key already expired. This function is used when loading /* Check if the key already expired. This function is used when loading
@ -2932,6 +2955,9 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
lru_idle = -1; lru_idle = -1;
} }
if (spjob != nullptr)
wqueue.enqueue(spjob);
if (key != nullptr) if (key != nullptr)
{ {
sdsfree(key); sdsfree(key);