Perf remove extra lookup in rocksdb
Former-commit-id: 8074472c7a25572a53f1166911920c2cb168c141
This commit is contained in:
parent
3d265fc3df
commit
2f46f18c35
@ -17,7 +17,7 @@ public:
|
|||||||
|
|
||||||
virtual ~IStorage();
|
virtual ~IStorage();
|
||||||
|
|
||||||
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) = 0;
|
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwire) = 0;
|
||||||
virtual bool erase(const char *key, size_t cchKey) = 0;
|
virtual bool erase(const char *key, size_t cchKey) = 0;
|
||||||
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const = 0;
|
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const = 0;
|
||||||
virtual size_t clear() = 0;
|
virtual size_t clear() = 0;
|
||||||
|
10
src/aof.cpp
10
src/aof.cpp
@ -764,6 +764,11 @@ int loadAppendOnlyFile(char *filename) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
|
{
|
||||||
|
g_pserver->db[idb]->trackChanges(true);
|
||||||
|
}
|
||||||
|
|
||||||
/* Read the actual AOF file, in REPL format, command by command. */
|
/* Read the actual AOF file, in REPL format, command by command. */
|
||||||
while(1) {
|
while(1) {
|
||||||
int argc, j;
|
int argc, j;
|
||||||
@ -864,6 +869,11 @@ int loadAppendOnlyFile(char *filename) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
|
loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */
|
||||||
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
|
{
|
||||||
|
auto vec = g_pserver->db[idb]->processChanges();
|
||||||
|
g_pserver->db[idb]->commitChanges(vec);
|
||||||
|
}
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
freeFakeClient(fakeClient);
|
freeFakeClient(fakeClient);
|
||||||
g_pserver->aof_state = old_aof_state;
|
g_pserver->aof_state = old_aof_state;
|
||||||
|
43
src/db.cpp
43
src/db.cpp
@ -91,7 +91,7 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) {
|
|||||||
lookupKeyUpdateObj(val, flags);
|
lookupKeyUpdateObj(val, flags);
|
||||||
if (flags & LOOKUP_UPDATEMVCC) {
|
if (flags & LOOKUP_UPDATEMVCC) {
|
||||||
val->mvcc_tstamp = getMvccTstamp();
|
val->mvcc_tstamp = getMvccTstamp();
|
||||||
db->trackkey(key);
|
db->trackkey(key, true /* fUpdate */);
|
||||||
}
|
}
|
||||||
return val;
|
return val;
|
||||||
} else {
|
} else {
|
||||||
@ -386,6 +386,10 @@ bool redisDbPersistentData::syncDelete(robj *key)
|
|||||||
fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted;
|
fDeleted = (dictDelete(m_pdict,ptrFromObj(key)) == DICT_OK) || fDeleted;
|
||||||
|
|
||||||
if (fDeleted) {
|
if (fDeleted) {
|
||||||
|
auto itrChange = m_setchanged.find(szFromObj(key));
|
||||||
|
if (itrChange != m_setchanged.end())
|
||||||
|
m_setchanged.erase(itrChange);
|
||||||
|
|
||||||
if (m_pdbSnapshot != nullptr)
|
if (m_pdbSnapshot != nullptr)
|
||||||
{
|
{
|
||||||
auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key));
|
auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key));
|
||||||
@ -1340,7 +1344,7 @@ int redisDbPersistentData::removeExpire(robj *key, dict_iter itr) {
|
|||||||
if (!val->FExpires())
|
if (!val->FExpires())
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
trackkey(key);
|
trackkey(key, true /* fUpdate */);
|
||||||
auto itrExpire = m_setexpire->find(itr.key());
|
auto itrExpire = m_setexpire->find(itr.key());
|
||||||
serverAssert(itrExpire != m_setexpire->end());
|
serverAssert(itrExpire != m_setexpire->end());
|
||||||
m_setexpire->erase(itrExpire);
|
m_setexpire->erase(itrExpire);
|
||||||
@ -1957,7 +1961,7 @@ bool redisDbPersistentData::insert(char *key, robj *o)
|
|||||||
{
|
{
|
||||||
serverAssert(dictFind(m_pdictTombstone, key) != nullptr);
|
serverAssert(dictFind(m_pdictTombstone, key) != nullptr);
|
||||||
}
|
}
|
||||||
trackkey(key);
|
trackkey(key, false /* fUpdate */);
|
||||||
}
|
}
|
||||||
return (res == DICT_OK);
|
return (res == DICT_OK);
|
||||||
}
|
}
|
||||||
@ -1997,7 +2001,7 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
|
|||||||
/* Reuse the sds from the main dict in the expire dict */
|
/* Reuse the sds from the main dict in the expire dict */
|
||||||
dictEntry *kde = dictFind(m_pdict,ptrFromObj(key));
|
dictEntry *kde = dictFind(m_pdict,ptrFromObj(key));
|
||||||
serverAssertWithInfo(NULL,key,kde != NULL);
|
serverAssertWithInfo(NULL,key,kde != NULL);
|
||||||
trackkey(key);
|
trackkey(key, true /* fUpdate */);
|
||||||
|
|
||||||
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
|
if (((robj*)dictGetVal(kde))->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
|
||||||
{
|
{
|
||||||
@ -2024,7 +2028,7 @@ void redisDbPersistentData::setExpire(robj *key, robj *subkey, long long when)
|
|||||||
|
|
||||||
void redisDbPersistentData::setExpire(expireEntry &&e)
|
void redisDbPersistentData::setExpire(expireEntry &&e)
|
||||||
{
|
{
|
||||||
trackkey(e.key());
|
trackkey(e.key(), true /* fUpdate */);
|
||||||
m_setexpire->insert(e);
|
m_setexpire->insert(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2035,7 +2039,7 @@ bool redisDb::FKeyExpires(const char *key)
|
|||||||
|
|
||||||
void redisDbPersistentData::updateValue(dict_iter itr, robj *val)
|
void redisDbPersistentData::updateValue(dict_iter itr, robj *val)
|
||||||
{
|
{
|
||||||
trackkey(itr.key());
|
trackkey(itr.key(), true /* fUpdate */);
|
||||||
dictSetVal(m_pdict, itr.de, val);
|
dictSetVal(m_pdict, itr.de, val);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2105,10 +2109,10 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o)
|
void redisDbPersistentData::storeKey(const char *szKey, size_t cchKey, robj *o, bool fOverwrite)
|
||||||
{
|
{
|
||||||
sds temp = serializeStoredObject(o);
|
sds temp = serializeStoredObject(o);
|
||||||
m_spstorage->insert(szKey, cchKey, temp, sdslen(temp));
|
m_spstorage->insert(szKey, cchKey, temp, sdslen(temp), fOverwrite);
|
||||||
sdsfree(temp);
|
sdsfree(temp);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2119,7 +2123,7 @@ void redisDbPersistentData::storeDatabase()
|
|||||||
while ((de = dictNext(di)) != NULL) {
|
while ((de = dictNext(di)) != NULL) {
|
||||||
sds key = (sds)dictGetKey(de);
|
sds key = (sds)dictGetKey(de);
|
||||||
robj *o = (robj*)dictGetVal(de);
|
robj *o = (robj*)dictGetVal(de);
|
||||||
storeKey(key, sdslen(key), o);
|
storeKey(key, sdslen(key), o, false);
|
||||||
}
|
}
|
||||||
dictReleaseIterator(di);
|
dictReleaseIterator(di);
|
||||||
}
|
}
|
||||||
@ -2146,14 +2150,14 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges()
|
|||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
for (auto &key : m_setchanged)
|
for (auto &change : m_setchanged)
|
||||||
{
|
{
|
||||||
dictEntry *de = dictFind(m_pdict, key.get());
|
dictEntry *de = dictFind(m_pdict, change.strkey.get());
|
||||||
if (de == nullptr)
|
if (de == nullptr)
|
||||||
continue;
|
continue;
|
||||||
robj *o = (robj*)dictGetVal(de);
|
robj *o = (robj*)dictGetVal(de);
|
||||||
sds temp = serializeStoredObject(o);
|
sds temp = serializeStoredObject(o);
|
||||||
vecRet.emplace_back(std::move(key), unique_sds_ptr(temp));
|
vecRet.emplace_back(std::move(change), unique_sds_ptr(temp));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m_setchanged.clear();
|
m_setchanged.clear();
|
||||||
@ -2167,7 +2171,7 @@ void redisDbPersistentData::commitChanges(const changelist &vec)
|
|||||||
{
|
{
|
||||||
for (auto &pair : vec)
|
for (auto &pair : vec)
|
||||||
{
|
{
|
||||||
m_spstorage->insert(pair.first.get(), sdslen(pair.first.get()), pair.second.get(), sdslen(pair.second.get()));
|
m_spstorage->insert(pair.first.strkey.get(), sdslen(pair.first.strkey.get()), pair.second.get(), sdslen(pair.second.get()), pair.first.fUpdate);
|
||||||
}
|
}
|
||||||
if (m_spstorage != nullptr)
|
if (m_spstorage != nullptr)
|
||||||
m_spstorage->endWriteBatch();
|
m_spstorage->endWriteBatch();
|
||||||
@ -2223,9 +2227,9 @@ bool redisDbPersistentData::removeCachedValue(const char *key)
|
|||||||
{
|
{
|
||||||
serverAssert(m_spstorage != nullptr);
|
serverAssert(m_spstorage != nullptr);
|
||||||
// First ensure its not a pending key
|
// First ensure its not a pending key
|
||||||
for (auto &strkey : m_setchanged)
|
for (auto &change : m_setchanged)
|
||||||
{
|
{
|
||||||
if (sdscmp(strkey.get(), (sds)key) == 0)
|
if (sdscmp(change.strkey.get(), (sds)key) == 0)
|
||||||
return false; // NOP
|
return false; // NOP
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2253,3 +2257,12 @@ void redisDbPersistentData::removeAllCachedValues()
|
|||||||
|
|
||||||
dictEmpty(m_pdict, nullptr);
|
dictEmpty(m_pdict, nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void redisDbPersistentData::trackkey(const char *key, bool fUpdate)
|
||||||
|
{
|
||||||
|
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage) {
|
||||||
|
auto itr = m_setchanged.find(key);
|
||||||
|
if (itr == m_setchanged.end())
|
||||||
|
m_setchanged.emplace(sdsdupshared(key), fUpdate);
|
||||||
|
}
|
||||||
|
}
|
31
src/server.h
31
src/server.h
@ -1263,16 +1263,12 @@ public:
|
|||||||
size_t size() const;
|
size_t size() const;
|
||||||
void expand(uint64_t slots) { dictExpand(m_pdict, slots); }
|
void expand(uint64_t slots) { dictExpand(m_pdict, slots); }
|
||||||
|
|
||||||
void trackkey(robj_roptr o)
|
void trackkey(robj_roptr o, bool fUpdate)
|
||||||
{
|
{
|
||||||
trackkey(szFromObj(o));
|
trackkey(szFromObj(o), fUpdate);
|
||||||
}
|
}
|
||||||
|
|
||||||
void trackkey(const char *key)
|
void trackkey(const char *key, bool fUpdate);
|
||||||
{
|
|
||||||
if (m_fTrackingChanges && !m_fAllChanged && m_spstorage)
|
|
||||||
m_setchanged.emplace(sdsdupshared(key));
|
|
||||||
}
|
|
||||||
|
|
||||||
dict_iter find(const char *key)
|
dict_iter find(const char *key)
|
||||||
{
|
{
|
||||||
@ -1324,7 +1320,14 @@ public:
|
|||||||
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
|
// to allow you to release the global lock before commiting. To prevent deadlocks you *must*
|
||||||
// either release the global lock or keep the same global lock between the two functions as
|
// either release the global lock or keep the same global lock between the two functions as
|
||||||
// a second look is kept to ensure writes to secondary storage are ordered
|
// a second look is kept to ensure writes to secondary storage are ordered
|
||||||
typedef std::vector<std::pair<sdsimmutablestring, unique_sds_ptr>> changelist;
|
struct changedesc
|
||||||
|
{
|
||||||
|
sdsimmutablestring strkey;
|
||||||
|
bool fUpdate;
|
||||||
|
|
||||||
|
changedesc(const char *strkey, bool fUpdate) : strkey(strkey), fUpdate(fUpdate) {}
|
||||||
|
};
|
||||||
|
typedef std::vector<std::pair<changedesc, unique_sds_ptr>> changelist;
|
||||||
changelist processChanges();
|
changelist processChanges();
|
||||||
void commitChanges(const changelist &vec);
|
void commitChanges(const changelist &vec);
|
||||||
|
|
||||||
@ -1345,10 +1348,18 @@ public:
|
|||||||
void removeAllCachedValues();
|
void removeAllCachedValues();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
struct changedescCmp
|
||||||
|
{
|
||||||
|
using is_transparent = void; // C++14 to allow comparisons with different types
|
||||||
|
bool operator()(const changedesc &a, const changedesc &b) const { return a.strkey < b.strkey; }
|
||||||
|
bool operator()(const changedesc &a, const char *key) const { return a.strkey < sdsview(key); }
|
||||||
|
bool operator()(const char *key, const changedesc &b) const { return sdsview(key) < b.strkey; }
|
||||||
|
};
|
||||||
|
|
||||||
void ensure(const char *key);
|
void ensure(const char *key);
|
||||||
void ensure(const char *key, dictEntry **de);
|
void ensure(const char *key, dictEntry **de);
|
||||||
void storeDatabase();
|
void storeDatabase();
|
||||||
void storeKey(const char *key, size_t cchKey, robj *o);
|
void storeKey(const char *key, size_t cchKey, robj *o, bool fOverwrite);
|
||||||
void recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot);
|
void recursiveFreeSnapshots(redisDbPersistentDataSnapshot *psnapshot);
|
||||||
|
|
||||||
// Keyspace
|
// Keyspace
|
||||||
@ -1356,7 +1367,7 @@ private:
|
|||||||
dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
|
dict *m_pdictTombstone = nullptr; /* Track deletes when we have a snapshot */
|
||||||
int m_fTrackingChanges = 0; // Note: Stack based
|
int m_fTrackingChanges = 0; // Note: Stack based
|
||||||
int m_fAllChanged = 0;
|
int m_fAllChanged = 0;
|
||||||
std::set<sdsimmutablestring> m_setchanged;
|
std::set<changedesc, changedescCmp> m_setchanged;
|
||||||
std::shared_ptr<IStorage> m_spstorage = nullptr;
|
std::shared_ptr<IStorage> m_spstorage = nullptr;
|
||||||
uint64_t mvccCheckpoint = 0;
|
uint64_t mvccCheckpoint = 0;
|
||||||
|
|
||||||
|
@ -9,10 +9,9 @@ RocksDBStorageProvider::RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spd
|
|||||||
m_readOptionsTemplate.snapshot = m_psnapshot;
|
m_readOptionsTemplate.snapshot = m_psnapshot;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb)
|
void RocksDBStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwrite)
|
||||||
{
|
{
|
||||||
rocksdb::Status status;
|
rocksdb::Status status;
|
||||||
bool fOverwrite = FKeyExists(key, cchKey);
|
|
||||||
if (m_spbatch != nullptr)
|
if (m_spbatch != nullptr)
|
||||||
status = m_spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
|
status = m_spbatch->Put(m_spcolfamily.get(), rocksdb::Slice(key, cchKey), rocksdb::Slice((const char*)data, cb));
|
||||||
else
|
else
|
||||||
|
@ -17,7 +17,7 @@ public:
|
|||||||
RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count);
|
RocksDBStorageProvider(std::shared_ptr<rocksdb::DB> &spdb, std::shared_ptr<rocksdb::ColumnFamilyHandle> &spcolfam, const rocksdb::Snapshot *psnapshot, size_t count);
|
||||||
~RocksDBStorageProvider();
|
~RocksDBStorageProvider();
|
||||||
|
|
||||||
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override;
|
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwrite) override;
|
||||||
virtual bool erase(const char *key, size_t cchKey) override;
|
virtual bool erase(const char *key, size_t cchKey) override;
|
||||||
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override;
|
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override;
|
||||||
virtual size_t clear() override;
|
virtual size_t clear() override;
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
#include "teststorageprovider.h"
|
#include "teststorageprovider.h"
|
||||||
#include <assert.h>
|
#include "../server.h"
|
||||||
|
|
||||||
IStorage *TestStorageFactory::create(int)
|
IStorage *TestStorageFactory::create(int)
|
||||||
{
|
{
|
||||||
return new TestStorageProvider();
|
return new (MALLOC_LOCAL) TestStorageProvider();
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *TestStorageFactory::name() const
|
const char *TestStorageFactory::name() const
|
||||||
@ -19,9 +19,12 @@ TestStorageProvider::~TestStorageProvider()
|
|||||||
{
|
{
|
||||||
}
|
}
|
||||||
|
|
||||||
void TestStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb)
|
void TestStorageProvider::insert(const char *key, size_t cchKey, void *data, size_t cb, bool fOverwrite)
|
||||||
{
|
{
|
||||||
m_map.insert(std::make_pair(std::string(key, cchKey), std::string((char*)data, cb)));
|
auto strkey = std::string(key, cchKey);
|
||||||
|
bool fActuallyExists = m_map.find(strkey) != m_map.end();
|
||||||
|
serverAssert(fActuallyExists == fOverwrite);
|
||||||
|
m_map.insert(std::make_pair(strkey, std::string((char*)data, cb)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -77,5 +80,5 @@ void TestStorageProvider::flush()
|
|||||||
/* This is permitted to be a shallow clone */
|
/* This is permitted to be a shallow clone */
|
||||||
const IStorage *TestStorageProvider::clone() const
|
const IStorage *TestStorageProvider::clone() const
|
||||||
{
|
{
|
||||||
return new TestStorageProvider(*this);
|
return new (MALLOC_LOCAL) TestStorageProvider(*this);
|
||||||
}
|
}
|
@ -16,7 +16,7 @@ public:
|
|||||||
TestStorageProvider();
|
TestStorageProvider();
|
||||||
virtual ~TestStorageProvider();
|
virtual ~TestStorageProvider();
|
||||||
|
|
||||||
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb) override;
|
virtual void insert(const char *key, size_t cchKey, void *data, size_t cb, bool fHintOverwrite) override;
|
||||||
virtual bool erase(const char *key, size_t cchKey) override;
|
virtual bool erase(const char *key, size_t cchKey) override;
|
||||||
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override;
|
virtual void retrieve(const char *key, size_t cchKey, callbackSingle fn) const override;
|
||||||
virtual size_t clear() override;
|
virtual size_t clear() override;
|
||||||
|
@ -48,6 +48,25 @@ start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.
|
|||||||
assert [expr [r ttl testkey] > 0]
|
assert [expr [r ttl testkey] > 0]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test { CREATE and UPDATE in transaction, key count is accurate } {
|
||||||
|
r flushall
|
||||||
|
r multi
|
||||||
|
r set testkey 2
|
||||||
|
r incr testkey
|
||||||
|
r exec
|
||||||
|
assert_equal {1} [r dbsize]
|
||||||
|
assert_equal {3} [r get testkey]
|
||||||
|
}
|
||||||
|
|
||||||
|
test { EXPIRE key count is accurate } {
|
||||||
|
r flushall
|
||||||
|
r set testkey foo ex 1
|
||||||
|
r flushall cache
|
||||||
|
assert_equal {1} [r dbsize]
|
||||||
|
after 1500
|
||||||
|
assert_equal {0} [r dbsize]
|
||||||
|
}
|
||||||
|
|
||||||
r flushall
|
r flushall
|
||||||
foreach policy {
|
foreach policy {
|
||||||
allkeys-random allkeys-lru allkeys-lfu
|
allkeys-random allkeys-lru allkeys-lfu
|
||||||
|
Loading…
x
Reference in New Issue
Block a user