From a28120cb9ebe4501bf7d3e2a7db86a01bd85fd11 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 20 Apr 2019 00:52:07 -0400 Subject: [PATCH] Timestamp writes to objects so we can be smarter about merging databases Former-commit-id: 5dc9f70386617b02fab7eee1194f321f6b4b25c5 --- src/Makefile | 2 -- src/db.cpp | 17 ++++++++--------- src/object.cpp | 5 +---- src/rdb.cpp | 7 +------ src/server.cpp | 18 +++++++++++++++++- src/server.h | 7 ++++--- 6 files changed, 31 insertions(+), 25 deletions(-) diff --git a/src/Makefile b/src/Makefile index f5ee6cae8..345e1c04b 100644 --- a/src/Makefile +++ b/src/Makefile @@ -44,8 +44,6 @@ endif endif endif -# CXXFLAGS+= -DENABLE_MVCC - USEASM?=true # Do we use our assembly spinlock? X64 only ifeq ($(uname_S),Linux) diff --git a/src/db.cpp b/src/db.cpp index 845635305..4a731e6c7 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -70,6 +70,10 @@ static robj *lookupKey(redisDb *db, robj *key, int flags) { val->lru = LRU_CLOCK(); } } + + if (flags & LOOKUP_UPDATEMVCC) { + val->mvcc_tstamp = getMvccTstamp(); + } return val; } else { return NULL; @@ -157,7 +161,7 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) { * does not exist in the specified DB. */ robj *lookupKeyWrite(redisDb *db, robj *key) { expireIfNeeded(db,key); - return lookupKey(db,key,LOOKUP_NONE); + return lookupKey(db,key,LOOKUP_UPDATEMVCC); } robj_roptr lookupKeyReadOrReply(client *c, robj *key, robj *reply) { @@ -175,9 +179,7 @@ robj *lookupKeyWriteOrReply(client *c, robj *key, robj *reply) { int dbAddCore(redisDb *db, robj *key, robj *val) { sds copy = sdsdup(szFromObj(key)); int retval = dictAdd(db->pdict, copy, val); -#ifdef ENABLE_MVCC val->mvcc_tstamp = key->mvcc_tstamp = getMvccTstamp(); -#endif if (retval == DICT_OK) { @@ -211,12 +213,9 @@ void dbOverwriteCore(redisDb *db, dictEntry *de, robj *val, bool fUpdateMvcc) if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { val->lru = old->lru; } -#ifdef ENABLE_MVCC if (fUpdateMvcc) val->mvcc_tstamp = getMvccTstamp(); -#else - UNUSED(fUpdateMvcc); -#endif + dictSetVal(db->pdict, de, val); if (server.lazyfree_lazy_server_del) { @@ -247,14 +246,14 @@ int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) dictEntry *de = dictFind(db->pdict, ptrFromObj(key)); if (de == nullptr) return (dbAddCore(db, key, val) == DICT_OK); -#ifdef ENABLE_MVCC + robj *old = (robj*)dictGetVal(de); if (old->mvcc_tstamp <= val->mvcc_tstamp) -#endif { dbOverwriteCore(db, de, val, false); return true; } + return false; } else diff --git a/src/object.cpp b/src/object.cpp index 169a6e08d..9bd2559e7 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -44,9 +44,7 @@ robj *createObject(int type, void *ptr) { o->encoding = OBJ_ENCODING_RAW; o->m_ptr = ptr; o->refcount = 1; -#ifdef ENABLE_MVCC o->mvcc_tstamp = OBJ_MVCC_INVALID; -#endif /* Set the LRU to the current lruclock (minutes resolution), or * alternatively the LFU counter. */ @@ -94,9 +92,8 @@ robj *createEmbeddedStringObject(const char *ptr, size_t len) { o->type = OBJ_STRING; o->encoding = OBJ_ENCODING_EMBSTR; o->refcount = 1; -#ifdef ENABLE_MVCC o->mvcc_tstamp = OBJ_MVCC_INVALID; -#endif + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { o->lru = (LFUGetTimeInMinutes()<<8) | LFU_INIT_VAL; } else { diff --git a/src/rdb.cpp b/src/rdb.cpp index 835672668..412bb1cec 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1061,11 +1061,9 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { if (rdbWriteRaw(rdb,buf,1) == -1) return -1; } -#ifdef ENABLE_MVCC char szMvcc[32]; snprintf(szMvcc, 32, "%" PRIu64, val->mvcc_tstamp); if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szMvcc) == -1) return -1; -#endif /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; @@ -1822,11 +1820,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { } else { rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); } -#ifdef ENABLE_MVCC + o->mvcc_tstamp = mvcc_tstamp; -#else - UNUSED(mvcc_tstamp); -#endif return o; } diff --git a/src/server.cpp b/src/server.cpp index 7e8275d2d..dda218cf4 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2933,6 +2933,8 @@ void initServer(void) { server.aof_last_write_errno = 0; server.repl_good_slaves_count = 0; + server.mvcc_tstamp = 0; + /* Create the timer callback, this is our way to process many background * operations incrementally, like clients timeout, eviction of unaccessed * expired keys and so forth. */ @@ -3423,6 +3425,7 @@ int processCommand(client *c, int callFlags) { AssertCorrectThread(c); serverAssert(GlobalLocksAcquired()); + incrementMvccTstamp(); /* Now lookup the command and check ASAP about trivial error conditions * such as wrong arity, bad command name and so forth. */ @@ -4879,7 +4882,20 @@ int redisIsSupervised(int mode) { uint64_t getMvccTstamp() { - return (server.mstime << 16); + return server.mvcc_tstamp; +} + +void incrementMvccTstamp() +{ + uint64_t msPrev = server.mvcc_tstamp >> 22; + if (msPrev >= (uint64_t)server.mstime) // we can be greater if the count overflows + { + atomicIncr(server.mvcc_tstamp, 1); + } + else + { + server.mvcc_tstamp = ((uint64_t)server.mstime) << 22; + } } void *workerThreadMain(void *parg) diff --git a/src/server.h b/src/server.h index 653563b09..0f2fc3783 100644 --- a/src/server.h +++ b/src/server.h @@ -709,10 +709,8 @@ typedef struct redisObject { unsigned lru:LRU_BITS; /* LRU time (relative to global lru_clock) or * LFU data (least significant 8 bits frequency * and most significant 16 bits access time). */ -#ifdef ENABLE_MVCC - uint64_t mvcc_tstamp; -#endif mutable int refcount; + uint64_t mvcc_tstamp; void *m_ptr; } robj; @@ -1526,6 +1524,7 @@ struct redisServer { unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */ struct fastlock flock; + uint64_t mvcc_tstamp; }; typedef struct pubsubPattern { @@ -2136,6 +2135,7 @@ void objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle, long long lru_clock); #define LOOKUP_NONE 0 #define LOOKUP_NOTOUCH (1<<0) +#define LOOKUP_UPDATEMVCC (1<<1) void dbAdd(redisDb *db, robj *key, robj *val); void dbOverwrite(redisDb *db, robj *key, robj *val); int dbMerge(redisDb *db, robj *key, robj *val, int fReplace); @@ -2447,6 +2447,7 @@ struct redisMaster *MasterInfoFromClient(client *c); /* MVCC */ uint64_t getMvccTstamp(); +void incrementMvccTstamp(); #if defined(__GNUC__) #ifndef __cplusplus