Threading fixes

Former-commit-id: 4f1cec6abd72d6e215880f2ce7812e88509cd218
This commit is contained in:
John Sully 2019-12-16 22:08:18 -05:00
parent d040a7a3d7
commit d95ae909a9
6 changed files with 17 additions and 10 deletions

View File

@ -2076,6 +2076,12 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
dictSetVal(m_pdict, *pde, o);
});
}
if (*pde != nullptr && dictGetVal(*pde) != nullptr)
{
robj *o = (robj*)dictGetVal(*pde);
serverAssert(o->FExpires() == (m_setexpire->find(sdsKey) != m_setexpire->end()));
}
}
void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o)

View File

@ -258,7 +258,9 @@ int dictRehashMilliseconds(dict *d, int ms) {
* dictionary so that the hash table automatically migrates from H1 to H2
* while it is actively used. */
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
unsigned long iterators;
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
if (iterators == 0) dictRehash(d,1);
}
/* Add an element to the target hash table */
@ -566,7 +568,7 @@ dictEntry *dictNext(dictIterator *iter)
dictht *ht = &iter->d->ht[iter->table];
if (iter->index == -1 && iter->table == 0) {
if (iter->safe)
iter->d->iterators++;
__atomic_fetch_add(&iter->d->iterators, 1, __ATOMIC_RELAXED);
else
iter->fingerprint = dictFingerprint(iter->d);
}
@ -598,7 +600,7 @@ void dictReleaseIterator(dictIterator *iter)
{
if (!(iter->index == -1 && iter->table == 0)) {
if (iter->safe)
iter->d->iterators--;
__atomic_fetch_sub(&iter->d->iterators, 1, __ATOMIC_RELAXED);
else
assert(iter->fingerprint == dictFingerprint(iter->d));
}

View File

@ -165,7 +165,7 @@ public:
auto itr = m_mapwait.find(pidCheck);
if (itr == m_mapwait.end())
break;
pidCheck = itr->second->m_pidOwner;
__atomic_load(&itr->second->m_pidOwner, &pidCheck, __ATOMIC_RELAXED);
if (pidCheck == thispid)
{
// Deadlock detected, printout some debugging info and crash

View File

@ -1121,7 +1121,7 @@ int saveKey(rio *rdb, const redisDbPersistentDataSnapshot *db, int flags, size_t
initStaticStringObject(key,(char*)keystr);
const expireEntry *pexpire = db->getExpire(&key);
serverAssert(!o->FExpires() || pexpire != nullptr);
serverAssert((o->FExpires() && pexpire != nullptr) || (!o->FExpires() && pexpire == nullptr));
if (rdbSaveKeyValuePair(rdb,&key,o,pexpire) == -1)
return 0;
@ -1183,11 +1183,8 @@ int rdbSaveRio(rio *rdb, const redisDbPersistentDataSnapshot **rgpdb, int *error
/* Iterate this DB writing every entry */
size_t ckeysExpired = 0;
bool fSavedAll = db->iterate_threadsafe([&](const char *keystr, robj_roptr o)->bool {
if (o->FExpires()) {
if (o->FExpires())
++ckeysExpired;
} else {
serverAssert(db->getExpire(keystr) == nullptr);
}
if (!saveKey(rdb, db, flags, &processed, keystr, o))
return false;

View File

@ -5091,7 +5091,7 @@ void incrementMvccTstamp()
}
else
{
atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)g_pserver->mstime) << MVCC_MS_SHIFT);
atomicSet(g_pserver->mvcc_tstamp, ((uint64_t)mst) << MVCC_MS_SHIFT);
}
}

View File

@ -96,6 +96,8 @@ typedef long long mstime_t; /* millisecond time type. */
#include "AsyncWorkQueue.h"
#include "gc.h"
#define FImplies(x, y) (!(x) || (y))
extern int g_fTestMode;
struct redisObject;