Fix thread safety issues with the cache prefetch logic
Former-commit-id: a80a128bb64b81115c095d6dd91896ff73048b3d
This commit is contained in:
parent
0d69b7684e
commit
74d73b4ca3
22
src/db.cpp
22
src/db.cpp
@ -3013,15 +3013,19 @@ void redisDbPersistentData::prefetchKeysAsync(client *c, parsed_command &command
|
||||
// Should the user do something weird like remap them then the worst that will
|
||||
// happen is we don't prefetch or we prefetch wrong data. A mild perf hit, but
|
||||
// not dangerous
|
||||
const char *cmd = szFromObj(command.argv[0]);
|
||||
if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) {
|
||||
auto h = dictSdsHash(szFromObj(command.argv[1]));
|
||||
for (int iht = 0; iht < 2; ++iht) {
|
||||
auto hT = h & c->db->m_pdict->ht[iht].sizemask;
|
||||
if (c->db->m_pdict->ht[iht].table != nullptr)
|
||||
_mm_prefetch(c->db->m_pdict->ht[iht].table[hT], _MM_HINT_T1);
|
||||
if (!dictIsRehashing(c->db->m_pdict))
|
||||
break;
|
||||
if (command.argc >= 2) {
|
||||
const char *cmd = szFromObj(command.argv[0]);
|
||||
if (!strcasecmp(cmd, "set") || !strcasecmp(cmd, "get")) {
|
||||
auto h = dictSdsHash(szFromObj(command.argv[1]));
|
||||
for (int iht = 0; iht < 2; ++iht) {
|
||||
auto hT = h & c->db->m_pdict->ht[iht].sizemask;
|
||||
dictEntry **table;
|
||||
__atomic_load(&c->db->m_pdict->ht[iht].table, &table, __ATOMIC_RELAXED);
|
||||
if (table != nullptr)
|
||||
_mm_prefetch(table[hT], _MM_HINT_T2);
|
||||
if (!dictIsRehashing(c->db->m_pdict))
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
@ -74,6 +74,8 @@ static int _dictInit(dict *ht, dictType *type, void *privDataPtr);
|
||||
|
||||
static uint8_t dict_hash_function_seed[16];
|
||||
|
||||
extern "C" void asyncFreeDictTable(dictEntry **de);
|
||||
|
||||
void dictSetHashFunctionSeed(uint8_t *seed) {
|
||||
memcpy(dict_hash_function_seed,seed,sizeof(dict_hash_function_seed));
|
||||
}
|
||||
@ -359,7 +361,7 @@ int dictRehash(dict *d, int n) {
|
||||
|
||||
/* Check if we already rehashed the whole table... */
|
||||
if (d->ht[0].used == 0) {
|
||||
zfree(d->ht[0].table);
|
||||
asyncFreeDictTable(d->ht[0].table);
|
||||
d->ht[0] = d->ht[1];
|
||||
_dictReset(&d->ht[1]);
|
||||
d->rehashidx = -1;
|
||||
@ -487,7 +489,7 @@ void dictCompleteRehashAsync(dictAsyncRehashCtl *ctl, bool fFree) {
|
||||
|
||||
/* Check if we already rehashed the whole table... */
|
||||
if (d->ht[0].used == 0 && d->asyncdata == nullptr) {
|
||||
zfree(d->ht[0].table);
|
||||
asyncFreeDictTable(d->ht[0].table);
|
||||
d->ht[0] = d->ht[1];
|
||||
_dictReset(&d->ht[1]);
|
||||
d->rehashidx = -1;
|
||||
@ -762,7 +764,7 @@ int _dictClear(dict *d, dictht *ht, void(callback)(void *)) {
|
||||
}
|
||||
}
|
||||
/* Free the table and the allocated cache structure */
|
||||
zfree(ht->table);
|
||||
asyncFreeDictTable(ht->table);
|
||||
/* Re-initialize the table */
|
||||
_dictReset(ht);
|
||||
return DICT_OK; /* never fails */
|
||||
|
@ -813,6 +813,10 @@ static int ipow(int base, int exp) {
|
||||
return result;
|
||||
}
|
||||
|
||||
extern "C" void asyncFreeDictTable(dictEntry **de) {
|
||||
zfree(de);
|
||||
}
|
||||
|
||||
static void showLatencyReport(void) {
|
||||
int i, curlat = 0;
|
||||
int usbetweenlat = ipow(10, MAX_LATENCY_PRECISION-config.precision);
|
||||
|
@ -144,6 +144,11 @@ static void cliRefreshPrompt(void) {
|
||||
sdsfree(prompt);
|
||||
}
|
||||
|
||||
struct dictEntry;
|
||||
void asyncFreeDictTable(struct dictEntry **de) {
|
||||
zfree(de);
|
||||
}
|
||||
|
||||
/* Return the name of the dotfile for the specified 'dotfilename'.
|
||||
* Normally it just concatenates user $HOME to the file specified
|
||||
* in 'dotfilename'. However if the environment variable 'envoverride'
|
||||
|
@ -2484,6 +2484,15 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
|
||||
return 1000/g_pserver->hz;
|
||||
}
|
||||
|
||||
extern "C" void asyncFreeDictTable(dictEntry **de)
|
||||
{
|
||||
if (de == nullptr || serverTL == nullptr || serverTL->gcEpoch.isReset()) {
|
||||
zfree(de);
|
||||
} else {
|
||||
g_pserver->garbageCollector.enqueueCPtr(serverTL->gcEpoch, de);
|
||||
}
|
||||
}
|
||||
|
||||
extern int ProcessingEventsWhileBlocked;
|
||||
void processClients();
|
||||
|
||||
|
20
src/server.h
20
src/server.h
@ -1786,6 +1786,19 @@ class GarbageCollectorCollection
|
||||
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollectorSnapshot;
|
||||
GarbageCollector<ICollectable> garbageCollectorGeneric;
|
||||
|
||||
class CPtrCollectable : public ICollectable
|
||||
{
|
||||
void *m_pv;
|
||||
|
||||
public:
|
||||
CPtrCollectable(void *pv)
|
||||
: m_pv(pv)
|
||||
{}
|
||||
~CPtrCollectable() {
|
||||
zfree(m_pv);
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
struct Epoch
|
||||
{
|
||||
@ -1831,6 +1844,13 @@ public:
|
||||
{
|
||||
garbageCollectorGeneric.enqueue(e.epochGeneric, std::move(sp));
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
void enqueueCPtr(Epoch e, T p)
|
||||
{
|
||||
auto sp = std::make_unique<CPtrCollectable>(reinterpret_cast<void*>(p));
|
||||
enqueue(e, std::move(sp));
|
||||
}
|
||||
};
|
||||
|
||||
// Per-thread variabels that may be accessed without a lock
|
||||
|
Loading…
x
Reference in New Issue
Block a user