From 198db651d20d3c40689aca59b701bc84eb3d4e8a Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 13 Jul 2020 21:14:03 -0400 Subject: [PATCH 01/17] Remove gitter, we don't check it often enough Former-commit-id: 119797014c09c9330e473b904f98353b32d549ab --- README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/README.md b/README.md index 88f5f0f7f..fe0030dff 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,5 @@ ![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg) ![CI](https://github.com/JohnSully/KeyDB/workflows/CI/badge.svg?branch=unstable) -[![Join the chat at https://gitter.im/KeyDB/community](https://badges.gitter.im/KeyDB/community.svg)](https://gitter.im/KeyDB/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![StackShare](http://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/eq-alpha-technology-inc/eq-alpha-technology-inc) ##### New! Want to extend KeyDB with Javascript? Try [ModJS](https://github.com/JohnSully/ModJS) From 9928562dadcbaa336049c08c9653d8f14f9bd05c Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 17 Jul 2020 21:02:51 +0000 Subject: [PATCH 02/17] Fix lock after free in module API Former-commit-id: d88fd1588d292bffc0aa53c299cb52e7a4e91015 --- src/ae.cpp | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/ae.cpp b/src/ae.cpp index 44f302e69..789a6888b 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -258,9 +258,11 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, if (fSynchronous) { + { std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; + } delete cmd.pctl; } @@ -300,7 +302,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch cmd.fLock = fLock; if (fSynchronous) { - cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); + cmd.pctl = new (MALLOC_LOCAL) aeCommandControl; cmd.pctl->mutexcv.lock(); } @@ -311,9 +313,11 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch int ret = AE_OK; if (fSynchronous) { + { std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; + } delete cmd.pctl; } return ret; From 9090e26aca828ccd9fd8a82166ec32e08265ba77 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Aug 2020 06:10:06 +0000 Subject: [PATCH 03/17] Add build flag to disable MVCC tstamps Former-commit-id: f17d178d03f44abcdaddd851a313dd3f7ec87ed5 --- src/Makefile | 5 +++++ src/db.cpp | 10 +++++++++- src/object.cpp | 13 ++++++++++++- src/rdb.cpp | 8 ++++++++ src/server.h | 6 ++++++ src/storage.h | 2 -- 6 files changed, 40 insertions(+), 4 deletions(-) diff --git a/src/Makefile b/src/Makefile index 038ac51f8..f3c72db27 100644 --- a/src/Makefile +++ b/src/Makefile @@ -47,6 +47,11 @@ endif USEASM?=true +ifeq ($(NOMVCC),) + CFLAGS+= -DENABLE_MVCC + CXXFLAGS+= -DENABLE_MVCC +endif + ifneq ($(SANITIZE),) CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE diff --git a/src/db.cpp b/src/db.cpp index 11816262b..33eae9fc3 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -92,9 +92,11 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) { updateDbValAccess(de, flags); +#ifdef ENABLE_MVCC if (flags & LOOKUP_UPDATEMVCC) { val->mvcc_tstamp = getMvccTstamp(); } +#endif return val; } else { return NULL; @@ -206,7 +208,9 @@ int dbAddCore(redisDb *db, robj *key, robj *val) { serverAssert(!val->FExpires()); sds copy = sdsdup(szFromObj(key)); int retval = dictAdd(db->pdict, copy, val); +#ifdef ENABLE_MVCC val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); +#endif if (retval == DICT_OK) { @@ -256,7 +260,9 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd if (fUpdateMvcc) { if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) val = dupStringObject(val); +#ifdef ENABLE_MVCC val->mvcc_tstamp = getMvccTstamp(); +#endif } dictSetVal(db->pdict, de, val); @@ -290,13 +296,15 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) if (de == nullptr) return (dbAddCore(db, key, val) == DICT_OK); +#ifdef ENABLE_MVCC robj *old = (robj*)dictGetVal(de); if (old->mvcc_tstamp <= val->mvcc_tstamp) { dbOverwriteCore(db, de, key, val, false, true); return true; } - +#endif + return false; } else diff --git a/src/object.cpp b/src/object.cpp index 091e3d4da..3befa73d7 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -46,7 +46,9 @@ robj *createObject(int type, void *ptr) { o->encoding = OBJ_ENCODING_RAW; o->m_ptr = ptr; o->setrefcount(1); +#ifdef ENABLE_MVCC o->mvcc_tstamp = OBJ_MVCC_INVALID; +#endif /* Set the LRU to the current lruclock (minutes resolution), or * alternatively the LFU counter. */ @@ -101,7 +103,9 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; o->setrefcount(1); +#ifdef ENABLE_MVCC o->mvcc_tstamp = OBJ_MVCC_INVALID; +#endif if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; @@ -129,8 +133,13 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { * * The current limit of 52 is chosen so that the biggest string object * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ +#ifdef ENABLE_MVCC #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 -static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); +#else +#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 256 +#endif + +//static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); robj *createStringObject(const char *ptr, size_t len) { if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) return createEmbeddedStringObject(ptr,len); @@ -1317,10 +1326,12 @@ NULL * because we update the access time only * when the key is read or overwritten. */ addReplyLongLong(c,LFUDecrAndReturn(o)); +#ifdef ENABLE_MVCC } else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) == NULL) return; addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000); +#endif } else { addReplySubcommandSyntaxError(c); } diff --git a/src/rdb.cpp b/src/rdb.cpp index 5ee6baa25..c6d39a010 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1089,8 +1089,10 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) { } char szT[32]; +#ifdef ENABLE_MVCC snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; +#endif /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; @@ -2046,7 +2048,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { return NULL; } +#ifdef ENABLE_MVCC o->mvcc_tstamp = mvcc_tstamp; +#endif serverAssert(!o->FExpires()); return o; } @@ -2398,7 +2402,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { key = nullptr; goto eoferr; } +#ifdef ENABLE_MVCC bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false; +#else + bool fStaleMvccKey = false; +#endif /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was diff --git a/src/server.h b/src/server.h index 7479b4557..4ed40339f 100644 --- a/src/server.h +++ b/src/server.h @@ -805,7 +805,9 @@ typedef struct redisObject { private: mutable std::atomic refcount {0}; public: +#ifdef ENABLE_MVCC uint64_t mvcc_tstamp; +#endif void *m_ptr; inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } @@ -816,7 +818,11 @@ public: void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } } robj; +#ifdef ENABLE_MVCC static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase"); +#else +static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase"); +#endif __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) { diff --git a/src/storage.h b/src/storage.h index e9106aca2..2d46c0223 100644 --- a/src/storage.h +++ b/src/storage.h @@ -1,8 +1,6 @@ #ifndef __STORAGE_H__ #define __STORAGE_H__ -#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 // Note: also defined in object.c - should always match - #ifdef __cplusplus extern "C" { #endif From 6b8e979434d15390d4c3d0acfbb7acdbc6c9d9bc Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 14 Aug 2020 16:05:39 +0000 Subject: [PATCH 04/17] Prehash the tombstone for cleanup Former-commit-id: c9d97a7c7448fc769486175bea1648589487c87c --- src/db.cpp | 12 ++++++++---- src/dict.cpp | 10 +++++----- src/dict.h | 2 +- src/lazyfree.cpp | 5 ++++- src/snapshot.cpp | 3 ++- 5 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index b7b36b518..b91041f66 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -422,7 +422,8 @@ bool redisDbPersistentData::syncDelete(robj *key) if (itr != nullptr) { sds keyTombstone = sdsdup(szFromObj(key)); - if (dictAdd(m_pdictTombstone, keyTombstone, nullptr) != DICT_OK) + uint64_t hash = dictGetHash(m_pdict, keyTombstone); + if (dictAdd(m_pdictTombstone, keyTombstone, (void*)hash) != DICT_OK) sdsfree(keyTombstone); } } @@ -2290,7 +2291,7 @@ void redisDbPersistentData::initialize() { m_pdbSnapshot = nullptr; m_pdict = dictCreate(&dbDictType,this); - m_pdictTombstone = dictCreate(&dbDictType,this); + m_pdictTombstone = dictCreate(&dbDictTypeTombstone,this); m_setexpire = new(MALLOC_LOCAL) expireset(); m_fAllChanged = 0; m_fTrackingChanges = 0; @@ -2477,8 +2478,11 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) { dictAdd(m_pdict, keyNew, nullptr); } - *pde = dictFind(m_pdict, sdsKey); - dictAdd(m_pdictTombstone, sdsdupshared(itr.key()), nullptr); + uint64_t hash = dictGetHash(m_pdict, sdsKey); + dictEntry **deT; + dictht *ht; + *pde = dictFindWithPrev(m_pdict, sdsKey, hash, &deT, &ht); + dictAdd(m_pdictTombstone, sdsdupshared(itr.key()), (void*)hash); } } diff --git a/src/dict.cpp b/src/dict.cpp index ef7365fdb..c69c663a4 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -326,7 +326,7 @@ int dictRehashMilliseconds(dict *d, int ms) { static void _dictRehashStep(dict *d) { unsigned long iterators; __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); - if (iterators == 0) dictRehash(d,1); + if (iterators == 0) dictRehash(d,10); } /* Add an element to the target hash table */ @@ -541,14 +541,13 @@ void dictRelease(dict *d) zfree(d); } -dictEntry *dictFindWithPrev(dict *d, const void *key, dictEntry ***dePrevPtr, dictht **pht) +dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht) { dictEntry *he; - uint64_t h, idx, table; + uint64_t idx, table; if (dictSize(d) == 0) return NULL; /* dict is empty */ if (dictIsRehashing(d)) _dictRehashStep(d); - h = dictHashKey(d, key); for (table = 0; table <= 1; table++) { *pht = d->ht + table; idx = h & d->ht[table].sizemask; @@ -570,7 +569,8 @@ dictEntry *dictFind(dict *d, const void *key) { dictEntry **deT; dictht *ht; - return dictFindWithPrev(d, key, &deT, &ht); + uint64_t h = dictHashKey(d, key); + return dictFindWithPrev(d, key, h, &deT, &ht); } void *dictFetchValue(dict *d, const void *key) { diff --git a/src/dict.h b/src/dict.h index c9118d35f..77ab832db 100644 --- a/src/dict.h +++ b/src/dict.h @@ -167,7 +167,7 @@ dictEntry *dictUnlink(dict *ht, const void *key); void dictFreeUnlinkedEntry(dict *d, dictEntry *he); void dictRelease(dict *d); dictEntry * dictFind(dict *d, const void *key); -dictEntry * dictFindWithPrev(dict *d, const void *key, dictEntry ***dePrevPtr, dictht **ht); +dictEntry * dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **ht); void *dictFetchValue(dict *d, const void *key); int dictResize(dict *d); dictIterator *dictGetIterator(dict *d); diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 1ee49103a..b5f341ed3 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -62,7 +62,10 @@ bool redisDbPersistentData::asyncDelete(robj *key) { dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key)); if (de) { if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)) != nullptr) - dictAdd(m_pdictTombstone, sdsdup((sds)dictGetKey(de)), nullptr); + { + uint64_t hash = dictGetHash(m_pdict, szFromObj(key)); + dictAdd(m_pdictTombstone, sdsdup((sds)dictGetKey(de)), (void*)hash); + } robj *val = (robj*)dictGetVal(de); if (val->FExpires()) diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 0ad1b5282..8dd4db367 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -70,6 +70,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 auto spdb = std::unique_ptr(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); + dictRehashMilliseconds(m_pdict, 50); // Give us the best chance at a fast cleanup spdb->m_fAllChanged = false; spdb->m_fTrackingChanges = 0; spdb->m_pdict = m_pdict; @@ -303,7 +304,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn { dictEntry **dePrev; dictht *ht; - dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), &dePrev, &ht); + dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht); if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot) { // The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt) From 6c6ca43eefa7479ede92f8fd3fab375fbd62de57 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 15 Aug 2020 22:59:01 +0000 Subject: [PATCH 05/17] Prevent unnecessary copy when overwriting a value from a snapshot Former-commit-id: 654a7bc6ea82f4ac45a1c1a25c794e1c27c0d902 --- src/db.cpp | 19 +++++++++++++++++++ src/server.h | 5 +++++ 2 files changed, 24 insertions(+) diff --git a/src/db.cpp b/src/db.cpp index 12c23ca21..e2a50164c 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -338,6 +338,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) * The client 'c' argument may be set to NULL if the operation is performed * in a context where there is no clear client performing the operation. */ void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) { + db->prepOverwriteForSnapshot(szFromObj(key)); if (!dbAddCore(db, key, val)) { dbOverwrite(db, key, val, !keepttl); } @@ -2358,6 +2359,24 @@ bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew) return (res == DICT_OK); } +// This is a performance tool to prevent us copying over an object we're going to overwrite anyways +void redisDbPersistentData::prepOverwriteForSnapshot(char *key) +{ + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) + return; + + if (m_pdbSnapshot != nullptr) + { + auto itr = m_pdbSnapshot->find_cached_threadsafe(key); + if (itr.key() != nullptr) + { + sds keyNew = sdsdupshared(itr.key()); + if (dictAdd(m_pdictTombstone, keyNew, (void*)dictHashKey(m_pdict, key)) != DICT_OK) + sdsfree(keyNew); + } + } +} + void redisDbPersistentData::tryResize() { if (htNeedsResize(m_pdict)) diff --git a/src/server.h b/src/server.h index 597ac8310..4f9375834 100644 --- a/src/server.h +++ b/src/server.h @@ -1326,6 +1326,9 @@ public: void setExpire(robj *key, robj *subkey, long long when); void setExpire(expireEntry &&e); void initialize(); + void prepOverwriteForSnapshot(char *key); + + bool FRehashing() const { return dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone); } void setStorageProvider(StorageCache *pstorage); @@ -1527,6 +1530,8 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::dictUnsafeKeyOnly; using redisDbPersistentData::resortExpire; using redisDbPersistentData::prefetchKeysAsync; + using redisDbPersistentData::prepOverwriteForSnapshot; + using redisDbPersistentData::FRehashing; public: expireset::setiter expireitr; From eb572c57e2525066e6d88174ffb2a06576878726 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 15 Aug 2020 23:05:56 +0000 Subject: [PATCH 06/17] Rehash efficiency Former-commit-id: fab383156626ec683881101c22eb2f6c2cea4c5d --- src/db.cpp | 4 ++-- src/dict.cpp | 6 ++++-- src/server.cpp | 11 ++++++----- src/server.h | 2 +- src/snapshot.cpp | 7 ++++++- 5 files changed, 19 insertions(+), 11 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index e2a50164c..1cc7e66f7 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -430,7 +430,7 @@ bool redisDbPersistentData::syncDelete(robj *key) auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)); if (itr != nullptr) { - sds keyTombstone = sdsdup(szFromObj(key)); + sds keyTombstone = sdsdupshared(itr.key()); uint64_t hash = dictGetHash(m_pdict, keyTombstone); if (dictAdd(m_pdictTombstone, keyTombstone, (void*)hash) != DICT_OK) sdsfree(keyTombstone); @@ -2300,7 +2300,7 @@ void redisDbPersistentData::initialize() { m_pdbSnapshot = nullptr; m_pdict = dictCreate(&dbDictType,this); - m_pdictTombstone = dictCreate(&dbDictTypeTombstone,this); + m_pdictTombstone = dictCreate(&dbTombstoneDictType,this); m_setexpire = new(MALLOC_LOCAL) expireset(); m_fAllChanged = 0; m_fTrackingChanges = 0; diff --git a/src/dict.cpp b/src/dict.cpp index c69c663a4..fda797363 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -326,7 +326,7 @@ int dictRehashMilliseconds(dict *d, int ms) { static void _dictRehashStep(dict *d) { unsigned long iterators; __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); - if (iterators == 0) dictRehash(d,10); + if (iterators == 0) dictRehash(d,2); } /* Add an element to the target hash table */ @@ -1220,7 +1220,9 @@ void dictGetStats(char *buf, size_t bufsize, dict *d) { void dictForceRehash(dict *d) { - while (dictIsRehashing(d)) _dictRehashStep(d); + unsigned long iterators; + __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); + while (iterators == 0 && dictIsRehashing(d)) _dictRehashStep(d); } /* ------------------------------- Benchmark ---------------------------------*/ diff --git a/src/server.cpp b/src/server.cpp index 52648e7bd..f72047c31 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1387,14 +1387,14 @@ dictType dbDictType = { dictObjectDestructor /* val destructor */ }; -/* db->pdict, keys are sds strings, vals uints. */ -dictType dbDictTypeTombstone = { +/* db->pdict, keys are sds strings, vals are Redis objects. */ +dictType dbTombstoneDictType = { dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ - dictDbKeyDestructor, /* key destructor */ - NULL /* val destructor */ + dictDbKeyDestructor, /* key destructor */ + NULL /* val destructor */ }; dictType dbSnapshotDictType = { @@ -1539,8 +1539,9 @@ void tryResizeHashTables(int dbid) { * is returned. */ int redisDbPersistentData::incrementallyRehash() { /* Keys dictionary */ - if (dictIsRehashing(m_pdict)) { + if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) { dictRehashMilliseconds(m_pdict,1); + dictRehashMilliseconds(m_pdictTombstone,1); return 1; /* already used our millisecond for this loop... */ } return 0; diff --git a/src/server.h b/src/server.h index 4f9375834..ce521b2b6 100644 --- a/src/server.h +++ b/src/server.h @@ -2564,7 +2564,7 @@ extern dictType zsetDictType; extern dictType clusterNodesDictType; extern dictType clusterNodesBlackListDictType; extern dictType dbDictType; -extern dictType dbDictTypeTombstone; +extern dictType dbTombstoneDictType; extern dictType dbSnapshotDictType; extern dictType shaScriptObjectDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 8dd4db367..bd476be26 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -91,8 +91,13 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 spdb->m_setexpire->pause_rehash(); // needs to be const } + if (dictIsRehashing(spdb->m_pdict) || dictIsRehashing(spdb->m_pdictTombstone)) { + serverLog(LL_NOTICE, "NOTICE: Suboptimal snapshot"); + } + m_pdict = dictCreate(&dbDictType,this); - m_pdictTombstone = dictCreate(&dbDictTypeTombstone, this); + dictExpand(m_pdict, 1024); // minimize rehash overhead + m_pdictTombstone = dictCreate(&dbTombstoneDictType, this); serverAssert(spdb->m_pdict->iterators == 1); From 10555438173e8753c6ddad9924e01eaaf38d18a2 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 15 Aug 2020 23:14:29 +0000 Subject: [PATCH 07/17] dictMerge perf improvements Former-commit-id: 48401ec369c5693689ef658cca518dc94ab1402e --- src/dict.cpp | 74 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/src/dict.cpp b/src/dict.cpp index fda797363..184e01ca8 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -179,6 +179,9 @@ int dictExpand(dict *d, unsigned long size) int dictMerge(dict *dst, dict *src) { +#define MERGE_BLOCK_SIZE 4 + dictEntry *rgdeT[MERGE_BLOCK_SIZE]; + assert(dst != src); if (dictSize(src) == 0) return DICT_OK; @@ -210,6 +213,50 @@ int dictMerge(dict *dst, dict *src) } _dictReset(&src->ht[0]); dst->rehashidx = 0; + assert(dictIsRehashing(dst)); + assert((dictSize(src)+dictSize(dst)) == expectedSize); + return DICT_OK; + } + + if (!dictIsRehashing(src) && dictSize(src) > 0 && + (src->ht[0].size == dst->ht[0].size || src->ht[0].size == dst->ht[1].size)) + { + auto &htDst = (src->ht[0].size == dst->ht[0].size) ? dst->ht[0] : dst->ht[1]; + + assert(src->ht[0].size == htDst.size); + for (size_t ide = 0; ide < src->ht[0].size; ide += MERGE_BLOCK_SIZE) + { + if (src->ht[0].used == 0) + break; + + for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) { + rgdeT[dde] = src->ht[0].table[ide + dde]; + src->ht[0].table[ide + dde] = nullptr; + } + + for (;;) { + bool fAnyFound = false; + for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) { + if (rgdeT[dde] == nullptr) + continue; + dictEntry *deNext = rgdeT[dde]->next; + rgdeT[dde]->next = htDst.table[ide+dde]; + htDst.table[ide+dde] = rgdeT[dde]; + rgdeT[dde] = deNext; + htDst.used++; + src->ht[0].used--; + + fAnyFound = fAnyFound || (deNext != nullptr); + } + + if (!fAnyFound) + break; + } + } + // If we copied to the base hash table of a rehashing dst, reset the rehash + if (dictIsRehashing(dst) && src->ht[0].size == dst->ht[0].size) + dst->rehashidx = 0; + assert(dictSize(src) == 0); assert((dictSize(src)+dictSize(dst)) == expectedSize); return DICT_OK; } @@ -218,10 +265,34 @@ int dictMerge(dict *dst, dict *src) auto &htDst = dictIsRehashing(dst) ? dst->ht[1] : dst->ht[0]; for (int iht = 0; iht < 2; ++iht) { - for (size_t ide = 0; ide < src->ht[iht].size; ++ide) + for (size_t ide = 0; ide < src->ht[iht].size; ide += MERGE_BLOCK_SIZE) { if (src->ht[iht].used == 0) break; + + for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) { + rgdeT[dde] = src->ht[iht].table[ide + dde]; + src->ht[iht].table[ide + dde] = nullptr; + } + + for (;;) { + bool fAnyFound = false; + for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) { + if (rgdeT[dde] == nullptr) + continue; + uint64_t h = dictHashKey(dst, rgdeT[dde]->key) & htDst.sizemask; + dictEntry *deNext = rgdeT[dde]->next; + rgdeT[dde]->next = htDst.table[h]; + htDst.table[h] = rgdeT[dde]; + rgdeT[dde] = deNext; + htDst.used++; + src->ht[iht].used--; + fAnyFound = fAnyFound || (deNext != nullptr); + } + if (!fAnyFound) + break; + } +#if 0 dictEntry *de = src->ht[iht].table[ide]; src->ht[iht].table[ide] = nullptr; while (de != nullptr) @@ -236,6 +307,7 @@ int dictMerge(dict *dst, dict *src) de = deNext; src->ht[iht].used--; } +#endif } } assert((dictSize(src)+dictSize(dst)) == expectedSize); From ce69a765f867611d737fb7efd6b046493142ac21 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 15 Aug 2020 23:25:58 +0000 Subject: [PATCH 08/17] Remove unnecessary key comparisons in perf critical snapshot paths Former-commit-id: 40f8a8d102fdca9443399ef03a47df609b146d58 --- src/dict.cpp | 4 ++-- src/dict.h | 2 +- src/snapshot.cpp | 9 ++++++--- 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index 184e01ca8..86a29db4a 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -613,7 +613,7 @@ void dictRelease(dict *d) zfree(d); } -dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht) +dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht, bool fShallowCompare) { dictEntry *he; uint64_t idx, table; @@ -626,7 +626,7 @@ dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***d he = d->ht[table].table[idx]; *dePrevPtr = &d->ht[table].table[idx]; while(he) { - if (key==he->key || dictCompareKeys(d, key, he->key)) { + if (key==he->key || (!fShallowCompare && dictCompareKeys(d, key, he->key))) { return he; } *dePrevPtr = &he->next; diff --git a/src/dict.h b/src/dict.h index 77ab832db..13f9d2aaa 100644 --- a/src/dict.h +++ b/src/dict.h @@ -167,7 +167,7 @@ dictEntry *dictUnlink(dict *ht, const void *key); void dictFreeUnlinkedEntry(dict *d, dictEntry *he); void dictRelease(dict *d); dictEntry * dictFind(dict *d, const void *key); -dictEntry * dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **ht); +dictEntry * dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **ht, bool fShallowCompare = false); void *dictFetchValue(dict *d, const void *key); int dictResize(dict *d); dictIterator *dictGetIterator(dict *d); diff --git a/src/snapshot.cpp b/src/snapshot.cpp index bd476be26..1eb8547ba 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -246,8 +246,11 @@ void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) dictEntry *de; while ((de = dictNext(di)) != nullptr) { - dictEntry *deObj = dictFind(m_pdbSnapshot->m_pdict, dictGetKey(de)); - if (deObj != nullptr && dictGetVal(deObj) != nullptr) + dictEntry **dePrev = nullptr; + dictht *ht = nullptr; + sds key = (sds)dictGetKey(de); + dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared(key)); + if (deObj != nullptr) { decrRefCount((robj*)dictGetVal(deObj)); void *ptrSet = nullptr; @@ -309,7 +312,7 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn { dictEntry **dePrev; dictht *ht; - dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht); + dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared((sds)dictGetKey(de))); if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot) { // The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt) From 82989540c4cc750382d481a19dd5c6d098ba3aaa Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 15 Aug 2020 23:27:32 +0000 Subject: [PATCH 09/17] remove unnecessary allocation in rdb save Former-commit-id: dfa76f04fd862e0cc7b4d3284b3922e3a5e5549d --- src/rdb.cpp | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 0ed4febf1..0781cb7b4 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -349,20 +349,24 @@ writeerr: } ssize_t rdbSaveLzfStringObject(rio *rdb, const unsigned char *s, size_t len) { + char rgbuf[2048]; size_t comprlen, outlen; - void *out; + void *out = rgbuf; /* We require at least four bytes compression for this to be worth it */ if (len <= 4) return 0; outlen = len-4; - if ((out = zmalloc(outlen+1, MALLOC_LOCAL)) == NULL) return 0; + if (outlen >= sizeof(rgbuf)) + if ((out = zmalloc(outlen+1, MALLOC_LOCAL)) == NULL) return 0; comprlen = lzf_compress(s, len, out, outlen); if (comprlen == 0) { - zfree(out); + if (out != rgbuf) + zfree(out); return 0; } ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len); - zfree(out); + if (out != rgbuf) + zfree(out); return nwritten; } From 311b286d41c4e165befd15f63126c1ef4bfd06b1 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 15 Aug 2020 23:30:22 +0000 Subject: [PATCH 10/17] BUG: Cannot have an EMBSTR bigger than 255 Former-commit-id: e2d6e2d9d585cb7a73f469a8580f9cb0ec71a429 --- src/object.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/object.cpp b/src/object.cpp index 382b40c02..fbbbae339 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -94,6 +94,7 @@ robj *createRawStringObject(const char *ptr, size_t len) { * an object where the sds string is actually an unmodifiable string * allocated in the same chunk as the object itself. */ robj *createEmbeddedStringObject(const char *ptr, size_t len) { + serverAssert(len <= UINT8_MAX); size_t allocsize = sizeof(struct sdshdr8)+len+1; if (allocsize < sizeof(void*)) allocsize = sizeof(void*); @@ -136,10 +137,10 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { #ifdef ENABLE_MVCC #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 #else -#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 256 +#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 56 #endif -//static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); +static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); robj *createStringObject(const char *ptr, size_t len) { if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) return createEmbeddedStringObject(ptr,len); From cf4e74006fb6e38cab1e674c527ff6f7b316235a Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Aug 2020 00:13:19 +0000 Subject: [PATCH 11/17] Don't free snapshot objects in a critical path (under the AE lock) Former-commit-id: d0da3d3cb74334cc8a2d14f4bdaef7935181700a --- src/server.cpp | 44 ++++++++++++++++++++++++++++++++++++-------- src/server.h | 6 +++++- src/snapshot.cpp | 48 ++++++++++++++++++++++++++++++++++++------------ 3 files changed, 77 insertions(+), 21 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index f72047c31..b98a07cc4 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2040,6 +2040,29 @@ void flushStorageWeak() } } +void freeSnapshotLazyFreesAsync() +{ + aeAcquireLock(); + std::vector vecObjs = std::move(g_pserver->vecobjLazyFree); + std::vector vecDicts = std::move(g_pserver->vecdictLazyFree); + std::vector> vecvecde = std::move(g_pserver->vecvecde); + aeReleaseLock(); + + for (auto &vecdeFree : vecvecde) + { + for (auto *de : vecdeFree) + { + dbDictType.keyDestructor(nullptr, dictGetKey(de)); + dbDictType.valDestructor(nullptr, dictGetVal(de)); + zfree(de); + } + } + for (robj *o : vecObjs) + decrRefCount(o); + for (dict *d : vecDicts) + dictRelease(d); +} + /* This is our timer interrupt, called g_pserver->hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: @@ -2313,14 +2336,18 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } - bool fAnySnapshots = false; - for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb) - fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot(); - if (fAnySnapshots) - { - g_pserver->asyncworkqueue->AddWorkFunction([]{ - g_pserver->db[0]->consolidate_snapshot(); - }, true /*HiPri*/); + run_with_period(100) { + bool fAsyncFrees = g_pserver->vecobjLazyFree.size() || g_pserver->vecdictLazyFree.size() || g_pserver->vecvecde.size(); + bool fAnySnapshots = false; + for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots && !fAsyncFrees; ++idb) + fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot(); + if (fAnySnapshots || fAsyncFrees) + { + g_pserver->asyncworkqueue->AddWorkFunction([fAsyncFrees]{ + g_pserver->db[0]->consolidate_snapshot(); + freeSnapshotLazyFreesAsync(); + }, true /*HiPri*/); + } } /* Fire the cron loop modules event. */ @@ -6049,6 +6076,7 @@ int main(int argc, char **argv) { serverAssert(fLockAcquired); g_pserver->garbageCollector.shutdown(); + freeSnapshotLazyFreesAsync(); delete g_pserver->m_pstorageFactory; return 0; diff --git a/src/server.h b/src/server.h index ce521b2b6..a84368044 100644 --- a/src/server.h +++ b/src/server.h @@ -1425,7 +1425,7 @@ protected: static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot); int snapshot_depth() const; void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce); - void freeTombstoneObjects(int depth); + bool freeTombstoneObjects(int depth); public: bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } @@ -2452,6 +2452,10 @@ struct redisServer { char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */ char *bgsave_cpulist; /* cpu affinity list of bgsave process. */ + std::vector vecdictLazyFree; + std::vector vecobjLazyFree; + std::vector> vecvecde; + bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; } }; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 1eb8547ba..a0187f1c5 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -233,17 +233,23 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency); } -void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) +bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) { if (m_pdbSnapshot == nullptr) - return; + { + serverAssert(dictSize(m_pdictTombstone) == 0); + return true; + } - const_cast(m_pdbSnapshot)->freeTombstoneObjects(depth+1); + bool fPrevResult = const_cast(m_pdbSnapshot)->freeTombstoneObjects(depth+1); if (m_pdbSnapshot->m_refCount != depth && (m_pdbSnapshot->m_refCount != (m_refCount+1))) - return; + return false; dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; + std::vector vecdeFree; + vecdeFree.reserve(dictSize(m_pdictTombstone)); + bool fAllCovered = true; while ((de = dictNext(di)) != nullptr) { dictEntry **dePrev = nullptr; @@ -252,12 +258,28 @@ void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared(key)); if (deObj != nullptr) { - decrRefCount((robj*)dictGetVal(deObj)); - void *ptrSet = nullptr; - __atomic_store(&deObj->v.val, &ptrSet, __ATOMIC_RELAXED); + // Now unlink the DE + __atomic_store(dePrev, &deObj->next, __ATOMIC_RELEASE); + ht->used--; + vecdeFree.push_back(deObj); + } + else + { + fAllCovered = fPrevResult; } } dictReleaseIterator(di); + + aeAcquireLock(); + if (fAllCovered) + { + g_pserver->vecdictLazyFree.push_back(m_pdictTombstone); + m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr); + } + g_pserver->vecvecde.emplace_back(std::move(vecdeFree)); + aeReleaseLock(); + + return fAllCovered; } void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot) @@ -308,6 +330,8 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn // Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; + m_spdbSnapshotHOLDER->m_pdict->iterators++; + std::vector vecde; while ((de = dictNext(di)) != NULL) { dictEntry **dePrev; @@ -327,15 +351,15 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn } // Delete the object from the source dict, we don't use dictDelete to avoid a second search - dictFreeKey(m_spdbSnapshotHOLDER->m_pdict, deSnapshot); - dictFreeVal(m_spdbSnapshotHOLDER->m_pdict, deSnapshot); - serverAssert(*dePrev == deSnapshot); + vecde.push_back(deSnapshot); *dePrev = deSnapshot->next; - zfree(deSnapshot); ht->used--; } + g_pserver->vecvecde.emplace_back(std::move(vecde)); + m_spdbSnapshotHOLDER->m_pdict->iterators--; dictReleaseIterator(di); - dictEmpty(m_pdictTombstone, nullptr); + g_pserver->vecdictLazyFree.push_back(m_pdictTombstone); + m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr); // Stage 2 Move all new keys to the snapshot DB dictMerge(m_spdbSnapshotHOLDER->m_pdict, m_pdict); From 3093d9e1173d706caa3b4dd049f7ff33382a78ea Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Aug 2020 00:24:18 +0000 Subject: [PATCH 12/17] Only save MVCC tstamps if its an active replica Former-commit-id: 5aded7dbad055308dc3932f7797fc71684b60966 --- src/rdb.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 0781cb7b4..413a39b24 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1097,8 +1097,10 @@ int rdbSaveKeyValuePair(rio *rdb, robj_roptr key, robj_roptr val, const expireEn char szT[32]; #ifdef ENABLE_MVCC - snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); - if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; + if (g_pserver->fActiveReplica) { + snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); + if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; + } #endif /* Save type, key, value */ From 4a356ac13fa456033b25e7b97201e1f82ce2a740 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Aug 2020 00:26:05 +0000 Subject: [PATCH 13/17] Don't try and consolidate snapshots with a depth of 1 Former-commit-id: 26c298bd9bc4e2c6981de5c20284120ea54580c3 --- src/server.h | 3 +-- src/snapshot.cpp | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/server.h b/src/server.h index a84368044..5c8c79a83 100644 --- a/src/server.h +++ b/src/server.h @@ -1421,13 +1421,12 @@ class redisDbPersistentDataSnapshot : protected redisDbPersistentData { friend class redisDbPersistentData; protected: - bool m_fConsolidated = false; static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot); - int snapshot_depth() const; void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce); bool freeTombstoneObjects(int depth); public: + int snapshot_depth() const; bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } bool iterate_threadsafe(std::function fn, bool fKeyOnly = false, bool fCacheOnly = false) const; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index a0187f1c5..c0ca378b1 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -571,11 +571,12 @@ void redisDbPersistentData::consolidate_snapshot() { aeAcquireLock(); auto psnapshot = (m_pdbSnapshot != nullptr) ? m_spdbSnapshotHOLDER.get() : nullptr; - if (psnapshot == nullptr) + if (psnapshot == nullptr || psnapshot->snapshot_depth() == 0) { aeReleaseLock(); return; } + psnapshot->m_refCount++; // ensure it's not free'd aeReleaseLock(); psnapshot->consolidate_children(this, false /* fForce */); @@ -648,7 +649,6 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData * serverLog(LL_VERBOSE, "cleaned %d snapshots", snapshot_depth()-1); spdb->m_refCount = depth; - spdb->m_fConsolidated = true; // Drop our refs from this snapshot and its children psnapshotT = this; std::vector vecT; From 00934cd3418aed34df9fb6ac4e9cc4fd565a7cec Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Aug 2020 00:27:10 +0000 Subject: [PATCH 14/17] Add snapshot depth to info string Former-commit-id: 66aef678e9d6d7ab6c28622d54ada0ea8cdd2a99 --- src/server.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/server.cpp b/src/server.cpp index b98a07cc4..bc9ee1e15 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -5187,12 +5187,20 @@ sds genRedisInfoString(const char *section) { } if (allsections || defsections || !strcasecmp(section,"keydb")) { + // Compute the MVCC depth + int mvcc_depth = 0; + for (int idb = 0; idb < cserver.dbnum; ++idb) { + mvcc_depth = std::max(mvcc_depth, g_pserver->db[idb]->snapshot_depth()); + } + if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# KeyDB\r\n" "variant:pro\r\n" - "license_status:%s\r\n", - cserver.license_key ? "OK" : "Trial" + "license_status:%s\r\n" + "mvcc_depth:%d\r\n", + cserver.license_key ? "OK" : "Trial", + mvcc_depth ); } From c0586b3aeda15038a3865ecbd445be97615efa26 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Aug 2020 00:32:48 +0000 Subject: [PATCH 15/17] Allow garbage collection of generic data Former-commit-id: feadb7fb1845027422bcfca43dbcb6097409b8dc --- src/AsyncWorkQueue.cpp | 2 +- src/gc.h | 6 +++++ src/server.cpp | 10 ++++---- src/server.h | 56 ++++++++++++++++++++++++++++++++++++++++-- 4 files changed, 66 insertions(+), 8 deletions(-) diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index be85f5ac2..48252ac97 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -45,7 +45,7 @@ void AsyncWorkQueue::WorkerThreadMain() ProcessPendingAsyncWrites(); aeReleaseLock(); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); - serverTL->gcEpoch = 0; + serverTL->gcEpoch.reset(); } listRelease(vars.clients_pending_asyncwrite); diff --git a/src/gc.h b/src/gc.h index d2c4066f7..5c0596963 100644 --- a/src/gc.h +++ b/src/gc.h @@ -3,6 +3,12 @@ #include #include +struct ICollectable +{ + virtual ~ICollectable() {} + bool FWillFreeChildDebug() { return false; } +}; + template class GarbageCollector { diff --git a/src/server.cpp b/src/server.cpp index bc9ee1e15..94477a886 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2505,17 +2505,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) { latencyAddSampleIfNeeded("storage-commit", commit_latency); handleClientsWithPendingWrites(iel, aof_state); - if (serverTL->gcEpoch != 0) + if (!serverTL->gcEpoch.isReset()) g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); - serverTL->gcEpoch = 0; + serverTL->gcEpoch.reset(); aeAcquireLock(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(iel); - if (serverTL->gcEpoch != 0) + if (!serverTL->gcEpoch.isReset()) g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); - serverTL->gcEpoch = 0; + serverTL->gcEpoch.reset(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this @@ -2531,7 +2531,7 @@ void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/); - serverAssert(serverTL->gcEpoch == 0); + serverAssert(serverTL->gcEpoch.isReset()); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); aeAcquireLock(); for (int idb = 0; idb < cserver.dbnum; ++idb) diff --git a/src/server.h b/src/server.h index 5c8c79a83..07f7a4010 100644 --- a/src/server.h +++ b/src/server.h @@ -1959,6 +1959,58 @@ struct clusterState; #define MAX_EVENT_LOOPS 16 #define IDX_EVENT_LOOP_MAIN 0 +class GarbageCollectorCollection +{ + GarbageCollector garbageCollectorSnapshot; + GarbageCollector garbageCollectorGeneric; + +public: + struct Epoch + { + uint64_t epochSnapshot = 0; + uint64_t epochGeneric = 0; + + void reset() { + epochSnapshot = 0; + epochGeneric = 0; + } + + bool isReset() const { + return epochSnapshot == 0 && epochGeneric == 0; + } + }; + + Epoch startEpoch() + { + Epoch e; + e.epochSnapshot = garbageCollectorSnapshot.startEpoch(); + e.epochGeneric = garbageCollectorGeneric.startEpoch(); + return e; + } + + void endEpoch(Epoch e, bool fNoFree = false) + { + garbageCollectorSnapshot.endEpoch(e.epochSnapshot, fNoFree); + garbageCollectorGeneric.endEpoch(e.epochGeneric, fNoFree); + } + + void shutdown() + { + garbageCollectorSnapshot.shutdown(); + garbageCollectorGeneric.shutdown(); + } + + void enqueue(Epoch e, std::unique_ptr &&sp) + { + garbageCollectorSnapshot.enqueue(e.epochSnapshot, std::move(sp)); + } + + void enqueue(Epoch e, std::unique_ptr &&sp) + { + garbageCollectorGeneric.enqueue(e.epochGeneric, std::move(sp)); + } +}; + // Per-thread variabels that may be accessed without a lock struct redisServerThreadVars { aeEventLoop *el; @@ -1980,7 +2032,7 @@ struct redisServerThreadVars { struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; - uint64_t gcEpoch = 0; + GarbageCollectorCollection::Epoch gcEpoch; const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr; bool fRetrySetAofEvent = false; @@ -2434,7 +2486,7 @@ struct redisServer { /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */ - GarbageCollector garbageCollector; + GarbageCollectorCollection garbageCollector; IStorageFactory *m_pstorageFactory = nullptr; int storage_flush_period; // The time between flushes in the CRON job From e8e7061a73cd9c33c8e17fb452c42b81fcc116e8 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Aug 2020 00:33:37 +0000 Subject: [PATCH 16/17] Fast cleanup of snapshots without leaving them forever Former-commit-id: fdd83b2b49244ed2988b080892ee5cffe9fd2684 --- src/dict.cpp | 2 + src/server.cpp | 53 ++++++---------- src/server.h | 7 +-- src/snapshot.cpp | 153 ++++++++++++++++++++++++++++++++++++----------- 4 files changed, 143 insertions(+), 72 deletions(-) diff --git a/src/dict.cpp b/src/dict.cpp index 86a29db4a..2c254b313 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -200,6 +200,8 @@ int dictMerge(dict *dst, dict *src) std::swap(dst->iterators, src->iterators); } + src->rehashidx = -1; + if (!dictIsRehashing(dst) && !dictIsRehashing(src)) { if (dst->ht[0].size >= src->ht[0].size) diff --git a/src/server.cpp b/src/server.cpp index 94477a886..2d47cab05 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2040,29 +2040,6 @@ void flushStorageWeak() } } -void freeSnapshotLazyFreesAsync() -{ - aeAcquireLock(); - std::vector vecObjs = std::move(g_pserver->vecobjLazyFree); - std::vector vecDicts = std::move(g_pserver->vecdictLazyFree); - std::vector> vecvecde = std::move(g_pserver->vecvecde); - aeReleaseLock(); - - for (auto &vecdeFree : vecvecde) - { - for (auto *de : vecdeFree) - { - dbDictType.keyDestructor(nullptr, dictGetKey(de)); - dbDictType.valDestructor(nullptr, dictGetVal(de)); - zfree(de); - } - } - for (robj *o : vecObjs) - decrRefCount(o); - for (dict *d : vecDicts) - dictRelease(d); -} - /* This is our timer interrupt, called g_pserver->hz times per second. * Here is where we do a number of things that need to be done asynchronously. * For instance: @@ -2243,11 +2220,22 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { CONFIG_BGSAVE_RETRY_DELAY || g_pserver->lastbgsave_status == C_OK)) { - serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", - sp->changes, (int)sp->seconds); - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSaveBackground(rsiptr); + // Ensure rehashing is complete + bool fRehashInProgress = false; + if (g_pserver->activerehashing) { + for (int idb = 0; idb < cserver.dbnum && !fRehashInProgress; ++idb) { + if (g_pserver->db[idb]->FRehashing()) + fRehashInProgress = true; + } + } + + if (!fRehashInProgress) { + serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", + sp->changes, (int)sp->seconds); + rdbSaveInfo rsi, *rsiptr; + rsiptr = rdbPopulateSaveInfo(&rsi); + rdbSaveBackground(rsiptr); + } break; } } @@ -2337,15 +2325,13 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } run_with_period(100) { - bool fAsyncFrees = g_pserver->vecobjLazyFree.size() || g_pserver->vecdictLazyFree.size() || g_pserver->vecvecde.size(); bool fAnySnapshots = false; - for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots && !fAsyncFrees; ++idb) + for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb) fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot(); - if (fAnySnapshots || fAsyncFrees) + if (fAnySnapshots) { - g_pserver->asyncworkqueue->AddWorkFunction([fAsyncFrees]{ + g_pserver->asyncworkqueue->AddWorkFunction([]{ g_pserver->db[0]->consolidate_snapshot(); - freeSnapshotLazyFreesAsync(); }, true /*HiPri*/); } } @@ -6084,7 +6070,6 @@ int main(int argc, char **argv) { serverAssert(fLockAcquired); g_pserver->garbageCollector.shutdown(); - freeSnapshotLazyFreesAsync(); delete g_pserver->m_pstorageFactory; return 0; diff --git a/src/server.h b/src/server.h index 07f7a4010..193004b52 100644 --- a/src/server.h +++ b/src/server.h @@ -1420,6 +1420,9 @@ private: class redisDbPersistentDataSnapshot : protected redisDbPersistentData { friend class redisDbPersistentData; +private: + bool iterate_threadsafe_core(std::function &fn, bool fKeyOnly, bool fCacheOnly, bool fTop) const; + protected: static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot); void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce); @@ -2503,10 +2506,6 @@ struct redisServer { char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */ char *bgsave_cpulist; /* cpu affinity list of bgsave process. */ - std::vector vecdictLazyFree; - std::vector vecobjLazyFree; - std::vector> vecvecde; - bool FRdbSaveInProgress() const { return rdbThreadVars.fRdbThreadActive; } }; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index c0ca378b1..0f84e483e 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -2,6 +2,29 @@ #include "aelocker.h" static const size_t c_elementsSmallLimit = 500000; +static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time + +class LazyFree : public ICollectable +{ +public: + virtual ~LazyFree() + { + for (auto *de : vecde) + { + dbDictType.keyDestructor(nullptr, dictGetKey(de)); + dbDictType.valDestructor(nullptr, dictGetVal(de)); + zfree(de); + } + for (robj *o : vecobjLazyFree) + decrRefCount(o); + for (dict *d : vecdictLazyFree) + dictRelease(d); + } + + std::vector vecdictLazyFree; + std::vector vecobjLazyFree; + std::vector vecde; +}; const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) { @@ -189,7 +212,18 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot) { mstime_t latency; - aeAcquireLock(); latencyStartMonitor(latency); + + aeAcquireLock(); + while (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) { + dictRehashMilliseconds(m_pdict, 1); + dictRehashMilliseconds(m_pdictTombstone, 1); + // Give someone else a chance + aeReleaseLock(); + usleep(300); + aeAcquireLock(); + } + + latencyStartMonitor(latency); if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint) { // Free a stale async snapshot so consolidate_children can clean it up later @@ -215,11 +249,22 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot auto psnapshotT = createSnapshot(LLONG_MAX, false); endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref psnapshot = nullptr; - aeReleaseLock(); latencyEndMonitor(latency); + + latencyEndMonitor(latency); latencyAddSampleIfNeeded("end-snapshot-async-phase-1", latency); + aeReleaseLock(); // do the expensive work of merging snapshots outside the ref - const_cast(psnapshotT)->freeTombstoneObjects(1); // depth is one because we just creted it + if (const_cast(psnapshotT)->freeTombstoneObjects(1)) // depth is one because we just creted it + { + aeAcquireLock(); + if (m_pdbSnapshotASYNC != nullptr) + endSnapshot(m_pdbSnapshotASYNC); + m_pdbSnapshotASYNC = nullptr; + endSnapshot(psnapshotT); + aeReleaseLock(); + return; + } const_cast(psnapshotT)->consolidate_children(this, true); // Final Cleanup @@ -228,9 +273,10 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot m_pdbSnapshotASYNC = psnapshotT; else endSnapshot(psnapshotT); // finally clean up our temp snapshot - aeReleaseLock(); latencyEndMonitor(latency); - + + latencyEndMonitor(latency); latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency); + aeReleaseLock(); } bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) @@ -241,45 +287,66 @@ bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) return true; } - bool fPrevResult = const_cast(m_pdbSnapshot)->freeTombstoneObjects(depth+1); + if (!const_cast(m_pdbSnapshot)->freeTombstoneObjects(depth+1)) + return false; + + { + AeLocker ae; + ae.arm(nullptr); if (m_pdbSnapshot->m_refCount != depth && (m_pdbSnapshot->m_refCount != (m_refCount+1))) return false; + ae.disarm(); + } + + std::unique_lock lock(s_lock, std::defer_lock); + if (!lock.try_lock()) + return false; // this is a best effort function + std::unique_ptr splazy = std::make_unique(); + + dict *dictTombstoneNew = dictCreate(&dbTombstoneDictType, nullptr); dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; std::vector vecdeFree; vecdeFree.reserve(dictSize(m_pdictTombstone)); - bool fAllCovered = true; + unsigned rgcremoved[2] = {0}; while ((de = dictNext(di)) != nullptr) { dictEntry **dePrev = nullptr; dictht *ht = nullptr; sds key = (sds)dictGetKey(de); - dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared(key)); + // BUG BUG: Why can't we do a shallow search here? + dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, false); + if (deObj != nullptr) { // Now unlink the DE __atomic_store(dePrev, &deObj->next, __ATOMIC_RELEASE); - ht->used--; - vecdeFree.push_back(deObj); - } - else - { - fAllCovered = fPrevResult; + if (ht == &m_pdbSnapshot->m_pdict->ht[0]) + rgcremoved[0]++; + else + rgcremoved[1]++; + splazy->vecde.push_back(deObj); + } else { + serverAssert(dictFind(m_pdbSnapshot->m_pdict, key) == nullptr); + serverAssert(m_pdbSnapshot->find_cached_threadsafe(key) != nullptr); + dictAdd(dictTombstoneNew, sdsdupshared((sds)dictGetKey(de)), dictGetVal(de)); } } dictReleaseIterator(di); + dictForceRehash(dictTombstoneNew); aeAcquireLock(); - if (fAllCovered) - { - g_pserver->vecdictLazyFree.push_back(m_pdictTombstone); - m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr); - } - g_pserver->vecvecde.emplace_back(std::move(vecdeFree)); + dict *dT = m_pdbSnapshot->m_pdict; + splazy->vecdictLazyFree.push_back(m_pdictTombstone); + __atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE); + __atomic_fetch_sub(&dT->ht[0].used, rgcremoved[0], __ATOMIC_RELEASE); + __atomic_fetch_sub(&dT->ht[1].used, rgcremoved[1], __ATOMIC_RELEASE); + serverLog(LL_WARNING, "tombstones removed: %u, remain: %lu", rgcremoved[0]+rgcremoved[1], dictSize(m_pdictTombstone)); + g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy)); aeReleaseLock(); - return fAllCovered; + return true; } void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot) @@ -331,16 +398,19 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; m_spdbSnapshotHOLDER->m_pdict->iterators++; - std::vector vecde; + auto splazy = std::make_unique(); while ((de = dictNext(di)) != NULL) { dictEntry **dePrev; dictht *ht; - dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, !!sdsisshared((sds)dictGetKey(de))); + // BUG BUG Why not a shallow search? + dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, false /*!!sdsisshared((sds)dictGetKey(de))*/); if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot) { // The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt) +#ifdef CHECKED_BUILD serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr); +#endif dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr); continue; } @@ -351,14 +421,15 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn } // Delete the object from the source dict, we don't use dictDelete to avoid a second search - vecde.push_back(deSnapshot); + splazy->vecde.push_back(deSnapshot); *dePrev = deSnapshot->next; ht->used--; } - g_pserver->vecvecde.emplace_back(std::move(vecde)); + + m_spdbSnapshotHOLDER->m_pdict->iterators--; dictReleaseIterator(di); - g_pserver->vecdictLazyFree.push_back(m_pdictTombstone); + splazy->vecdictLazyFree.push_back(m_pdictTombstone); m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr); // Stage 2 Move all new keys to the snapshot DB @@ -388,8 +459,10 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn auto spsnapshotFree = std::move(m_spdbSnapshotHOLDER); m_spdbSnapshotHOLDER = std::move(spsnapshotFree->m_spdbSnapshotHOLDER); - if (serverTL != nullptr) + if (serverTL != nullptr) { g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(spsnapshotFree)); + g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy)); + } // Sanity Checks serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr); @@ -426,8 +499,10 @@ dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOn dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const { + dict *dictTombstone; + __atomic_load(&m_pdictTombstone, &dictTombstone, __ATOMIC_ACQUIRE); dictEntry *de = dictFind(m_pdict, key); - if (de == nullptr && m_pdbSnapshot != nullptr && dictFind(m_pdictTombstone, key) == nullptr) + if (de == nullptr && m_pdbSnapshot != nullptr && dictFind(dictTombstone, key) == nullptr) { auto itr = m_pdbSnapshot->find_cached_threadsafe(key); if (itr != nullptr) @@ -493,11 +568,20 @@ unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long itera } bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn, bool fKeyOnly, bool fCacheOnly) const +{ + return iterate_threadsafe_core(fn, fKeyOnly, fCacheOnly, true); +} + +bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function &fn, bool fKeyOnly, bool fCacheOnly, bool fFirst) const { // Take the size so we can ensure we visited every element exactly once // use volatile to ensure it's not checked too late. This makes it more // likely we'll detect races (but it won't gurantee it) + aeAcquireLock(); + dict *dictTombstone; + __atomic_load(&m_pdictTombstone, &dictTombstone, __ATOMIC_ACQUIRE); volatile ssize_t celem = (ssize_t)size(); + aeReleaseLock(); dictEntry *de = nullptr; bool fResult = true; @@ -543,19 +627,22 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functioniterate_threadsafe([this, &fn, &celem](const char *key, robj_roptr o) { - dictEntry *deTombstone = dictFind(m_pdictTombstone, key); + std::function fnNew = [this, &fn, &celem, dictTombstone](const char *key, robj_roptr o) { + dictEntry *deTombstone = dictFind(dictTombstone, key); if (deTombstone != nullptr) return true; // Alright it's a key in the use keyspace, lets ensure it and then pass it off --celem; return fn(key, o); - }, fKeyOnly, fCacheOnly); + }; + fResult = psnapshot->iterate_threadsafe_core(fnNew, fKeyOnly, fCacheOnly, false); } // we should have hit all keys or had a good reason not to - serverAssert(!fResult || celem == 0 || (m_spstorage && fCacheOnly)); + if (!(!fResult || celem == 0 || (m_spstorage && fCacheOnly))) + serverLog(LL_WARNING, "celem: %ld", celem); + serverAssert(!fResult || celem == 0 || (m_spstorage && fCacheOnly) || !fFirst); return fResult; } @@ -588,8 +675,6 @@ void redisDbPersistentData::consolidate_snapshot() // only call this on the "real" database to consolidate the first child void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce) { - static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time - std::unique_lock lock(s_lock, std::defer_lock); if (!lock.try_lock()) return; // this is a best effort function From 93a67abe523a68a9de4663c442868a40ed3ca7e6 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Aug 2020 02:25:45 +0000 Subject: [PATCH 17/17] Fix ARM build of RocksDB Former-commit-id: 7301d6d5e76b65115e2610a5fc5afee443782d7c --- deps/Makefile | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/deps/Makefile b/deps/Makefile index b2c021227..c1c30ae54 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -1,6 +1,7 @@ # Redis dependency Makefile uname_S:= $(shell sh -c 'uname -s 2>/dev/null || echo not') +uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not') CCCOLOR="\033[34m" LINKCOLOR="\033[34;1m" @@ -94,6 +95,10 @@ jemalloc: .make-prerequisites rocksdb: .make-prerequisites @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) +ifeq ($(uname_M),x86_64) cd rocksdb && PORTABLE=1 USE_SSE=1 FORCE_SSE42=1 $(MAKE) static_lib +else + cd rocksdb && PORTABLE=1 $(MAKE) static_lib +endif .PHONY: rocksdb