diff --git a/README.md b/README.md index aa10a47dc..0c91bd76d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,5 @@ ![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg) ![CI](https://github.com/JohnSully/KeyDB/workflows/CI/badge.svg?branch=unstable) -[![Join the chat at https://gitter.im/KeyDB/community](https://badges.gitter.im/KeyDB/community.svg)](https://gitter.im/KeyDB/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![StackShare](http://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/eq-alpha-technology-inc/eq-alpha-technology-inc) ##### New! Want to extend KeyDB with Javascript? Try [ModJS](https://github.com/JohnSully/ModJS) diff --git a/deps/Makefile b/deps/Makefile index b2c021227..c1c30ae54 100644 --- a/deps/Makefile +++ b/deps/Makefile @@ -1,6 +1,7 @@ # Redis dependency Makefile uname_S:= $(shell sh -c 'uname -s 2>/dev/null || echo not') +uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not') CCCOLOR="\033[34m" LINKCOLOR="\033[34;1m" @@ -94,6 +95,10 @@ jemalloc: .make-prerequisites rocksdb: .make-prerequisites @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) +ifeq ($(uname_M),x86_64) cd rocksdb && PORTABLE=1 USE_SSE=1 FORCE_SSE42=1 $(MAKE) static_lib +else + cd rocksdb && PORTABLE=1 $(MAKE) static_lib +endif .PHONY: rocksdb diff --git a/src/AsyncWorkQueue.cpp b/src/AsyncWorkQueue.cpp index be85f5ac2..48252ac97 100644 --- a/src/AsyncWorkQueue.cpp +++ b/src/AsyncWorkQueue.cpp @@ -45,7 +45,7 @@ void AsyncWorkQueue::WorkerThreadMain() ProcessPendingAsyncWrites(); aeReleaseLock(); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); - serverTL->gcEpoch = 0; + serverTL->gcEpoch.reset(); } listRelease(vars.clients_pending_asyncwrite); diff --git a/src/Makefile b/src/Makefile index ce3ab9ab9..d21a2052f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -47,6 +47,11 @@ endif USEASM?=true +ifeq ($(NOMVCC),) + CFLAGS+= -DENABLE_MVCC + CXXFLAGS+= -DENABLE_MVCC +endif + ifneq ($(SANITIZE),) CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE diff --git a/src/ae.cpp b/src/ae.cpp index cccf2a130..c67ff803e 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -261,9 +261,11 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, if (fSynchronous) { + { std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; + } delete cmd.pctl; } @@ -315,9 +317,11 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch int ret = AE_OK; if (fSynchronous) { + { std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; + } delete cmd.pctl; } return ret; diff --git a/src/cluster.cpp b/src/cluster.cpp index 3a20c9897..a9e1cc9ef 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -4920,9 +4920,11 @@ void createDumpPayload(rio *payload, robj_roptr o, robj *key) { rioInitWithBuffer(payload,sdsempty()); serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObject(payload,o,key)); +#ifdef ENABLE_MVCC char szT[32]; snprintf(szT, 32, "%" PRIu64, o->mvcc_tstamp); serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1); +#endif /* Write the footer, this is how it looks like: * ----------------+---------------------+---------------+ @@ -5064,9 +5066,11 @@ void restoreCommand(client *c) { decrRefCount(auxkey); goto eoferr; } +#ifdef ENABLE_MVCC if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); } +#endif decrRefCount(auxkey); decrRefCount(auxval); } diff --git a/src/db.cpp b/src/db.cpp index b7b36b518..1cc7e66f7 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -91,7 +91,9 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) { robj *val = itr.val(); lookupKeyUpdateObj(val, flags); if (flags & LOOKUP_UPDATEMVCC) { +#ifdef ENABLE_MVCC val->mvcc_tstamp = getMvccTstamp(); +#endif db->trackkey(key, true /* fUpdate */); } return val; @@ -218,8 +220,10 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { bool dbAddCore(redisDb *db, robj *key, robj *val, bool fAssumeNew = false) { serverAssert(!val->FExpires()); sds copy = sdsdupshared(szFromObj(key)); +#ifdef ENABLE_MVCC if (g_pserver->fActiveReplica) val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); +#endif bool fInserted = db->insert(copy, val, fAssumeNew); @@ -270,7 +274,9 @@ void redisDb::dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpd if (fUpdateMvcc) { if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) val = dupStringObject(val); +#ifdef ENABLE_MVCC val->mvcc_tstamp = getMvccTstamp(); +#endif } if (g_pserver->lazyfree_lazy_server_del) @@ -303,13 +309,15 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) if (itr == nullptr) return (dbAddCore(db, key, val) == true); +#ifdef ENABLE_MVCC robj *old = itr.val(); if (old->mvcc_tstamp <= val->mvcc_tstamp) { db->dbOverwriteCore(itr, key, val, false, true); return true; } - +#endif + return false; } else @@ -330,6 +338,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) * The client 'c' argument may be set to NULL if the operation is performed * in a context where there is no clear client performing the operation. */ void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) { + db->prepOverwriteForSnapshot(szFromObj(key)); if (!dbAddCore(db, key, val)) { dbOverwrite(db, key, val, !keepttl); } @@ -421,8 +430,9 @@ bool redisDbPersistentData::syncDelete(robj *key) auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)); if (itr != nullptr) { - sds keyTombstone = sdsdup(szFromObj(key)); - if (dictAdd(m_pdictTombstone, keyTombstone, nullptr) != DICT_OK) + sds keyTombstone = sdsdupshared(itr.key()); + uint64_t hash = dictGetHash(m_pdict, keyTombstone); + if (dictAdd(m_pdictTombstone, keyTombstone, (void*)hash) != DICT_OK) sdsfree(keyTombstone); } } @@ -2290,7 +2300,7 @@ void redisDbPersistentData::initialize() { m_pdbSnapshot = nullptr; m_pdict = dictCreate(&dbDictType,this); - m_pdictTombstone = dictCreate(&dbDictType,this); + m_pdictTombstone = dictCreate(&dbTombstoneDictType,this); m_setexpire = new(MALLOC_LOCAL) expireset(); m_fAllChanged = 0; m_fTrackingChanges = 0; @@ -2349,6 +2359,24 @@ bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew) return (res == DICT_OK); } +// This is a performance tool to prevent us copying over an object we're going to overwrite anyways +void redisDbPersistentData::prepOverwriteForSnapshot(char *key) +{ + if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) + return; + + if (m_pdbSnapshot != nullptr) + { + auto itr = m_pdbSnapshot->find_cached_threadsafe(key); + if (itr.key() != nullptr) + { + sds keyNew = sdsdupshared(itr.key()); + if (dictAdd(m_pdictTombstone, keyNew, (void*)dictHashKey(m_pdict, key)) != DICT_OK) + sdsfree(keyNew); + } + } +} + void redisDbPersistentData::tryResize() { if (htNeedsResize(m_pdict)) @@ -2470,15 +2498,20 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) sdsfree(strT); dictAdd(m_pdict, keyNew, objNew); serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); +#ifdef ENABLE_MVCC serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp); +#endif } } else { dictAdd(m_pdict, keyNew, nullptr); } - *pde = dictFind(m_pdict, sdsKey); - dictAdd(m_pdictTombstone, sdsdupshared(itr.key()), nullptr); + uint64_t hash = dictGetHash(m_pdict, sdsKey); + dictEntry **deT; + dictht *ht; + *pde = dictFindWithPrev(m_pdict, sdsKey, hash, &deT, &ht); + dictAdd(m_pdictTombstone, sdsdupshared(itr.key()), (void*)hash); } } diff --git a/src/dict.cpp b/src/dict.cpp index ef7365fdb..2c254b313 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -179,6 +179,9 @@ int dictExpand(dict *d, unsigned long size) int dictMerge(dict *dst, dict *src) { +#define MERGE_BLOCK_SIZE 4 + dictEntry *rgdeT[MERGE_BLOCK_SIZE]; + assert(dst != src); if (dictSize(src) == 0) return DICT_OK; @@ -197,6 +200,8 @@ int dictMerge(dict *dst, dict *src) std::swap(dst->iterators, src->iterators); } + src->rehashidx = -1; + if (!dictIsRehashing(dst) && !dictIsRehashing(src)) { if (dst->ht[0].size >= src->ht[0].size) @@ -210,6 +215,50 @@ int dictMerge(dict *dst, dict *src) } _dictReset(&src->ht[0]); dst->rehashidx = 0; + assert(dictIsRehashing(dst)); + assert((dictSize(src)+dictSize(dst)) == expectedSize); + return DICT_OK; + } + + if (!dictIsRehashing(src) && dictSize(src) > 0 && + (src->ht[0].size == dst->ht[0].size || src->ht[0].size == dst->ht[1].size)) + { + auto &htDst = (src->ht[0].size == dst->ht[0].size) ? dst->ht[0] : dst->ht[1]; + + assert(src->ht[0].size == htDst.size); + for (size_t ide = 0; ide < src->ht[0].size; ide += MERGE_BLOCK_SIZE) + { + if (src->ht[0].used == 0) + break; + + for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) { + rgdeT[dde] = src->ht[0].table[ide + dde]; + src->ht[0].table[ide + dde] = nullptr; + } + + for (;;) { + bool fAnyFound = false; + for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) { + if (rgdeT[dde] == nullptr) + continue; + dictEntry *deNext = rgdeT[dde]->next; + rgdeT[dde]->next = htDst.table[ide+dde]; + htDst.table[ide+dde] = rgdeT[dde]; + rgdeT[dde] = deNext; + htDst.used++; + src->ht[0].used--; + + fAnyFound = fAnyFound || (deNext != nullptr); + } + + if (!fAnyFound) + break; + } + } + // If we copied to the base hash table of a rehashing dst, reset the rehash + if (dictIsRehashing(dst) && src->ht[0].size == dst->ht[0].size) + dst->rehashidx = 0; + assert(dictSize(src) == 0); assert((dictSize(src)+dictSize(dst)) == expectedSize); return DICT_OK; } @@ -218,10 +267,34 @@ int dictMerge(dict *dst, dict *src) auto &htDst = dictIsRehashing(dst) ? dst->ht[1] : dst->ht[0]; for (int iht = 0; iht < 2; ++iht) { - for (size_t ide = 0; ide < src->ht[iht].size; ++ide) + for (size_t ide = 0; ide < src->ht[iht].size; ide += MERGE_BLOCK_SIZE) { if (src->ht[iht].used == 0) break; + + for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) { + rgdeT[dde] = src->ht[iht].table[ide + dde]; + src->ht[iht].table[ide + dde] = nullptr; + } + + for (;;) { + bool fAnyFound = false; + for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) { + if (rgdeT[dde] == nullptr) + continue; + uint64_t h = dictHashKey(dst, rgdeT[dde]->key) & htDst.sizemask; + dictEntry *deNext = rgdeT[dde]->next; + rgdeT[dde]->next = htDst.table[h]; + htDst.table[h] = rgdeT[dde]; + rgdeT[dde] = deNext; + htDst.used++; + src->ht[iht].used--; + fAnyFound = fAnyFound || (deNext != nullptr); + } + if (!fAnyFound) + break; + } +#if 0 dictEntry *de = src->ht[iht].table[ide]; src->ht[iht].table[ide] = nullptr; while (de != nullptr) @@ -236,6 +309,7 @@ int dictMerge(dict *dst, dict *src) de = deNext; src->ht[iht].used--; } +#endif } } assert((dictSize(src)+dictSize(dst)) == expectedSize); @@ -326,7 +400,7 @@ int dictRehashMilliseconds(dict *d, int ms) { static void _dictRehashStep(dict *d) { unsigned long iterators; __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); - if (iterators == 0) dictRehash(d,1); + if (iterators == 0) dictRehash(d,2); } /* Add an element to the target hash table */ @@ -541,21 +615,20 @@ void dictRelease(dict *d) zfree(d); } -dictEntry *dictFindWithPrev(dict *d, const void *key, dictEntry ***dePrevPtr, dictht **pht) +dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht, bool fShallowCompare) { dictEntry *he; - uint64_t h, idx, table; + uint64_t idx, table; if (dictSize(d) == 0) return NULL; /* dict is empty */ if (dictIsRehashing(d)) _dictRehashStep(d); - h = dictHashKey(d, key); for (table = 0; table <= 1; table++) { *pht = d->ht + table; idx = h & d->ht[table].sizemask; he = d->ht[table].table[idx]; *dePrevPtr = &d->ht[table].table[idx]; while(he) { - if (key==he->key || dictCompareKeys(d, key, he->key)) { + if (key==he->key || (!fShallowCompare && dictCompareKeys(d, key, he->key))) { return he; } *dePrevPtr = &he->next; @@ -570,7 +643,8 @@ dictEntry *dictFind(dict *d, const void *key) { dictEntry **deT; dictht *ht; - return dictFindWithPrev(d, key, &deT, &ht); + uint64_t h = dictHashKey(d, key); + return dictFindWithPrev(d, key, h, &deT, &ht); } void *dictFetchValue(dict *d, const void *key) { @@ -1220,7 +1294,9 @@ void dictGetStats(char *buf, size_t bufsize, dict *d) { void dictForceRehash(dict *d) { - while (dictIsRehashing(d)) _dictRehashStep(d); + unsigned long iterators; + __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); + while (iterators == 0 && dictIsRehashing(d)) _dictRehashStep(d); } /* ------------------------------- Benchmark ---------------------------------*/ diff --git a/src/dict.h b/src/dict.h index c9118d35f..13f9d2aaa 100644 --- a/src/dict.h +++ b/src/dict.h @@ -167,7 +167,7 @@ dictEntry *dictUnlink(dict *ht, const void *key); void dictFreeUnlinkedEntry(dict *d, dictEntry *he); void dictRelease(dict *d); dictEntry * dictFind(dict *d, const void *key); -dictEntry * dictFindWithPrev(dict *d, const void *key, dictEntry ***dePrevPtr, dictht **ht); +dictEntry * dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **ht, bool fShallowCompare = false); void *dictFetchValue(dict *d, const void *key); int dictResize(dict *d); dictIterator *dictGetIterator(dict *d); diff --git a/src/gc.h b/src/gc.h index d2c4066f7..5c0596963 100644 --- a/src/gc.h +++ b/src/gc.h @@ -3,6 +3,12 @@ #include #include +struct ICollectable +{ + virtual ~ICollectable() {} + bool FWillFreeChildDebug() { return false; } +}; + template class GarbageCollector { diff --git a/src/lazyfree.cpp b/src/lazyfree.cpp index 1ee49103a..b5f341ed3 100644 --- a/src/lazyfree.cpp +++ b/src/lazyfree.cpp @@ -62,7 +62,10 @@ bool redisDbPersistentData::asyncDelete(robj *key) { dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key)); if (de) { if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)) != nullptr) - dictAdd(m_pdictTombstone, sdsdup((sds)dictGetKey(de)), nullptr); + { + uint64_t hash = dictGetHash(m_pdict, szFromObj(key)); + dictAdd(m_pdictTombstone, sdsdup((sds)dictGetKey(de)), (void*)hash); + } robj *val = (robj*)dictGetVal(de); if (val->FExpires()) diff --git a/src/object.cpp b/src/object.cpp index a45a91db8..fbbbae339 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -46,7 +46,9 @@ robj *createObject(int type, void *ptr) { o->encoding = OBJ_ENCODING_RAW; o->m_ptr = ptr; o->setrefcount(1); +#ifdef ENABLE_MVCC o->mvcc_tstamp = OBJ_MVCC_INVALID; +#endif /* Set the LRU to the current lruclock (minutes resolution), or * alternatively the LFU counter. */ @@ -92,6 +94,7 @@ robj *createRawStringObject(const char *ptr, size_t len) { * an object where the sds string is actually an unmodifiable string * allocated in the same chunk as the object itself. */ robj *createEmbeddedStringObject(const char *ptr, size_t len) { + serverAssert(len <= UINT8_MAX); size_t allocsize = sizeof(struct sdshdr8)+len+1; if (allocsize < sizeof(void*)) allocsize = sizeof(void*); @@ -101,7 +104,9 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; o->setrefcount(1); +#ifdef ENABLE_MVCC o->mvcc_tstamp = OBJ_MVCC_INVALID; +#endif if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; @@ -129,7 +134,12 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { * * The current limit of 52 is chosen so that the biggest string object * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ +#ifdef ENABLE_MVCC #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 +#else +#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 56 +#endif + static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); robj *createStringObject(const char *ptr, size_t len) { if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) @@ -1316,10 +1326,12 @@ NULL * because we update the access time only * when the key is read or overwritten. */ addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast())); +#ifdef ENABLE_MVCC } else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) == nullptr) return; addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000); +#endif } else { addReplySubcommandSyntaxError(c); } @@ -1579,9 +1591,11 @@ robj *deserializeStoredObjectCore(const void *data, size_t cb) decrRefCount(auxkey); goto eoferr; } +#ifdef ENABLE_MVCC if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); } +#endif decrRefCount(auxkey); decrRefCount(auxval); } diff --git a/src/rdb.cpp b/src/rdb.cpp index 729fd0f59..413a39b24 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -349,20 +349,24 @@ writeerr: } ssize_t rdbSaveLzfStringObject(rio *rdb, const unsigned char *s, size_t len) { + char rgbuf[2048]; size_t comprlen, outlen; - void *out; + void *out = rgbuf; /* We require at least four bytes compression for this to be worth it */ if (len <= 4) return 0; outlen = len-4; - if ((out = zmalloc(outlen+1, MALLOC_LOCAL)) == NULL) return 0; + if (outlen >= sizeof(rgbuf)) + if ((out = zmalloc(outlen+1, MALLOC_LOCAL)) == NULL) return 0; comprlen = lzf_compress(s, len, out, outlen); if (comprlen == 0) { - zfree(out); + if (out != rgbuf) + zfree(out); return 0; } ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len); - zfree(out); + if (out != rgbuf) + zfree(out); return nwritten; } @@ -1092,8 +1096,12 @@ int rdbSaveKeyValuePair(rio *rdb, robj_roptr key, robj_roptr val, const expireEn } char szT[32]; - snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); - if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; +#ifdef ENABLE_MVCC + if (g_pserver->fActiveReplica) { + snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); + if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; + } +#endif /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; @@ -2131,7 +2139,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { return NULL; } +#ifdef ENABLE_MVCC o->mvcc_tstamp = mvcc_tstamp; +#endif serverAssert(!o->FExpires()); return o; } @@ -2489,7 +2499,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { key = nullptr; goto eoferr; } +#ifdef ENABLE_MVCC bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false; +#else + bool fStaleMvccKey = false; +#endif /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was diff --git a/src/server.cpp b/src/server.cpp index 52648e7bd..2d47cab05 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1387,14 +1387,14 @@ dictType dbDictType = { dictObjectDestructor /* val destructor */ }; -/* db->pdict, keys are sds strings, vals uints. */ -dictType dbDictTypeTombstone = { +/* db->pdict, keys are sds strings, vals are Redis objects. */ +dictType dbTombstoneDictType = { dictSdsHash, /* hash function */ NULL, /* key dup */ NULL, /* val dup */ dictSdsKeyCompare, /* key compare */ - dictDbKeyDestructor, /* key destructor */ - NULL /* val destructor */ + dictDbKeyDestructor, /* key destructor */ + NULL /* val destructor */ }; dictType dbSnapshotDictType = { @@ -1539,8 +1539,9 @@ void tryResizeHashTables(int dbid) { * is returned. */ int redisDbPersistentData::incrementallyRehash() { /* Keys dictionary */ - if (dictIsRehashing(m_pdict)) { + if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) { dictRehashMilliseconds(m_pdict,1); + dictRehashMilliseconds(m_pdictTombstone,1); return 1; /* already used our millisecond for this loop... */ } return 0; @@ -2219,11 +2220,22 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { CONFIG_BGSAVE_RETRY_DELAY || g_pserver->lastbgsave_status == C_OK)) { - serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", - sp->changes, (int)sp->seconds); - rdbSaveInfo rsi, *rsiptr; - rsiptr = rdbPopulateSaveInfo(&rsi); - rdbSaveBackground(rsiptr); + // Ensure rehashing is complete + bool fRehashInProgress = false; + if (g_pserver->activerehashing) { + for (int idb = 0; idb < cserver.dbnum && !fRehashInProgress; ++idb) { + if (g_pserver->db[idb]->FRehashing()) + fRehashInProgress = true; + } + } + + if (!fRehashInProgress) { + serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", + sp->changes, (int)sp->seconds); + rdbSaveInfo rsi, *rsiptr; + rsiptr = rdbPopulateSaveInfo(&rsi); + rdbSaveBackground(rsiptr); + } break; } } @@ -2312,14 +2324,16 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { } } - bool fAnySnapshots = false; - for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb) - fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot(); - if (fAnySnapshots) - { - g_pserver->asyncworkqueue->AddWorkFunction([]{ - g_pserver->db[0]->consolidate_snapshot(); - }, true /*HiPri*/); + run_with_period(100) { + bool fAnySnapshots = false; + for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb) + fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot(); + if (fAnySnapshots) + { + g_pserver->asyncworkqueue->AddWorkFunction([]{ + g_pserver->db[0]->consolidate_snapshot(); + }, true /*HiPri*/); + } } /* Fire the cron loop modules event. */ @@ -2477,17 +2491,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) { latencyAddSampleIfNeeded("storage-commit", commit_latency); handleClientsWithPendingWrites(iel, aof_state); - if (serverTL->gcEpoch != 0) + if (!serverTL->gcEpoch.isReset()) g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); - serverTL->gcEpoch = 0; + serverTL->gcEpoch.reset(); aeAcquireLock(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(iel); - if (serverTL->gcEpoch != 0) + if (!serverTL->gcEpoch.isReset()) g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); - serverTL->gcEpoch = 0; + serverTL->gcEpoch.reset(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this @@ -2503,7 +2517,7 @@ void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/); - serverAssert(serverTL->gcEpoch == 0); + serverAssert(serverTL->gcEpoch.isReset()); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); aeAcquireLock(); for (int idb = 0; idb < cserver.dbnum; ++idb) @@ -5159,12 +5173,20 @@ sds genRedisInfoString(const char *section) { } if (allsections || defsections || !strcasecmp(section,"keydb")) { + // Compute the MVCC depth + int mvcc_depth = 0; + for (int idb = 0; idb < cserver.dbnum; ++idb) { + mvcc_depth = std::max(mvcc_depth, g_pserver->db[idb]->snapshot_depth()); + } + if (sections++) info = sdscat(info,"\r\n"); info = sdscatprintf(info, "# KeyDB\r\n" "variant:pro\r\n" - "license_status:%s\r\n", - cserver.license_key ? "OK" : "Trial" + "license_status:%s\r\n" + "mvcc_depth:%d\r\n", + cserver.license_key ? "OK" : "Trial", + mvcc_depth ); } diff --git a/src/server.h b/src/server.h index 86dcb872f..193004b52 100644 --- a/src/server.h +++ b/src/server.h @@ -877,7 +877,9 @@ typedef struct redisObject { private: mutable std::atomic refcount {0}; public: +#ifdef ENABLE_MVCC uint64_t mvcc_tstamp; +#endif void *m_ptr; inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } @@ -888,7 +890,11 @@ public: void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } } robj; +#ifdef ENABLE_MVCC static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase"); +#else +static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase"); +#endif __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) { @@ -1320,6 +1326,9 @@ public: void setExpire(robj *key, robj *subkey, long long when); void setExpire(expireEntry &&e); void initialize(); + void prepOverwriteForSnapshot(char *key); + + bool FRehashing() const { return dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone); } void setStorageProvider(StorageCache *pstorage); @@ -1411,14 +1420,16 @@ private: class redisDbPersistentDataSnapshot : protected redisDbPersistentData { friend class redisDbPersistentData; +private: + bool iterate_threadsafe_core(std::function &fn, bool fKeyOnly, bool fCacheOnly, bool fTop) const; + protected: - bool m_fConsolidated = false; static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot); - int snapshot_depth() const; void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce); - void freeTombstoneObjects(int depth); + bool freeTombstoneObjects(int depth); public: + int snapshot_depth() const; bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } bool iterate_threadsafe(std::function fn, bool fKeyOnly = false, bool fCacheOnly = false) const; @@ -1521,6 +1532,8 @@ struct redisDb : public redisDbPersistentDataSnapshot using redisDbPersistentData::dictUnsafeKeyOnly; using redisDbPersistentData::resortExpire; using redisDbPersistentData::prefetchKeysAsync; + using redisDbPersistentData::prepOverwriteForSnapshot; + using redisDbPersistentData::FRehashing; public: expireset::setiter expireitr; @@ -1949,6 +1962,58 @@ struct clusterState; #define MAX_EVENT_LOOPS 16 #define IDX_EVENT_LOOP_MAIN 0 +class GarbageCollectorCollection +{ + GarbageCollector garbageCollectorSnapshot; + GarbageCollector garbageCollectorGeneric; + +public: + struct Epoch + { + uint64_t epochSnapshot = 0; + uint64_t epochGeneric = 0; + + void reset() { + epochSnapshot = 0; + epochGeneric = 0; + } + + bool isReset() const { + return epochSnapshot == 0 && epochGeneric == 0; + } + }; + + Epoch startEpoch() + { + Epoch e; + e.epochSnapshot = garbageCollectorSnapshot.startEpoch(); + e.epochGeneric = garbageCollectorGeneric.startEpoch(); + return e; + } + + void endEpoch(Epoch e, bool fNoFree = false) + { + garbageCollectorSnapshot.endEpoch(e.epochSnapshot, fNoFree); + garbageCollectorGeneric.endEpoch(e.epochGeneric, fNoFree); + } + + void shutdown() + { + garbageCollectorSnapshot.shutdown(); + garbageCollectorGeneric.shutdown(); + } + + void enqueue(Epoch e, std::unique_ptr &&sp) + { + garbageCollectorSnapshot.enqueue(e.epochSnapshot, std::move(sp)); + } + + void enqueue(Epoch e, std::unique_ptr &&sp) + { + garbageCollectorGeneric.enqueue(e.epochGeneric, std::move(sp)); + } +}; + // Per-thread variabels that may be accessed without a lock struct redisServerThreadVars { aeEventLoop *el; @@ -1970,7 +2035,7 @@ struct redisServerThreadVars { struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; - uint64_t gcEpoch = 0; + GarbageCollectorCollection::Epoch gcEpoch; const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr; bool fRetrySetAofEvent = false; @@ -2424,7 +2489,7 @@ struct redisServer { /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */ - GarbageCollector garbageCollector; + GarbageCollectorCollection garbageCollector; IStorageFactory *m_pstorageFactory = nullptr; int storage_flush_period; // The time between flushes in the CRON job @@ -2553,7 +2618,7 @@ extern dictType zsetDictType; extern dictType clusterNodesDictType; extern dictType clusterNodesBlackListDictType; extern dictType dbDictType; -extern dictType dbDictTypeTombstone; +extern dictType dbTombstoneDictType; extern dictType dbSnapshotDictType; extern dictType shaScriptObjectDictType; extern double R_Zero, R_PosInf, R_NegInf, R_Nan; diff --git a/src/snapshot.cpp b/src/snapshot.cpp index 0ad1b5282..0f84e483e 100644 --- a/src/snapshot.cpp +++ b/src/snapshot.cpp @@ -2,6 +2,29 @@ #include "aelocker.h" static const size_t c_elementsSmallLimit = 500000; +static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time + +class LazyFree : public ICollectable +{ +public: + virtual ~LazyFree() + { + for (auto *de : vecde) + { + dbDictType.keyDestructor(nullptr, dictGetKey(de)); + dbDictType.valDestructor(nullptr, dictGetVal(de)); + zfree(de); + } + for (robj *o : vecobjLazyFree) + decrRefCount(o); + for (dict *d : vecdictLazyFree) + dictRelease(d); + } + + std::vector vecdictLazyFree; + std::vector vecobjLazyFree; + std::vector vecde; +}; const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) { @@ -70,6 +93,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 auto spdb = std::unique_ptr(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); + dictRehashMilliseconds(m_pdict, 50); // Give us the best chance at a fast cleanup spdb->m_fAllChanged = false; spdb->m_fTrackingChanges = 0; spdb->m_pdict = m_pdict; @@ -90,8 +114,13 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6 spdb->m_setexpire->pause_rehash(); // needs to be const } + if (dictIsRehashing(spdb->m_pdict) || dictIsRehashing(spdb->m_pdictTombstone)) { + serverLog(LL_NOTICE, "NOTICE: Suboptimal snapshot"); + } + m_pdict = dictCreate(&dbDictType,this); - m_pdictTombstone = dictCreate(&dbDictTypeTombstone, this); + dictExpand(m_pdict, 1024); // minimize rehash overhead + m_pdictTombstone = dictCreate(&dbTombstoneDictType, this); serverAssert(spdb->m_pdict->iterators == 1); @@ -183,7 +212,18 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot) { mstime_t latency; - aeAcquireLock(); latencyStartMonitor(latency); + + aeAcquireLock(); + while (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) { + dictRehashMilliseconds(m_pdict, 1); + dictRehashMilliseconds(m_pdictTombstone, 1); + // Give someone else a chance + aeReleaseLock(); + usleep(300); + aeAcquireLock(); + } + + latencyStartMonitor(latency); if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint) { // Free a stale async snapshot so consolidate_children can clean it up later @@ -209,11 +249,22 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot auto psnapshotT = createSnapshot(LLONG_MAX, false); endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref psnapshot = nullptr; - aeReleaseLock(); latencyEndMonitor(latency); + + latencyEndMonitor(latency); latencyAddSampleIfNeeded("end-snapshot-async-phase-1", latency); + aeReleaseLock(); // do the expensive work of merging snapshots outside the ref - const_cast(psnapshotT)->freeTombstoneObjects(1); // depth is one because we just creted it + if (const_cast(psnapshotT)->freeTombstoneObjects(1)) // depth is one because we just creted it + { + aeAcquireLock(); + if (m_pdbSnapshotASYNC != nullptr) + endSnapshot(m_pdbSnapshotASYNC); + m_pdbSnapshotASYNC = nullptr; + endSnapshot(psnapshotT); + aeReleaseLock(); + return; + } const_cast(psnapshotT)->consolidate_children(this, true); // Final Cleanup @@ -222,33 +273,80 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot m_pdbSnapshotASYNC = psnapshotT; else endSnapshot(psnapshotT); // finally clean up our temp snapshot - aeReleaseLock(); latencyEndMonitor(latency); - + + latencyEndMonitor(latency); latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency); + aeReleaseLock(); } -void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) +bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) { if (m_pdbSnapshot == nullptr) - return; + { + serverAssert(dictSize(m_pdictTombstone) == 0); + return true; + } - const_cast(m_pdbSnapshot)->freeTombstoneObjects(depth+1); + if (!const_cast(m_pdbSnapshot)->freeTombstoneObjects(depth+1)) + return false; + + { + AeLocker ae; + ae.arm(nullptr); if (m_pdbSnapshot->m_refCount != depth && (m_pdbSnapshot->m_refCount != (m_refCount+1))) - return; + return false; + ae.disarm(); + } + + std::unique_lock lock(s_lock, std::defer_lock); + if (!lock.try_lock()) + return false; // this is a best effort function + std::unique_ptr splazy = std::make_unique(); + + dict *dictTombstoneNew = dictCreate(&dbTombstoneDictType, nullptr); dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; + std::vector vecdeFree; + vecdeFree.reserve(dictSize(m_pdictTombstone)); + unsigned rgcremoved[2] = {0}; while ((de = dictNext(di)) != nullptr) { - dictEntry *deObj = dictFind(m_pdbSnapshot->m_pdict, dictGetKey(de)); - if (deObj != nullptr && dictGetVal(deObj) != nullptr) + dictEntry **dePrev = nullptr; + dictht *ht = nullptr; + sds key = (sds)dictGetKey(de); + // BUG BUG: Why can't we do a shallow search here? + dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, false); + + if (deObj != nullptr) { - decrRefCount((robj*)dictGetVal(deObj)); - void *ptrSet = nullptr; - __atomic_store(&deObj->v.val, &ptrSet, __ATOMIC_RELAXED); + // Now unlink the DE + __atomic_store(dePrev, &deObj->next, __ATOMIC_RELEASE); + if (ht == &m_pdbSnapshot->m_pdict->ht[0]) + rgcremoved[0]++; + else + rgcremoved[1]++; + splazy->vecde.push_back(deObj); + } else { + serverAssert(dictFind(m_pdbSnapshot->m_pdict, key) == nullptr); + serverAssert(m_pdbSnapshot->find_cached_threadsafe(key) != nullptr); + dictAdd(dictTombstoneNew, sdsdupshared((sds)dictGetKey(de)), dictGetVal(de)); } } dictReleaseIterator(di); + + dictForceRehash(dictTombstoneNew); + aeAcquireLock(); + dict *dT = m_pdbSnapshot->m_pdict; + splazy->vecdictLazyFree.push_back(m_pdictTombstone); + __atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE); + __atomic_fetch_sub(&dT->ht[0].used, rgcremoved[0], __ATOMIC_RELEASE); + __atomic_fetch_sub(&dT->ht[1].used, rgcremoved[1], __ATOMIC_RELEASE); + serverLog(LL_WARNING, "tombstones removed: %u, remain: %lu", rgcremoved[0]+rgcremoved[1], dictSize(m_pdictTombstone)); + g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy)); + aeReleaseLock(); + + return true; } void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot) @@ -299,15 +397,20 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn // Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB dictIterator *di = dictGetIterator(m_pdictTombstone); dictEntry *de; + m_spdbSnapshotHOLDER->m_pdict->iterators++; + auto splazy = std::make_unique(); while ((de = dictNext(di)) != NULL) { dictEntry **dePrev; dictht *ht; - dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), &dePrev, &ht); + // BUG BUG Why not a shallow search? + dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, false /*!!sdsisshared((sds)dictGetKey(de))*/); if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot) { // The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt) +#ifdef CHECKED_BUILD serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr); +#endif dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr); continue; } @@ -318,15 +421,16 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn } // Delete the object from the source dict, we don't use dictDelete to avoid a second search - dictFreeKey(m_spdbSnapshotHOLDER->m_pdict, deSnapshot); - dictFreeVal(m_spdbSnapshotHOLDER->m_pdict, deSnapshot); - serverAssert(*dePrev == deSnapshot); + splazy->vecde.push_back(deSnapshot); *dePrev = deSnapshot->next; - zfree(deSnapshot); ht->used--; } + + + m_spdbSnapshotHOLDER->m_pdict->iterators--; dictReleaseIterator(di); - dictEmpty(m_pdictTombstone, nullptr); + splazy->vecdictLazyFree.push_back(m_pdictTombstone); + m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr); // Stage 2 Move all new keys to the snapshot DB dictMerge(m_spdbSnapshotHOLDER->m_pdict, m_pdict); @@ -355,8 +459,10 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn auto spsnapshotFree = std::move(m_spdbSnapshotHOLDER); m_spdbSnapshotHOLDER = std::move(spsnapshotFree->m_spdbSnapshotHOLDER); - if (serverTL != nullptr) + if (serverTL != nullptr) { g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(spsnapshotFree)); + g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy)); + } // Sanity Checks serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr); @@ -393,8 +499,10 @@ dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOn dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const { + dict *dictTombstone; + __atomic_load(&m_pdictTombstone, &dictTombstone, __ATOMIC_ACQUIRE); dictEntry *de = dictFind(m_pdict, key); - if (de == nullptr && m_pdbSnapshot != nullptr && dictFind(m_pdictTombstone, key) == nullptr) + if (de == nullptr && m_pdbSnapshot != nullptr && dictFind(dictTombstone, key) == nullptr) { auto itr = m_pdbSnapshot->find_cached_threadsafe(key); if (itr != nullptr) @@ -460,11 +568,20 @@ unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long itera } bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function fn, bool fKeyOnly, bool fCacheOnly) const +{ + return iterate_threadsafe_core(fn, fKeyOnly, fCacheOnly, true); +} + +bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function &fn, bool fKeyOnly, bool fCacheOnly, bool fFirst) const { // Take the size so we can ensure we visited every element exactly once // use volatile to ensure it's not checked too late. This makes it more // likely we'll detect races (but it won't gurantee it) + aeAcquireLock(); + dict *dictTombstone; + __atomic_load(&m_pdictTombstone, &dictTombstone, __ATOMIC_ACQUIRE); volatile ssize_t celem = (ssize_t)size(); + aeReleaseLock(); dictEntry *de = nullptr; bool fResult = true; @@ -510,19 +627,22 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::functioniterate_threadsafe([this, &fn, &celem](const char *key, robj_roptr o) { - dictEntry *deTombstone = dictFind(m_pdictTombstone, key); + std::function fnNew = [this, &fn, &celem, dictTombstone](const char *key, robj_roptr o) { + dictEntry *deTombstone = dictFind(dictTombstone, key); if (deTombstone != nullptr) return true; // Alright it's a key in the use keyspace, lets ensure it and then pass it off --celem; return fn(key, o); - }, fKeyOnly, fCacheOnly); + }; + fResult = psnapshot->iterate_threadsafe_core(fnNew, fKeyOnly, fCacheOnly, false); } // we should have hit all keys or had a good reason not to - serverAssert(!fResult || celem == 0 || (m_spstorage && fCacheOnly)); + if (!(!fResult || celem == 0 || (m_spstorage && fCacheOnly))) + serverLog(LL_WARNING, "celem: %ld", celem); + serverAssert(!fResult || celem == 0 || (m_spstorage && fCacheOnly) || !fFirst); return fResult; } @@ -538,11 +658,12 @@ void redisDbPersistentData::consolidate_snapshot() { aeAcquireLock(); auto psnapshot = (m_pdbSnapshot != nullptr) ? m_spdbSnapshotHOLDER.get() : nullptr; - if (psnapshot == nullptr) + if (psnapshot == nullptr || psnapshot->snapshot_depth() == 0) { aeReleaseLock(); return; } + psnapshot->m_refCount++; // ensure it's not free'd aeReleaseLock(); psnapshot->consolidate_children(this, false /* fForce */); @@ -554,8 +675,6 @@ void redisDbPersistentData::consolidate_snapshot() // only call this on the "real" database to consolidate the first child void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce) { - static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time - std::unique_lock lock(s_lock, std::defer_lock); if (!lock.try_lock()) return; // this is a best effort function @@ -615,7 +734,6 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData * serverLog(LL_VERBOSE, "cleaned %d snapshots", snapshot_depth()-1); spdb->m_refCount = depth; - spdb->m_fConsolidated = true; // Drop our refs from this snapshot and its children psnapshotT = this; std::vector vecT;