From d22f1d948124ea3b69ef9ae10b6995c1077fbb0f Mon Sep 17 00:00:00 2001 From: John Sully Date: Fri, 19 Apr 2019 22:54:42 -0400 Subject: [PATCH] Additional MVCC work and fix memory leak loading objects from rdb Former-commit-id: efde2e6be6dc2fc3425a17e2dc146c5b8823730a --- src/Makefile | 2 ++ src/cluster.cpp | 2 +- src/db.cpp | 62 ++++++++++++++++++++++------------- src/rdb.cpp | 71 +++++++++++++++++++++++++---------------- src/rdb.h | 2 +- src/redis-check-rdb.cpp | 2 +- 6 files changed, 88 insertions(+), 53 deletions(-) diff --git a/src/Makefile b/src/Makefile index 345e1c04b..f5ee6cae8 100644 --- a/src/Makefile +++ b/src/Makefile @@ -44,6 +44,8 @@ endif endif endif +# CXXFLAGS+= -DENABLE_MVCC + USEASM?=true # Do we use our assembly spinlock? X64 only ifeq ($(uname_S),Linux) diff --git a/src/cluster.cpp b/src/cluster.cpp index 5aca6237d..57ff37f9e 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -4936,7 +4936,7 @@ void restoreCommand(client *c) { rioInitWithBuffer(&payload,szFromObj(c->argv[3])); if (((type = rdbLoadObjectType(&payload)) == -1) || - ((obj = rdbLoadObject(type,&payload,c->argv[1])) == NULL)) + ((obj = rdbLoadObject(type,&payload,c->argv[1], OBJ_MVCC_INVALID)) == NULL)) { addReplyError(c,"Bad data format"); return; diff --git a/src/db.cpp b/src/db.cpp index 685d56bc8..845635305 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -204,18 +204,27 @@ void dbAdd(redisDb *db, robj *key, robj *val) serverAssertWithInfo(NULL,key,retval == DICT_OK); } -/* Insert a key, handling duplicate keys according to fReplace */ -int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) +void dbOverwriteCore(redisDb *db, dictEntry *de, robj *val, bool fUpdateMvcc) { - if (fReplace) - { - setKey(db, key, val); - return TRUE; + dictEntry auxentry = *de; + robj *old = (robj*)dictGetVal(de); + if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { + val->lru = old->lru; } - else - { - return (dbAddCore(db, key, val) == DICT_OK); +#ifdef ENABLE_MVCC + if (fUpdateMvcc) + val->mvcc_tstamp = getMvccTstamp(); +#else + UNUSED(fUpdateMvcc); +#endif + dictSetVal(db->pdict, de, val); + + if (server.lazyfree_lazy_server_del) { + freeObjAsync(old); + dictSetVal(db->pdict, &auxentry, NULL); } + + dictFreeVal(db->pdict, &auxentry); } /* Overwrite an existing key with a new value. Incrementing the reference @@ -227,22 +236,31 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) { dictEntry *de = dictFind(db->pdict,ptrFromObj(key)); serverAssertWithInfo(NULL,key,de != NULL); - dictEntry auxentry = *de; - robj *old = (robj*)dictGetVal(de); - if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) { - val->lru = old->lru; - } + dbOverwriteCore(db, de, val, true); +} + +/* Insert a key, handling duplicate keys according to fReplace */ +int dbMerge(redisDb *db, robj *key, robj *val, int fReplace) +{ + if (fReplace) + { + dictEntry *de = dictFind(db->pdict, ptrFromObj(key)); + if (de == nullptr) + return (dbAddCore(db, key, val) == DICT_OK); #ifdef ENABLE_MVCC - val->mvcc_tstamp = getMvccTstamp(); + robj *old = (robj*)dictGetVal(de); + if (old->mvcc_tstamp <= val->mvcc_tstamp) #endif - dictSetVal(db->pdict, de, val); - - if (server.lazyfree_lazy_server_del) { - freeObjAsync(old); - dictSetVal(db->pdict, &auxentry, NULL); + { + dbOverwriteCore(db, de, val, false); + return true; + } + return false; + } + else + { + return (dbAddCore(db, key, val) == DICT_OK); } - - dictFreeVal(db->pdict, &auxentry); } /* High level Set operation. This function can be used in order to set diff --git a/src/rdb.cpp b/src/rdb.cpp index be8b382dc..835672668 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -992,6 +992,31 @@ ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key) { return nwritten; } +/* Save an AUX field. */ +ssize_t rdbSaveAuxField(rio *rdb, const void *key, size_t keylen, const void *val, size_t vallen) { + ssize_t ret, len = 0; + if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1; + len += ret; + if ((ret = rdbSaveRawString(rdb,(const unsigned char*)key,keylen)) == -1) return -1; + len += ret; + if ((ret = rdbSaveRawString(rdb,(const unsigned char*)val,vallen)) == -1) return -1; + len += ret; + return len; +} + +/* Wrapper for rdbSaveAuxField() used when key/val length can be obtained + * with strlen(). */ +ssize_t rdbSaveAuxFieldStrStr(rio *rdb, const char *key, const char *val) { + return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val)); +} + +/* Wrapper for strlen(key) + integer type (up to long long range). */ +ssize_t rdbSaveAuxFieldStrInt(rio *rdb, const char *key, long long val) { + char buf[LONG_STR_SIZE]; + int vlen = ll2string(buf,sizeof(buf),val); + return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen); +} + /* Return the length the object will have on disk if saved with * the rdbSaveObject() function. Currently we use a trick to get * this length with very little changes to the code. In the future @@ -1036,6 +1061,12 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { if (rdbWriteRaw(rdb,buf,1) == -1) return -1; } +#ifdef ENABLE_MVCC + char szMvcc[32]; + snprintf(szMvcc, 32, "%" PRIu64, val->mvcc_tstamp); + if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szMvcc) == -1) return -1; +#endif + /* Save type, key, value */ if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1; @@ -1043,31 +1074,6 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { return 1; } -/* Save an AUX field. */ -ssize_t rdbSaveAuxField(rio *rdb, const void *key, size_t keylen, const void *val, size_t vallen) { - ssize_t ret, len = 0; - if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1; - len += ret; - if ((ret = rdbSaveRawString(rdb,(const unsigned char*)key,keylen)) == -1) return -1; - len += ret; - if ((ret = rdbSaveRawString(rdb,(const unsigned char*)val,vallen)) == -1) return -1; - len += ret; - return len; -} - -/* Wrapper for rdbSaveAuxField() used when key/val length can be obtained - * with strlen(). */ -ssize_t rdbSaveAuxFieldStrStr(rio *rdb, const char *key, const char *val) { - return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val)); -} - -/* Wrapper for strlen(key) + integer type (up to long long range). */ -ssize_t rdbSaveAuxFieldStrInt(rio *rdb, const char *key, long long val) { - char buf[LONG_STR_SIZE]; - int vlen = ll2string(buf,sizeof(buf),val); - return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen); -} - /* Save a few default AUX fields with information about the RDB generated. */ int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) { int redis_bits = (sizeof(void*) == 8) ? 64 : 32; @@ -1401,7 +1407,7 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) { /* Load a Redis object of the specified type from the specified file. * On success a newly allocated object is returned, otherwise NULL. */ -robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { +robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { robj *o = NULL, *ele, *dec; uint64_t len; unsigned int i; @@ -1816,6 +1822,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { } else { rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype); } +#ifdef ENABLE_MVCC + o->mvcc_tstamp = mvcc_tstamp; +#else + UNUSED(mvcc_tstamp); +#endif return o; } @@ -1882,7 +1893,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { char buf[1024]; /* Key-specific attributes, set by opcodes before the key type. */ long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); - long long lru_clock = LRU_CLOCK(); + long long lru_clock = 0; + uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = server.loading_process_events_interval_bytes; @@ -2010,6 +2022,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { if (haspreamble) serverLog(LL_NOTICE,"RDB has an AOF tail"); } else if (!strcasecmp(szFromObj(auxkey),"redis-bits")) { /* Just ignored. */ + } else if (!strcasecmp(szFromObj(auxkey),"mvcc-tstamp")) { + static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits"); + mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); } else { /* We ignore fields we don't understand, as by AUX field * contract. */ @@ -2053,7 +2068,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { /* Read key */ if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; /* Read value */ - if ((val = rdbLoadObject(type,rdb,key)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is diff --git a/src/rdb.h b/src/rdb.h index 18cd3f3d4..45cfa475a 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -146,7 +146,7 @@ int rdbSaveS3(char *path, rdbSaveInfo *rsi); int rdbLoadS3(char *path, rdbSaveInfo *rsi); ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key); size_t rdbSavedObjectLen(robj *o); -robj *rdbLoadObject(int type, rio *rdb, robj *key); +robj *rdbLoadObject(int type, rio *rdb, robj *key, uint64_t mvcc_tstamp); void backgroundSaveDoneHandler(int exitcode, int bysignal); int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime); robj *rdbLoadStringObject(rio *rdb); diff --git a/src/redis-check-rdb.cpp b/src/redis-check-rdb.cpp index a5570095f..204f40663 100644 --- a/src/redis-check-rdb.cpp +++ b/src/redis-check-rdb.cpp @@ -285,7 +285,7 @@ int redis_check_rdb(const char *rdbfilename, FILE *fp) { rdbstate.keys++; /* Read value */ rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE; - if ((val = rdbLoadObject(type,&rdb,key)) == NULL) goto eoferr; + if ((val = rdbLoadObject(type,&rdb,key,OBJ_MVCC_INVALID)) == NULL) goto eoferr; /* Check if the key already expired. */ if (expiretime != -1 && expiretime < now) rdbstate.already_expired++;