Fix TSAN errors in tests

Former-commit-id: 30207d2ab34f175b94f430f97581191343d23f1e
This commit is contained in:
John Sully 2021-03-16 02:29:24 +00:00
parent dc658ba47b
commit 15e1ee620f
8 changed files with 32 additions and 34 deletions

View File

@ -49,6 +49,11 @@ void AsyncWorkQueue::WorkerThreadMain()
} }
listRelease(vars.clients_pending_asyncwrite); listRelease(vars.clients_pending_asyncwrite);
std::unique_lock<fastlock> lockf(serverTL->lockPendingWrite);
serverTL->vecclientsProcess.clear();
serverTL->clients_pending_write.clear();
std::atomic_thread_fence(std::memory_order_seq_cst);
} }
bool AsyncWorkQueue::removeClientAsyncWrites(client *c) bool AsyncWorkQueue::removeClientAsyncWrites(client *c)

View File

@ -31,7 +31,8 @@ void cronCommand(client *c)
if (getLongLongFromObjectOrReply(c, c->argv[ARG_EXPIRE], &interval, "missing expire time") != C_OK) if (getLongLongFromObjectOrReply(c, c->argv[ARG_EXPIRE], &interval, "missing expire time") != C_OK)
return; return;
long long base = g_pserver->mstime; long long base;
__atomic_load(&g_pserver->mstime, &base, __ATOMIC_ACQUIRE);
if (getLongLongFromObject(c->argv[ARG_EXPIRE+1], &base) == C_OK) { if (getLongLongFromObject(c->argv[ARG_EXPIRE+1], &base) == C_OK) {
arg_offset++; arg_offset++;
std::swap(base, interval); std::swap(base, interval);
@ -120,18 +121,20 @@ void executeCronJobExpireHook(const char *key, robj *o)
else else
{ {
job->startTime += job->interval; job->startTime += job->interval;
if (job->startTime < (uint64_t)g_pserver->mstime) mstime_t mstime;
__atomic_load(&g_pserver->mstime, &mstime, __ATOMIC_ACQUIRE);
if (job->startTime < (uint64_t)mstime)
{ {
// If we are more than one interval in the past then fast forward to // If we are more than one interval in the past then fast forward to
// the first interval still in the future. If startTime wasn't zero align // the first interval still in the future. If startTime wasn't zero align
// this to the original startTime, if it was zero align to now // this to the original startTime, if it was zero align to now
if (job->startTime == job->interval) if (job->startTime == job->interval)
{ // startTime was 0 { // startTime was 0
job->startTime = g_pserver->mstime + job->interval; job->startTime = mstime + job->interval;
} }
else else
{ {
auto delta = g_pserver->mstime - job->startTime; auto delta = mstime - job->startTime;
auto multiple = (delta / job->interval)+1; auto multiple = (delta / job->interval)+1;
job->startTime += job->interval * multiple; job->startTime += job->interval * multiple;
} }

View File

@ -1713,7 +1713,8 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when)
/* Update TTL stats (exponential moving average) */ /* Update TTL stats (exponential moving average) */
/* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */ /* Note: We never have to update this on expiry since we reduce it by the current elapsed time here */
long long now = g_pserver->mstime; mstime_t now;
__atomic_load(&g_pserver->mstime, &now, __ATOMIC_ACQUIRE);
db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed db->avg_ttl -= (now - db->last_expire_set); // reduce the TTL by the time that has elapsed
if (db->expireSize() == 0) if (db->expireSize() == 0)
db->avg_ttl = 0; db->avg_ttl = 0;
@ -1894,7 +1895,7 @@ int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key) {
* open object in a next call, if the next call will see the key expired, * open object in a next call, if the next call will see the key expired,
* while the first did not. */ * while the first did not. */
else if (serverTL->fixed_time_expire > 0) { else if (serverTL->fixed_time_expire > 0) {
now = g_pserver->mstime; __atomic_load(&g_pserver->mstime, &now, __ATOMIC_ACQUIRE);
} }
/* For the other cases, we want to use the most fresh time we have. */ /* For the other cases, we want to use the most fresh time we have. */
else { else {
@ -2854,16 +2855,16 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
return true; return true;
} }
redisDbPersistentData::redisDbPersistentData() {
m_dictChanged = dictCreate(&dictChangeDescType, nullptr);
}
void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint) void redisDbPersistentData::trackChanges(bool fBulk, size_t sizeHint)
{ {
m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed); m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed);
if (fBulk) if (fBulk)
m_fAllChanged.fetch_add(1, std::memory_order_acq_rel); m_fAllChanged.fetch_add(1, std::memory_order_acq_rel);
if (m_dictChanged == nullptr) {
m_dictChanged = dictCreate(&dictChangeDescType, nullptr);
}
if (sizeHint > 0) if (sizeHint > 0)
dictExpand(m_dictChanged, sizeHint, false); dictExpand(m_dictChanged, sizeHint, false);
} }

View File

@ -433,7 +433,7 @@ extern "C" void fastlock_unlock(struct fastlock *lock)
serverAssert(pidT >= 0); // unlock after free serverAssert(pidT >= 0); // unlock after free
int t = -1; int t = -1;
__atomic_store(&lock->m_pidOwner, &t, __ATOMIC_RELEASE); __atomic_store(&lock->m_pidOwner, &t, __ATOMIC_RELEASE);
std::atomic_thread_fence(std::memory_order_release); std::atomic_thread_fence(std::memory_order_acq_rel);
ANNOTATE_RWLOCK_RELEASED(lock, true); ANNOTATE_RWLOCK_RELEASED(lock, true);
uint16_t activeNew = __atomic_add_fetch(&lock->m_ticket.m_active, 1, __ATOMIC_RELEASE); // on x86 the atomic is not required here, but ASM handles that case uint16_t activeNew = __atomic_add_fetch(&lock->m_ticket.m_active, 1, __ATOMIC_RELEASE); // on x86 the atomic is not required here, but ASM handles that case
#ifdef __linux__ #ifdef __linux__

View File

@ -29,6 +29,11 @@ class GarbageCollector
}; };
public: public:
~GarbageCollector() {
// Silence TSAN errors
m_lock.lock();
}
uint64_t startEpoch() uint64_t startEpoch()
{ {
std::unique_lock<fastlock> lock(m_lock); std::unique_lock<fastlock> lock(m_lock);
@ -41,6 +46,7 @@ public:
{ {
std::unique_lock<fastlock> lock(m_lock); std::unique_lock<fastlock> lock(m_lock);
m_vecepochs.clear(); m_vecepochs.clear();
m_setepochOutstanding.clear();
} }
void endEpoch(uint64_t epoch, bool fNoFree = false) void endEpoch(uint64_t epoch, bool fNoFree = false)

View File

@ -2370,21 +2370,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
} }
} }
class EvictionPolicyCleanup
{
int oldpolicy;
public:
EvictionPolicyCleanup() {
oldpolicy = g_pserver->maxmemory_policy;
g_pserver->maxmemory_policy = MAXMEMORY_ALLKEYS_RANDOM;
}
~EvictionPolicyCleanup() {
g_pserver->maxmemory_policy = oldpolicy;
}
};
/* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned, /* Load an RDB file from the rio stream 'rdb'. On success C_OK is returned,
* otherwise C_ERR is returned and 'errno' is set accordingly. */ * otherwise C_ERR is returned and 'errno' is set accordingly. */
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
@ -2401,10 +2386,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
sds key = nullptr; sds key = nullptr;
bool fLastKeyExpired = false; bool fLastKeyExpired = false;
// If we're running flash we may evict during load. We want a fast eviction function
// because there isn't any difference in use times between keys anyways
EvictionPolicyCleanup ecleanup;
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
{ {
g_pserver->db[idb]->trackChanges(true, 1024); g_pserver->db[idb]->trackChanges(true, 1024);

View File

@ -3926,7 +3926,7 @@ void call(client *c, int flags) {
dirty = g_pserver->dirty; dirty = g_pserver->dirty;
incrementMvccTstamp(); incrementMvccTstamp();
__atomic_load(&g_pserver->ustime, &start, __ATOMIC_SEQ_CST); __atomic_load(&g_pserver->ustime, &start, __ATOMIC_SEQ_CST);
start = g_pserver->ustime;
try { try {
c->cmd->proc(c); c->cmd->proc(c);
} catch (robj_roptr o) { } catch (robj_roptr o) {
@ -5403,10 +5403,12 @@ sds genRedisInfoString(const char *section) {
vkeys = g_pserver->db[j]->expireSize(); vkeys = g_pserver->db[j]->expireSize();
// Adjust TTL by the current time // Adjust TTL by the current time
g_pserver->db[j]->avg_ttl -= (g_pserver->mstime - g_pserver->db[j]->last_expire_set); mstime_t mstime;
__atomic_load(&g_pserver->mstime, &mstime, __ATOMIC_ACQUIRE);
g_pserver->db[j]->avg_ttl -= (mstime - g_pserver->db[j]->last_expire_set);
if (g_pserver->db[j]->avg_ttl < 0) if (g_pserver->db[j]->avg_ttl < 0)
g_pserver->db[j]->avg_ttl = 0; g_pserver->db[j]->avg_ttl = 0;
g_pserver->db[j]->last_expire_set = g_pserver->mstime; g_pserver->db[j]->last_expire_set = mstime;
if (keys || vkeys) { if (keys || vkeys) {
info = sdscatprintf(info, info = sdscatprintf(info,

View File

@ -1045,9 +1045,9 @@ class redisDbPersistentData
friend class redisDbPersistentDataSnapshot; friend class redisDbPersistentDataSnapshot;
public: public:
redisDbPersistentData();
virtual ~redisDbPersistentData(); virtual ~redisDbPersistentData();
redisDbPersistentData() = default;
redisDbPersistentData(redisDbPersistentData &&) = default; redisDbPersistentData(redisDbPersistentData &&) = default;
size_t slots() const { return dictSlots(m_pdict); } size_t slots() const { return dictSlots(m_pdict); }