Track key count accurately before we've committed to the database. Also use this to optimize DB lookups when all keys are in memory
Former-commit-id: 78c07f0276109a085761b7044d4edc63af26d3da
This commit is contained in:
parent
2f46f18c35
commit
fb2d81395d
24
src/db.cpp
24
src/db.cpp
@ -388,7 +388,11 @@ bool redisDbPersistentData::syncDelete(robj *key)
|
|||||||
if (fDeleted) {
|
if (fDeleted) {
|
||||||
auto itrChange = m_setchanged.find(szFromObj(key));
|
auto itrChange = m_setchanged.find(szFromObj(key));
|
||||||
if (itrChange != m_setchanged.end())
|
if (itrChange != m_setchanged.end())
|
||||||
|
{
|
||||||
|
if (!itrChange->fUpdate)
|
||||||
|
--m_cnewKeysPending;
|
||||||
m_setchanged.erase(itrChange);
|
m_setchanged.erase(itrChange);
|
||||||
|
}
|
||||||
|
|
||||||
if (m_pdbSnapshot != nullptr)
|
if (m_pdbSnapshot != nullptr)
|
||||||
{
|
{
|
||||||
@ -2093,7 +2097,8 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
|
|||||||
// If we haven't found it yet check our storage engine
|
// If we haven't found it yet check our storage engine
|
||||||
if (*pde == nullptr && m_spstorage != nullptr)
|
if (*pde == nullptr && m_spstorage != nullptr)
|
||||||
{
|
{
|
||||||
serverAssert(m_spstorage != nullptr);
|
if (dictSize(m_pdict) != size()) // if all keys are cached then no point in looking up the database
|
||||||
|
{
|
||||||
m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){
|
m_spstorage->retrieve(sdsKey, sdslen(sdsKey), [&](const char *, size_t, const void *data, size_t cb){
|
||||||
robj *o = deserializeStoredObject(this, sdsKey, data, cb);
|
robj *o = deserializeStoredObject(this, sdsKey, data, cb);
|
||||||
serverAssert(o != nullptr);
|
serverAssert(o != nullptr);
|
||||||
@ -2101,6 +2106,7 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
|
|||||||
});
|
});
|
||||||
*pde = dictFind(m_pdict, sdsKey);
|
*pde = dictFind(m_pdict, sdsKey);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (*pde != nullptr && dictGetVal(*pde) != nullptr)
|
if (*pde != nullptr && dictGetVal(*pde) != nullptr)
|
||||||
{
|
{
|
||||||
@ -2161,6 +2167,7 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
m_setchanged.clear();
|
m_setchanged.clear();
|
||||||
|
m_cnewKeysPending = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2217,7 +2224,7 @@ dict_iter redisDbPersistentData::random()
|
|||||||
size_t redisDbPersistentData::size() const
|
size_t redisDbPersistentData::size() const
|
||||||
{
|
{
|
||||||
if (m_spstorage != nullptr)
|
if (m_spstorage != nullptr)
|
||||||
return m_spstorage->count();
|
return m_spstorage->count() + m_cnewKeysPending;
|
||||||
|
|
||||||
return dictSize(m_pdict)
|
return dictSize(m_pdict)
|
||||||
+ (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
|
+ (m_pdbSnapshot ? (m_pdbSnapshot->size() - dictSize(m_pdictTombstone)) : 0);
|
||||||
@ -2227,11 +2234,9 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
|
|||||||
{
|
{
|
||||||
serverAssert(m_spstorage != nullptr);
|
serverAssert(m_spstorage != nullptr);
|
||||||
// First ensure its not a pending key
|
// First ensure its not a pending key
|
||||||
for (auto &change : m_setchanged)
|
auto itr = m_setchanged.find(key);
|
||||||
{
|
if (itr != m_setchanged.end())
|
||||||
if (sdscmp(change.strkey.get(), (sds)key) == 0)
|
return false; // can't evict
|
||||||
return false; // NOP
|
|
||||||
}
|
|
||||||
|
|
||||||
// since we write ASAP the database already has a valid copy so safe to delete
|
// since we write ASAP the database already has a valid copy so safe to delete
|
||||||
dictDelete(m_pdict, key);
|
dictDelete(m_pdict, key);
|
||||||
@ -2262,7 +2267,10 @@ void redisDbPersistentData::trackkey(const char *key, bool fUpdate)
|
|||||||
{
|
{
|
||||||
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) {
|
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) {
|
||||||
auto itr = m_setchanged.find(key);
|
auto itr = m_setchanged.find(key);
|
||||||
if (itr == m_setchanged.end())
|
if (itr == m_setchanged.end()) {
|
||||||
m_setchanged.emplace(sdsdupshared(key), fUpdate);
|
m_setchanged.emplace(sdsdupshared(key), fUpdate);
|
||||||
|
if (!fUpdate)
|
||||||
|
++m_cnewKeysPending;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -1368,6 +1368,7 @@ private:
|
|||||||
int m_fTrackingChanges = 0; // Note: Stack based
|
int m_fTrackingChanges = 0; // Note: Stack based
|
||||||
int m_fAllChanged = 0;
|
int m_fAllChanged = 0;
|
||||||
std::set<changedesc, changedescCmp> m_setchanged;
|
std::set<changedesc, changedescCmp> m_setchanged;
|
||||||
|
size_t m_cnewKeysPending = 0;
|
||||||
std::shared_ptr<IStorage> m_spstorage = nullptr;
|
std::shared_ptr<IStorage> m_spstorage = nullptr;
|
||||||
uint64_t mvccCheckpoint = 0;
|
uint64_t mvccCheckpoint = 0;
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user