Merge branch 'keydbpro' into PRO_RELEASE_6

Former-commit-id: ed98be0ba81ffdc501847ea0d2486f5f01391319
This commit is contained in:
John Sully 2020-08-17 02:26:51 +00:00
commit dcf607622c
16 changed files with 454 additions and 86 deletions

View File

@ -1,6 +1,5 @@
![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg) ![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg)
![CI](https://github.com/JohnSully/KeyDB/workflows/CI/badge.svg?branch=unstable) ![CI](https://github.com/JohnSully/KeyDB/workflows/CI/badge.svg?branch=unstable)
[![Join the chat at https://gitter.im/KeyDB/community](https://badges.gitter.im/KeyDB/community.svg)](https://gitter.im/KeyDB/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![StackShare](http://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/eq-alpha-technology-inc/eq-alpha-technology-inc) [![StackShare](http://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/eq-alpha-technology-inc/eq-alpha-technology-inc)
##### New! Want to extend KeyDB with Javascript? Try [ModJS](https://github.com/JohnSully/ModJS) ##### New! Want to extend KeyDB with Javascript? Try [ModJS](https://github.com/JohnSully/ModJS)

5
deps/Makefile vendored
View File

@ -1,6 +1,7 @@
# Redis dependency Makefile # Redis dependency Makefile
uname_S:= $(shell sh -c 'uname -s 2>/dev/null || echo not') uname_S:= $(shell sh -c 'uname -s 2>/dev/null || echo not')
uname_M := $(shell sh -c 'uname -m 2>/dev/null || echo not')
CCCOLOR="\033[34m" CCCOLOR="\033[34m"
LINKCOLOR="\033[34;1m" LINKCOLOR="\033[34;1m"
@ -94,6 +95,10 @@ jemalloc: .make-prerequisites
rocksdb: .make-prerequisites rocksdb: .make-prerequisites
@printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR) @printf '%b %b\n' $(MAKECOLOR)MAKE$(ENDCOLOR) $(BINCOLOR)$@$(ENDCOLOR)
ifeq ($(uname_M),x86_64)
cd rocksdb && PORTABLE=1 USE_SSE=1 FORCE_SSE42=1 $(MAKE) static_lib cd rocksdb && PORTABLE=1 USE_SSE=1 FORCE_SSE42=1 $(MAKE) static_lib
else
cd rocksdb && PORTABLE=1 $(MAKE) static_lib
endif
.PHONY: rocksdb .PHONY: rocksdb

View File

@ -45,7 +45,7 @@ void AsyncWorkQueue::WorkerThreadMain()
ProcessPendingAsyncWrites(); ProcessPendingAsyncWrites();
aeReleaseLock(); aeReleaseLock();
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch);
serverTL->gcEpoch = 0; serverTL->gcEpoch.reset();
} }
listRelease(vars.clients_pending_asyncwrite); listRelease(vars.clients_pending_asyncwrite);

View File

@ -47,6 +47,11 @@ endif
USEASM?=true USEASM?=true
ifeq ($(NOMVCC),)
CFLAGS+= -DENABLE_MVCC
CXXFLAGS+= -DENABLE_MVCC
endif
ifneq ($(SANITIZE),) ifneq ($(SANITIZE),)
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE

View File

@ -260,10 +260,12 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
} }
if (fSynchronous) if (fSynchronous)
{
{ {
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock); std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock); cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval; ret = cmd.pctl->rval;
}
delete cmd.pctl; delete cmd.pctl;
} }
@ -314,10 +316,12 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
int ret = AE_OK; int ret = AE_OK;
if (fSynchronous) if (fSynchronous)
{
{ {
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock); std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock); cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval; ret = cmd.pctl->rval;
}
delete cmd.pctl; delete cmd.pctl;
} }
return ret; return ret;

View File

@ -4920,9 +4920,11 @@ void createDumpPayload(rio *payload, robj_roptr o, robj *key) {
rioInitWithBuffer(payload,sdsempty()); rioInitWithBuffer(payload,sdsempty());
serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObjectType(payload,o));
serverAssert(rdbSaveObject(payload,o,key)); serverAssert(rdbSaveObject(payload,o,key));
#ifdef ENABLE_MVCC
char szT[32]; char szT[32];
snprintf(szT, 32, "%" PRIu64, o->mvcc_tstamp); snprintf(szT, 32, "%" PRIu64, o->mvcc_tstamp);
serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1); serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1);
#endif
/* Write the footer, this is how it looks like: /* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+ * ----------------+---------------------+---------------+
@ -5064,9 +5066,11 @@ void restoreCommand(client *c) {
decrRefCount(auxkey); decrRefCount(auxkey);
goto eoferr; goto eoferr;
} }
#ifdef ENABLE_MVCC
if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) {
obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
} }
#endif
decrRefCount(auxkey); decrRefCount(auxkey);
decrRefCount(auxval); decrRefCount(auxval);
} }

View File

@ -91,7 +91,9 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) {
robj *val = itr.val(); robj *val = itr.val();
lookupKeyUpdateObj(val, flags); lookupKeyUpdateObj(val, flags);
if (flags & LOOKUP_UPDATEMVCC) { if (flags & LOOKUP_UPDATEMVCC) {
#ifdef ENABLE_MVCC
val->mvcc_tstamp = getMvccTstamp(); val->mvcc_tstamp = getMvccTstamp();
#endif
db->trackkey(key, true /* fUpdate */); db->trackkey(key, true /* fUpdate */);
} }
return val; return val;
@ -218,8 +220,10 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
bool dbAddCore(redisDb *db, robj *key, robj *val, bool fAssumeNew = false) { bool dbAddCore(redisDb *db, robj *key, robj *val, bool fAssumeNew = false) {
serverAssert(!val->FExpires()); serverAssert(!val->FExpires());
sds copy = sdsdupshared(szFromObj(key)); sds copy = sdsdupshared(szFromObj(key));
#ifdef ENABLE_MVCC
if (g_pserver->fActiveReplica) if (g_pserver->fActiveReplica)
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
#endif
bool fInserted = db->insert(copy, val, fAssumeNew); bool fInserted = db->insert(copy, val, fAssumeNew);
@ -270,7 +274,9 @@ void redisDb::dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpd
if (fUpdateMvcc) { if (fUpdateMvcc) {
if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
val = dupStringObject(val); val = dupStringObject(val);
#ifdef ENABLE_MVCC
val->mvcc_tstamp = getMvccTstamp(); val->mvcc_tstamp = getMvccTstamp();
#endif
} }
if (g_pserver->lazyfree_lazy_server_del) if (g_pserver->lazyfree_lazy_server_del)
@ -303,12 +309,14 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
if (itr == nullptr) if (itr == nullptr)
return (dbAddCore(db, key, val) == true); return (dbAddCore(db, key, val) == true);
#ifdef ENABLE_MVCC
robj *old = itr.val(); robj *old = itr.val();
if (old->mvcc_tstamp <= val->mvcc_tstamp) if (old->mvcc_tstamp <= val->mvcc_tstamp)
{ {
db->dbOverwriteCore(itr, key, val, false, true); db->dbOverwriteCore(itr, key, val, false, true);
return true; return true;
} }
#endif
return false; return false;
} }
@ -330,6 +338,7 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
* The client 'c' argument may be set to NULL if the operation is performed * The client 'c' argument may be set to NULL if the operation is performed
* in a context where there is no clear client performing the operation. */ * in a context where there is no clear client performing the operation. */
void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) { void genericSetKey(client *c, redisDb *db, robj *key, robj *val, int keepttl, int signal) {
db->prepOverwriteForSnapshot(szFromObj(key));
if (!dbAddCore(db, key, val)) { if (!dbAddCore(db, key, val)) {
dbOverwrite(db, key, val, !keepttl); dbOverwrite(db, key, val, !keepttl);
} }
@ -421,8 +430,9 @@ bool redisDbPersistentData::syncDelete(robj *key)
auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)); auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key));
if (itr != nullptr) if (itr != nullptr)
{ {
sds keyTombstone = sdsdup(szFromObj(key)); sds keyTombstone = sdsdupshared(itr.key());
if (dictAdd(m_pdictTombstone, keyTombstone, nullptr) != DICT_OK) uint64_t hash = dictGetHash(m_pdict, keyTombstone);
if (dictAdd(m_pdictTombstone, keyTombstone, (void*)hash) != DICT_OK)
sdsfree(keyTombstone); sdsfree(keyTombstone);
} }
} }
@ -2290,7 +2300,7 @@ void redisDbPersistentData::initialize()
{ {
m_pdbSnapshot = nullptr; m_pdbSnapshot = nullptr;
m_pdict = dictCreate(&dbDictType,this); m_pdict = dictCreate(&dbDictType,this);
m_pdictTombstone = dictCreate(&dbDictType,this); m_pdictTombstone = dictCreate(&dbTombstoneDictType,this);
m_setexpire = new(MALLOC_LOCAL) expireset(); m_setexpire = new(MALLOC_LOCAL) expireset();
m_fAllChanged = 0; m_fAllChanged = 0;
m_fTrackingChanges = 0; m_fTrackingChanges = 0;
@ -2349,6 +2359,24 @@ bool redisDbPersistentData::insert(char *key, robj *o, bool fAssumeNew)
return (res == DICT_OK); return (res == DICT_OK);
} }
// This is a performance tool to prevent us copying over an object we're going to overwrite anyways
void redisDbPersistentData::prepOverwriteForSnapshot(char *key)
{
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU)
return;
if (m_pdbSnapshot != nullptr)
{
auto itr = m_pdbSnapshot->find_cached_threadsafe(key);
if (itr.key() != nullptr)
{
sds keyNew = sdsdupshared(itr.key());
if (dictAdd(m_pdictTombstone, keyNew, (void*)dictHashKey(m_pdict, key)) != DICT_OK)
sdsfree(keyNew);
}
}
}
void redisDbPersistentData::tryResize() void redisDbPersistentData::tryResize()
{ {
if (htNeedsResize(m_pdict)) if (htNeedsResize(m_pdict))
@ -2470,15 +2498,20 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
sdsfree(strT); sdsfree(strT);
dictAdd(m_pdict, keyNew, objNew); dictAdd(m_pdict, keyNew, objNew);
serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1);
#ifdef ENABLE_MVCC
serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp); serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp);
#endif
} }
} }
else else
{ {
dictAdd(m_pdict, keyNew, nullptr); dictAdd(m_pdict, keyNew, nullptr);
} }
*pde = dictFind(m_pdict, sdsKey); uint64_t hash = dictGetHash(m_pdict, sdsKey);
dictAdd(m_pdictTombstone, sdsdupshared(itr.key()), nullptr); dictEntry **deT;
dictht *ht;
*pde = dictFindWithPrev(m_pdict, sdsKey, hash, &deT, &ht);
dictAdd(m_pdictTombstone, sdsdupshared(itr.key()), (void*)hash);
} }
} }

View File

@ -179,6 +179,9 @@ int dictExpand(dict *d, unsigned long size)
int dictMerge(dict *dst, dict *src) int dictMerge(dict *dst, dict *src)
{ {
#define MERGE_BLOCK_SIZE 4
dictEntry *rgdeT[MERGE_BLOCK_SIZE];
assert(dst != src); assert(dst != src);
if (dictSize(src) == 0) if (dictSize(src) == 0)
return DICT_OK; return DICT_OK;
@ -197,6 +200,8 @@ int dictMerge(dict *dst, dict *src)
std::swap(dst->iterators, src->iterators); std::swap(dst->iterators, src->iterators);
} }
src->rehashidx = -1;
if (!dictIsRehashing(dst) && !dictIsRehashing(src)) if (!dictIsRehashing(dst) && !dictIsRehashing(src))
{ {
if (dst->ht[0].size >= src->ht[0].size) if (dst->ht[0].size >= src->ht[0].size)
@ -210,6 +215,50 @@ int dictMerge(dict *dst, dict *src)
} }
_dictReset(&src->ht[0]); _dictReset(&src->ht[0]);
dst->rehashidx = 0; dst->rehashidx = 0;
assert(dictIsRehashing(dst));
assert((dictSize(src)+dictSize(dst)) == expectedSize);
return DICT_OK;
}
if (!dictIsRehashing(src) && dictSize(src) > 0 &&
(src->ht[0].size == dst->ht[0].size || src->ht[0].size == dst->ht[1].size))
{
auto &htDst = (src->ht[0].size == dst->ht[0].size) ? dst->ht[0] : dst->ht[1];
assert(src->ht[0].size == htDst.size);
for (size_t ide = 0; ide < src->ht[0].size; ide += MERGE_BLOCK_SIZE)
{
if (src->ht[0].used == 0)
break;
for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) {
rgdeT[dde] = src->ht[0].table[ide + dde];
src->ht[0].table[ide + dde] = nullptr;
}
for (;;) {
bool fAnyFound = false;
for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) {
if (rgdeT[dde] == nullptr)
continue;
dictEntry *deNext = rgdeT[dde]->next;
rgdeT[dde]->next = htDst.table[ide+dde];
htDst.table[ide+dde] = rgdeT[dde];
rgdeT[dde] = deNext;
htDst.used++;
src->ht[0].used--;
fAnyFound = fAnyFound || (deNext != nullptr);
}
if (!fAnyFound)
break;
}
}
// If we copied to the base hash table of a rehashing dst, reset the rehash
if (dictIsRehashing(dst) && src->ht[0].size == dst->ht[0].size)
dst->rehashidx = 0;
assert(dictSize(src) == 0);
assert((dictSize(src)+dictSize(dst)) == expectedSize); assert((dictSize(src)+dictSize(dst)) == expectedSize);
return DICT_OK; return DICT_OK;
} }
@ -218,10 +267,34 @@ int dictMerge(dict *dst, dict *src)
auto &htDst = dictIsRehashing(dst) ? dst->ht[1] : dst->ht[0]; auto &htDst = dictIsRehashing(dst) ? dst->ht[1] : dst->ht[0];
for (int iht = 0; iht < 2; ++iht) for (int iht = 0; iht < 2; ++iht)
{ {
for (size_t ide = 0; ide < src->ht[iht].size; ++ide) for (size_t ide = 0; ide < src->ht[iht].size; ide += MERGE_BLOCK_SIZE)
{ {
if (src->ht[iht].used == 0) if (src->ht[iht].used == 0)
break; break;
for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) {
rgdeT[dde] = src->ht[iht].table[ide + dde];
src->ht[iht].table[ide + dde] = nullptr;
}
for (;;) {
bool fAnyFound = false;
for (int dde = 0; dde < MERGE_BLOCK_SIZE; ++dde) {
if (rgdeT[dde] == nullptr)
continue;
uint64_t h = dictHashKey(dst, rgdeT[dde]->key) & htDst.sizemask;
dictEntry *deNext = rgdeT[dde]->next;
rgdeT[dde]->next = htDst.table[h];
htDst.table[h] = rgdeT[dde];
rgdeT[dde] = deNext;
htDst.used++;
src->ht[iht].used--;
fAnyFound = fAnyFound || (deNext != nullptr);
}
if (!fAnyFound)
break;
}
#if 0
dictEntry *de = src->ht[iht].table[ide]; dictEntry *de = src->ht[iht].table[ide];
src->ht[iht].table[ide] = nullptr; src->ht[iht].table[ide] = nullptr;
while (de != nullptr) while (de != nullptr)
@ -236,6 +309,7 @@ int dictMerge(dict *dst, dict *src)
de = deNext; de = deNext;
src->ht[iht].used--; src->ht[iht].used--;
} }
#endif
} }
} }
assert((dictSize(src)+dictSize(dst)) == expectedSize); assert((dictSize(src)+dictSize(dst)) == expectedSize);
@ -326,7 +400,7 @@ int dictRehashMilliseconds(dict *d, int ms) {
static void _dictRehashStep(dict *d) { static void _dictRehashStep(dict *d) {
unsigned long iterators; unsigned long iterators;
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED); __atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
if (iterators == 0) dictRehash(d,1); if (iterators == 0) dictRehash(d,2);
} }
/* Add an element to the target hash table */ /* Add an element to the target hash table */
@ -541,21 +615,20 @@ void dictRelease(dict *d)
zfree(d); zfree(d);
} }
dictEntry *dictFindWithPrev(dict *d, const void *key, dictEntry ***dePrevPtr, dictht **pht) dictEntry *dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **pht, bool fShallowCompare)
{ {
dictEntry *he; dictEntry *he;
uint64_t h, idx, table; uint64_t idx, table;
if (dictSize(d) == 0) return NULL; /* dict is empty */ if (dictSize(d) == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d); if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) { for (table = 0; table <= 1; table++) {
*pht = d->ht + table; *pht = d->ht + table;
idx = h & d->ht[table].sizemask; idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx]; he = d->ht[table].table[idx];
*dePrevPtr = &d->ht[table].table[idx]; *dePrevPtr = &d->ht[table].table[idx];
while(he) { while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) { if (key==he->key || (!fShallowCompare && dictCompareKeys(d, key, he->key))) {
return he; return he;
} }
*dePrevPtr = &he->next; *dePrevPtr = &he->next;
@ -570,7 +643,8 @@ dictEntry *dictFind(dict *d, const void *key)
{ {
dictEntry **deT; dictEntry **deT;
dictht *ht; dictht *ht;
return dictFindWithPrev(d, key, &deT, &ht); uint64_t h = dictHashKey(d, key);
return dictFindWithPrev(d, key, h, &deT, &ht);
} }
void *dictFetchValue(dict *d, const void *key) { void *dictFetchValue(dict *d, const void *key) {
@ -1220,7 +1294,9 @@ void dictGetStats(char *buf, size_t bufsize, dict *d) {
void dictForceRehash(dict *d) void dictForceRehash(dict *d)
{ {
while (dictIsRehashing(d)) _dictRehashStep(d); unsigned long iterators;
__atomic_load(&d->iterators, &iterators, __ATOMIC_RELAXED);
while (iterators == 0 && dictIsRehashing(d)) _dictRehashStep(d);
} }
/* ------------------------------- Benchmark ---------------------------------*/ /* ------------------------------- Benchmark ---------------------------------*/

View File

@ -167,7 +167,7 @@ dictEntry *dictUnlink(dict *ht, const void *key);
void dictFreeUnlinkedEntry(dict *d, dictEntry *he); void dictFreeUnlinkedEntry(dict *d, dictEntry *he);
void dictRelease(dict *d); void dictRelease(dict *d);
dictEntry * dictFind(dict *d, const void *key); dictEntry * dictFind(dict *d, const void *key);
dictEntry * dictFindWithPrev(dict *d, const void *key, dictEntry ***dePrevPtr, dictht **ht); dictEntry * dictFindWithPrev(dict *d, const void *key, uint64_t h, dictEntry ***dePrevPtr, dictht **ht, bool fShallowCompare = false);
void *dictFetchValue(dict *d, const void *key); void *dictFetchValue(dict *d, const void *key);
int dictResize(dict *d); int dictResize(dict *d);
dictIterator *dictGetIterator(dict *d); dictIterator *dictGetIterator(dict *d);

View File

@ -3,6 +3,12 @@
#include <assert.h> #include <assert.h>
#include <unordered_set> #include <unordered_set>
struct ICollectable
{
virtual ~ICollectable() {}
bool FWillFreeChildDebug() { return false; }
};
template<typename T> template<typename T>
class GarbageCollector class GarbageCollector
{ {

View File

@ -62,7 +62,10 @@ bool redisDbPersistentData::asyncDelete(robj *key) {
dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key)); dictEntry *de = dictUnlink(m_pdict,ptrFromObj(key));
if (de) { if (de) {
if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)) != nullptr) if (m_pdbSnapshot != nullptr && m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)) != nullptr)
dictAdd(m_pdictTombstone, sdsdup((sds)dictGetKey(de)), nullptr); {
uint64_t hash = dictGetHash(m_pdict, szFromObj(key));
dictAdd(m_pdictTombstone, sdsdup((sds)dictGetKey(de)), (void*)hash);
}
robj *val = (robj*)dictGetVal(de); robj *val = (robj*)dictGetVal(de);
if (val->FExpires()) if (val->FExpires())

View File

@ -46,7 +46,9 @@ robj *createObject(int type, void *ptr) {
o->encoding = OBJ_ENCODING_RAW; o->encoding = OBJ_ENCODING_RAW;
o->m_ptr = ptr; o->m_ptr = ptr;
o->setrefcount(1); o->setrefcount(1);
#ifdef ENABLE_MVCC
o->mvcc_tstamp = OBJ_MVCC_INVALID; o->mvcc_tstamp = OBJ_MVCC_INVALID;
#endif
/* Set the LRU to the current lruclock (minutes resolution), or /* Set the LRU to the current lruclock (minutes resolution), or
* alternatively the LFU counter. */ * alternatively the LFU counter. */
@ -92,6 +94,7 @@ robj *createRawStringObject(const char *ptr, size_t len) {
* an object where the sds string is actually an unmodifiable string * an object where the sds string is actually an unmodifiable string
* allocated in the same chunk as the object itself. */ * allocated in the same chunk as the object itself. */
robj *createEmbeddedStringObject(const char *ptr, size_t len) { robj *createEmbeddedStringObject(const char *ptr, size_t len) {
serverAssert(len <= UINT8_MAX);
size_t allocsize = sizeof(struct sdshdr8)+len+1; size_t allocsize = sizeof(struct sdshdr8)+len+1;
if (allocsize < sizeof(void*)) if (allocsize < sizeof(void*))
allocsize = sizeof(void*); allocsize = sizeof(void*);
@ -101,7 +104,9 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
o->type = OBJ_STRING; o->type = OBJ_STRING;
o->encoding = OBJ_ENCODING_EMBSTR; o->encoding = OBJ_ENCODING_EMBSTR;
o->setrefcount(1); o->setrefcount(1);
#ifdef ENABLE_MVCC
o->mvcc_tstamp = OBJ_MVCC_INVALID; o->mvcc_tstamp = OBJ_MVCC_INVALID;
#endif
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
@ -129,7 +134,12 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
* *
* The current limit of 52 is chosen so that the biggest string object * The current limit of 52 is chosen so that the biggest string object
* we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
#ifdef ENABLE_MVCC
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48
#else
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 56
#endif
static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total");
robj *createStringObject(const char *ptr, size_t len) { robj *createStringObject(const char *ptr, size_t len) {
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
@ -1316,10 +1326,12 @@ NULL
* because we update the access time only * because we update the access time only
* when the key is read or overwritten. */ * when the key is read or overwritten. */
addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast())); addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast()));
#ifdef ENABLE_MVCC
} else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) { } else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== nullptr) return; == nullptr) return;
addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000); addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000);
#endif
} else { } else {
addReplySubcommandSyntaxError(c); addReplySubcommandSyntaxError(c);
} }
@ -1579,9 +1591,11 @@ robj *deserializeStoredObjectCore(const void *data, size_t cb)
decrRefCount(auxkey); decrRefCount(auxkey);
goto eoferr; goto eoferr;
} }
#ifdef ENABLE_MVCC
if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) {
obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
} }
#endif
decrRefCount(auxkey); decrRefCount(auxkey);
decrRefCount(auxval); decrRefCount(auxval);
} }

View File

@ -349,19 +349,23 @@ writeerr:
} }
ssize_t rdbSaveLzfStringObject(rio *rdb, const unsigned char *s, size_t len) { ssize_t rdbSaveLzfStringObject(rio *rdb, const unsigned char *s, size_t len) {
char rgbuf[2048];
size_t comprlen, outlen; size_t comprlen, outlen;
void *out; void *out = rgbuf;
/* We require at least four bytes compression for this to be worth it */ /* We require at least four bytes compression for this to be worth it */
if (len <= 4) return 0; if (len <= 4) return 0;
outlen = len-4; outlen = len-4;
if (outlen >= sizeof(rgbuf))
if ((out = zmalloc(outlen+1, MALLOC_LOCAL)) == NULL) return 0; if ((out = zmalloc(outlen+1, MALLOC_LOCAL)) == NULL) return 0;
comprlen = lzf_compress(s, len, out, outlen); comprlen = lzf_compress(s, len, out, outlen);
if (comprlen == 0) { if (comprlen == 0) {
if (out != rgbuf)
zfree(out); zfree(out);
return 0; return 0;
} }
ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len); ssize_t nwritten = rdbSaveLzfBlob(rdb, out, comprlen, len);
if (out != rgbuf)
zfree(out); zfree(out);
return nwritten; return nwritten;
} }
@ -1092,8 +1096,12 @@ int rdbSaveKeyValuePair(rio *rdb, robj_roptr key, robj_roptr val, const expireEn
} }
char szT[32]; char szT[32];
#ifdef ENABLE_MVCC
if (g_pserver->fActiveReplica) {
snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp);
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1;
}
#endif
/* Save type, key, value */ /* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveObjectType(rdb,val) == -1) return -1;
@ -2131,7 +2139,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
return NULL; return NULL;
} }
#ifdef ENABLE_MVCC
o->mvcc_tstamp = mvcc_tstamp; o->mvcc_tstamp = mvcc_tstamp;
#endif
serverAssert(!o->FExpires()); serverAssert(!o->FExpires());
return o; return o;
} }
@ -2489,7 +2499,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
key = nullptr; key = nullptr;
goto eoferr; goto eoferr;
} }
#ifdef ENABLE_MVCC
bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false; bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false;
#else
bool fStaleMvccKey = false;
#endif
/* Check if the key already expired. This function is used when loading /* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was * an RDB file from disk, either at startup, or when an RDB was

View File

@ -1387,8 +1387,8 @@ dictType dbDictType = {
dictObjectDestructor /* val destructor */ dictObjectDestructor /* val destructor */
}; };
/* db->pdict, keys are sds strings, vals uints. */ /* db->pdict, keys are sds strings, vals are Redis objects. */
dictType dbDictTypeTombstone = { dictType dbTombstoneDictType = {
dictSdsHash, /* hash function */ dictSdsHash, /* hash function */
NULL, /* key dup */ NULL, /* key dup */
NULL, /* val dup */ NULL, /* val dup */
@ -1539,8 +1539,9 @@ void tryResizeHashTables(int dbid) {
* is returned. */ * is returned. */
int redisDbPersistentData::incrementallyRehash() { int redisDbPersistentData::incrementallyRehash() {
/* Keys dictionary */ /* Keys dictionary */
if (dictIsRehashing(m_pdict)) { if (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) {
dictRehashMilliseconds(m_pdict,1); dictRehashMilliseconds(m_pdict,1);
dictRehashMilliseconds(m_pdictTombstone,1);
return 1; /* already used our millisecond for this loop... */ return 1; /* already used our millisecond for this loop... */
} }
return 0; return 0;
@ -2219,11 +2220,22 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
CONFIG_BGSAVE_RETRY_DELAY || CONFIG_BGSAVE_RETRY_DELAY ||
g_pserver->lastbgsave_status == C_OK)) g_pserver->lastbgsave_status == C_OK))
{ {
// Ensure rehashing is complete
bool fRehashInProgress = false;
if (g_pserver->activerehashing) {
for (int idb = 0; idb < cserver.dbnum && !fRehashInProgress; ++idb) {
if (g_pserver->db[idb]->FRehashing())
fRehashInProgress = true;
}
}
if (!fRehashInProgress) {
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds); sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr; rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi); rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(rsiptr); rdbSaveBackground(rsiptr);
}
break; break;
} }
} }
@ -2312,6 +2324,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
} }
} }
run_with_period(100) {
bool fAnySnapshots = false; bool fAnySnapshots = false;
for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb) for (int idb = 0; idb < cserver.dbnum && !fAnySnapshots; ++idb)
fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot(); fAnySnapshots = fAnySnapshots || g_pserver->db[0]->FSnapshot();
@ -2321,6 +2334,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
g_pserver->db[0]->consolidate_snapshot(); g_pserver->db[0]->consolidate_snapshot();
}, true /*HiPri*/); }, true /*HiPri*/);
} }
}
/* Fire the cron loop modules event. */ /* Fire the cron loop modules event. */
RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,g_pserver->hz}; RedisModuleCronLoopV1 ei = {REDISMODULE_CRON_LOOP_VERSION,g_pserver->hz};
@ -2477,17 +2491,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
latencyAddSampleIfNeeded("storage-commit", commit_latency); latencyAddSampleIfNeeded("storage-commit", commit_latency);
handleClientsWithPendingWrites(iel, aof_state); handleClientsWithPendingWrites(iel, aof_state);
if (serverTL->gcEpoch != 0) if (!serverTL->gcEpoch.isReset())
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
serverTL->gcEpoch = 0; serverTL->gcEpoch.reset();
aeAcquireLock(); aeAcquireLock();
/* Close clients that need to be closed asynchronous */ /* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue(iel); freeClientsInAsyncFreeQueue(iel);
if (serverTL->gcEpoch != 0) if (!serverTL->gcEpoch.isReset())
g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/); g_pserver->garbageCollector.endEpoch(serverTL->gcEpoch, true /*fNoFree*/);
serverTL->gcEpoch = 0; serverTL->gcEpoch.reset();
/* Before we are going to sleep, let the threads access the dataset by /* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this * releasing the GIL. Redis main thread will not touch anything at this
@ -2503,7 +2517,7 @@ void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop); UNUSED(eventLoop);
if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/); if (moduleCount()) moduleAcquireGIL(TRUE /*fServerThread*/);
serverAssert(serverTL->gcEpoch == 0); serverAssert(serverTL->gcEpoch.isReset());
serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch(); serverTL->gcEpoch = g_pserver->garbageCollector.startEpoch();
aeAcquireLock(); aeAcquireLock();
for (int idb = 0; idb < cserver.dbnum; ++idb) for (int idb = 0; idb < cserver.dbnum; ++idb)
@ -5159,12 +5173,20 @@ sds genRedisInfoString(const char *section) {
} }
if (allsections || defsections || !strcasecmp(section,"keydb")) { if (allsections || defsections || !strcasecmp(section,"keydb")) {
// Compute the MVCC depth
int mvcc_depth = 0;
for (int idb = 0; idb < cserver.dbnum; ++idb) {
mvcc_depth = std::max(mvcc_depth, g_pserver->db[idb]->snapshot_depth());
}
if (sections++) info = sdscat(info,"\r\n"); if (sections++) info = sdscat(info,"\r\n");
info = sdscatprintf(info, info = sdscatprintf(info,
"# KeyDB\r\n" "# KeyDB\r\n"
"variant:pro\r\n" "variant:pro\r\n"
"license_status:%s\r\n", "license_status:%s\r\n"
cserver.license_key ? "OK" : "Trial" "mvcc_depth:%d\r\n",
cserver.license_key ? "OK" : "Trial",
mvcc_depth
); );
} }

View File

@ -877,7 +877,9 @@ typedef struct redisObject {
private: private:
mutable std::atomic<unsigned> refcount {0}; mutable std::atomic<unsigned> refcount {0};
public: public:
#ifdef ENABLE_MVCC
uint64_t mvcc_tstamp; uint64_t mvcc_tstamp;
#endif
void *m_ptr; void *m_ptr;
inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; }
@ -888,7 +890,11 @@ public:
void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); }
unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); }
} robj; } robj;
#ifdef ENABLE_MVCC
static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase"); static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase");
#else
static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase");
#endif
__attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o)
{ {
@ -1320,6 +1326,9 @@ public:
void setExpire(robj *key, robj *subkey, long long when); void setExpire(robj *key, robj *subkey, long long when);
void setExpire(expireEntry &&e); void setExpire(expireEntry &&e);
void initialize(); void initialize();
void prepOverwriteForSnapshot(char *key);
bool FRehashing() const { return dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone); }
void setStorageProvider(StorageCache *pstorage); void setStorageProvider(StorageCache *pstorage);
@ -1411,14 +1420,16 @@ private:
class redisDbPersistentDataSnapshot : protected redisDbPersistentData class redisDbPersistentDataSnapshot : protected redisDbPersistentData
{ {
friend class redisDbPersistentData; friend class redisDbPersistentData;
private:
bool iterate_threadsafe_core(std::function<bool(const char*, robj_roptr o)> &fn, bool fKeyOnly, bool fCacheOnly, bool fTop) const;
protected: protected:
bool m_fConsolidated = false;
static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot); static void gcDisposeSnapshot(redisDbPersistentDataSnapshot *psnapshot);
int snapshot_depth() const;
void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce); void consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce);
void freeTombstoneObjects(int depth); bool freeTombstoneObjects(int depth);
public: public:
int snapshot_depth() const;
bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; } bool FWillFreeChildDebug() const { return m_spdbSnapshotHOLDER != nullptr; }
bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false, bool fCacheOnly = false) const; bool iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly = false, bool fCacheOnly = false) const;
@ -1521,6 +1532,8 @@ struct redisDb : public redisDbPersistentDataSnapshot
using redisDbPersistentData::dictUnsafeKeyOnly; using redisDbPersistentData::dictUnsafeKeyOnly;
using redisDbPersistentData::resortExpire; using redisDbPersistentData::resortExpire;
using redisDbPersistentData::prefetchKeysAsync; using redisDbPersistentData::prefetchKeysAsync;
using redisDbPersistentData::prepOverwriteForSnapshot;
using redisDbPersistentData::FRehashing;
public: public:
expireset::setiter expireitr; expireset::setiter expireitr;
@ -1949,6 +1962,58 @@ struct clusterState;
#define MAX_EVENT_LOOPS 16 #define MAX_EVENT_LOOPS 16
#define IDX_EVENT_LOOP_MAIN 0 #define IDX_EVENT_LOOP_MAIN 0
class GarbageCollectorCollection
{
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollectorSnapshot;
GarbageCollector<ICollectable> garbageCollectorGeneric;
public:
struct Epoch
{
uint64_t epochSnapshot = 0;
uint64_t epochGeneric = 0;
void reset() {
epochSnapshot = 0;
epochGeneric = 0;
}
bool isReset() const {
return epochSnapshot == 0 && epochGeneric == 0;
}
};
Epoch startEpoch()
{
Epoch e;
e.epochSnapshot = garbageCollectorSnapshot.startEpoch();
e.epochGeneric = garbageCollectorGeneric.startEpoch();
return e;
}
void endEpoch(Epoch e, bool fNoFree = false)
{
garbageCollectorSnapshot.endEpoch(e.epochSnapshot, fNoFree);
garbageCollectorGeneric.endEpoch(e.epochGeneric, fNoFree);
}
void shutdown()
{
garbageCollectorSnapshot.shutdown();
garbageCollectorGeneric.shutdown();
}
void enqueue(Epoch e, std::unique_ptr<redisDbPersistentDataSnapshot> &&sp)
{
garbageCollectorSnapshot.enqueue(e.epochSnapshot, std::move(sp));
}
void enqueue(Epoch e, std::unique_ptr<ICollectable> &&sp)
{
garbageCollectorGeneric.enqueue(e.epochGeneric, std::move(sp));
}
};
// Per-thread variabels that may be accessed without a lock // Per-thread variabels that may be accessed without a lock
struct redisServerThreadVars { struct redisServerThreadVars {
aeEventLoop *el; aeEventLoop *el;
@ -1970,7 +2035,7 @@ struct redisServerThreadVars {
struct fastlock lockPendingWrite { "thread pending write" }; struct fastlock lockPendingWrite { "thread pending write" };
char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */
long unsigned commandsExecuted = 0; long unsigned commandsExecuted = 0;
uint64_t gcEpoch = 0; GarbageCollectorCollection::Epoch gcEpoch;
const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr; const redisDbPersistentDataSnapshot **rgdbSnapshot = nullptr;
bool fRetrySetAofEvent = false; bool fRetrySetAofEvent = false;
@ -2424,7 +2489,7 @@ struct redisServer {
/* System hardware info */ /* System hardware info */
size_t system_memory_size; /* Total memory in system as reported by OS */ size_t system_memory_size; /* Total memory in system as reported by OS */
GarbageCollector<redisDbPersistentDataSnapshot> garbageCollector; GarbageCollectorCollection garbageCollector;
IStorageFactory *m_pstorageFactory = nullptr; IStorageFactory *m_pstorageFactory = nullptr;
int storage_flush_period; // The time between flushes in the CRON job int storage_flush_period; // The time between flushes in the CRON job
@ -2553,7 +2618,7 @@ extern dictType zsetDictType;
extern dictType clusterNodesDictType; extern dictType clusterNodesDictType;
extern dictType clusterNodesBlackListDictType; extern dictType clusterNodesBlackListDictType;
extern dictType dbDictType; extern dictType dbDictType;
extern dictType dbDictTypeTombstone; extern dictType dbTombstoneDictType;
extern dictType dbSnapshotDictType; extern dictType dbSnapshotDictType;
extern dictType shaScriptObjectDictType; extern dictType shaScriptObjectDictType;
extern double R_Zero, R_PosInf, R_NegInf, R_Nan; extern double R_Zero, R_PosInf, R_NegInf, R_Nan;

View File

@ -2,6 +2,29 @@
#include "aelocker.h" #include "aelocker.h"
static const size_t c_elementsSmallLimit = 500000; static const size_t c_elementsSmallLimit = 500000;
static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time
class LazyFree : public ICollectable
{
public:
virtual ~LazyFree()
{
for (auto *de : vecde)
{
dbDictType.keyDestructor(nullptr, dictGetKey(de));
dbDictType.valDestructor(nullptr, dictGetVal(de));
zfree(de);
}
for (robj *o : vecobjLazyFree)
decrRefCount(o);
for (dict *d : vecdictLazyFree)
dictRelease(d);
}
std::vector<dict*> vecdictLazyFree;
std::vector<robj*> vecobjLazyFree;
std::vector<dictEntry*> vecde;
};
const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional) const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint64_t mvccCheckpoint, bool fOptional)
{ {
@ -70,6 +93,7 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot()); auto spdb = std::unique_ptr<redisDbPersistentDataSnapshot>(new (MALLOC_LOCAL) redisDbPersistentDataSnapshot());
dictRehashMilliseconds(m_pdict, 50); // Give us the best chance at a fast cleanup
spdb->m_fAllChanged = false; spdb->m_fAllChanged = false;
spdb->m_fTrackingChanges = 0; spdb->m_fTrackingChanges = 0;
spdb->m_pdict = m_pdict; spdb->m_pdict = m_pdict;
@ -90,8 +114,13 @@ const redisDbPersistentDataSnapshot *redisDbPersistentData::createSnapshot(uint6
spdb->m_setexpire->pause_rehash(); // needs to be const spdb->m_setexpire->pause_rehash(); // needs to be const
} }
if (dictIsRehashing(spdb->m_pdict) || dictIsRehashing(spdb->m_pdictTombstone)) {
serverLog(LL_NOTICE, "NOTICE: Suboptimal snapshot");
}
m_pdict = dictCreate(&dbDictType,this); m_pdict = dictCreate(&dbDictType,this);
m_pdictTombstone = dictCreate(&dbDictTypeTombstone, this); dictExpand(m_pdict, 1024); // minimize rehash overhead
m_pdictTombstone = dictCreate(&dbTombstoneDictType, this);
serverAssert(spdb->m_pdict->iterators == 1); serverAssert(spdb->m_pdict->iterators == 1);
@ -183,7 +212,18 @@ void redisDbPersistentData::restoreSnapshot(const redisDbPersistentDataSnapshot
void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot) void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot *psnapshot)
{ {
mstime_t latency; mstime_t latency;
aeAcquireLock(); latencyStartMonitor(latency);
aeAcquireLock();
while (dictIsRehashing(m_pdict) || dictIsRehashing(m_pdictTombstone)) {
dictRehashMilliseconds(m_pdict, 1);
dictRehashMilliseconds(m_pdictTombstone, 1);
// Give someone else a chance
aeReleaseLock();
usleep(300);
aeAcquireLock();
}
latencyStartMonitor(latency);
if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint) if (m_pdbSnapshotASYNC && m_pdbSnapshotASYNC->m_mvccCheckpoint <= psnapshot->m_mvccCheckpoint)
{ {
// Free a stale async snapshot so consolidate_children can clean it up later // Free a stale async snapshot so consolidate_children can clean it up later
@ -209,11 +249,22 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
auto psnapshotT = createSnapshot(LLONG_MAX, false); auto psnapshotT = createSnapshot(LLONG_MAX, false);
endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref endSnapshot(psnapshot); // this will just dec the ref count since our new snapshot has a ref
psnapshot = nullptr; psnapshot = nullptr;
aeReleaseLock(); latencyEndMonitor(latency);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("end-snapshot-async-phase-1", latency); latencyAddSampleIfNeeded("end-snapshot-async-phase-1", latency);
aeReleaseLock();
// do the expensive work of merging snapshots outside the ref // do the expensive work of merging snapshots outside the ref
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->freeTombstoneObjects(1); // depth is one because we just creted it if (const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->freeTombstoneObjects(1)) // depth is one because we just creted it
{
aeAcquireLock();
if (m_pdbSnapshotASYNC != nullptr)
endSnapshot(m_pdbSnapshotASYNC);
m_pdbSnapshotASYNC = nullptr;
endSnapshot(psnapshotT);
aeReleaseLock();
return;
}
const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->consolidate_children(this, true); const_cast<redisDbPersistentDataSnapshot*>(psnapshotT)->consolidate_children(this, true);
// Final Cleanup // Final Cleanup
@ -222,33 +273,80 @@ void redisDbPersistentData::endSnapshotAsync(const redisDbPersistentDataSnapshot
m_pdbSnapshotASYNC = psnapshotT; m_pdbSnapshotASYNC = psnapshotT;
else else
endSnapshot(psnapshotT); // finally clean up our temp snapshot endSnapshot(psnapshotT); // finally clean up our temp snapshot
aeReleaseLock(); latencyEndMonitor(latency);
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency); latencyAddSampleIfNeeded("end-snapshot-async-phase-2", latency);
aeReleaseLock();
} }
void redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth) bool redisDbPersistentDataSnapshot::freeTombstoneObjects(int depth)
{ {
if (m_pdbSnapshot == nullptr) if (m_pdbSnapshot == nullptr)
return; {
serverAssert(dictSize(m_pdictTombstone) == 0);
return true;
}
const_cast<redisDbPersistentDataSnapshot*>(m_pdbSnapshot)->freeTombstoneObjects(depth+1); if (!const_cast<redisDbPersistentDataSnapshot*>(m_pdbSnapshot)->freeTombstoneObjects(depth+1))
return false;
{
AeLocker ae;
ae.arm(nullptr);
if (m_pdbSnapshot->m_refCount != depth && (m_pdbSnapshot->m_refCount != (m_refCount+1))) if (m_pdbSnapshot->m_refCount != depth && (m_pdbSnapshot->m_refCount != (m_refCount+1)))
return; return false;
ae.disarm();
}
std::unique_lock<fastlock> lock(s_lock, std::defer_lock);
if (!lock.try_lock())
return false; // this is a best effort function
std::unique_ptr<LazyFree> splazy = std::make_unique<LazyFree>();
dict *dictTombstoneNew = dictCreate(&dbTombstoneDictType, nullptr);
dictIterator *di = dictGetIterator(m_pdictTombstone); dictIterator *di = dictGetIterator(m_pdictTombstone);
dictEntry *de; dictEntry *de;
std::vector<dictEntry*> vecdeFree;
vecdeFree.reserve(dictSize(m_pdictTombstone));
unsigned rgcremoved[2] = {0};
while ((de = dictNext(di)) != nullptr) while ((de = dictNext(di)) != nullptr)
{ {
dictEntry *deObj = dictFind(m_pdbSnapshot->m_pdict, dictGetKey(de)); dictEntry **dePrev = nullptr;
if (deObj != nullptr && dictGetVal(deObj) != nullptr) dictht *ht = nullptr;
sds key = (sds)dictGetKey(de);
// BUG BUG: Why can't we do a shallow search here?
dictEntry *deObj = dictFindWithPrev(m_pdbSnapshot->m_pdict, key, (uint64_t)dictGetVal(de), &dePrev, &ht, false);
if (deObj != nullptr)
{ {
decrRefCount((robj*)dictGetVal(deObj)); // Now unlink the DE
void *ptrSet = nullptr; __atomic_store(dePrev, &deObj->next, __ATOMIC_RELEASE);
__atomic_store(&deObj->v.val, &ptrSet, __ATOMIC_RELAXED); if (ht == &m_pdbSnapshot->m_pdict->ht[0])
rgcremoved[0]++;
else
rgcremoved[1]++;
splazy->vecde.push_back(deObj);
} else {
serverAssert(dictFind(m_pdbSnapshot->m_pdict, key) == nullptr);
serverAssert(m_pdbSnapshot->find_cached_threadsafe(key) != nullptr);
dictAdd(dictTombstoneNew, sdsdupshared((sds)dictGetKey(de)), dictGetVal(de));
} }
} }
dictReleaseIterator(di); dictReleaseIterator(di);
dictForceRehash(dictTombstoneNew);
aeAcquireLock();
dict *dT = m_pdbSnapshot->m_pdict;
splazy->vecdictLazyFree.push_back(m_pdictTombstone);
__atomic_store(&m_pdictTombstone, &dictTombstoneNew, __ATOMIC_RELEASE);
__atomic_fetch_sub(&dT->ht[0].used, rgcremoved[0], __ATOMIC_RELEASE);
__atomic_fetch_sub(&dT->ht[1].used, rgcremoved[1], __ATOMIC_RELEASE);
serverLog(LL_WARNING, "tombstones removed: %u, remain: %lu", rgcremoved[0]+rgcremoved[1], dictSize(m_pdictTombstone));
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy));
aeReleaseLock();
return true;
} }
void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot) void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psnapshot)
@ -299,15 +397,20 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
// Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB // Stage 1 Loop through all the tracked deletes and remove them from the snapshot DB
dictIterator *di = dictGetIterator(m_pdictTombstone); dictIterator *di = dictGetIterator(m_pdictTombstone);
dictEntry *de; dictEntry *de;
m_spdbSnapshotHOLDER->m_pdict->iterators++;
auto splazy = std::make_unique<LazyFree>();
while ((de = dictNext(di)) != NULL) while ((de = dictNext(di)) != NULL)
{ {
dictEntry **dePrev; dictEntry **dePrev;
dictht *ht; dictht *ht;
dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), &dePrev, &ht); // BUG BUG Why not a shallow search?
dictEntry *deSnapshot = dictFindWithPrev(m_spdbSnapshotHOLDER->m_pdict, dictGetKey(de), (uint64_t)dictGetVal(de), &dePrev, &ht, false /*!!sdsisshared((sds)dictGetKey(de))*/);
if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot) if (deSnapshot == nullptr && m_spdbSnapshotHOLDER->m_pdbSnapshot)
{ {
// The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt) // The tombstone is for a grand child, propogate it (or possibly in the storage provider - but an extra tombstone won't hurt)
#ifdef CHECKED_BUILD
serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr); serverAssert(m_spdbSnapshotHOLDER->m_pdbSnapshot->find_cached_threadsafe((const char*)dictGetKey(de)) != nullptr);
#endif
dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr); dictAdd(m_spdbSnapshotHOLDER->m_pdictTombstone, sdsdupshared((sds)dictGetKey(de)), nullptr);
continue; continue;
} }
@ -318,15 +421,16 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
} }
// Delete the object from the source dict, we don't use dictDelete to avoid a second search // Delete the object from the source dict, we don't use dictDelete to avoid a second search
dictFreeKey(m_spdbSnapshotHOLDER->m_pdict, deSnapshot); splazy->vecde.push_back(deSnapshot);
dictFreeVal(m_spdbSnapshotHOLDER->m_pdict, deSnapshot);
serverAssert(*dePrev == deSnapshot);
*dePrev = deSnapshot->next; *dePrev = deSnapshot->next;
zfree(deSnapshot);
ht->used--; ht->used--;
} }
m_spdbSnapshotHOLDER->m_pdict->iterators--;
dictReleaseIterator(di); dictReleaseIterator(di);
dictEmpty(m_pdictTombstone, nullptr); splazy->vecdictLazyFree.push_back(m_pdictTombstone);
m_pdictTombstone = dictCreate(&dbTombstoneDictType, nullptr);
// Stage 2 Move all new keys to the snapshot DB // Stage 2 Move all new keys to the snapshot DB
dictMerge(m_spdbSnapshotHOLDER->m_pdict, m_pdict); dictMerge(m_spdbSnapshotHOLDER->m_pdict, m_pdict);
@ -355,8 +459,10 @@ void redisDbPersistentData::endSnapshot(const redisDbPersistentDataSnapshot *psn
auto spsnapshotFree = std::move(m_spdbSnapshotHOLDER); auto spsnapshotFree = std::move(m_spdbSnapshotHOLDER);
m_spdbSnapshotHOLDER = std::move(spsnapshotFree->m_spdbSnapshotHOLDER); m_spdbSnapshotHOLDER = std::move(spsnapshotFree->m_spdbSnapshotHOLDER);
if (serverTL != nullptr) if (serverTL != nullptr) {
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(spsnapshotFree)); g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(spsnapshotFree));
g_pserver->garbageCollector.enqueue(serverTL->gcEpoch, std::move(splazy));
}
// Sanity Checks // Sanity Checks
serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr); serverAssert(m_spdbSnapshotHOLDER != nullptr || m_pdbSnapshot == nullptr);
@ -393,8 +499,10 @@ dict_iter redisDbPersistentDataSnapshot::random_cache_threadsafe(bool fPrimaryOn
dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const dict_iter redisDbPersistentData::find_cached_threadsafe(const char *key) const
{ {
dict *dictTombstone;
__atomic_load(&m_pdictTombstone, &dictTombstone, __ATOMIC_ACQUIRE);
dictEntry *de = dictFind(m_pdict, key); dictEntry *de = dictFind(m_pdict, key);
if (de == nullptr && m_pdbSnapshot != nullptr && dictFind(m_pdictTombstone, key) == nullptr) if (de == nullptr && m_pdbSnapshot != nullptr && dictFind(dictTombstone, key) == nullptr)
{ {
auto itr = m_pdbSnapshot->find_cached_threadsafe(key); auto itr = m_pdbSnapshot->find_cached_threadsafe(key);
if (itr != nullptr) if (itr != nullptr)
@ -460,11 +568,20 @@ unsigned long redisDbPersistentDataSnapshot::scan_threadsafe(unsigned long itera
} }
bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly, bool fCacheOnly) const bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const char*, robj_roptr o)> fn, bool fKeyOnly, bool fCacheOnly) const
{
return iterate_threadsafe_core(fn, fKeyOnly, fCacheOnly, true);
}
bool redisDbPersistentDataSnapshot::iterate_threadsafe_core(std::function<bool(const char*, robj_roptr o)> &fn, bool fKeyOnly, bool fCacheOnly, bool fFirst) const
{ {
// Take the size so we can ensure we visited every element exactly once // Take the size so we can ensure we visited every element exactly once
// use volatile to ensure it's not checked too late. This makes it more // use volatile to ensure it's not checked too late. This makes it more
// likely we'll detect races (but it won't gurantee it) // likely we'll detect races (but it won't gurantee it)
aeAcquireLock();
dict *dictTombstone;
__atomic_load(&m_pdictTombstone, &dictTombstone, __ATOMIC_ACQUIRE);
volatile ssize_t celem = (ssize_t)size(); volatile ssize_t celem = (ssize_t)size();
aeReleaseLock();
dictEntry *de = nullptr; dictEntry *de = nullptr;
bool fResult = true; bool fResult = true;
@ -510,19 +627,22 @@ bool redisDbPersistentDataSnapshot::iterate_threadsafe(std::function<bool(const
__atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE); __atomic_load(&m_pdbSnapshot, &psnapshot, __ATOMIC_ACQUIRE);
if (fResult && psnapshot != nullptr) if (fResult && psnapshot != nullptr)
{ {
fResult = psnapshot->iterate_threadsafe([this, &fn, &celem](const char *key, robj_roptr o) { std::function<bool(const char*, robj_roptr o)> fnNew = [this, &fn, &celem, dictTombstone](const char *key, robj_roptr o) {
dictEntry *deTombstone = dictFind(m_pdictTombstone, key); dictEntry *deTombstone = dictFind(dictTombstone, key);
if (deTombstone != nullptr) if (deTombstone != nullptr)
return true; return true;
// Alright it's a key in the use keyspace, lets ensure it and then pass it off // Alright it's a key in the use keyspace, lets ensure it and then pass it off
--celem; --celem;
return fn(key, o); return fn(key, o);
}, fKeyOnly, fCacheOnly); };
fResult = psnapshot->iterate_threadsafe_core(fnNew, fKeyOnly, fCacheOnly, false);
} }
// we should have hit all keys or had a good reason not to // we should have hit all keys or had a good reason not to
serverAssert(!fResult || celem == 0 || (m_spstorage && fCacheOnly)); if (!(!fResult || celem == 0 || (m_spstorage && fCacheOnly)))
serverLog(LL_WARNING, "celem: %ld", celem);
serverAssert(!fResult || celem == 0 || (m_spstorage && fCacheOnly) || !fFirst);
return fResult; return fResult;
} }
@ -538,11 +658,12 @@ void redisDbPersistentData::consolidate_snapshot()
{ {
aeAcquireLock(); aeAcquireLock();
auto psnapshot = (m_pdbSnapshot != nullptr) ? m_spdbSnapshotHOLDER.get() : nullptr; auto psnapshot = (m_pdbSnapshot != nullptr) ? m_spdbSnapshotHOLDER.get() : nullptr;
if (psnapshot == nullptr) if (psnapshot == nullptr || psnapshot->snapshot_depth() == 0)
{ {
aeReleaseLock(); aeReleaseLock();
return; return;
} }
psnapshot->m_refCount++; // ensure it's not free'd psnapshot->m_refCount++; // ensure it's not free'd
aeReleaseLock(); aeReleaseLock();
psnapshot->consolidate_children(this, false /* fForce */); psnapshot->consolidate_children(this, false /* fForce */);
@ -554,8 +675,6 @@ void redisDbPersistentData::consolidate_snapshot()
// only call this on the "real" database to consolidate the first child // only call this on the "real" database to consolidate the first child
void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce) void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *pdbPrimary, bool fForce)
{ {
static fastlock s_lock {"consolidate_children"}; // this lock ensures only one thread is consolidating at a time
std::unique_lock<fastlock> lock(s_lock, std::defer_lock); std::unique_lock<fastlock> lock(s_lock, std::defer_lock);
if (!lock.try_lock()) if (!lock.try_lock())
return; // this is a best effort function return; // this is a best effort function
@ -615,7 +734,6 @@ void redisDbPersistentDataSnapshot::consolidate_children(redisDbPersistentData *
serverLog(LL_VERBOSE, "cleaned %d snapshots", snapshot_depth()-1); serverLog(LL_VERBOSE, "cleaned %d snapshots", snapshot_depth()-1);
spdb->m_refCount = depth; spdb->m_refCount = depth;
spdb->m_fConsolidated = true;
// Drop our refs from this snapshot and its children // Drop our refs from this snapshot and its children
psnapshotT = this; psnapshotT = this;
std::vector<redisDbPersistentDataSnapshot*> vecT; std::vector<redisDbPersistentDataSnapshot*> vecT;