Merge branch 'unstable' into keydbpro

Former-commit-id: ae482585f0dc470efd73833f74111c2f87a172c5
This commit is contained in:
John Sully 2020-08-15 21:29:00 +00:00
commit db193a1ef1
8 changed files with 52 additions and 3 deletions

View File

@ -1,6 +1,5 @@
![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg) ![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg)
![CI](https://github.com/JohnSully/KeyDB/workflows/CI/badge.svg?branch=unstable) ![CI](https://github.com/JohnSully/KeyDB/workflows/CI/badge.svg?branch=unstable)
[![Join the chat at https://gitter.im/KeyDB/community](https://badges.gitter.im/KeyDB/community.svg)](https://gitter.im/KeyDB/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)
[![StackShare](http://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/eq-alpha-technology-inc/eq-alpha-technology-inc) [![StackShare](http://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/eq-alpha-technology-inc/eq-alpha-technology-inc)
##### New! Want to extend KeyDB with Javascript? Try [ModJS](https://github.com/JohnSully/ModJS) ##### New! Want to extend KeyDB with Javascript? Try [ModJS](https://github.com/JohnSully/ModJS)

View File

@ -47,6 +47,11 @@ endif
USEASM?=true USEASM?=true
ifeq ($(NOMVCC),)
CFLAGS+= -DENABLE_MVCC
CXXFLAGS+= -DENABLE_MVCC
endif
ifneq ($(SANITIZE),) ifneq ($(SANITIZE),)
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE

View File

@ -261,9 +261,11 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
if (fSynchronous) if (fSynchronous)
{ {
{
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock); std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock); cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval; ret = cmd.pctl->rval;
}
delete cmd.pctl; delete cmd.pctl;
} }
@ -315,9 +317,11 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
int ret = AE_OK; int ret = AE_OK;
if (fSynchronous) if (fSynchronous)
{ {
{
std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock); std::unique_lock<std::mutex> ulock(cmd.pctl->mutexcv, std::adopt_lock);
cmd.pctl->cv.wait(ulock); cmd.pctl->cv.wait(ulock);
ret = cmd.pctl->rval; ret = cmd.pctl->rval;
}
delete cmd.pctl; delete cmd.pctl;
} }
return ret; return ret;

View File

@ -4920,9 +4920,11 @@ void createDumpPayload(rio *payload, robj_roptr o, robj *key) {
rioInitWithBuffer(payload,sdsempty()); rioInitWithBuffer(payload,sdsempty());
serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObjectType(payload,o));
serverAssert(rdbSaveObject(payload,o,key)); serverAssert(rdbSaveObject(payload,o,key));
#ifdef ENABLE_MVCC
char szT[32]; char szT[32];
snprintf(szT, 32, "%" PRIu64, o->mvcc_tstamp); snprintf(szT, 32, "%" PRIu64, o->mvcc_tstamp);
serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1); serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1);
#endif
/* Write the footer, this is how it looks like: /* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+ * ----------------+---------------------+---------------+
@ -5064,9 +5066,11 @@ void restoreCommand(client *c) {
decrRefCount(auxkey); decrRefCount(auxkey);
goto eoferr; goto eoferr;
} }
#ifdef ENABLE_MVCC
if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) {
obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
} }
#endif
decrRefCount(auxkey); decrRefCount(auxkey);
decrRefCount(auxval); decrRefCount(auxval);
} }

View File

@ -91,7 +91,9 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) {
robj *val = itr.val(); robj *val = itr.val();
lookupKeyUpdateObj(val, flags); lookupKeyUpdateObj(val, flags);
if (flags & LOOKUP_UPDATEMVCC) { if (flags & LOOKUP_UPDATEMVCC) {
#ifdef ENABLE_MVCC
val->mvcc_tstamp = getMvccTstamp(); val->mvcc_tstamp = getMvccTstamp();
#endif
db->trackkey(key, true /* fUpdate */); db->trackkey(key, true /* fUpdate */);
} }
return val; return val;
@ -218,8 +220,10 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
bool dbAddCore(redisDb *db, robj *key, robj *val, bool fAssumeNew = false) { bool dbAddCore(redisDb *db, robj *key, robj *val, bool fAssumeNew = false) {
serverAssert(!val->FExpires()); serverAssert(!val->FExpires());
sds copy = sdsdupshared(szFromObj(key)); sds copy = sdsdupshared(szFromObj(key));
#ifdef ENABLE_MVCC
if (g_pserver->fActiveReplica) if (g_pserver->fActiveReplica)
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
#endif
bool fInserted = db->insert(copy, val, fAssumeNew); bool fInserted = db->insert(copy, val, fAssumeNew);
@ -270,7 +274,9 @@ void redisDb::dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpd
if (fUpdateMvcc) { if (fUpdateMvcc) {
if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
val = dupStringObject(val); val = dupStringObject(val);
#ifdef ENABLE_MVCC
val->mvcc_tstamp = getMvccTstamp(); val->mvcc_tstamp = getMvccTstamp();
#endif
} }
if (g_pserver->lazyfree_lazy_server_del) if (g_pserver->lazyfree_lazy_server_del)
@ -303,12 +309,14 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
if (itr == nullptr) if (itr == nullptr)
return (dbAddCore(db, key, val) == true); return (dbAddCore(db, key, val) == true);
#ifdef ENABLE_MVCC
robj *old = itr.val(); robj *old = itr.val();
if (old->mvcc_tstamp <= val->mvcc_tstamp) if (old->mvcc_tstamp <= val->mvcc_tstamp)
{ {
db->dbOverwriteCore(itr, key, val, false, true); db->dbOverwriteCore(itr, key, val, false, true);
return true; return true;
} }
#endif
return false; return false;
} }
@ -2471,7 +2479,9 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde)
sdsfree(strT); sdsfree(strT);
dictAdd(m_pdict, keyNew, objNew); dictAdd(m_pdict, keyNew, objNew);
serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1);
#ifdef ENABLE_MVCC
serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp); serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp);
#endif
} }
} }
else else

View File

@ -46,7 +46,9 @@ robj *createObject(int type, void *ptr) {
o->encoding = OBJ_ENCODING_RAW; o->encoding = OBJ_ENCODING_RAW;
o->m_ptr = ptr; o->m_ptr = ptr;
o->setrefcount(1); o->setrefcount(1);
#ifdef ENABLE_MVCC
o->mvcc_tstamp = OBJ_MVCC_INVALID; o->mvcc_tstamp = OBJ_MVCC_INVALID;
#endif
/* Set the LRU to the current lruclock (minutes resolution), or /* Set the LRU to the current lruclock (minutes resolution), or
* alternatively the LFU counter. */ * alternatively the LFU counter. */
@ -101,7 +103,9 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
o->type = OBJ_STRING; o->type = OBJ_STRING;
o->encoding = OBJ_ENCODING_EMBSTR; o->encoding = OBJ_ENCODING_EMBSTR;
o->setrefcount(1); o->setrefcount(1);
#ifdef ENABLE_MVCC
o->mvcc_tstamp = OBJ_MVCC_INVALID; o->mvcc_tstamp = OBJ_MVCC_INVALID;
#endif
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
@ -129,8 +133,13 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
* *
* The current limit of 52 is chosen so that the biggest string object * The current limit of 52 is chosen so that the biggest string object
* we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
#ifdef ENABLE_MVCC
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48
static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); #else
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 256
#endif
//static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total");
robj *createStringObject(const char *ptr, size_t len) { robj *createStringObject(const char *ptr, size_t len) {
if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT)
return createEmbeddedStringObject(ptr,len); return createEmbeddedStringObject(ptr,len);
@ -1316,10 +1325,12 @@ NULL
* because we update the access time only * because we update the access time only
* when the key is read or overwritten. */ * when the key is read or overwritten. */
addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast())); addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast()));
#ifdef ENABLE_MVCC
} else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) { } else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) {
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
== nullptr) return; == nullptr) return;
addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000); addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000);
#endif
} else { } else {
addReplySubcommandSyntaxError(c); addReplySubcommandSyntaxError(c);
} }
@ -1579,9 +1590,11 @@ robj *deserializeStoredObjectCore(const void *data, size_t cb)
decrRefCount(auxkey); decrRefCount(auxkey);
goto eoferr; goto eoferr;
} }
#ifdef ENABLE_MVCC
if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) {
obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
} }
#endif
decrRefCount(auxkey); decrRefCount(auxkey);
decrRefCount(auxval); decrRefCount(auxval);
} }

View File

@ -1092,8 +1092,10 @@ int rdbSaveKeyValuePair(rio *rdb, robj_roptr key, robj_roptr val, const expireEn
} }
char szT[32]; char szT[32];
#ifdef ENABLE_MVCC
snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp);
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1;
#endif
/* Save type, key, value */ /* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveObjectType(rdb,val) == -1) return -1;
@ -2131,7 +2133,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
return NULL; return NULL;
} }
#ifdef ENABLE_MVCC
o->mvcc_tstamp = mvcc_tstamp; o->mvcc_tstamp = mvcc_tstamp;
#endif
serverAssert(!o->FExpires()); serverAssert(!o->FExpires());
return o; return o;
} }
@ -2489,7 +2493,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
key = nullptr; key = nullptr;
goto eoferr; goto eoferr;
} }
#ifdef ENABLE_MVCC
bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false; bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false;
#else
bool fStaleMvccKey = false;
#endif
/* Check if the key already expired. This function is used when loading /* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was * an RDB file from disk, either at startup, or when an RDB was

View File

@ -877,7 +877,9 @@ typedef struct redisObject {
private: private:
mutable std::atomic<unsigned> refcount {0}; mutable std::atomic<unsigned> refcount {0};
public: public:
#ifdef ENABLE_MVCC
uint64_t mvcc_tstamp; uint64_t mvcc_tstamp;
#endif
void *m_ptr; void *m_ptr;
inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; }
@ -888,7 +890,11 @@ public:
void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); }
unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); }
} robj; } robj;
#ifdef ENABLE_MVCC
static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase"); static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase");
#else
static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase");
#endif
__attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o)
{ {