diff --git a/src/ae.cpp b/src/ae.cpp index 29d687077..3b27f43dd 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -867,7 +867,7 @@ void aeReleaseLock() g_lock.unlock(); } -void aeSetThreadOwnsLockOverride(bool fOverride) +void aeSetThreadOwnsLockOverride(int fOverride) { fOwnLockOverride = fOverride; } diff --git a/src/ae.h b/src/ae.h index 9d8821143..8a1cdc304 100644 --- a/src/ae.h +++ b/src/ae.h @@ -169,7 +169,7 @@ void aeAcquireLock(); int aeTryAcquireLock(int fWeak); void aeReleaseLock(); int aeThreadOwnsLock(); -void aeSetThreadOwnsLockOverride(bool fOverride); +void aeSetThreadOwnsLockOverride(int fOverride); int aeLockContested(int threshold); int aeLockContention(); // returns the number of instantaneous threads waiting on the lock diff --git a/src/rdb.cpp b/src/rdb.cpp index 92a466f68..d969b4f54 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -519,7 +519,12 @@ void *rdbGenericLoadStringObject(rio *rdb, int flags, size_t *lenptr) { if (len == RDB_LENERR) return NULL; if (plain || sds) { - void *buf = plain ? zmalloc(len, MALLOC_SHARED) : sdsnewlen(SDS_NOINIT,len); + ssize_t lenSigned = (ssize_t)len; + if (flags & RDB_LOAD_SDS_SHARED) + lenSigned = -lenSigned; + void *buf = plain ? zmalloc(len, MALLOC_SHARED) : sdsnewlen(SDS_NOINIT, lenSigned); + if (buf == nullptr) + return nullptr; if (lenptr) *lenptr = len; if (len && rioRead(rdb,buf,len) == 0) { if (plain) @@ -2355,30 +2360,29 @@ class rdbAsyncWorkThread std::vector> queuefn; // for custom jobs std::mutex mutex; std::condition_variable cv; + bool fLaunched = false; bool fExit = false; std::atomic ckeysLoaded; std::thread m_thread; - list *clients_pending_async_write = nullptr; + long long now; public: - rdbAsyncWorkThread(rdbSaveInfo *rsi, int rdbflags) - : rsi(rsi), rdbflags(rdbflags) + rdbAsyncWorkThread(rdbSaveInfo *rsi, int rdbflags, long long now) + : rsi(rsi), rdbflags(rdbflags), now(now) { ckeysLoaded = 0; } ~rdbAsyncWorkThread() { - if (!fExit && m_thread.joinable()) - endWork(); - if (clients_pending_async_write) - listRelease(clients_pending_async_write); + if (m_thread.joinable()) + endWork(); } void start() { - if (clients_pending_async_write == nullptr) - clients_pending_async_write = listCreate(); - m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this, clients_pending_async_write); + serverAssert(!fLaunched); + m_thread = std::thread(&rdbAsyncWorkThread::loadWorkerThreadMain, this); + fLaunched = true; } void enqueue(rdbInsertJob &job) { @@ -2401,25 +2405,28 @@ public: size_t endWork() { std::unique_lock l(mutex); + serverAssert(fLaunched); fExit = true; cv.notify_one(); l.unlock(); m_thread.join(); - listJoin(serverTL->clients_pending_asyncwrite, clients_pending_async_write); - ProcessPendingAsyncWrites(); + fLaunched = false; + fExit = false; + serverAssert(queuejobs.empty()); + serverAssert(queuefn.empty()); return ckeysLoaded; } - static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue, list *clients_pending_asyncwrite) { + static void loadWorkerThreadMain(rdbAsyncWorkThread *pqueue) { rdbAsyncWorkThread &queue = *pqueue; - redisServerThreadVars vars; - vars.clients_pending_asyncwrite = clients_pending_asyncwrite; + redisServerThreadVars vars = {}; + vars.clients_pending_asyncwrite = listCreate(); serverTL = &vars; aeSetThreadOwnsLockOverride(true); for (;;) { std::unique_lock lock(queue.mutex); if (queue.queuejobs.empty() && queue.queuefn.empty()) { - if (queue.queuejobs.empty() && queue.queuefn.empty() && queue.fExit) + if (queue.fExit) break; queue.cv.wait(lock); if (queue.queuejobs.empty() && queue.queuefn.empty() && queue.fExit) @@ -2430,47 +2437,70 @@ public: queue.queuejobs.reserve(1024); auto queuefn = std::move(queue.queuefn); lock.unlock(); - - for (auto &fn : queuefn) { - fn(); - } bool f1024thKey = false; for (auto &job : queuejobs) { redisObjectStack keyobj; initStaticStringObject(keyobj,job.key); - /* Add the new object in the hash table */ - int fInserted = dbMerge(job.db, &keyobj, job.val, (queue.rsi && queue.rsi->fForceSetKey) || (queue.rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef + bool fStaleMvccKey = (pqueue->rsi) ? mvccFromObj(job.val) < pqueue->rsi->mvccMinThreshold : false; - if (fInserted) - { - auto ckeys = queue.ckeysLoaded.fetch_add(1, std::memory_order_relaxed); - f1024thKey = f1024thKey || (ckeys % 1024) == 0; - - /* Set the expire time if needed */ - if (job.expiretime != -1) - { - setExpire(NULL,job.db,&keyobj,nullptr,job.expiretime); + /* Check if the key already expired. This function is used when loading + * an RDB file from disk, either at startup, or when an RDB was + * received from the master. In the latter case, the master is + * responsible for key expiry. If we would expire keys here, the + * snapshot taken by the master may not be reflected on the replica. */ + bool fExpiredKey = iAmMaster() && !(pqueue->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != -1 && job.expiretime < pqueue->now; + if (fStaleMvccKey || fExpiredKey) { + if (fStaleMvccKey && !fExpiredKey && pqueue->rsi != nullptr && pqueue->rsi->mi != nullptr && pqueue->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { + // We have a key that we've already deleted and is not back in our database. + // We'll need to inform the sending master of the delete if it is also a replica of us + robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key))); + pqueue->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); } - - /* Set usage information (for eviction). */ - objectSetLRUOrLFU(job.val,job.lfu_freq,job.lru_idle,job.lru_clock,1000); - - /* call key space notification on key loaded for modules only */ - moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id); - - replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime); - } - else - { + sdsfree(job.key); + job.key = nullptr; decrRefCount(job.val); + job.val = nullptr; + } else { + /* Add the new object in the hash table */ + int fInserted = dbMerge(job.db, &keyobj, job.val, (queue.rsi && queue.rsi->fForceSetKey) || (queue.rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef + + if (fInserted) + { + auto ckeys = queue.ckeysLoaded.fetch_add(1, std::memory_order_relaxed); + f1024thKey = f1024thKey || (ckeys % 1024) == 0; + + /* Set the expire time if needed */ + if (job.expiretime != -1) + { + setExpire(NULL,job.db,&keyobj,nullptr,job.expiretime); + } + + /* Set usage information (for eviction). */ + objectSetLRUOrLFU(job.val,job.lfu_freq,job.lru_idle,job.lru_clock,1000); + + /* call key space notification on key loaded for modules only */ + moduleNotifyKeyspaceEvent(NOTIFY_LOADED, "loaded", &keyobj, job.db->id); + + replicationNotifyLoadedKey(job.db, &keyobj, job.val, job.expiretime); + } + else + { + decrRefCount(job.val); + } } + + if (job.key != nullptr) { sdsfree(job.key); } } + + for (auto &fn : queuefn) { + fn(); + } /* If we have a storage provider check if we need to evict some keys to stay under our memory limit, do this every 16 keys to limit the perf impact */ @@ -2494,6 +2524,11 @@ public: } } } + std::unique_lock lock(queue.mutex); + serverAssert(queue.queuefn.empty()); + serverAssert(queue.queuejobs.empty()); + ProcessPendingAsyncWrites(); + listRelease(vars.clients_pending_asyncwrite); aeSetThreadOwnsLockOverride(false); } }; @@ -2547,7 +2582,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now; long long lru_clock = 0; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; - rdbAsyncWorkThread wqueue(rsi, rdbflags); + now = mstime(); + rdbAsyncWorkThread wqueue(rsi, rdbflags, now); robj *subexpireKey = nullptr; sds key = nullptr; bool fLastKeyExpired = false; @@ -2574,7 +2610,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { return C_ERR; } - now = mstime(); lru_clock = LRU_CLOCK(); wqueue.start(); @@ -2711,12 +2746,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } else { long long expireT = strtoll(szFromObj(auxval), nullptr, 10); - wqueue.enqueue([dbCur, subexpireKey, key, expireT]{ + sds keyT = sdsdupshared(key); + wqueue.enqueue([dbCur, subexpireKey, keyT, expireT]{ redisObjectStack keyobj; - initStaticStringObject(keyobj,key); + initStaticStringObject(keyobj,keyT); setExpire(NULL, dbCur, &keyobj, subexpireKey, expireT); replicateSubkeyExpire(dbCur, &keyobj, subexpireKey, expireT); decrRefCount(subexpireKey); + sdsfree(keyT); }); subexpireKey = nullptr; } @@ -2790,7 +2827,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { key = nullptr; } - if ((key = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) + if ((key = (sds)rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS_SHARED,NULL)) == NULL) goto eoferr; /* Read value */ if ((val = rdbLoadObject(type,rdb,key,mvcc_tstamp)) == NULL) { @@ -2798,45 +2835,27 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { key = nullptr; goto eoferr; } + + bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false; + rdbInsertJob job; + job.db = dbCur; + job.key = sdsdupshared(key); + job.val = val; + job.lru_clock = lru_clock; + job.expiretime = expiretime; + job.lru_idle = lru_idle; + job.lfu_freq = lfu_freq; + wqueue.enqueue(job); + val = nullptr; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the replica. */ - redisObjectStack keyobj; - initStaticStringObject(keyobj,key); bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now; - if (fStaleMvccKey || fExpiredKey) { - #if 0 // TODO! - if (fStaleMvccKey && !fExpiredKey && rsi != nullptr && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, &keyobj) == nullptr) { - // We have a key that we've already deleted and is not back in our database. - // We'll need to inform the sending master of the delete if it is also a replica of us - robj_sharedptr objKeyDup(createStringObject(key, sdslen(key))); - rsi->mi->staleKeyMap->operator[](db->id).push_back(objKeyDup); - } - fLastKeyExpired = true; - sdsfree(key); - key = nullptr; - decrRefCount(val); - val = nullptr; - #endif - } else { - fLastKeyExpired = false; - rdbInsertJob job; - job.db = dbCur; - job.key = key; - job.val = val; - job.lru_clock = lru_clock; - job.expiretime = expiretime; - job.lru_idle = lru_idle; - job.lfu_freq = lfu_freq; - wqueue.enqueue(job); - - key = nullptr; - val = nullptr; - } + fLastKeyExpired = fStaleMvccKey || fExpiredKey; if (g_pserver->key_load_delay) usleep(g_pserver->key_load_delay); diff --git a/src/rdb.h b/src/rdb.h index c561a8799..38fded807 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -119,10 +119,11 @@ #define RDB_MODULE_OPCODE_STRING 5 /* String. */ /* rdbLoad...() functions flags. */ -#define RDB_LOAD_NONE 0 -#define RDB_LOAD_ENC (1<<0) -#define RDB_LOAD_PLAIN (1<<1) -#define RDB_LOAD_SDS (1<<2) +#define RDB_LOAD_NONE 0 +#define RDB_LOAD_ENC (1<<0) +#define RDB_LOAD_PLAIN (1<<1) +#define RDB_LOAD_SDS (1<<2) +#define RDB_LOAD_SDS_SHARED ((1 << 3) | RDB_LOAD_SDS) /* flags on the purpose of rdb save or load */ #define RDBFLAGS_NONE 0 /* No special RDB loading. */