diff --git a/src/Makefile b/src/Makefile index f3c72db27..038ac51f8 100644 --- a/src/Makefile +++ b/src/Makefile @@ -47,11 +47,6 @@ endif USEASM?=true -ifeq ($(NOMVCC),) - CFLAGS+= -DENABLE_MVCC - CXXFLAGS+= -DENABLE_MVCC -endif - ifneq ($(SANITIZE),) CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE diff --git a/src/aof.cpp b/src/aof.cpp index e8d4930b8..e37bc67d6 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -1426,7 +1426,8 @@ int rewriteAppendOnlyFileRio(rio *aof) { /* Iterate this DB writing every entry */ while((de = dictNext(di)) != NULL) { sds keystr; - robj key, *o; + redisObjectStack key; + robj *o = nullptr; keystr = (sds)dictGetKey(de); o = (robj*)dictGetVal(de); diff --git a/src/db.cpp b/src/db.cpp index 33eae9fc3..060e64b16 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -92,11 +92,9 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) { updateDbValAccess(de, flags); -#ifdef ENABLE_MVCC if (flags & LOOKUP_UPDATEMVCC) { - val->mvcc_tstamp = getMvccTstamp(); + setMvccTstamp(val, getMvccTstamp()); } -#endif return val; } else { return NULL; @@ -208,9 +206,9 @@ int dbAddCore(redisDb *db, robj *key, robj *val) { serverAssert(!val->FExpires()); sds copy = sdsdup(szFromObj(key)); int retval = dictAdd(db->pdict, copy, val); -#ifdef ENABLE_MVCC - val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); -#endif + uint64_t mvcc = getMvccTstamp(); + setMvccTstamp(key, mvcc); + setMvccTstamp(val, mvcc); if (retval == DICT_OK) { @@ -260,9 +258,7 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd if (fUpdateMvcc) { if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT) val = dupStringObject(val); -#ifdef ENABLE_MVCC - val->mvcc_tstamp = getMvccTstamp(); -#endif + setMvccTstamp(val, getMvccTstamp()); } dictSetVal(db->pdict, de, val); @@ -296,14 +292,12 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) if (de == nullptr) return (dbAddCore(db, key, val) == DICT_OK); -#ifdef ENABLE_MVCC robj *old = (robj*)dictGetVal(de); - if (old->mvcc_tstamp <= val->mvcc_tstamp) + if (mvccFromObj(old) <= mvccFromObj(val)) { dbOverwriteCore(db, de, key, val, false, true); return true; } -#endif return false; } @@ -1494,7 +1488,6 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey) { robj *argv[3]; - robj objT; redisCommand *cmd = nullptr; switch (type) { diff --git a/src/defrag.cpp b/src/defrag.cpp index 3547cc4da..7463a818e 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -55,7 +55,8 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); * returns NULL in case the allocatoin wasn't moved. * when it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ -void* activeDefragAlloc(void *ptr) { +template +TPTR* activeDefragAlloc(TPTR *ptr) { size_t size; void *newptr; if(!je_get_defrag_hint(ptr)) { @@ -70,7 +71,14 @@ void* activeDefragAlloc(void *ptr) { newptr = zmalloc_no_tcache(size); memcpy(newptr, ptr, size); zfree_no_tcache(ptr); - return newptr; + return (TPTR*)newptr; +} + +template<> +robj* activeDefragAlloc(robj *o) { + void *pvSrc = allocPtrFromObj(o); + void *pvDst = activeDefragAlloc(pvSrc); + return objFromAllocPtr(pvDst); } /*Defrag helper for sds strings diff --git a/src/expire.cpp b/src/expire.cpp index b7f648117..c7d12e3bc 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -80,7 +80,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { robj *val = (robj*)dictGetVal(de); int deleted = 0; - robj objKey; + redisObjectStack objKey; initStaticStringObject(objKey, (char*)e.key()); bool fTtlChanged = false; @@ -145,7 +145,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) { serverAssert(false); } - robj objSubkey; + redisObjectStack objSubkey; initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get()); propagateSubkeyExpire(db, val->type, &objKey, &objSubkey); diff --git a/src/networking.cpp b/src/networking.cpp index b5610763c..f881dd1a6 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -56,7 +56,7 @@ size_t getStringObjectSdsUsedMemory(robj *o) { serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); switch(o->encoding) { case OBJ_ENCODING_RAW: return sdsZmallocSize((sds)ptrFromObj(o)); - case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj); + case OBJ_ENCODING_EMBSTR: return zmalloc_size(allocPtrFromObj(o))-sizeof(robj); default: return 0; /* Just integer encoding for now. */ } } diff --git a/src/object.cpp b/src/object.cpp index 2c763d0b4..4b32c5a4d 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -41,14 +41,15 @@ /* ===================== Creation and parsing of objects ==================== */ robj *createObject(int type, void *ptr) { - robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED); + size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0; + char *oB = (char*)zcalloc(sizeof(robj)+mvccExtraBytes, MALLOC_SHARED); + robj *o = reinterpret_cast(oB + mvccExtraBytes); + o->type = type; o->encoding = OBJ_ENCODING_RAW; o->m_ptr = ptr; o->setrefcount(1); -#ifdef ENABLE_MVCC - o->mvcc_tstamp = OBJ_MVCC_INVALID; -#endif + setMvccTstamp(o, OBJ_MVCC_INVALID); /* Set the LRU to the current lruclock (minutes resolution), or * alternatively the LFU counter. */ @@ -97,15 +98,16 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { size_t allocsize = sizeof(struct sdshdr8)+len+1; if (allocsize < sizeof(void*)) allocsize = sizeof(void*); - robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED); + + size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0; + char *oB = (char*)zcalloc(sizeof(robj)+allocsize-sizeof(redisObject::m_ptr)+mvccExtraBytes, MALLOC_SHARED); + robj *o = reinterpret_cast(oB + mvccExtraBytes); struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr); o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; o->setrefcount(1); -#ifdef ENABLE_MVCC - o->mvcc_tstamp = OBJ_MVCC_INVALID; -#endif + setMvccTstamp(o, OBJ_MVCC_INVALID); if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) { o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; @@ -133,11 +135,7 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { * * The current limit of 52 is chosen so that the biggest string object * we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */ -#ifdef ENABLE_MVCC -#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48 -#else -#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 256 -#endif +#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 52 //static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total"); robj *createStringObject(const char *ptr, size_t len) { @@ -399,7 +397,11 @@ void decrRefCount(robj_roptr o) { case OBJ_CRON: freeCronObject(o); break; default: serverPanic("Unknown object type"); break; } - zfree(o.unsafe_robjcast()); + if (g_pserver->fActiveReplica) { + zfree(reinterpret_cast(o.unsafe_robjcast())-1); + } else { + zfree(o.unsafe_robjcast()); + } } else { if (prev <= 0) serverPanic("decrRefCount against refcount <= 0"); } @@ -1326,12 +1328,11 @@ NULL * because we update the access time only * when the key is read or overwritten. */ addReplyLongLong(c,LFUDecrAndReturn(o)); -#ifdef ENABLE_MVCC } else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) { if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp])) == NULL) return; - addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000); -#endif + uint64_t mvcc = mvccFromObj(o); + addReplyLongLong(c, (g_pserver->mstime - (mvcc >> MVCC_MS_SHIFT)) / 1000); } else { addReplySubcommandSyntaxError(c); } @@ -1511,3 +1512,39 @@ void redisObject::setrefcount(unsigned ref) serverAssert(!FExpires()); refcount.store(ref, std::memory_order_relaxed); } + +redisObjectStack::redisObjectStack() +{ + // We need to ensure the Extended Object is first in the class layout + serverAssert(reinterpret_cast(static_cast(this)) != reinterpret_cast(this)); +} + +void *allocPtrFromObj(robj_roptr o) { + if (g_pserver->fActiveReplica) + return reinterpret_cast(o.unsafe_robjcast()) - 1; + return o.unsafe_robjcast(); +} + +robj *objFromAllocPtr(void *pv) { + if (g_pserver->fActiveReplica) { + return reinterpret_cast(reinterpret_cast(pv)+1); + } + return reinterpret_cast(pv); +} + +uint64_t mvccFromObj(robj_roptr o) +{ + if (g_pserver->fActiveReplica) { + redisObjectExtended *oe = reinterpret_cast(o.unsafe_robjcast()) - 1; + return oe->mvcc_tstamp; + } + return OBJ_MVCC_INVALID; +} + +void setMvccTstamp(robj *o, uint64_t mvcc) +{ + if (!g_pserver->fActiveReplica) + return; + redisObjectExtended *oe = reinterpret_cast(o) - 1; + oe->mvcc_tstamp = mvcc; +} \ No newline at end of file diff --git a/src/rdb.cpp b/src/rdb.cpp index 7876ab00f..71ff6030c 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1089,10 +1089,10 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) { } char szT[32]; -#ifdef ENABLE_MVCC - snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); - if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; -#endif + if (g_pserver->fActiveReplica) { + snprintf(szT, 32, "%" PRIu64, mvccFromObj(val)); + if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; + } /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; @@ -1146,7 +1146,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o) { - robj key; + redisObjectStack key; initStaticStringObject(key,(char*)keystr); expireEntry *pexpire = getExpire(db, &key); @@ -1999,7 +1999,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { exit(1); } RedisModuleIO io; - robj keyobj; + redisObjectStack keyobj; initStaticStringObject(keyobj,key); moduleInitIOContext(io,mt,rdb,&keyobj); io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2; @@ -2048,9 +2048,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) { return NULL; } -#ifdef ENABLE_MVCC - o->mvcc_tstamp = mvcc_tstamp; -#endif + setMvccTstamp(o, mvcc_tstamp); serverAssert(!o->FExpires()); return o; } @@ -2318,7 +2316,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } } else { - redisObject keyobj; + redisObjectStack keyobj; initStaticStringObject(keyobj,key); setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); decrRefCount(subexpireKey); @@ -2402,18 +2400,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { key = nullptr; goto eoferr; } -#ifdef ENABLE_MVCC - bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false; -#else - bool fStaleMvccKey = false; -#endif + bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the replica. */ - robj keyobj; + redisObjectStack keyobj; initStaticStringObject(keyobj,key); bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now; if (fStaleMvccKey || fExpiredKey) { diff --git a/src/server.h b/src/server.h index 9527bab1c..caed71ee7 100644 --- a/src/server.h +++ b/src/server.h @@ -799,7 +799,16 @@ typedef struct RedisModuleDigest { #define MVCC_MS_SHIFT 20 -typedef struct redisObject { +// This struct will be allocated ahead of the ROBJ when needed +struct redisObjectExtended { + uint64_t mvcc_tstamp; +}; + +typedef class redisObject { +protected: + redisObject() {} + +public: unsigned type:4; unsigned encoding:4; unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or @@ -808,9 +817,6 @@ typedef struct redisObject { private: mutable std::atomic refcount {0}; public: -#ifdef ENABLE_MVCC - uint64_t mvcc_tstamp; -#endif void *m_ptr; inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; } @@ -821,11 +827,18 @@ public: void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); } unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); } } robj; -#ifdef ENABLE_MVCC -static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase"); -#else static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase"); -#endif + +class redisObjectStack : public redisObjectExtended, public redisObject +{ +public: + redisObjectStack(); +}; + +uint64_t mvccFromObj(robj_roptr o); +void setMvccTstamp(redisObject *o, uint64_t mvcc); +void *allocPtrFromObj(robj_roptr o); +robj *objFromAllocPtr(void *pv); __attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o) {