Drastically improve perf when loading an RDB with a storage provider
Former-commit-id: 0133b42d54676e8fac2c5cb006cc87988dced268
This commit is contained in:
parent
6065f276c4
commit
76698beeaf
77
src/db.cpp
77
src/db.cpp
@ -49,10 +49,25 @@ struct dbBackup {
|
||||
int keyIsExpired(const redisDbPersistentDataSnapshot *db, robj *key);
|
||||
int expireIfNeeded(redisDb *db, robj *key, robj *o);
|
||||
void slotToKeyUpdateKeyCore(const char *key, size_t keylen, int add);
|
||||
void changedescDtor(void *privdata, void *obj);
|
||||
|
||||
std::unique_ptr<expireEntry> deserializeExpire(sds key, const char *str, size_t cch, size_t *poffset);
|
||||
sds serializeStoredObjectAndExpire(redisDbPersistentData *db, const char *key, robj_roptr o);
|
||||
|
||||
dictType dictChangeDescType {
|
||||
dictSdsHash, /* hash function */
|
||||
NULL, /* key dup */
|
||||
NULL, /* val dup */
|
||||
dictSdsKeyCompare, /* key compare */
|
||||
nullptr, /* key destructor */
|
||||
changedescDtor /* val destructor */
|
||||
};
|
||||
|
||||
void changedescDtor(void *, void *obj) {
|
||||
redisDbPersistentData::changedesc *desc = (redisDbPersistentData::changedesc*)obj;
|
||||
delete desc;
|
||||
}
|
||||
|
||||
/* Update LFU when an object is accessed.
|
||||
* Firstly, decrement the counter if the decrement time is reached.
|
||||
* Then logarithmically increment the counter, and update the access time. */
|
||||
@ -419,12 +434,13 @@ bool redisDbPersistentData::syncDelete(robj *key)
|
||||
fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted;
|
||||
|
||||
if (fDeleted) {
|
||||
auto itrChange = m_setchanged.find(szFromObj(key));
|
||||
if (itrChange != m_setchanged.end())
|
||||
dictEntry *de = dictUnlink(m_dictChanged, szFromObj(key));
|
||||
if (de != nullptr)
|
||||
{
|
||||
if (!itrChange->fUpdate)
|
||||
changedesc *desc = (changedesc*)dictGetVal(de);
|
||||
if (!desc->fUpdate)
|
||||
--m_cnewKeysPending;
|
||||
m_setchanged.erase(itrChange);
|
||||
dictFreeUnlinkedEntry(m_dictChanged, de);
|
||||
}
|
||||
|
||||
if (m_pdbSnapshot != nullptr)
|
||||
@ -2505,7 +2521,7 @@ void redisDbPersistentData::clear(void(callback)(void*))
|
||||
dictEmpty(m_pdict,callback);
|
||||
if (m_fTrackingChanges)
|
||||
{
|
||||
m_setchanged.clear();
|
||||
dictEmpty(m_dictChanged, nullptr);
|
||||
m_cnewKeysPending = 0;
|
||||
m_fAllChanged++;
|
||||
}
|
||||
@ -2708,17 +2724,20 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
|
||||
|
||||
if (m_spstorage != nullptr)
|
||||
{
|
||||
if (!m_fAllChanged && m_setchanged.empty() && m_cnewKeysPending == 0)
|
||||
if (!m_fAllChanged && dictSize(m_dictChanged) == 0 && m_cnewKeysPending == 0)
|
||||
return false;
|
||||
m_spstorage->beginWriteBatch();
|
||||
serverAssert(m_pdbSnapshotStorageFlush == nullptr);
|
||||
if (fSnapshot && !m_fAllChanged && m_setchanged.size() > 100)
|
||||
if (fSnapshot && !m_fAllChanged && dictSize(m_dictChanged) > 100)
|
||||
{
|
||||
// Do a snapshot based process if possible
|
||||
m_pdbSnapshotStorageFlush = createSnapshot(getMvccTstamp(), true /* optional */);
|
||||
if (m_pdbSnapshotStorageFlush)
|
||||
{
|
||||
m_setchangedStorageFlush = std::move(m_setchanged);
|
||||
if (m_dictChangedStorageFlush)
|
||||
dictRelease(m_dictChangedStorageFlush);
|
||||
m_dictChangedStorageFlush = m_dictChanged;
|
||||
m_dictChanged = dictCreate(&dictChangeDescType, nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
@ -2732,13 +2751,17 @@ bool redisDbPersistentData::processChanges(bool fSnapshot)
|
||||
}
|
||||
else
|
||||
{
|
||||
for (auto &change : m_setchanged)
|
||||
dictIterator *di = dictGetIterator(m_dictChanged);
|
||||
dictEntry *de;
|
||||
while ((de = dictNext(di)) != nullptr)
|
||||
{
|
||||
serializeAndStoreChange(m_spstorage.get(), this, change);
|
||||
changedesc *change = (changedesc*)dictGetVal(de);
|
||||
serializeAndStoreChange(m_spstorage.get(), this, *change);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
}
|
||||
m_setchanged.clear();
|
||||
dictEmpty(m_dictChanged, nullptr);
|
||||
m_cnewKeysPending = 0;
|
||||
}
|
||||
return (m_spstorage != nullptr);
|
||||
@ -2748,12 +2771,16 @@ void redisDbPersistentData::commitChanges(const redisDbPersistentDataSnapshot **
|
||||
{
|
||||
if (m_pdbSnapshotStorageFlush)
|
||||
{
|
||||
|
||||
for (auto &change : m_setchangedStorageFlush)
|
||||
dictIterator *di = dictGetIterator(m_dictChangedStorageFlush);
|
||||
dictEntry *de;
|
||||
while ((de = dictNext(di)) != nullptr)
|
||||
{
|
||||
serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, change);
|
||||
changedesc *change = (changedesc*)dictGetVal(de);
|
||||
serializeAndStoreChange(m_spstorage.get(), (redisDbPersistentData*)m_pdbSnapshotStorageFlush, *change);
|
||||
}
|
||||
m_setchangedStorageFlush.clear();
|
||||
dictReleaseIterator(di);
|
||||
dictRelease(m_dictChangedStorageFlush);
|
||||
m_dictChangedStorageFlush = nullptr;
|
||||
*psnapshotFree = m_pdbSnapshotStorageFlush;
|
||||
m_pdbSnapshotStorageFlush = nullptr;
|
||||
}
|
||||
@ -2773,6 +2800,12 @@ redisDbPersistentData::~redisDbPersistentData()
|
||||
dictRelease(m_pdict);
|
||||
if (m_pdictTombstone)
|
||||
dictRelease(m_pdictTombstone);
|
||||
|
||||
if (m_dictChanged)
|
||||
dictRelease(m_dictChanged);
|
||||
if (m_dictChangedStorageFlush)
|
||||
dictRelease(m_dictChangedStorageFlush);
|
||||
|
||||
delete m_setexpire;
|
||||
}
|
||||
|
||||
@ -2815,8 +2848,8 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
|
||||
if (m_spstorage != nullptr)
|
||||
m_spstorage->batch_lock();
|
||||
|
||||
auto itr = m_setchanged.find(key);
|
||||
if (itr != m_setchanged.end())
|
||||
dictEntry *de = dictFind(m_dictChanged, key);
|
||||
if (de != nullptr)
|
||||
{
|
||||
if (m_spstorage != nullptr)
|
||||
m_spstorage->batch_unlock();
|
||||
@ -2837,6 +2870,9 @@ void redisDbPersistentData::trackChanges(bool fBulk)
|
||||
m_fTrackingChanges.fetch_add(1, std::memory_order_relaxed);
|
||||
if (fBulk)
|
||||
m_fAllChanged.fetch_add(1, std::memory_order_acq_rel);
|
||||
|
||||
if (m_dictChanged == nullptr)
|
||||
m_dictChanged = dictCreate(&dictChangeDescType, nullptr);
|
||||
}
|
||||
|
||||
void redisDbPersistentData::removeAllCachedValues()
|
||||
@ -2855,9 +2891,10 @@ void redisDbPersistentData::removeAllCachedValues()
|
||||
void redisDbPersistentData::trackkey(const char *key, bool fUpdate)
|
||||
{
|
||||
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) {
|
||||
auto itr = m_setchanged.find(key);
|
||||
if (itr == m_setchanged.end()) {
|
||||
m_setchanged.emplace(sdsdupshared(key), fUpdate);
|
||||
dictEntry *de = dictFind(m_dictChanged, key);
|
||||
if (de == nullptr) {
|
||||
changedesc *desc = new changedesc(sdsdupshared(key), fUpdate);
|
||||
dictAdd(m_dictChanged, (void*)desc->strkey.get(), desc);
|
||||
if (!fUpdate)
|
||||
++m_cnewKeysPending;
|
||||
}
|
||||
|
12
src/server.h
12
src/server.h
@ -1042,6 +1042,7 @@ class redisDbPersistentDataSnapshot;
|
||||
class redisDbPersistentData
|
||||
{
|
||||
friend void dictDbKeyDestructor(void *privdata, void *key);
|
||||
friend void changedescDtor(void*, void*);
|
||||
friend class redisDbPersistentDataSnapshot;
|
||||
|
||||
public:
|
||||
@ -1153,13 +1154,6 @@ private:
|
||||
|
||||
changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {}
|
||||
};
|
||||
struct changedescCmp
|
||||
{
|
||||
using is_transparent = void; // C++14 to allow comparisons with different types
|
||||
bool operator()(const changedesc &a, const changedesc &b) const { return a.strkey < b.strkey; }
|
||||
bool operator()(const changedesc &a, const char *key) const { return a.strkey < sdsview(key); }
|
||||
bool operator()(const char *key, const changedesc &b) const { return sdsview(key) < b.strkey; }
|
||||
};
|
||||
|
||||
static void serializeAndStoreChange(StorageCache *storage, redisDbPersistentData *db, const changedesc &change);
|
||||
|
||||
@ -1174,7 +1168,7 @@ private:
|
||||
dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
|
||||
std::atomic<int> m_fTrackingChanges {0}; // Note: Stack based
|
||||
std::atomic<int> m_fAllChanged {0};
|
||||
std::set<changedesc, changedescCmp> m_setchanged;
|
||||
dict *m_dictChanged = nullptr;
|
||||
size_t m_cnewKeysPending = 0;
|
||||
std::shared_ptr<StorageCache> m_spstorage = nullptr;
|
||||
|
||||
@ -1189,7 +1183,7 @@ private:
|
||||
const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr;
|
||||
|
||||
const redisDbPersistentDataSnapshot *m_pdbSnapshotStorageFlush = nullptr;
|
||||
std::set<changedesc, changedescCmp> m_setchangedStorageFlush;
|
||||
dict *m_dictChangedStorageFlush = nullptr;
|
||||
|
||||
int m_refCount = 0;
|
||||
};
|
||||
|
Loading…
x
Reference in New Issue
Block a user