diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index 48252ac97..dae36cc06 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -49,6 +49,11 @@ void AsyncWorkQueue::WorkerThreadMain() } listRelease(vars.clients_pending_asyncwrite); + + std::unique_lock lockf(serverTL->lockPendingWrite); + serverTL->vecclientsProcess.clear(); + serverTL->clients_pending_write.clear(); + std::atomic_thread_fence(std::memory_order_seq_cst); } bool AsyncWorkQueue::removeClientAsyncWrites(client *c) diff --git a/src/cron.cpp b/src/cron.cpp index 584407fac..876c69d88 100644 --- a/src/cron.cpp +++ b/src/cron.cpp @@ -31,7 +31,8 @@ void cronCommand(client *c) if (getLongLongFromObjectOrReply(c, c->argv[ARG_EXPIRE], &interval, "missing expire time") != C_OK) return; - long long base = g_pserver->mstime; + long long base; + __atomic_load(&g_pserver->mstime, &base, __ATOMIC_ACQUIRE); if (getLongLongFromObject(c->argv[ARG_EXPIRE+1], &base) == C_OK) { arg_offset++; std::swap(base, interval); @@ -120,18 +121,20 @@ void executeCronJobExpireHook(const char *key, robj *o) else { job->startTime += job->interval; - if (job->startTime < (uint64_t)g_pserver->mstime) + mstime_t mstime; + __atomic_load(&g_pserver->mstime, &mstime, __ATOMIC_ACQUIRE); + if (job->startTime < (uint64_t)mstime) { // If we are more than one interval in the past then fast forward to // the first interval still in the future. If startTime wasn't zero align // this to the original startTime, if it was zero align to now if (job->startTime == job->interval) { // startTime was 0 - job->startTime = g_pserver->mstime + job->interval; + job->startTime = mstime + job->interval; } else { - auto delta = g_pserver->mstime - job->startTime; + auto delta = mstime - job->startTime; auto multiple = (delta / job->interval)+1; job->startTime += job->interval * multiple; } diff --git a/src/db.cpp b/src/db.cpp index b1aa2bad3..20b70fe02 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1713,7 +1713,8 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) /* Update TTL stats (exponential moving average) */ /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ - long long now = g_pserver->mstime; + mstime_t now; + __atomic_load(&g_pserver->mstime, &now, __ATOMIC_ACQUIRE); db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed if (db->expireSize() == 0) db->avg_ttl = 0; @@ -1894,7 +1895,7 @@ int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) { * open object in a next call, if the next call will see the key expired, * while the first did not. */ else if (serverTL->fixed_time_expire > 0) { - now = g_pserver->mstime; + __atomic_load(&g_pserver->mstime, &now, __ATOMIC_ACQUIRE); } /* For the other cases, we want to use the most fresh time we have. */ else { @@ -2854,16 +2855,16 @@ bool redisDbPersistentData::removeCachedValue(const char *key) return true; } +redisDbPersistentData::redisDbPersistentData() { + m_dictChanged = dictCreate(&dictChangeDescType, nullptr); +} + void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint) { m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed); if (fBulk) m_fAllChanged.fetch_add(1, std::memory_order_acq_rel); - if (m_dictChanged == nullptr) { - m_dictChanged = dictCreate(&dictChangeDescType, nullptr); - } - if (sizeHint > 0) dictExpand(m_dictChanged, sizeHint, false); } diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 6e6b6e4d2..38afaa075 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -433,7 +433,7 @@ extern "C" void fastlock_unlock(struct fastlock *lock) serverAssert(pidT >= 0); // unlock after free int t = -1; __atomic_store(&lock->m_pidOwner, &t, __ATOMIC_RELEASE); - std::atomic_thread_fence(std::memory_order_release); + std::atomic_thread_fence(std::memory_order_acq_rel); ANNOTATE_RWLOCK_RELEASED(lock, true); uint16_t activeNew = __atomic_add_fetch(&lock->m_ticket.m_active, 1, __ATOMIC_RELEASE); // on x86 the atomic is not required here, but ASM handles that case #ifdef __linux__ diff --git a/src/gc.h b/src/gc.h index 5c0596963..4715bc8de 100644 --- a/src/gc.h +++ b/src/gc.h @@ -29,6 +29,11 @@ class GarbageCollector }; public: + ~GarbageCollector() { + // Silence TSAN errors + m_lock.lock(); + } + uint64_t startEpoch() { std::unique_lock lock(m_lock); @@ -41,6 +46,7 @@ public: { std::unique_lock lock(m_lock); m_vecepochs.clear(); + m_setepochOutstanding.clear(); } void endEpoch(uint64_t epoch, bool fNoFree = false) diff --git a/src/rdb.cpp b/src/rdb.cpp index cedf787ce..ec510546d 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2370,21 +2370,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { } } -class EvictionPolicyCleanup -{ - int oldpolicy; - -public: - EvictionPolicyCleanup() { - oldpolicy = g_pserver->maxmemory_policy; - g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_RANDOM; - } - - ~EvictionPolicyCleanup() { - g_pserver->maxmemory_policy = oldpolicy; - } -}; - /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, * otherwise C_ERR is returned and 'errno' is set accordingly. */ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { @@ -2401,10 +2386,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { sds key = nullptr; bool fLastKeyExpired = false; - // If we're running flash we may evict during load. We want a fast eviction function - // because there isn't any difference in use times between keys anyways - EvictionPolicyCleanup ecleanup; - for (int idb = 0; idb < cserver.dbnum; ++idb) { g_pserver->db[idb]->trackChanges(true, 1024); diff --git a/src/server.cpp b/src/server.cpp index 56c476aa8..225e0cd2c 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3926,7 +3926,7 @@ void call(client *c, int flags) { dirty = g_pserver->dirty; incrementMvccTstamp(); __atomic_load(&g_pserver->ustime, &start, __ATOMIC_SEQ_CST); - start = g_pserver->ustime; + try { c->cmd->proc(c); } catch (robj_roptr o) { @@ -5403,10 +5403,12 @@ sds genRedisInfoString(const char *section) { vkeys = g_pserver->db[j]->expireSize(); // Adjust TTL by the current time - g_pserver->db[j]->avg_ttl -= (g_pserver->mstime - g_pserver->db[j]->last_expire_set); + mstime_t mstime; + __atomic_load(&g_pserver->mstime, &mstime, __ATOMIC_ACQUIRE); + g_pserver->db[j]->avg_ttl -= (mstime - g_pserver->db[j]->last_expire_set); if (g_pserver->db[j]->avg_ttl < 0) g_pserver->db[j]->avg_ttl = 0; - g_pserver->db[j]->last_expire_set = g_pserver->mstime; + g_pserver->db[j]->last_expire_set = mstime; if (keys || vkeys) { info = sdscatprintf(info, diff --git a/src/server.h b/src/server.h index 42a32ecde..55666e612 100644 --- a/src/server.h +++ b/src/server.h @@ -1045,9 +1045,9 @@ class redisDbPersistentData friend class redisDbPersistentDataSnapshot; public: + redisDbPersistentData(); virtual ~redisDbPersistentData(); - redisDbPersistentData() = default; redisDbPersistentData(redisDbPersistentData &&) = default; size_t slots() const { return dictSlots(m_pdict); }