Timestamp writes to objects so we can be smarter about merging databases
Former-commit-id: 5dc9f70386617b02fab7eee1194f321f6b4b25c5
This commit is contained in:
parent
53253d993f
commit
a28120cb9e
@ -44,8 +44,6 @@ endif
|
||||
endif
|
||||
endif
|
||||
|
||||
# CXXFLAGS+= -DENABLE_MVCC
|
||||
|
||||
USEASM?=true
|
||||
# Do we use our assembly spinlock? X64 only
|
||||
ifeq ($(uname_S),Linux)
|
||||
|
17
src/db.cpp
17
src/db.cpp
@ -70,6 +70,10 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) {
|
||||
val->lru = LRU_CLOCK();
|
||||
}
|
||||
}
|
||||
|
||||
if (flags & LOOKUP_UPDATEMVCC) {
|
||||
val->mvcc_tstamp = getMvccTstamp();
|
||||
}
|
||||
return val;
|
||||
} else {
|
||||
return NULL;
|
||||
@ -157,7 +161,7 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) {
|
||||
* does not exist in the specified DB. */
|
||||
robj *lookupKeyWrite(redisDb *db, robj *key) {
|
||||
expireIfNeeded(db,key);
|
||||
return lookupKey(db,key,LOOKUP_NONE);
|
||||
return lookupKey(db,key,LOOKUP_UPDATEMVCC);
|
||||
}
|
||||
|
||||
robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) {
|
||||
@ -175,9 +179,7 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) {
|
||||
int dbAddCore(redisDb *db, robj *key, robj *val) {
|
||||
sds copy = sdsdup(szFromObj(key));
|
||||
int retval = dictAdd(db->pdict, copy, val);
|
||||
#ifdef ENABLE_MVCC
|
||||
val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp();
|
||||
#endif
|
||||
|
||||
if (retval == DICT_OK)
|
||||
{
|
||||
@ -211,12 +213,9 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *val, bool fUpdateMvcc)
|
||||
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||
val->lru = old->lru;
|
||||
}
|
||||
#ifdef ENABLE_MVCC
|
||||
if (fUpdateMvcc)
|
||||
val->mvcc_tstamp = getMvccTstamp();
|
||||
#else
|
||||
UNUSED(fUpdateMvcc);
|
||||
#endif
|
||||
|
||||
dictSetVal(db->pdict, de, val);
|
||||
|
||||
if (server.lazyfree_lazy_server_del) {
|
||||
@ -247,14 +246,14 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
|
||||
dictEntry *de = dictFind(db->pdict, ptrFromObj(key));
|
||||
if (de == nullptr)
|
||||
return (dbAddCore(db, key, val) == DICT_OK);
|
||||
#ifdef ENABLE_MVCC
|
||||
|
||||
robj *old = (robj*)dictGetVal(de);
|
||||
if (old->mvcc_tstamp <= val->mvcc_tstamp)
|
||||
#endif
|
||||
{
|
||||
dbOverwriteCore(db, de, val, false);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
else
|
||||
|
@ -44,9 +44,7 @@ robj *createObject(int type, void *ptr) {
|
||||
o->encoding = OBJ_ENCODING_RAW;
|
||||
o->m_ptr = ptr;
|
||||
o->refcount = 1;
|
||||
#ifdef ENABLE_MVCC
|
||||
o->mvcc_tstamp = OBJ_MVCC_INVALID;
|
||||
#endif
|
||||
|
||||
/* Set the LRU to the current lruclock (minutes resolution), or
|
||||
* alternatively the LFU counter. */
|
||||
@ -94,9 +92,8 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) {
|
||||
o->type = OBJ_STRING;
|
||||
o->encoding = OBJ_ENCODING_EMBSTR;
|
||||
o->refcount = 1;
|
||||
#ifdef ENABLE_MVCC
|
||||
o->mvcc_tstamp = OBJ_MVCC_INVALID;
|
||||
#endif
|
||||
|
||||
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
|
||||
o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL;
|
||||
} else {
|
||||
|
@ -1061,11 +1061,9 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
|
||||
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
|
||||
}
|
||||
|
||||
#ifdef ENABLE_MVCC
|
||||
char szMvcc[32];
|
||||
snprintf(szMvcc, 32, "%" PRIu64, val->mvcc_tstamp);
|
||||
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szMvcc) == -1) return -1;
|
||||
#endif
|
||||
|
||||
/* Save type, key, value */
|
||||
if (rdbSaveObjectType(rdb,val) == -1) return -1;
|
||||
@ -1822,11 +1820,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
|
||||
} else {
|
||||
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
|
||||
}
|
||||
#ifdef ENABLE_MVCC
|
||||
|
||||
o->mvcc_tstamp = mvcc_tstamp;
|
||||
#else
|
||||
UNUSED(mvcc_tstamp);
|
||||
#endif
|
||||
return o;
|
||||
}
|
||||
|
||||
|
@ -2933,6 +2933,8 @@ void initServer(void) {
|
||||
server.aof_last_write_errno = 0;
|
||||
server.repl_good_slaves_count = 0;
|
||||
|
||||
server.mvcc_tstamp = 0;
|
||||
|
||||
/* Create the timer callback, this is our way to process many background
|
||||
* operations incrementally, like clients timeout, eviction of unaccessed
|
||||
* expired keys and so forth. */
|
||||
@ -3423,6 +3425,7 @@ int processCommand(client *c, int callFlags) {
|
||||
|
||||
AssertCorrectThread(c);
|
||||
serverAssert(GlobalLocksAcquired());
|
||||
incrementMvccTstamp();
|
||||
|
||||
/* Now lookup the command and check ASAP about trivial error conditions
|
||||
* such as wrong arity, bad command name and so forth. */
|
||||
@ -4879,7 +4882,20 @@ int redisIsSupervised(int mode) {
|
||||
|
||||
uint64_t getMvccTstamp()
|
||||
{
|
||||
return (server.mstime << 16);
|
||||
return server.mvcc_tstamp;
|
||||
}
|
||||
|
||||
void incrementMvccTstamp()
|
||||
{
|
||||
uint64_t msPrev = server.mvcc_tstamp >> 22;
|
||||
if (msPrev >= (uint64_t)server.mstime) // we can be greater if the count overflows
|
||||
{
|
||||
atomicIncr(server.mvcc_tstamp, 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
server.mvcc_tstamp = ((uint64_t)server.mstime) << 22;
|
||||
}
|
||||
}
|
||||
|
||||
void *workerThreadMain(void *parg)
|
||||
|
@ -709,10 +709,8 @@ typedef struct redisObject {
|
||||
unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or
|
||||
* LFU data (least significant 8 bits frequency
|
||||
* and most significant 16 bits access time). */
|
||||
#ifdef ENABLE_MVCC
|
||||
uint64_t mvcc_tstamp;
|
||||
#endif
|
||||
mutable int refcount;
|
||||
uint64_t mvcc_tstamp;
|
||||
void *m_ptr;
|
||||
} robj;
|
||||
|
||||
@ -1526,6 +1524,7 @@ struct redisServer {
|
||||
unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */
|
||||
|
||||
struct fastlock flock;
|
||||
uint64_t mvcc_tstamp;
|
||||
};
|
||||
|
||||
typedef struct pubsubPattern {
|
||||
@ -2136,6 +2135,7 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
|
||||
long long lru_clock);
|
||||
#define LOOKUP_NONE 0
|
||||
#define LOOKUP_NOTOUCH (1<<0)
|
||||
#define LOOKUP_UPDATEMVCC (1<<1)
|
||||
void dbAdd(redisDb *db, robj *key, robj *val);
|
||||
void dbOverwrite(redisDb *db, robj *key, robj *val);
|
||||
int dbMerge(redisDb *db, robj *key, robj *val, int fReplace);
|
||||
@ -2447,6 +2447,7 @@ struct redisMaster *MasterInfoFromClient(client *c);
|
||||
|
||||
/* MVCC */
|
||||
uint64_t getMvccTstamp();
|
||||
void incrementMvccTstamp();
|
||||
|
||||
#if defined(__GNUC__)
|
||||
#ifndef __cplusplus
|
||||
|
Loading…
x
Reference in New Issue
Block a user