From d95ae909a904a5d9ccd98ef580a44c9596b29b0f Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 16 Dec 2019 22:08:18 -0500 Subject: [PATCH] Threading fixes Former-commit-id: 4f1cec6abd72d6e215880f2ce7812e88509cd218 --- src/db.cpp | 6 ++++++ src/dict.cpp | 8 +++++--- src/fastlock.cpp | 2 +- src/rdb.cpp | 7 ++----- src/server.cpp | 2 +- src/server.h | 2 ++ 6 files changed, 17 insertions(+), 10 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 0b294b4ce..7a9dbb2ef 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2076,6 +2076,12 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) dictSetVal(m_pdict, *pde, o); }); } + + if (*pde != nullptr && dictGetVal(*pde) != nullptr) + { + robj *o = (robj*)dictGetVal(*pde); + serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end())); + } } void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o) diff --git a/src/dict.cpp b/src/dict.cpp index 5217081ac..d3a9f735e 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -258,7 +258,9 @@ int dictRehashMilliseconds(dict *d, int ms) { * dictionary so that the hash table automatically migrates from H1 to H2 * while it is actively used. */ static void _dictRehashStep(dict *d) { - if (d->iterators == 0) dictRehash(d,1); + unsigned long iterators; + __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); + if (iterators == 0) dictRehash(d,1); } /* Add an element to the target hash table */ @@ -566,7 +568,7 @@ dictEntry *dictNext(dictIterator *iter) dictht *ht = &iter->d->ht[iter->table]; if (iter->index == -1 && iter->table == 0) { if (iter->safe) - iter->d->iterators++; + __atomic_fetch_add(&iter->d->iterators, 1, __ATOMIC_RELAXED); else iter->fingerprint = dictFingerprint(iter->d); } @@ -598,7 +600,7 @@ void dictReleaseIterator(dictIterator *iter) { if (!(iter->index == -1 && iter->table == 0)) { if (iter->safe) - iter->d->iterators--; + __atomic_fetch_sub(&iter->d->iterators, 1, __ATOMIC_RELAXED); else assert(iter->fingerprint == dictFingerprint(iter->d)); } diff --git a/src/fastlock.cpp b/src/fastlock.cpp index d566bb267..9acc854b3 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -165,7 +165,7 @@ public: auto itr = m_mapwait.find(pidCheck); if (itr == m_mapwait.end()) break; - pidCheck = itr->second->m_pidOwner; + __atomic_load(&itr->second->m_pidOwner, &pidCheck, __ATOMIC_RELAXED); if (pidCheck == thispid) { // Deadlock detected, printout some debugging info and crash diff --git a/src/rdb.cpp b/src/rdb.cpp index e1c5567fc..c8a8c5797 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1121,7 +1121,7 @@ int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t initStaticStringObject(key,(char*)keystr); const expireEntry *pexpire = db->getExpire(&key); - serverAssert(!o->FExpires() || pexpire != nullptr); + serverAssert((o->FExpires() && pexpire != nullptr) || (!o->FExpires() && pexpire == nullptr)); if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1) return 0; @@ -1183,11 +1183,8 @@ int rdbSaveRio(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error /* Iterate this DB writing every entry */ size_t ckeysExpired = 0; bool fSavedAll = db->iterate_threadsafe([&](const char *keystr, robj_roptr o)->bool { - if (o->FExpires()) { + if (o->FExpires()) ++ckeysExpired; - } else { - serverAssert(db->getExpire(keystr) == nullptr); - } if (!saveKey(rdb, db, flags, &processed, keystr, o)) return false; diff --git a/src/server.cpp b/src/server.cpp index 115ca879f..92a5b6f6a 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -5091,7 +5091,7 @@ void incrementMvccTstamp() } else { - atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT); + atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)mst) << MVCC_MS_SHIFT); } } diff --git a/src/server.h b/src/server.h index c4666d650..ea57b140f 100644 --- a/src/server.h +++ b/src/server.h @@ -96,6 +96,8 @@ typedef long long mstime_t; /* millisecond time type. */ #include "AsyncWorkQueue.h" #include "gc.h" +#define FImplies(x, y) (!(x) || (y)) + extern int g_fTestMode; struct redisObject;