Avoid memory overhead in situations it won't be used (active rep only)
Former-commit-id: 8f7d977f9068904eca783fefbb24df4d8abd4214
This commit is contained in:
parent
ced0413189
commit
bf6569afb3
@ -47,11 +47,6 @@ endif
|
|||||||
|
|
||||||
USEASM?=true
|
USEASM?=true
|
||||||
|
|
||||||
ifeq ($(NOMVCC),)
|
|
||||||
CFLAGS+= -DENABLE_MVCC
|
|
||||||
CXXFLAGS+= -DENABLE_MVCC
|
|
||||||
endif
|
|
||||||
|
|
||||||
ifneq ($(SANITIZE),)
|
ifneq ($(SANITIZE),)
|
||||||
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
|
CFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
|
||||||
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
|
CXXFLAGS+= -fsanitize=$(SANITIZE) -DSANITIZE
|
||||||
|
@ -1426,7 +1426,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
|
|||||||
/* Iterate this DB writing every entry */
|
/* Iterate this DB writing every entry */
|
||||||
while((de = dictNext(di)) != NULL) {
|
while((de = dictNext(di)) != NULL) {
|
||||||
sds keystr;
|
sds keystr;
|
||||||
robj key, *o;
|
redisObjectStack key;
|
||||||
|
robj *o = nullptr;
|
||||||
|
|
||||||
keystr = (sds)dictGetKey(de);
|
keystr = (sds)dictGetKey(de);
|
||||||
o = (robj*)dictGetVal(de);
|
o = (robj*)dictGetVal(de);
|
||||||
|
19
src/db.cpp
19
src/db.cpp
@ -92,11 +92,9 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) {
|
|||||||
|
|
||||||
updateDbValAccess(de, flags);
|
updateDbValAccess(de, flags);
|
||||||
|
|
||||||
#ifdef ENABLE_MVCC
|
|
||||||
if (flags & LOOKUP_UPDATEMVCC) {
|
if (flags & LOOKUP_UPDATEMVCC) {
|
||||||
val->mvcc_tstamp = getMvccTstamp();
|
setMvccTstamp(val, getMvccTstamp());
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
return val;
|
return val;
|
||||||
} else {
|
} else {
|
||||||
return NULL;
|
return NULL;
|
||||||
@ -208,9 +206,9 @@ int dbAddCore(redisDb *db, robj *key, robj *val) {
|
|||||||
serverAssert(!val->FExpires());
|
serverAssert(!val->FExpires());
|
||||||
sds copy = sdsdup(szFromObj(key));
|
sds copy = sdsdup(szFromObj(key));
|
||||||
int retval = dictAdd(db->pdict, copy, val);
|
int retval = dictAdd(db->pdict, copy, val);
|
||||||
#ifdef ENABLE_MVCC
|
uint64_t mvcc = getMvccTstamp();
|
||||||
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
|
setMvccTstamp(key, mvcc);
|
||||||
#endif
|
setMvccTstamp(val, mvcc);
|
||||||
|
|
||||||
if (retval == DICT_OK)
|
if (retval == DICT_OK)
|
||||||
{
|
{
|
||||||
@ -260,9 +258,7 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *key, robj *val, bool fUpd
|
|||||||
if (fUpdateMvcc) {
|
if (fUpdateMvcc) {
|
||||||
if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
|
if (val->getrefcount(std::memory_order_relaxed) == OBJ_SHARED_REFCOUNT)
|
||||||
val = dupStringObject(val);
|
val = dupStringObject(val);
|
||||||
#ifdef ENABLE_MVCC
|
setMvccTstamp(val, getMvccTstamp());
|
||||||
val->mvcc_tstamp = getMvccTstamp();
|
|
||||||
#endif
|
|
||||||
}
|
}
|
||||||
|
|
||||||
dictSetVal(db->pdict, de, val);
|
dictSetVal(db->pdict, de, val);
|
||||||
@ -296,14 +292,12 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
|
|||||||
if (de == nullptr)
|
if (de == nullptr)
|
||||||
return (dbAddCore(db, key, val) == DICT_OK);
|
return (dbAddCore(db, key, val) == DICT_OK);
|
||||||
|
|
||||||
#ifdef ENABLE_MVCC
|
|
||||||
robj *old = (robj*)dictGetVal(de);
|
robj *old = (robj*)dictGetVal(de);
|
||||||
if (old->mvcc_tstamp <= val->mvcc_tstamp)
|
if (mvccFromObj(old) <= mvccFromObj(val))
|
||||||
{
|
{
|
||||||
dbOverwriteCore(db, de, key, val, false, true);
|
dbOverwriteCore(db, de, key, val, false, true);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -1494,7 +1488,6 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
|
|||||||
void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey)
|
void propagateSubkeyExpire(redisDb *db, int type, robj *key, robj *subkey)
|
||||||
{
|
{
|
||||||
robj *argv[3];
|
robj *argv[3];
|
||||||
robj objT;
|
|
||||||
redisCommand *cmd = nullptr;
|
redisCommand *cmd = nullptr;
|
||||||
switch (type)
|
switch (type)
|
||||||
{
|
{
|
||||||
|
@ -55,7 +55,8 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
|
|||||||
* returns NULL in case the allocatoin wasn't moved.
|
* returns NULL in case the allocatoin wasn't moved.
|
||||||
* when it returns a non-null value, the old pointer was already released
|
* when it returns a non-null value, the old pointer was already released
|
||||||
* and should NOT be accessed. */
|
* and should NOT be accessed. */
|
||||||
void* activeDefragAlloc(void *ptr) {
|
template<typename TPTR>
|
||||||
|
TPTR* activeDefragAlloc(TPTR *ptr) {
|
||||||
size_t size;
|
size_t size;
|
||||||
void *newptr;
|
void *newptr;
|
||||||
if(!je_get_defrag_hint(ptr)) {
|
if(!je_get_defrag_hint(ptr)) {
|
||||||
@ -70,7 +71,14 @@ void* activeDefragAlloc(void *ptr) {
|
|||||||
newptr = zmalloc_no_tcache(size);
|
newptr = zmalloc_no_tcache(size);
|
||||||
memcpy(newptr, ptr, size);
|
memcpy(newptr, ptr, size);
|
||||||
zfree_no_tcache(ptr);
|
zfree_no_tcache(ptr);
|
||||||
return newptr;
|
return (TPTR*)newptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
template<>
|
||||||
|
robj* activeDefragAlloc(robj *o) {
|
||||||
|
void *pvSrc = allocPtrFromObj(o);
|
||||||
|
void *pvDst = activeDefragAlloc(pvSrc);
|
||||||
|
return objFromAllocPtr(pvDst);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*Defrag helper for sds strings
|
/*Defrag helper for sds strings
|
||||||
|
@ -80,7 +80,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
|||||||
robj *val = (robj*)dictGetVal(de);
|
robj *val = (robj*)dictGetVal(de);
|
||||||
int deleted = 0;
|
int deleted = 0;
|
||||||
|
|
||||||
robj objKey;
|
redisObjectStack objKey;
|
||||||
initStaticStringObject(objKey, (char*)e.key());
|
initStaticStringObject(objKey, (char*)e.key());
|
||||||
bool fTtlChanged = false;
|
bool fTtlChanged = false;
|
||||||
|
|
||||||
@ -145,7 +145,7 @@ void activeExpireCycleExpire(redisDb *db, expireEntry &e, long long now) {
|
|||||||
serverAssert(false);
|
serverAssert(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
robj objSubkey;
|
redisObjectStack objSubkey;
|
||||||
initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get());
|
initStaticStringObject(objSubkey, (char*)pfat->nextExpireEntry().spsubkey.get());
|
||||||
propagateSubkeyExpire(db, val->type, &objKey, &objSubkey);
|
propagateSubkeyExpire(db, val->type, &objKey, &objSubkey);
|
||||||
|
|
||||||
|
@ -56,7 +56,7 @@ size_t getStringObjectSdsUsedMemory(robj *o) {
|
|||||||
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
|
serverAssertWithInfo(NULL,o,o->type == OBJ_STRING);
|
||||||
switch(o->encoding) {
|
switch(o->encoding) {
|
||||||
case OBJ_ENCODING_RAW: return sdsZmallocSize((sds)ptrFromObj(o));
|
case OBJ_ENCODING_RAW: return sdsZmallocSize((sds)ptrFromObj(o));
|
||||||
case OBJ_ENCODING_EMBSTR: return zmalloc_size(o)-sizeof(robj);
|
case OBJ_ENCODING_EMBSTR: return zmalloc_size(allocPtrFromObj(o))-sizeof(robj);
|
||||||
default: return 0; /* Just integer encoding for now. */
|
default: return 0; /* Just integer encoding for now. */
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -41,14 +41,15 @@
|
|||||||
/* ===================== Creation and parsing of objects ==================== */
|
/* ===================== Creation and parsing of objects ==================== */
|
||||||
|
|
||||||
robj *createObject(int type, void *ptr) {
|
robj *createObject(int type, void *ptr) {
|
||||||
robj *o = (robj*)zcalloc(sizeof(*o), MALLOC_SHARED);
|
size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0;
|
||||||
|
char *oB = (char*)zcalloc(sizeof(robj)+mvccExtraBytes, MALLOC_SHARED);
|
||||||
|
robj *o = reinterpret_cast<robj*>(oB + mvccExtraBytes);
|
||||||
|
|
||||||
o->type = type;
|
o->type = type;
|
||||||
o->encoding = OBJ_ENCODING_RAW;
|
o->encoding = OBJ_ENCODING_RAW;
|
||||||
o->m_ptr = ptr;
|
o->m_ptr = ptr;
|
||||||
o->setrefcount(1);
|
o->setrefcount(1);
|
||||||
#ifdef ENABLE_MVCC
|
setMvccTstamp(o, OBJ_MVCC_INVALID);
|
||||||
o->mvcc_tstamp = OBJ_MVCC_INVALID;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/* Set the LRU to the current lruclock (minutes resolution), or
|
/* Set the LRU to the current lruclock (minutes resolution), or
|
||||||
* alternatively the LFU counter. */
|
* alternatively the LFU counter. */
|
||||||
@ -97,15 +98,16 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
|
|||||||
size_t allocsize = sizeof(struct sdshdr8)+len+1;
|
size_t allocsize = sizeof(struct sdshdr8)+len+1;
|
||||||
if (allocsize < sizeof(void*))
|
if (allocsize < sizeof(void*))
|
||||||
allocsize = sizeof(void*);
|
allocsize = sizeof(void*);
|
||||||
robj *o = (robj*)zcalloc(sizeof(robj)+allocsize-sizeof(o->m_ptr), MALLOC_SHARED);
|
|
||||||
|
size_t mvccExtraBytes = g_pserver->fActiveReplica ? sizeof(redisObjectExtended) : 0;
|
||||||
|
char *oB = (char*)zcalloc(sizeof(robj)+allocsize-sizeof(redisObject::m_ptr)+mvccExtraBytes, MALLOC_SHARED);
|
||||||
|
robj *o = reinterpret_cast<robj*>(oB + mvccExtraBytes);
|
||||||
struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr);
|
struct sdshdr8 *sh = (sdshdr8*)(&o->m_ptr);
|
||||||
|
|
||||||
o->type = OBJ_STRING;
|
o->type = OBJ_STRING;
|
||||||
o->encoding = OBJ_ENCODING_EMBSTR;
|
o->encoding = OBJ_ENCODING_EMBSTR;
|
||||||
o->setrefcount(1);
|
o->setrefcount(1);
|
||||||
#ifdef ENABLE_MVCC
|
setMvccTstamp(o, OBJ_MVCC_INVALID);
|
||||||
o->mvcc_tstamp = OBJ_MVCC_INVALID;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
if (g_pserver->maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||||
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
|
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
|
||||||
@ -133,11 +135,7 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
|
|||||||
*
|
*
|
||||||
* The current limit of 52 is chosen so that the biggest string object
|
* The current limit of 52 is chosen so that the biggest string object
|
||||||
* we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
|
* we allocate as EMBSTR will still fit into the 64 byte arena of jemalloc. */
|
||||||
#ifdef ENABLE_MVCC
|
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 52
|
||||||
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 48
|
|
||||||
#else
|
|
||||||
#define OBJ_ENCODING_EMBSTR_SIZE_LIMIT 256
|
|
||||||
#endif
|
|
||||||
|
|
||||||
//static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total");
|
//static_assert((sizeof(redisObject)+OBJ_ENCODING_EMBSTR_SIZE_LIMIT-8) == 64, "Max EMBSTR obj should be 64 bytes total");
|
||||||
robj *createStringObject(const char *ptr, size_t len) {
|
robj *createStringObject(const char *ptr, size_t len) {
|
||||||
@ -399,7 +397,11 @@ void decrRefCount(robj_roptr o) {
|
|||||||
case OBJ_CRON: freeCronObject(o); break;
|
case OBJ_CRON: freeCronObject(o); break;
|
||||||
default: serverPanic("Unknown object type"); break;
|
default: serverPanic("Unknown object type"); break;
|
||||||
}
|
}
|
||||||
|
if (g_pserver->fActiveReplica) {
|
||||||
|
zfree(reinterpret_cast<redisObjectExtended*>(o.unsafe_robjcast())-1);
|
||||||
|
} else {
|
||||||
zfree(o.unsafe_robjcast());
|
zfree(o.unsafe_robjcast());
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
if (prev <= 0) serverPanic("decrRefCount against refcount <= 0");
|
if (prev <= 0) serverPanic("decrRefCount against refcount <= 0");
|
||||||
}
|
}
|
||||||
@ -1326,12 +1328,11 @@ NULL
|
|||||||
* because we update the access time only
|
* because we update the access time only
|
||||||
* when the key is read or overwritten. */
|
* when the key is read or overwritten. */
|
||||||
addReplyLongLong(c,LFUDecrAndReturn(o));
|
addReplyLongLong(c,LFUDecrAndReturn(o));
|
||||||
#ifdef ENABLE_MVCC
|
|
||||||
} else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) {
|
} else if (!strcasecmp(szFromObj(c->argv[1]), "lastmodified") && c->argc == 3) {
|
||||||
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
|
if ((o = objectCommandLookupOrReply(c,c->argv[2],shared.null[c->resp]))
|
||||||
== NULL) return;
|
== NULL) return;
|
||||||
addReplyLongLong(c, (g_pserver->mstime - (o->mvcc_tstamp >> MVCC_MS_SHIFT)) / 1000);
|
uint64_t mvcc = mvccFromObj(o);
|
||||||
#endif
|
addReplyLongLong(c, (g_pserver->mstime - (mvcc >> MVCC_MS_SHIFT)) / 1000);
|
||||||
} else {
|
} else {
|
||||||
addReplySubcommandSyntaxError(c);
|
addReplySubcommandSyntaxError(c);
|
||||||
}
|
}
|
||||||
@ -1511,3 +1512,39 @@ void redisObject::setrefcount(unsigned ref)
|
|||||||
serverAssert(!FExpires());
|
serverAssert(!FExpires());
|
||||||
refcount.store(ref, std::memory_order_relaxed);
|
refcount.store(ref, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
redisObjectStack::redisObjectStack()
|
||||||
|
{
|
||||||
|
// We need to ensure the Extended Object is first in the class layout
|
||||||
|
serverAssert(reinterpret_cast<ptrdiff_t>(static_cast<redisObject*>(this)) != reinterpret_cast<ptrdiff_t>(this));
|
||||||
|
}
|
||||||
|
|
||||||
|
void *allocPtrFromObj(robj_roptr o) {
|
||||||
|
if (g_pserver->fActiveReplica)
|
||||||
|
return reinterpret_cast<redisObjectExtended*>(o.unsafe_robjcast()) - 1;
|
||||||
|
return o.unsafe_robjcast();
|
||||||
|
}
|
||||||
|
|
||||||
|
robj *objFromAllocPtr(void *pv) {
|
||||||
|
if (g_pserver->fActiveReplica) {
|
||||||
|
return reinterpret_cast<robj*>(reinterpret_cast<redisObjectExtended*>(pv)+1);
|
||||||
|
}
|
||||||
|
return reinterpret_cast<robj*>(pv);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t mvccFromObj(robj_roptr o)
|
||||||
|
{
|
||||||
|
if (g_pserver->fActiveReplica) {
|
||||||
|
redisObjectExtended *oe = reinterpret_cast<redisObjectExtended*>(o.unsafe_robjcast()) - 1;
|
||||||
|
return oe->mvcc_tstamp;
|
||||||
|
}
|
||||||
|
return OBJ_MVCC_INVALID;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setMvccTstamp(robj *o, uint64_t mvcc)
|
||||||
|
{
|
||||||
|
if (!g_pserver->fActiveReplica)
|
||||||
|
return;
|
||||||
|
redisObjectExtended *oe = reinterpret_cast<redisObjectExtended*>(o) - 1;
|
||||||
|
oe->mvcc_tstamp = mvcc;
|
||||||
|
}
|
24
src/rdb.cpp
24
src/rdb.cpp
@ -1089,10 +1089,10 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
char szT[32];
|
char szT[32];
|
||||||
#ifdef ENABLE_MVCC
|
if (g_pserver->fActiveReplica) {
|
||||||
snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp);
|
snprintf(szT, 32, "%" PRIu64, mvccFromObj(val));
|
||||||
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1;
|
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1;
|
||||||
#endif
|
}
|
||||||
|
|
||||||
/* Save type, key, value */
|
/* Save type, key, value */
|
||||||
if (rdbSaveObjectType(rdb,val) == -1) return -1;
|
if (rdbSaveObjectType(rdb,val) == -1) return -1;
|
||||||
@ -1146,7 +1146,7 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
|
|
||||||
int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o)
|
int saveKey(rio *rdb, redisDb *db, int flags, size_t *processed, const char *keystr, robj *o)
|
||||||
{
|
{
|
||||||
robj key;
|
redisObjectStack key;
|
||||||
|
|
||||||
initStaticStringObject(key,(char*)keystr);
|
initStaticStringObject(key,(char*)keystr);
|
||||||
expireEntry *pexpire = getExpire(db, &key);
|
expireEntry *pexpire = getExpire(db, &key);
|
||||||
@ -1999,7 +1999,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
RedisModuleIO io;
|
RedisModuleIO io;
|
||||||
robj keyobj;
|
redisObjectStack keyobj;
|
||||||
initStaticStringObject(keyobj,key);
|
initStaticStringObject(keyobj,key);
|
||||||
moduleInitIOContext(io,mt,rdb,&keyobj);
|
moduleInitIOContext(io,mt,rdb,&keyobj);
|
||||||
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
|
io.ver = (rdbtype == RDB_TYPE_MODULE) ? 1 : 2;
|
||||||
@ -2048,9 +2048,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, uint64_t mvcc_tstamp) {
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
#ifdef ENABLE_MVCC
|
setMvccTstamp(o, mvcc_tstamp);
|
||||||
o->mvcc_tstamp = mvcc_tstamp;
|
|
||||||
#endif
|
|
||||||
serverAssert(!o->FExpires());
|
serverAssert(!o->FExpires());
|
||||||
return o;
|
return o;
|
||||||
}
|
}
|
||||||
@ -2318,7 +2316,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
redisObject keyobj;
|
redisObjectStack keyobj;
|
||||||
initStaticStringObject(keyobj,key);
|
initStaticStringObject(keyobj,key);
|
||||||
setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10));
|
setExpire(NULL, db, &keyobj, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10));
|
||||||
decrRefCount(subexpireKey);
|
decrRefCount(subexpireKey);
|
||||||
@ -2402,18 +2400,14 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
|
|||||||
key = nullptr;
|
key = nullptr;
|
||||||
goto eoferr;
|
goto eoferr;
|
||||||
}
|
}
|
||||||
#ifdef ENABLE_MVCC
|
bool fStaleMvccKey = (rsi) ? mvccFromObj(val) < rsi->mvccMinThreshold : false;
|
||||||
bool fStaleMvccKey = (rsi) ? val->mvcc_tstamp < rsi->mvccMinThreshold : false;
|
|
||||||
#else
|
|
||||||
bool fStaleMvccKey = false;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
/* Check if the key already expired. This function is used when loading
|
/* Check if the key already expired. This function is used when loading
|
||||||
* an RDB file from disk, either at startup, or when an RDB was
|
* an RDB file from disk, either at startup, or when an RDB was
|
||||||
* received from the master. In the latter case, the master is
|
* received from the master. In the latter case, the master is
|
||||||
* responsible for key expiry. If we would expire keys here, the
|
* responsible for key expiry. If we would expire keys here, the
|
||||||
* snapshot taken by the master may not be reflected on the replica. */
|
* snapshot taken by the master may not be reflected on the replica. */
|
||||||
robj keyobj;
|
redisObjectStack keyobj;
|
||||||
initStaticStringObject(keyobj,key);
|
initStaticStringObject(keyobj,key);
|
||||||
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
|
bool fExpiredKey = iAmMaster() && !(rdbflags&RDBFLAGS_AOF_PREAMBLE) && expiretime != -1 && expiretime < now;
|
||||||
if (fStaleMvccKey || fExpiredKey) {
|
if (fStaleMvccKey || fExpiredKey) {
|
||||||
|
29
src/server.h
29
src/server.h
@ -799,7 +799,16 @@ typedef struct RedisModuleDigest {
|
|||||||
|
|
||||||
#define MVCC_MS_SHIFT 20
|
#define MVCC_MS_SHIFT 20
|
||||||
|
|
||||||
typedef struct redisObject {
|
// This struct will be allocated ahead of the ROBJ when needed
|
||||||
|
struct redisObjectExtended {
|
||||||
|
uint64_t mvcc_tstamp;
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef class redisObject {
|
||||||
|
protected:
|
||||||
|
redisObject() {}
|
||||||
|
|
||||||
|
public:
|
||||||
unsigned type:4;
|
unsigned type:4;
|
||||||
unsigned encoding:4;
|
unsigned encoding:4;
|
||||||
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
|
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
|
||||||
@ -808,9 +817,6 @@ typedef struct redisObject {
|
|||||||
private:
|
private:
|
||||||
mutable std::atomic<unsigned> refcount {0};
|
mutable std::atomic<unsigned> refcount {0};
|
||||||
public:
|
public:
|
||||||
#ifdef ENABLE_MVCC
|
|
||||||
uint64_t mvcc_tstamp;
|
|
||||||
#endif
|
|
||||||
void *m_ptr;
|
void *m_ptr;
|
||||||
|
|
||||||
inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; }
|
inline bool FExpires() const { return refcount.load(std::memory_order_relaxed) >> 31; }
|
||||||
@ -821,11 +827,18 @@ public:
|
|||||||
void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); }
|
void addref() const { refcount.fetch_add(1, std::memory_order_relaxed); }
|
||||||
unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); }
|
unsigned release() const { return refcount.fetch_sub(1, std::memory_order_seq_cst) & ~(1U << 31); }
|
||||||
} robj;
|
} robj;
|
||||||
#ifdef ENABLE_MVCC
|
|
||||||
static_assert(sizeof(redisObject) == 24, "object size is critical, don't increase");
|
|
||||||
#else
|
|
||||||
static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase");
|
static_assert(sizeof(redisObject) == 16, "object size is critical, don't increase");
|
||||||
#endif
|
|
||||||
|
class redisObjectStack : public redisObjectExtended, public redisObject
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
redisObjectStack();
|
||||||
|
};
|
||||||
|
|
||||||
|
uint64_t mvccFromObj(robj_roptr o);
|
||||||
|
void setMvccTstamp(redisObject *o, uint64_t mvcc);
|
||||||
|
void *allocPtrFromObj(robj_roptr o);
|
||||||
|
robj *objFromAllocPtr(void *pv);
|
||||||
|
|
||||||
__attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o)
|
__attribute__((always_inline)) inline const void *ptrFromObj(robj_roptr &o)
|
||||||
{
|
{
|
||||||
|
Loading…
x
Reference in New Issue
Block a user