diff --git a/README.md b/README.md index aa10a47dc..0c91bd76d 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,5 @@ ![Current Release](https://img.shields.io/github/release/JohnSully/KeyDB.svg) ![CI](https://github.com/JohnSully/KeyDB/workflows/CI/badge.svg?branch=unstable) -[![Join the chat at https://gitter.im/KeyDB/community](https://badges.gitter.im/KeyDB/community.svg)](https://gitter.im/KeyDB/community?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) [![StackShare](http://img.shields.io/badge/tech-stack-0690fa.svg?style=flat)](https://stackshare.io/eq-alpha-technology-inc/eq-alpha-technology-inc) ##### New! Want to extend KeyDB with Javascript? Try [ModJS](https://github.com/JohnSully/ModJS) diff --git a/src/Makefile b/src/Makefile index ce3ab9ab9..d21a2052f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -47,6 +47,11 @@ endif USEASM?=true +ifeq ($(NOMVCC),) + CFLAGS+= -DENABLE_MVCC + CXXFLAGS+= -DENABLE_MVCC +endif + ifneq ($(SANITIZE),) CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE diff --git a/src/ae.cpp b/src/ae.cpp index cccf2a130..c67ff803e 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -261,9 +261,11 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, if (fSynchronous) { + { std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; + } delete cmd.pctl; } @@ -315,9 +317,11 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch int ret = AE_OK; if (fSynchronous) { + { std::unique_lock ulock(cmd.pctl->mutexcv, std::adopt_lock); cmd.pctl->cv.wait(ulock); ret = cmd.pctl->rval; + } delete cmd.pctl; } return ret; diff --git a/src/cluster.cpp b/src/cluster.cpp index 3a20c9897..a9e1cc9ef 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -4920,9 +4920,11 @@ void createDumpPayload(rio *payload, robj_roptr o, robj *key) { rioInitWithBuffer(payload,sdsempty()); serverAssert(rdbSaveObjectType(payload,o)); serverAssert(rdbSaveObject(payload,o,key)); +#ifdef ENABLE_MVCC char szT[32]; snprintf(szT, 32, "%" PRIu64, o->mvcc_tstamp); serverAssert(rdbSaveAuxFieldStrStr(payload,"mvcc-tstamp", szT) != -1); +#endif /* Write the footer, this is how it looks like: * ----------------+---------------------+---------------+ @@ -5064,9 +5066,11 @@ void restoreCommand(client *c) { decrRefCount(auxkey); goto eoferr; } +#ifdef ENABLE_MVCC if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); } +#endif decrRefCount(auxkey); decrRefCount(auxval); } diff --git a/src/db.cpp b/src/db.cpp index b91041f66..12c23ca21 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -91,7 +91,9 @@ static robj* lookupKey(redisDb *db, robj *key, int flags) { robj *val = itr.val(); lookupKeyUpdateObj(val, flags); if (flags & LOOKUP_UPDATEMVCC) { +#ifdef ENABLE_MVCC val->mvcc_tstamp = getMvccTstamp(); +#endif db->trackkey(key, true /* fUpdate */); } return val; @@ -218,8 +220,10 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { bool dbAddCore(redisDb *db, robj *key, robj *val, bool fAssumeNew = false) { serverAssert(!val->FExpires()); sds copy = sdsdupshared(szFromObj(key)); +#ifdef ENABLE_MVCC if (g_pserver->fActiveReplica) val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); +#endif bool fInserted = db->insert(copy, val, fAssumeNew); @@ -270,7 +274,9 @@ void redisDb::dbOverwriteCore(redisDb::iter itr, robj *key, robj *val, bool fUpd if (fUpdateMvcc) { if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) val = dupStringObject(val); +#ifdef ENABLE_MVCC val->mvcc_tstamp = getMvccTstamp(); +#endif } if (g_pserver->lazyfree_lazy_server_del) @@ -303,13 +309,15 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) if (itr == nullptr) return (dbAddCore(db, key, val) == true); +#ifdef ENABLE_MVCC robj *old = itr.val(); if (old->mvcc_tstamp <= val->mvcc_tstamp) { db->dbOverwriteCore(itr, key, val, false, true); return true; } - +#endif + return false; } else @@ -2471,7 +2479,9 @@ void redisDbPersistentData::ensure(const char *sdsKey, dictEntry **pde) sdsfree(strT); dictAdd(m_pdict, keyNew, objNew); serverAssert(objNew->getrefcount(std::memory_order_relaxed) == 1); +#ifdef ENABLE_MVCC serverAssert(objNew->mvcc_tstamp == itr.val()->mvcc_tstamp); +#endif } } else diff --git a/src/object.cpp b/src/object.cpp index a45a91db8..382b40c02 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -46,7 +46,9 @@ robj *createObject(int type, void *ptr) { o->encoding = OBJ_ENCODING_RAW; o->m_ptr = ptr; o->setrefcount(1); +#ifdef ENABLE_MVCC o->mvcc_tstamp = OBJ_MVCC_INVALID; +#endif /* Set the LRU to the current lruclock (minutes resolution), or * alternatively the LFU counter. */ @@ -101,7 +103,9 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; o->setrefcount(1); +#ifdef ENABLE_MVCC o->mvcc_tstamp = OBJ_MVCC_INVALID; +#endif if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; @@ -129,8 +133,13 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { * * The current limit of 52 is chosen so that the biggest string object * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ +#ifdef ENABLE_MVCC #define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 -static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); +#else +#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 256 +#endif + +//static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); robj *createStringObject(const char *ptr, size_t len) { if (len <= OBJ_ENCODING_EMBSTR_SIZE_LIMIT) return createEmbeddedStringObject(ptr,len); @@ -1316,10 +1325,12 @@ NULL * because we update the access time only * when the key is read or overwritten. */ addReplyLongLong(c,LFUDecrAndReturn(o.unsafe_robjcast())); +#ifdef ENABLE_MVCC } else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) == nullptr) return; addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000); +#endif } else { addReplySubcommandSyntaxError(c); } @@ -1579,9 +1590,11 @@ robj *deserializeStoredObjectCore(const void *data, size_t cb) decrRefCount(auxkey); goto eoferr; } +#ifdef ENABLE_MVCC if (strcasecmp(szFromObj(auxkey), "mvcc-tstamp") == 0) { obj->mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); } +#endif decrRefCount(auxkey); decrRefCount(auxval); } diff --git a/src/rdb.cpp b/src/rdb.cpp index 729fd0f59..0ed4febf1 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1092,8 +1092,10 @@ int rdbSaveKeyValuePair(rio *rdb, robj_roptr key, robj_roptr val, const expireEn } char szT[32]; +#ifdef ENABLE_MVCC snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; +#endif /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; @@ -2131,7 +2133,9 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { return NULL; } +#ifdef ENABLE_MVCC o->mvcc_tstamp = mvcc_tstamp; +#endif serverAssert(!o->FExpires()); return o; } @@ -2489,7 +2493,11 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { key = nullptr; goto eoferr; } +#ifdef ENABLE_MVCC bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false; +#else + bool fStaleMvccKey = false; +#endif /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was diff --git a/src/server.h b/src/server.h index 86dcb872f..597ac8310 100644 --- a/src/server.h +++ b/src/server.h @@ -877,7 +877,9 @@ typedef struct redisObject { private: mutable std::atomic refcount {0}; public: +#ifdef ENABLE_MVCC uint64_t mvcc_tstamp; +#endif void *m_ptr; inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } @@ -888,7 +890,11 @@ public: void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } } robj; +#ifdef ENABLE_MVCC static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase"); +#else +static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase"); +#endif __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) {