Make async rehash behave with snapshots (thread safety issues)
Former-commit-id: 372adf39a80252b8035e3c948fcaf7d5ef6f928f
This commit is contained in:
parent
25f56c59b0
commit
4f06fb2b4f
54
src/dict.cpp
54
src/dict.cpp
@ -129,6 +129,7 @@ int _dictInit(dict *d, dictType *type,
|
|||||||
d->rehashidx = -1;
|
d->rehashidx = -1;
|
||||||
d->iterators = 0;
|
d->iterators = 0;
|
||||||
d->asyncdata = nullptr;
|
d->asyncdata = nullptr;
|
||||||
|
d->refcount = 1;
|
||||||
return DICT_OK;
|
return DICT_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -369,8 +370,14 @@ int dictRehash(dict *d, int n) {
|
|||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dictAsyncRehashCtl::dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) {
|
||||||
|
queue.reserve(c_targetQueueSize);
|
||||||
|
__atomic_fetch_add(&d->refcount, 1, __ATOMIC_RELEASE);
|
||||||
|
}
|
||||||
|
|
||||||
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
|
dictAsyncRehashCtl *dictRehashAsyncStart(dict *d, int buckets) {
|
||||||
if (!dictIsRehashing(d)) return 0;
|
assert(d->type->asyncfree != nullptr);
|
||||||
|
if (!dictIsRehashing(d) || d->iterators != 0) return nullptr;
|
||||||
|
|
||||||
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
|
d->asyncdata = new dictAsyncRehashCtl(d, d->asyncdata);
|
||||||
|
|
||||||
@ -454,7 +461,7 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fUnlinked && !ctl->release) {
|
if (fUnlinked && !ctl->abondon) {
|
||||||
if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
|
if (d->ht[0].table != nullptr) { // can be null if we're cleared during the rehash
|
||||||
for (auto &wi : ctl->queue) {
|
for (auto &wi : ctl->queue) {
|
||||||
// We need to remove it from the source hash table, and store it in the dest.
|
// We need to remove it from the source hash table, and store it in the dest.
|
||||||
@ -487,23 +494,10 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (fFree) {
|
if (fFree) {
|
||||||
while (ctl->deGCList != nullptr) {
|
d->type->asyncfree(ctl);
|
||||||
auto next = ctl->deGCList->next;
|
|
||||||
dictFreeKey(d, ctl->deGCList);
|
|
||||||
dictFreeVal(d, ctl->deGCList);
|
|
||||||
zfree(ctl->deGCList);
|
|
||||||
ctl->deGCList = next;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Was the dictionary free'd while we were in flight?
|
// Remove our reference
|
||||||
if (ctl->release) {
|
dictRelease(d);
|
||||||
if (d->asyncdata != nullptr)
|
|
||||||
d->asyncdata->release = true;
|
|
||||||
else
|
|
||||||
dictRelease(d);
|
|
||||||
}
|
|
||||||
|
|
||||||
delete ctl;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -514,6 +508,16 @@ long long timeInMilliseconds(void) {
|
|||||||
return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000);
|
return (((long long)tv.tv_sec)*1000)+(tv.tv_usec/1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
dictAsyncRehashCtl::~dictAsyncRehashCtl() {
|
||||||
|
while (deGCList != nullptr) {
|
||||||
|
auto next = deGCList->next;
|
||||||
|
dictFreeKey(dict, deGCList);
|
||||||
|
dictFreeVal(dict, deGCList);
|
||||||
|
zfree(deGCList);
|
||||||
|
deGCList = next;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger
|
/* Rehash in ms+"delta" milliseconds. The value of "delta" is larger
|
||||||
* than 0, and is smaller than 1 in most cases. The exact upper bound
|
* than 0, and is smaller than 1 in most cases. The exact upper bound
|
||||||
* depends on the running time of dictRehash(d,100).*/
|
* depends on the running time of dictRehash(d,100).*/
|
||||||
@ -537,7 +541,7 @@ int dictRehashMilliseconds(dict *d, int ms) {
|
|||||||
* dictionary so that the hash table automatically migrates from H1 to H2
|
* dictionary so that the hash table automatically migrates from H1 to H2
|
||||||
* while it is actively used. */
|
* while it is actively used. */
|
||||||
static void _dictRehashStep(dict *d) {
|
static void _dictRehashStep(dict *d) {
|
||||||
unsigned long iterators;
|
unsigned iterators;
|
||||||
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
|
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
|
||||||
if (iterators == 0) dictRehash(d,2);
|
if (iterators == 0) dictRehash(d,2);
|
||||||
}
|
}
|
||||||
@ -766,13 +770,11 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
|||||||
/* Clear & Release the hash table */
|
/* Clear & Release the hash table */
|
||||||
void dictRelease(dict *d)
|
void dictRelease(dict *d)
|
||||||
{
|
{
|
||||||
if (d->asyncdata) {
|
if (__atomic_sub_fetch(&d->refcount, 1, __ATOMIC_ACQ_REL) == 0) {
|
||||||
d->asyncdata->release = true;
|
_dictClear(d,&d->ht[0],NULL);
|
||||||
return;
|
_dictClear(d,&d->ht[1],NULL);
|
||||||
|
zfree(d);
|
||||||
}
|
}
|
||||||
_dictClear(d,&d->ht[0],NULL);
|
|
||||||
_dictClear(d,&d->ht[1],NULL);
|
|
||||||
zfree(d);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht, bool fShallowCompare)
|
dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht, bool fShallowCompare)
|
||||||
@ -1460,7 +1462,7 @@ void dictGetStats(char *buf, size_t bufsize, dict *d) {
|
|||||||
|
|
||||||
void dictForceRehash(dict *d)
|
void dictForceRehash(dict *d)
|
||||||
{
|
{
|
||||||
unsigned long iterators;
|
unsigned iterators;
|
||||||
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
|
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
|
||||||
while (iterators == 0 && dictIsRehashing(d)) _dictRehashStep(d);
|
while (iterators == 0 && dictIsRehashing(d)) _dictRehashStep(d);
|
||||||
}
|
}
|
||||||
|
12
src/dict.h
12
src/dict.h
@ -53,6 +53,7 @@ extern "C" {
|
|||||||
|
|
||||||
/* Unused arguments generate annoying warnings... */
|
/* Unused arguments generate annoying warnings... */
|
||||||
#define DICT_NOTUSED(V) ((void) V)
|
#define DICT_NOTUSED(V) ((void) V)
|
||||||
|
struct dictAsyncRehashCtl;
|
||||||
|
|
||||||
typedef struct dictEntry {
|
typedef struct dictEntry {
|
||||||
void *key;
|
void *key;
|
||||||
@ -72,6 +73,7 @@ typedef struct dictType {
|
|||||||
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
|
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
|
||||||
void (*keyDestructor)(void *privdata, void *key);
|
void (*keyDestructor)(void *privdata, void *key);
|
||||||
void (*valDestructor)(void *privdata, void *obj);
|
void (*valDestructor)(void *privdata, void *obj);
|
||||||
|
void (*asyncfree)(dictAsyncRehashCtl *);
|
||||||
} dictType;
|
} dictType;
|
||||||
|
|
||||||
/* This is our hash table structure. Every dictionary has two of this as we
|
/* This is our hash table structure. Every dictionary has two of this as we
|
||||||
@ -98,13 +100,12 @@ struct dictAsyncRehashCtl {
|
|||||||
struct dict *dict = nullptr;
|
struct dict *dict = nullptr;
|
||||||
std::vector<workItem> queue;
|
std::vector<workItem> queue;
|
||||||
size_t hashIdx = 0;
|
size_t hashIdx = 0;
|
||||||
bool release = false;
|
|
||||||
dictAsyncRehashCtl *next = nullptr;
|
dictAsyncRehashCtl *next = nullptr;
|
||||||
std::atomic<bool> done { false };
|
std::atomic<bool> done { false };
|
||||||
|
std::atomic<bool> abondon { false };
|
||||||
|
|
||||||
dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next) : dict(d), next(next) {
|
dictAsyncRehashCtl(struct dict *d, dictAsyncRehashCtl *next);
|
||||||
queue.reserve(c_targetQueueSize);
|
~dictAsyncRehashCtl();
|
||||||
}
|
|
||||||
};
|
};
|
||||||
#else
|
#else
|
||||||
struct dictAsyncRehashCtl;
|
struct dictAsyncRehashCtl;
|
||||||
@ -115,7 +116,8 @@ typedef struct dict {
|
|||||||
void *privdata;
|
void *privdata;
|
||||||
dictht ht[2];
|
dictht ht[2];
|
||||||
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
|
long rehashidx; /* rehashing not in progress if rehashidx == -1 */
|
||||||
unsigned long iterators; /* number of iterators currently running */
|
unsigned iterators; /* number of iterators currently running */
|
||||||
|
unsigned refcount;
|
||||||
dictAsyncRehashCtl *asyncdata;
|
dictAsyncRehashCtl *asyncdata;
|
||||||
} dict;
|
} dict;
|
||||||
|
|
||||||
|
@ -1359,6 +1359,8 @@ uint64_t dictEncObjHash(const void *key) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dictGCAsyncFree(dictAsyncRehashCtl *async);
|
||||||
|
|
||||||
/* Generic hash table type where keys are Redis Objects, Values
|
/* Generic hash table type where keys are Redis Objects, Values
|
||||||
* dummy pointers. */
|
* dummy pointers. */
|
||||||
dictType objectKeyPointerValueDictType = {
|
dictType objectKeyPointerValueDictType = {
|
||||||
@ -1407,8 +1409,9 @@ dictType dbDictType = {
|
|||||||
NULL, /* key dup */
|
NULL, /* key dup */
|
||||||
NULL, /* val dup */
|
NULL, /* val dup */
|
||||||
dictSdsKeyCompare, /* key compare */
|
dictSdsKeyCompare, /* key compare */
|
||||||
dictDbKeyDestructor, /* key destructor */
|
dictDbKeyDestructor, /* key destructor */
|
||||||
dictObjectDestructor /* val destructor */
|
dictObjectDestructor, /* val destructor */
|
||||||
|
dictGCAsyncFree /* async free destructor */
|
||||||
};
|
};
|
||||||
|
|
||||||
/* db->pdict, keys are sds strings, vals are Redis objects. */
|
/* db->pdict, keys are sds strings, vals are Redis objects. */
|
||||||
|
@ -26,6 +26,17 @@ public:
|
|||||||
std::vector<dictEntry*> vecde;
|
std::vector<dictEntry*> vecde;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void discontinueAsyncRehash(dict *d) {
|
||||||
|
if (d->asyncdata != nullptr) {
|
||||||
|
auto adata = d->asyncdata;
|
||||||
|
while (adata != nullptr) {
|
||||||
|
adata->abondon = true;
|
||||||
|
adata = adata->next;
|
||||||
|
}
|
||||||
|
d->rehashidx = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional)
|
const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional)
|
||||||
{
|
{
|
||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
@ -67,14 +78,8 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
|
|
||||||
// We can't have async rehash modifying these. Setting the asyncdata list to null
|
// We can't have async rehash modifying these. Setting the asyncdata list to null
|
||||||
// will cause us to throw away the async work rather than modify the tables in flight
|
// will cause us to throw away the async work rather than modify the tables in flight
|
||||||
if (m_pdict->asyncdata != nullptr) {
|
discontinueAsyncRehash(m_pdict);
|
||||||
m_pdict->asyncdata = nullptr;
|
discontinueAsyncRehash(m_pdictTombstone);
|
||||||
m_pdict->rehashidx = 0;
|
|
||||||
}
|
|
||||||
if (m_pdictTombstone->asyncdata != nullptr) {
|
|
||||||
m_pdictTombstone->rehashidx = 0;
|
|
||||||
m_pdictTombstone->asyncdata = nullptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
spdb->m_fAllChanged = false;
|
spdb->m_fAllChanged = false;
|
||||||
spdb->m_fTrackingChanges = 0;
|
spdb->m_fTrackingChanges = 0;
|
||||||
@ -124,6 +129,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
|
|||||||
m_pdbSnapshotASYNC = nullptr;
|
m_pdbSnapshotASYNC = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::atomic_thread_fence(std::memory_order_seq_cst);
|
||||||
return m_pdbSnapshot;
|
return m_pdbSnapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -318,6 +324,12 @@ bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
|
|||||||
|
|
||||||
dictForceRehash(dictTombstoneNew);
|
dictForceRehash(dictTombstoneNew);
|
||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
|
if (m_pdbSnapshot->m_pdict->asyncdata != nullptr) {
|
||||||
|
// In this case we use the asyncdata to free us, not our own lazy free
|
||||||
|
for (auto de : splazy->vecde)
|
||||||
|
dictFreeUnlinkedEntry(m_pdbSnapshot->m_pdict, de);
|
||||||
|
splazy->vecde.clear();
|
||||||
|
}
|
||||||
dict *dT = m_pdbSnapshot->m_pdict;
|
dict *dT = m_pdbSnapshot->m_pdict;
|
||||||
splazy->vecdictLazyFree.push_back(m_pdictTombstone);
|
splazy->vecdictLazyFree.push_back(m_pdictTombstone);
|
||||||
__atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE);
|
__atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE);
|
||||||
@ -402,9 +414,14 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Delete the object from the source dict, we don't use dictDelete to avoid a second search
|
// Delete the object from the source dict, we don't use dictDelete to avoid a second search
|
||||||
if (deSnapshot != nullptr)
|
*dePrev = deSnapshot->next; // Unlink it first
|
||||||
splazy->vecde.push_back(deSnapshot);
|
if (deSnapshot != nullptr) {
|
||||||
*dePrev = deSnapshot->next;
|
if (m_spdbSnapshotHOLDER->m_pdict->asyncdata != nullptr) {
|
||||||
|
dictFreeUnlinkedEntry(m_spdbSnapshotHOLDER->m_pdict, deSnapshot);
|
||||||
|
} else {
|
||||||
|
splazy->vecde.push_back(deSnapshot);
|
||||||
|
}
|
||||||
|
}
|
||||||
ht->used--;
|
ht->used--;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -641,3 +658,17 @@ bool redisDbPersistentDataSnapshot::FStale() const
|
|||||||
static const uint64_t msStale = 500;
|
static const uint64_t msStale = 500;
|
||||||
return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale;
|
return ((getMvccTstamp() - m_mvccCheckpoint) >> MVCC_MS_SHIFT) >= msStale;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void dictGCAsyncFree(dictAsyncRehashCtl *async) {
|
||||||
|
if (async->deGCList != nullptr && serverTL != nullptr && !serverTL->gcEpoch.isReset()) {
|
||||||
|
auto splazy = std::make_unique<LazyFree>();
|
||||||
|
auto *de = async->deGCList;
|
||||||
|
while (de != nullptr) {
|
||||||
|
splazy->vecde.push_back(de);
|
||||||
|
de = de->next;
|
||||||
|
}
|
||||||
|
async->deGCList = nullptr;
|
||||||
|
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy));
|
||||||
|
}
|
||||||
|
delete async;
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user