Fix multithreading data races
Former-commit-id: 80f6e5818fd575cb08a5f620c35eed1cd862eb57
This commit is contained in:
parent
5f64e227a4
commit
da4adb261b
27
src/db.cpp
27
src/db.cpp
@ -674,6 +674,7 @@ bool redisDbPersistentData::iterate_threadsafe(std::function<bool(const char*, r
|
|||||||
dictIterator *di = dictGetIterator(m_pdict);
|
dictIterator *di = dictGetIterator(m_pdict);
|
||||||
dictEntry *de = nullptr;
|
dictEntry *de = nullptr;
|
||||||
bool fResult = true;
|
bool fResult = true;
|
||||||
|
serverAssert(m_pdict->iterators > 0);
|
||||||
|
|
||||||
while((de = dictNext(di)) != nullptr)
|
while((de = dictNext(di)) != nullptr)
|
||||||
{
|
{
|
||||||
@ -789,7 +790,7 @@ void keysCommand(client *c) {
|
|||||||
|
|
||||||
const redisDbPersistentData *snapshot = nullptr;
|
const redisDbPersistentData *snapshot = nullptr;
|
||||||
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
|
if (!(c->flags & (CLIENT_MULTI | CLIENT_BLOCKED)))
|
||||||
snapshot = c->db->createSnapshot(c->mvccCheckpoint);
|
snapshot = c->db->createSnapshot(c->mvccCheckpoint, true /* fOptional */);
|
||||||
if (snapshot != nullptr)
|
if (snapshot != nullptr)
|
||||||
{
|
{
|
||||||
sds patternCopy = sdsdup(pattern);
|
sds patternCopy = sdsdup(pattern);
|
||||||
@ -2176,11 +2177,22 @@ void redisDbPersistentData::processChanges()
|
|||||||
m_setchanged.clear();
|
m_setchanged.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
const redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint)
|
const redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional)
|
||||||
{
|
{
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
serverAssert(m_refCount == 0); // do not call this on a snapshot
|
serverAssert(m_refCount == 0); // do not call this on a snapshot
|
||||||
|
|
||||||
|
// First see if we have too many levels and can bail out of this to reduce load
|
||||||
|
int levels = 1;
|
||||||
|
redisDbPersistentData *psnapshot = m_spdbSnapshotHOLDER.get();
|
||||||
|
while (psnapshot != nullptr)
|
||||||
|
{
|
||||||
|
++levels;
|
||||||
|
psnapshot = psnapshot->m_spdbSnapshotHOLDER.get();
|
||||||
|
}
|
||||||
|
if (fOptional && (levels > 8))
|
||||||
|
return nullptr;
|
||||||
|
|
||||||
if (m_spdbSnapshotHOLDER != nullptr)
|
if (m_spdbSnapshotHOLDER != nullptr)
|
||||||
{
|
{
|
||||||
if (mvccCheckpoint <= m_spdbSnapshotHOLDER->mvccCheckpoint)
|
if (mvccCheckpoint <= m_spdbSnapshotHOLDER->mvccCheckpoint)
|
||||||
@ -2188,15 +2200,17 @@ const redisDbPersistentData *redisDbPersistentData::createSnapshot(uint64_t mvcc
|
|||||||
m_spdbSnapshotHOLDER->m_refCount++;
|
m_spdbSnapshotHOLDER->m_refCount++;
|
||||||
return m_spdbSnapshotHOLDER.get();
|
return m_spdbSnapshotHOLDER.get();
|
||||||
}
|
}
|
||||||
serverLog(LL_WARNING, "Nested snapshot created");
|
serverLog(levels > 5 ? LL_NOTICE : LL_VERBOSE, "Nested snapshot created: %d levels", levels);
|
||||||
}
|
}
|
||||||
auto spdb = std::unique_ptr<redisDbPersistentData>(new (MALLOC_LOCAL) redisDbPersistentData());
|
auto spdb = std::unique_ptr<redisDbPersistentData>(new (MALLOC_LOCAL) redisDbPersistentData());
|
||||||
|
|
||||||
spdb->m_fAllChanged = false;
|
spdb->m_fAllChanged = false;
|
||||||
spdb->m_fTrackingChanges = 0;
|
spdb->m_fTrackingChanges = 0;
|
||||||
spdb->m_pdict = m_pdict;
|
spdb->m_pdict = m_pdict;
|
||||||
spdb->m_pdict->iterators++;
|
|
||||||
spdb->m_pdictTombstone = m_pdictTombstone;
|
spdb->m_pdictTombstone = m_pdictTombstone;
|
||||||
|
// Add a fake iterator so the dicts don't rehash (they need to be read only)
|
||||||
|
spdb->m_pdict->iterators++;
|
||||||
|
dictForceRehash(spdb->m_pdictTombstone); // prevent rehashing by finishing the rehash now
|
||||||
spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER);
|
spdb->m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER);
|
||||||
spdb->m_pdbSnapshot = m_pdbSnapshot;
|
spdb->m_pdbSnapshot = m_pdbSnapshot;
|
||||||
spdb->m_refCount = 1;
|
spdb->m_refCount = 1;
|
||||||
@ -2264,7 +2278,9 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot)
|
|||||||
|
|
||||||
serverAssert(m_spdbSnapshotHOLDER->m_pdict->iterators == 1); // All iterators should have been free'd except the fake one from createSnapshot
|
serverAssert(m_spdbSnapshotHOLDER->m_pdict->iterators == 1); // All iterators should have been free'd except the fake one from createSnapshot
|
||||||
if (m_refCount == 0)
|
if (m_refCount == 0)
|
||||||
|
{
|
||||||
m_spdbSnapshotHOLDER->m_pdict->iterators--;
|
m_spdbSnapshotHOLDER->m_pdict->iterators--;
|
||||||
|
}
|
||||||
|
|
||||||
if (m_pdbSnapshot == nullptr)
|
if (m_pdbSnapshot == nullptr)
|
||||||
{
|
{
|
||||||
@ -2336,7 +2352,9 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentData *psnapshot)
|
|||||||
|
|
||||||
// Fixup the about to free'd snapshots iterator count so the dtor doesn't complain
|
// Fixup the about to free'd snapshots iterator count so the dtor doesn't complain
|
||||||
if (m_refCount)
|
if (m_refCount)
|
||||||
|
{
|
||||||
m_spdbSnapshotHOLDER->m_pdict->iterators--;
|
m_spdbSnapshotHOLDER->m_pdict->iterators--;
|
||||||
|
}
|
||||||
|
|
||||||
m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER->m_spdbSnapshotHOLDER);
|
m_spdbSnapshotHOLDER = std::move(m_spdbSnapshotHOLDER->m_spdbSnapshotHOLDER);
|
||||||
serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr);
|
serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr);
|
||||||
@ -2350,6 +2368,7 @@ redisDbPersistentData::~redisDbPersistentData()
|
|||||||
serverAssert(m_pdbSnapshot == nullptr);
|
serverAssert(m_pdbSnapshot == nullptr);
|
||||||
serverAssert(m_refCount == 0);
|
serverAssert(m_refCount == 0);
|
||||||
serverAssert(m_pdict->iterators == 0);
|
serverAssert(m_pdict->iterators == 0);
|
||||||
|
serverAssert(m_pdictTombstone == nullptr || m_pdictTombstone->iterators == 0);
|
||||||
dictRelease(m_pdict);
|
dictRelease(m_pdict);
|
||||||
if (m_pdictTombstone)
|
if (m_pdictTombstone)
|
||||||
dictRelease(m_pdictTombstone);
|
dictRelease(m_pdictTombstone);
|
||||||
|
@ -1132,6 +1132,11 @@ void dictGetStats(char *buf, size_t bufsize, dict *d) {
|
|||||||
if (orig_bufsize) orig_buf[orig_bufsize-1] = '\0';
|
if (orig_bufsize) orig_buf[orig_bufsize-1] = '\0';
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dictForceRehash(dict *d)
|
||||||
|
{
|
||||||
|
while (dictIsRehashing(d)) _dictRehashStep(d);
|
||||||
|
}
|
||||||
|
|
||||||
/* ------------------------------- Benchmark ---------------------------------*/
|
/* ------------------------------- Benchmark ---------------------------------*/
|
||||||
|
|
||||||
#ifdef DICT_BENCHMARK_MAIN
|
#ifdef DICT_BENCHMARK_MAIN
|
||||||
|
@ -189,6 +189,7 @@ uint8_t *dictGetHashFunctionSeed(void);
|
|||||||
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata);
|
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, dictScanBucketFunction *bucketfn, void *privdata);
|
||||||
uint64_t dictGetHash(dict *d, const void *key);
|
uint64_t dictGetHash(dict *d, const void *key);
|
||||||
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
|
dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t hash);
|
||||||
|
void dictForceRehash(dict *d);
|
||||||
|
|
||||||
/* Hash table types */
|
/* Hash table types */
|
||||||
extern dictType dictTypeHeapStringCopyKey;
|
extern dictType dictTypeHeapStringCopyKey;
|
||||||
|
@ -1403,7 +1403,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
|
|||||||
args->rsi.master_repl_offset = g_pserver->master_repl_offset;
|
args->rsi.master_repl_offset = g_pserver->master_repl_offset;
|
||||||
|
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(getMvccTstamp());
|
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(getMvccTstamp(), false /* fOptional */);
|
||||||
|
|
||||||
g_pserver->rdbThreadVars.tmpfileNum++;
|
g_pserver->rdbThreadVars.tmpfileNum++;
|
||||||
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
|
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
|
||||||
@ -2592,7 +2592,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
|
|||||||
start = ustime();
|
start = ustime();
|
||||||
|
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(getMvccTstamp());
|
args->rgpdb[idb] = g_pserver->db[idb].createSnapshot(getMvccTstamp(), false /*fOptional*/);
|
||||||
|
|
||||||
g_pserver->rdbThreadVars.tmpfileNum++;
|
g_pserver->rdbThreadVars.tmpfileNum++;
|
||||||
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
|
g_pserver->rdbThreadVars.fRdbThreadCancel = false;
|
||||||
|
@ -1278,7 +1278,7 @@ public:
|
|||||||
expireset *setexpireUnsafe() { return m_setexpire; }
|
expireset *setexpireUnsafe() { return m_setexpire; }
|
||||||
const expireset *setexpire() { return m_setexpire; }
|
const expireset *setexpire() { return m_setexpire; }
|
||||||
|
|
||||||
const redisDbPersistentData *createSnapshot(uint64_t mvccCheckpoint);
|
const redisDbPersistentData *createSnapshot(uint64_t mvccCheckpoint, bool fOptional);
|
||||||
void endSnapshot(const redisDbPersistentData *psnapshot);
|
void endSnapshot(const redisDbPersistentData *psnapshot);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
Loading…
x
Reference in New Issue
Block a user