diff --git a/src/Makefile b/src/Makefile index 87016208d..6966f8ede 100644 --- a/src/Makefile +++ b/src/Makefile @@ -103,7 +103,7 @@ endif FINAL_CFLAGS=$(STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(REDIS_CFLAGS) FINAL_CXXFLAGS=$(CXX_STD) $(WARN) $(OPT) $(DEBUG) $(CFLAGS) $(CXXFLAGS) $(REDIS_CFLAGS) FINAL_LDFLAGS=$(LDFLAGS) $(REDIS_LDFLAGS) $(DEBUG) -FINAL_LIBS=-lm -lz +FINAL_LIBS=-lm -lz -latomic DEBUG=-g -ggdb ifeq ($(uname_S),SunOS) @@ -187,7 +187,7 @@ ifeq ($(MALLOC),memkind) endif REDIS_CC=$(QUIET_CC)$(CC) $(FINAL_CFLAGS) -REDIS_CXX=$(QUIET_CC)$(CC) $(FINAL_CXXFLAGS) +REDIS_CXX=$(QUIET_CC)$(CXX) $(FINAL_CXXFLAGS) KEYDB_AS=$(QUIET_CC) as --64 -g REDIS_LD=$(QUIET_LINK)$(CXX) $(FINAL_LDFLAGS) REDIS_INSTALL=$(QUIET_INSTALL)$(INSTALL) diff --git a/src/compactvector.h b/src/compactvector.h index 4571f9528..222eaab2a 100644 --- a/src/compactvector.h +++ b/src/compactvector.h @@ -69,7 +69,7 @@ public: inline T* end() { return m_data + m_celem; } inline const T* end() const { return m_data + m_celem; } - T* insert(T* where, T &val) + T* insert(T* where, const T &val) { assert(where >= m_data); size_t idx = where - m_data; diff --git a/src/cowptr.h b/src/cowptr.h new file mode 100644 index 000000000..b0c64b4bd --- /dev/null +++ b/src/cowptr.h @@ -0,0 +1,55 @@ +#pragma once + +template +class CowPtr +{ + public: + typedef std::shared_ptr RefPtr; + + private: + mutable RefPtr m_sp; + + void detach() + { + if( !( m_sp == nullptr || m_sp.unique() ) ) { + m_sp = std::make_shared(*m_sp); + } + } + + public: + CowPtr() = default; + + CowPtr(const RefPtr& refptr) + : m_sp(refptr) + { + } + + bool operator==(std::nullptr_t) const + { + return m_sp == nullptr; + } + + bool operator!=(std::nullptr_t) const + { + return m_sp != nullptr; + } + + const T& operator*() const + { + return *m_sp; + } + T& operator*() + { + detach(); + return *m_sp; + } + const T* operator->() const + { + return m_sp.operator->(); + } + T* operator->() + { + detach(); + return m_sp.operator->(); + } +}; diff --git a/src/rdb.cpp b/src/rdb.cpp index c8a8c5797..fbb011dfe 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1438,7 +1438,6 @@ int rdbSaveBackground(rdbSaveInfo *rsi) { g_pserver->stat_fork_time = ustime()-start; g_pserver->stat_fork_rate = (double) zmalloc_used_memory() * 1000000 / g_pserver->stat_fork_time / (1024*1024*1024); /* GB per second. */ - latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000); if (launchRdbSaveThread(child, rsi) != C_OK) { closeChildInfoPipe(); g_pserver->lastbgsave_status = C_ERR; @@ -1446,6 +1445,7 @@ int rdbSaveBackground(rdbSaveInfo *rsi) { strerror(errno)); return C_ERR; } + latencyAddSampleIfNeeded("fork",g_pserver->stat_fork_time/1000); serverLog(LL_NOTICE,"Background saving started"); g_pserver->rdb_save_time_start = time(NULL); g_pserver->rdbThreadVars.fRdbThreadActive = true; diff --git a/src/semiorderedset.h b/src/semiorderedset.h index fe04cdae0..e9ec80281 100644 --- a/src/semiorderedset.h +++ b/src/semiorderedset.h @@ -1,6 +1,7 @@ #pragma once #include #include "compactvector.h" +#include "cowptr.h" /**************************************** * semiorderedset.h: @@ -30,13 +31,15 @@ namespace keydbutils template class semiorderedset { + typedef compactvector vector_type; + friend struct setiter; - std::vector> m_data; + std::vector> m_data; size_t celem = 0; static const size_t bits_min = 8; size_t bits = bits_min; size_t idxRehash = (1ULL << bits_min); - bool fPauseRehash = false; + int cfPauseRehash = 0; constexpr size_t targetElementsPerBucket() { @@ -57,9 +60,9 @@ public: size_t idxPrimary = 0; size_t idxSecondary = 0; - setiter(semiorderedset *set) + setiter(const semiorderedset *set) { - this->set = set; + this->set = (semiorderedset*)set; } bool operator==(const setiter &other) const @@ -69,22 +72,29 @@ public: bool operator!=(const setiter &other) const { return !operator==(other); } - inline T &operator*() { return set->m_data[idxPrimary][idxSecondary]; } - inline const T &operator*() const { return set->m_data[idxPrimary][idxSecondary]; } + inline T &operator*() { return set->m_data[idxPrimary]->operator[](idxSecondary); } + inline const T &operator*() const { return set->m_data[idxPrimary]->operator[](idxSecondary); } - inline T *operator->() { return &set->m_data[idxPrimary][idxSecondary]; } - inline const T *operator->() const { return &set->m_data[idxPrimary][idxSecondary]; } + inline T *operator->() { return &set->m_data[idxPrimary]->operator[](idxSecondary); } + inline const T *operator->() const { return &set->m_data[idxPrimary]->operator[](idxSecondary); } }; setiter find(const T_KEY &key) { RehashStep(); + return const_cast(this)->find(key); + } + + setiter find(const T_KEY &key) const + { setiter itr(this); itr.idxPrimary = idxFromObj(key); for (int hashset = 0; hashset < 2; ++hashset) // rehashing may only be 1 resize behind, so we check up to two slots { - auto &vecBucket = m_data[itr.idxPrimary]; + if (m_data[itr.idxPrimary] == nullptr) + continue; + const auto &vecBucket = *m_data[itr.idxPrimary]; auto itrFind = std::find(vecBucket.begin(), vecBucket.end(), key); if (itrFind != vecBucket.end()) @@ -118,7 +128,7 @@ public: return itr; } - void insert(T &e, bool fRehash = false) + void insert(const T &e, bool fRehash = false) { if (!fRehash) RehashStep(); @@ -127,12 +137,15 @@ public: if (!fRehash) ++celem; - typename compactvector::iterator itrInsert; - if (!m_data[idx].empty() && !(e < m_data[idx].back())) - itrInsert = m_data[idx].end(); + if (m_data[idx] == nullptr) + m_data[idx] = std::make_shared(); + + typename vector_type::iterator itrInsert; + if (!m_data[idx]->empty() && !(e < m_data[idx]->back())) + itrInsert = m_data[idx]->end(); else - itrInsert = std::upper_bound(m_data[idx].begin(), m_data[idx].end(), e); - itrInsert = m_data[idx].insert(itrInsert, e); + itrInsert = std::upper_bound(m_data[idx]->begin(), m_data[idx]->end(), e); + itrInsert = m_data[idx]->insert(itrInsert, e); if (celem > ((1ULL << bits)*targetElementsPerBucket())) grow(); @@ -147,7 +160,7 @@ public: if (itrStart.set != this) // really if this case isn't true its probably a bug itr.set = this; // but why crash the program when we can easily fix this? - fPauseRehash = true; + cfPauseRehash++; if (itr.idxPrimary >= m_data.size()) itr.idxPrimary = 0; @@ -161,7 +174,7 @@ public: if (itr.idxPrimary >= m_data.size()) itr.idxPrimary = 0; } - fPauseRehash = false; + cfPauseRehash--; return itr; } @@ -178,11 +191,11 @@ public: for (size_t idxPrimaryCount = 0; idxPrimaryCount < m_data.size(); ++idxPrimaryCount) { size_t idxPrimary = (basePrimary + idxPrimaryCount) % m_data.size(); - if (idxSecondary < m_data[idxPrimary].size()) + if (m_data[idxPrimary] != nullptr && idxSecondary < m_data[idxPrimary]->size()) { ++visited; fSawAny = true; - if (!fn(m_data[idxPrimary][idxSecondary])) + if (!fn(m_data[idxPrimary]->operator[](idxSecondary))) return visited; } } @@ -196,16 +209,16 @@ public: for (;;) { size_t idxPrimary = rand() % m_data.size(); - if (m_data[idxPrimary].empty()) + if (m_data[idxPrimary] == nullptr || m_data[idxPrimary]->empty()) continue; - return m_data[idxPrimary][rand() % m_data[idxPrimary].size()]; + return (*m_data[idxPrimary])[rand() % m_data[idxPrimary]->size()]; } } void erase(const setiter &itr) { - auto &vecRow = m_data[itr.idxPrimary]; + auto &vecRow = *m_data[itr.idxPrimary]; vecRow.erase(vecRow.begin() + itr.idxSecondary); --celem; RehashStep(); @@ -227,7 +240,8 @@ public: size_t cb = sizeof(this) + (m_data.capacity()-m_data.size())*sizeof(T); for (auto &vec : m_data) { - cb += vec.bytes_used(); + if (vec != nullptr) + cb += vec->bytes_used(); } return cb; } @@ -246,7 +260,10 @@ public: } /* Compute stats. */ - for (auto &vec : m_data) { + for (const auto &spvec : m_data) { + if (spvec == nullptr) + continue; + const auto &vec = *spvec; if (vec.empty()) { clvector[0]++; continue; @@ -288,10 +305,13 @@ public: return strlen(buf); } + void pause_rehash() { ++cfPauseRehash; } + void unpause_rehash() { --cfPauseRehash; RehashStep(); } + private: inline size_t hashmask() const { return (1ULL << bits) - 1; } - size_t idxFromObj(const T_KEY &key) + size_t idxFromObj(const T_KEY &key) const { size_t v = keydbutils::hash(key); return v & hashmask(); @@ -304,16 +324,18 @@ private: void RehashStep() { - if (fPauseRehash) + if (cfPauseRehash) return; int steps = 0; for (; idxRehash < (m_data.size()/2); ++idxRehash) { - compactvector vecT; - std::swap(m_data[idxRehash], vecT); + if (m_data[idxRehash] == nullptr) + continue; + CowPtr spvecT = std::make_shared(); + std::swap(m_data[idxRehash], spvecT); - for (auto &v : vecT) + for (const auto &v : *spvecT) insert(v, true); if (++steps > 1024) @@ -334,7 +356,10 @@ private: template inline bool enumerate_bucket(setiter &itr, const T_MAX &max, T_VISITOR &fn, long long *pcheckLimit) { - auto &vec = m_data[itr.idxPrimary]; + if (m_data[itr.idxPrimary] == nullptr) + return true; + + auto &vec = *m_data[itr.idxPrimary]; for (; itr.idxSecondary < vec.size(); ++itr.idxSecondary) { // Assert we're ordered by T_MAX diff --git a/src/server.h b/src/server.h index ea57b140f..c3752acb2 100644 --- a/src/server.h +++ b/src/server.h @@ -1057,10 +1057,7 @@ public: expireEntry(const expireEntry &e) { - u.m_key = e.u.m_key; - m_when = e.m_when; - if (e.FFat()) - u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry); + *this = e; } expireEntry(expireEntry &&e) @@ -1076,6 +1073,15 @@ public: delete u.m_pfatentry; } + expireEntry &operator=(const expireEntry &e) + { + u.m_key = e.u.m_key; + m_when = e.m_when; + if (e.FFat()) + u.m_pfatentry = new (MALLOC_LOCAL) expireEntryFat(*e.u.m_pfatentry); + return *this; + } + void setKeyUnsafe(sds key) { if (FFat()) diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 0664f8f45..7c0a653a5 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -47,7 +47,10 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 spdb->m_refCount = 1; spdb->mvccCheckpoint = getMvccTstamp(); if (m_setexpire != nullptr) + { spdb->m_setexpire = new (MALLOC_LOCAL) expireset(*m_setexpire); + spdb->m_setexpire->pause_rehash(); // needs to be const + } m_pdict = dictCreate(&dbDictType,this); m_pdictTombstone = dictCreate(&dbDictType, this); @@ -163,16 +166,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn continue; } - robj *obj = (robj*)dictGetVal(deSnapshot); const char *key = (const char*)dictGetKey(deSnapshot); - if (obj == nullptr || obj->FExpires()) - { - auto itrExpire = m_spdbSnapshotHOLDER->m_setexpire->find(key); - if (itrExpire != m_spdbSnapshotHOLDER->m_setexpire->end()) - { - m_spdbSnapshotHOLDER->m_setexpire->erase(itrExpire); // Note: normally we would have to set obj::fexpire false but we're deleting it anyways... - } - } dictDelete(m_spdbSnapshotHOLDER->m_pdict, key); } dictReleaseIterator(di); @@ -289,7 +283,7 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function