Perf fixes for expire snapshots

Former-commit-id: da805e4442815c89e85ca7b9fc855dd11ef2a6c8
This commit is contained in:
John Sully 2019-12-17 17:39:04 -05:00
parent d95ae909a9
commit 4466b2dba4
7 changed files with 129 additions and 49 deletions

View File

@ -103,7 +103,7 @@ endif
FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS)
FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(CXXFLAGS) $(REDIS_CFLAGS)
FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG)
FINAL_LIBS=-lm -lz
FINAL_LIBS=-lm -lz -latomic
DEBUG=-g -ggdb
ifeq ($(uname_S),SunOS)
@ -187,7 +187,7 @@ ifeq ($(MALLOC),memkind)
endif
REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS)
REDIS_CXX=$(QUIET_CC)$(CC) $(FINAL_CXXFLAGS)
REDIS_CXX=$(QUIET_CC)$(CXX) $(FINAL_CXXFLAGS)
KEYDB_AS=$(QUIET_CC) as --64 -g
REDIS_LD=$(QUIET_LINK)$(CXX) $(FINAL_LDFLAGS)
REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL)

View File

@ -69,7 +69,7 @@ public:
inline T* end() { return m_data + m_celem; }
inline const T* end() const { return m_data + m_celem; }
T* insert(T* where, T &val)
T* insert(T* where, const T &val)
{
assert(where >= m_data);
size_t idx = where - m_data;

55
src/cowptr.h Normal file
View File

@ -0,0 +1,55 @@
#pragma once
template <class T>
class CowPtr
{
public:
typedef std::shared_ptr<T> RefPtr;
private:
mutable RefPtr m_sp;
void detach()
{
if( !( m_sp == nullptr || m_sp.unique() ) ) {
m_sp = std::make_shared<T>(*m_sp);
}
}
public:
CowPtr() = default;
CowPtr(const RefPtr& refptr)
: m_sp(refptr)
{
}
bool operator==(std::nullptr_t) const
{
return m_sp == nullptr;
}
bool operator!=(std::nullptr_t) const
{
return m_sp != nullptr;
}
const T& operator*() const
{
return *m_sp;
}
T& operator*()
{
detach();
return *m_sp;
}
const T* operator->() const
{
return m_sp.operator->();
}
T* operator->()
{
detach();
return m_sp.operator->();
}
};

View File

@ -1438,7 +1438,6 @@ int rdbSaveBackground(rdbSaveInfo *rsi) {
g_pserver->stat_fork_time = ustime()-start;
g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */
latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000);
if (launchRdbSaveThread(child, rsi) != C_OK) {
closeChildInfoPipe();
g_pserver->lastbgsave_status = C_ERR;
@ -1446,6 +1445,7 @@ int rdbSaveBackground(rdbSaveInfo *rsi) {
strerror(errno));
return C_ERR;
}
latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000);
serverLog(LL_NOTICE,"Background saving started");
g_pserver->rdb_save_time_start = time(NULL);
g_pserver->rdbThreadVars.fRdbThreadActive = true;

View File

@ -1,6 +1,7 @@
#pragma once
#include <assert.h>
#include "compactvector.h"
#include "cowptr.h"
/****************************************
* semiorderedset.h:
@ -30,13 +31,15 @@ namespace keydbutils
template<typename T, typename T_KEY = T, bool MEMMOVE_SAFE = false>
class semiorderedset
{
typedef compactvector<T, MEMMOVE_SAFE> vector_type;
friend struct setiter;
std::vector<compactvector<T, MEMMOVE_SAFE>> m_data;
std::vector<CowPtr<vector_type>> m_data;
size_t celem = 0;
static const size_t bits_min = 8;
size_t bits = bits_min;
size_t idxRehash = (1ULL << bits_min);
bool fPauseRehash = false;
int cfPauseRehash = 0;
constexpr size_t targetElementsPerBucket()
{
@ -57,9 +60,9 @@ public:
size_t idxPrimary = 0;
size_t idxSecondary = 0;
setiter(semiorderedset *set)
setiter(const semiorderedset *set)
{
this->set = set;
this->set = (semiorderedset*)set;
}
bool operator==(const setiter &other) const
@ -69,22 +72,29 @@ public:
bool operator!=(const setiter &other) const { return !operator==(other); }
inline T &operator*() { return set->m_data[idxPrimary][idxSecondary]; }
inline const T &operator*() const { return set->m_data[idxPrimary][idxSecondary]; }
inline T &operator*() { return set->m_data[idxPrimary]->operator[](idxSecondary); }
inline const T &operator*() const { return set->m_data[idxPrimary]->operator[](idxSecondary); }
inline T *operator->() { return &set->m_data[idxPrimary][idxSecondary]; }
inline const T *operator->() const { return &set->m_data[idxPrimary][idxSecondary]; }
inline T *operator->() { return &set->m_data[idxPrimary]->operator[](idxSecondary); }
inline const T *operator->() const { return &set->m_data[idxPrimary]->operator[](idxSecondary); }
};
setiter find(const T_KEY &key)
{
RehashStep();
return const_cast<const semiorderedset*>(this)->find(key);
}
setiter find(const T_KEY &key) const
{
setiter itr(this);
itr.idxPrimary = idxFromObj(key);
for (int hashset = 0; hashset < 2; ++hashset) // rehashing may only be 1 resize behind, so we check up to two slots
{
auto &vecBucket = m_data[itr.idxPrimary];
if (m_data[itr.idxPrimary] == nullptr)
continue;
const auto &vecBucket = *m_data[itr.idxPrimary];
auto itrFind = std::find(vecBucket.begin(), vecBucket.end(), key);
if (itrFind != vecBucket.end())
@ -118,7 +128,7 @@ public:
return itr;
}
void insert(T &e, bool fRehash = false)
void insert(const T &e, bool fRehash = false)
{
if (!fRehash)
RehashStep();
@ -127,12 +137,15 @@ public:
if (!fRehash)
++celem;
typename compactvector<T, MEMMOVE_SAFE>::iterator itrInsert;
if (!m_data[idx].empty() && !(e < m_data[idx].back()))
itrInsert = m_data[idx].end();
if (m_data[idx] == nullptr)
m_data[idx] = std::make_shared<vector_type>();
typename vector_type::iterator itrInsert;
if (!m_data[idx]->empty() && !(e < m_data[idx]->back()))
itrInsert = m_data[idx]->end();
else
itrInsert = std::upper_bound(m_data[idx].begin(), m_data[idx].end(), e);
itrInsert = m_data[idx].insert(itrInsert, e);
itrInsert = std::upper_bound(m_data[idx]->begin(), m_data[idx]->end(), e);
itrInsert = m_data[idx]->insert(itrInsert, e);
if (celem > ((1ULL << bits)*targetElementsPerBucket()))
grow();
@ -147,7 +160,7 @@ public:
if (itrStart.set != this) // really if this case isn't true its probably a bug
itr.set = this; // but why crash the program when we can easily fix this?
fPauseRehash = true;
cfPauseRehash++;
if (itr.idxPrimary >= m_data.size())
itr.idxPrimary = 0;
@ -161,7 +174,7 @@ public:
if (itr.idxPrimary >= m_data.size())
itr.idxPrimary = 0;
}
fPauseRehash = false;
cfPauseRehash--;
return itr;
}
@ -178,11 +191,11 @@ public:
for (size_t idxPrimaryCount = 0; idxPrimaryCount < m_data.size(); ++idxPrimaryCount)
{
size_t idxPrimary = (basePrimary + idxPrimaryCount) % m_data.size();
if (idxSecondary < m_data[idxPrimary].size())
if (m_data[idxPrimary] != nullptr && idxSecondary < m_data[idxPrimary]->size())
{
++visited;
fSawAny = true;
if (!fn(m_data[idxPrimary][idxSecondary]))
if (!fn(m_data[idxPrimary]->operator[](idxSecondary)))
return visited;
}
}
@ -196,16 +209,16 @@ public:
for (;;)
{
size_t idxPrimary = rand() % m_data.size();
if (m_data[idxPrimary].empty())
if (m_data[idxPrimary] == nullptr || m_data[idxPrimary]->empty())
continue;
return m_data[idxPrimary][rand() % m_data[idxPrimary].size()];
return (*m_data[idxPrimary])[rand() % m_data[idxPrimary]->size()];
}
}
void erase(const setiter &itr)
{
auto &vecRow = m_data[itr.idxPrimary];
auto &vecRow = *m_data[itr.idxPrimary];
vecRow.erase(vecRow.begin() + itr.idxSecondary);
--celem;
RehashStep();
@ -227,7 +240,8 @@ public:
size_t cb = sizeof(this) + (m_data.capacity()-m_data.size())*sizeof(T);
for (auto &vec : m_data)
{
cb += vec.bytes_used();
if (vec != nullptr)
cb += vec->bytes_used();
}
return cb;
}
@ -246,7 +260,10 @@ public:
}
/* Compute stats. */
for (auto &vec : m_data) {
for (const auto &spvec : m_data) {
if (spvec == nullptr)
continue;
const auto &vec = *spvec;
if (vec.empty()) {
clvector[0]++;
continue;
@ -288,10 +305,13 @@ public:
return strlen(buf);
}
void pause_rehash() { ++cfPauseRehash; }
void unpause_rehash() { --cfPauseRehash; RehashStep(); }
private:
inline size_t hashmask() const { return (1ULL << bits) - 1; }
size_t idxFromObj(const T_KEY &key)
size_t idxFromObj(const T_KEY &key) const
{
size_t v = keydbutils::hash(key);
return v & hashmask();
@ -304,16 +324,18 @@ private:
void RehashStep()
{
if (fPauseRehash)
if (cfPauseRehash)
return;
int steps = 0;
for (; idxRehash < (m_data.size()/2); ++idxRehash)
{
compactvector<T, MEMMOVE_SAFE> vecT;
std::swap(m_data[idxRehash], vecT);
if (m_data[idxRehash] == nullptr)
continue;
CowPtr<vector_type> spvecT = std::make_shared<vector_type>();
std::swap(m_data[idxRehash], spvecT);
for (auto &v : vecT)
for (const auto &v : *spvecT)
insert(v, true);
if (++steps > 1024)
@ -334,7 +356,10 @@ private:
template<typename T_VISITOR, typename T_MAX>
inline bool enumerate_bucket(setiter &itr, const T_MAX &max, T_VISITOR &fn, long long *pcheckLimit)
{
auto &vec = m_data[itr.idxPrimary];
if (m_data[itr.idxPrimary] == nullptr)
return true;
auto &vec = *m_data[itr.idxPrimary];
for (; itr.idxSecondary < vec.size(); ++itr.idxSecondary)
{
// Assert we're ordered by T_MAX

View File

@ -1057,10 +1057,7 @@ public:
expireEntry(const expireEntry &e)
{
u.m_key = e.u.m_key;
m_when = e.m_when;
if (e.FFat())
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry);
*this = e;
}
expireEntry(expireEntry &&e)
@ -1076,6 +1073,15 @@ public:
delete u.m_pfatentry;
}
expireEntry &operator=(const expireEntry &e)
{
u.m_key = e.u.m_key;
m_when = e.m_when;
if (e.FFat())
u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry);
return *this;
}
void setKeyUnsafe(sds key)
{
if (FFat())

View File

@ -47,7 +47,10 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
spdb->m_refCount = 1;
spdb->mvccCheckpoint = getMvccTstamp();
if (m_setexpire != nullptr)
{
spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire);
spdb->m_setexpire->pause_rehash(); // needs to be const
}
m_pdict = dictCreate(&dbDictType,this);
m_pdictTombstone = dictCreate(&dbDictType, this);
@ -163,16 +166,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
continue;
}
robj *obj = (robj*)dictGetVal(deSnapshot);
const char *key = (const char*)dictGetKey(deSnapshot);
if (obj == nullptr || obj->FExpires())
{
auto itrExpire = m_spdbSnapshotHOLDER->m_setexpire->find(key);
if (itrExpire != m_spdbSnapshotHOLDER->m_setexpire->end())
{
m_spdbSnapshotHOLDER->m_setexpire->erase(itrExpire); // Note: normally we would have to set obj::fexpire false but we're deleting it anyways...
}
}
dictDelete(m_spdbSnapshotHOLDER->m_pdict, key);
}
dictReleaseIterator(di);
@ -289,7 +283,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
}
dictReleaseIterator(di);
redisDbPersistentDataSnapshot *psnapshot;
const redisDbPersistentDataSnapshot *psnapshot;
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
if (fResult && psnapshot != nullptr)
{
@ -405,7 +399,7 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
std::atomic_thread_fence(std::memory_order_seq_cst);
m_spdbSnapshotHOLDER.release(); // GC has responsibility for it now
m_spdbSnapshotHOLDER = std::move(spdb);
auto ptrT = m_spdbSnapshotHOLDER.get();
const redisDbPersistentDataSnapshot *ptrT = m_spdbSnapshotHOLDER.get();
__atomic_store(&m_pdbSnapshot, &ptrT, __ATOMIC_SEQ_CST);
locker.disarm(); // ensure we're not locked for any dtors
}