Additional MVCC work and fix memory leak loading objects from rdb

Former-commit-id: efde2e6be6dc2fc3425a17e2dc146c5b8823730a
This commit is contained in:
John Sully 2019-04-19 22:54:42 -04:00
parent c32ae00cb8
commit d22f1d9481
6 changed files with 88 additions and 53 deletions

View File

@ -44,6 +44,8 @@ endif
endif
endif
# CXXFLAGS+= -DENABLE_MVCC
USEASM?=true
# Do we use our assembly spinlock? X64 only
ifeq ($(uname_S),Linux)

View File

@ -4936,7 +4936,7 @@ void restoreCommand(client *c) {
rioInitWithBuffer(&payload,szFromObj(c->argv[3]));
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,c->argv[1])) == NULL))
((obj = rdbLoadObject(type,&payload,c->argv[1], OBJ_MVCC_INVALID)) == NULL))
{
addReplyError(c,"Bad data format");
return;

View File

@ -204,18 +204,27 @@ void dbAdd(redisDb *db, robj *key, robj *val)
serverAssertWithInfo(NULL,key,retval == DICT_OK);
}
/* Insert a key, handling duplicate keys according to fReplace */
int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
void dbOverwriteCore(redisDb *db, dictEntry *de, robj *val, bool fUpdateMvcc)
{
if (fReplace)
{
setKey(db, key, val);
return TRUE;
dictEntry auxentry = *de;
robj *old = (robj*)dictGetVal(de);
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
val->lru = old->lru;
}
else
{
return (dbAddCore(db, key, val) == DICT_OK);
#ifdef ENABLE_MVCC
if (fUpdateMvcc)
val->mvcc_tstamp = getMvccTstamp();
#else
UNUSED(fUpdateMvcc);
#endif
dictSetVal(db->pdict, de, val);
if (server.lazyfree_lazy_server_del) {
freeObjAsync(old);
dictSetVal(db->pdict, &auxentry, NULL);
}
dictFreeVal(db->pdict, &auxentry);
}
/* Overwrite an existing key with a new value. Incrementing the reference
@ -227,22 +236,31 @@ void dbOverwrite(redisDb *db, robj *key, robj *val) {
dictEntry *de = dictFind(db->pdict,ptrFromObj(key));
serverAssertWithInfo(NULL,key,de != NULL);
dictEntry auxentry = *de;
robj *old = (robj*)dictGetVal(de);
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU) {
val->lru = old->lru;
dbOverwriteCore(db, de, val, true);
}
/* Insert a key, handling duplicate keys according to fReplace */
int dbMerge(redisDb *db, robj *key, robj *val, int fReplace)
{
if (fReplace)
{
dictEntry *de = dictFind(db->pdict, ptrFromObj(key));
if (de == nullptr)
return (dbAddCore(db, key, val) == DICT_OK);
#ifdef ENABLE_MVCC
val->mvcc_tstamp = getMvccTstamp();
robj *old = (robj*)dictGetVal(de);
if (old->mvcc_tstamp <= val->mvcc_tstamp)
#endif
dictSetVal(db->pdict, de, val);
if (server.lazyfree_lazy_server_del) {
freeObjAsync(old);
dictSetVal(db->pdict, &auxentry, NULL);
{
dbOverwriteCore(db, de, val, false);
return true;
}
return false;
}
else
{
return (dbAddCore(db, key, val) == DICT_OK);
}
dictFreeVal(db->pdict, &auxentry);
}
/* High level Set operation. This function can be used in order to set

View File

@ -992,6 +992,31 @@ ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key) {
return nwritten;
}
/* Save an AUX field. */
ssize_t rdbSaveAuxField(rio *rdb, const void *key, size_t keylen, const void *val, size_t vallen) {
ssize_t ret, len = 0;
if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;
len += ret;
if ((ret = rdbSaveRawString(rdb,(const unsigned char*)key,keylen)) == -1) return -1;
len += ret;
if ((ret = rdbSaveRawString(rdb,(const unsigned char*)val,vallen)) == -1) return -1;
len += ret;
return len;
}
/* Wrapper for rdbSaveAuxField() used when key/val length can be obtained
* with strlen(). */
ssize_t rdbSaveAuxFieldStrStr(rio *rdb, const char *key, const char *val) {
return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val));
}
/* Wrapper for strlen(key) + integer type (up to long long range). */
ssize_t rdbSaveAuxFieldStrInt(rio *rdb, const char *key, long long val) {
char buf[LONG_STR_SIZE];
int vlen = ll2string(buf,sizeof(buf),val);
return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen);
}
/* Return the length the object will have on disk if saved with
* the rdbSaveObject() function. Currently we use a trick to get
* this length with very little changes to the code. In the future
@ -1036,6 +1061,12 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}
#ifdef ENABLE_MVCC
char szMvcc[32];
snprintf(szMvcc, 32, "%" PRIu64, val->mvcc_tstamp);
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szMvcc) == -1) return -1;
#endif
/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
@ -1043,31 +1074,6 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
return 1;
}
/* Save an AUX field. */
ssize_t rdbSaveAuxField(rio *rdb, const void *key, size_t keylen, const void *val, size_t vallen) {
ssize_t ret, len = 0;
if ((ret = rdbSaveType(rdb,RDB_OPCODE_AUX)) == -1) return -1;
len += ret;
if ((ret = rdbSaveRawString(rdb,(const unsigned char*)key,keylen)) == -1) return -1;
len += ret;
if ((ret = rdbSaveRawString(rdb,(const unsigned char*)val,vallen)) == -1) return -1;
len += ret;
return len;
}
/* Wrapper for rdbSaveAuxField() used when key/val length can be obtained
* with strlen(). */
ssize_t rdbSaveAuxFieldStrStr(rio *rdb, const char *key, const char *val) {
return rdbSaveAuxField(rdb,key,strlen(key),val,strlen(val));
}
/* Wrapper for strlen(key) + integer type (up to long long range). */
ssize_t rdbSaveAuxFieldStrInt(rio *rdb, const char *key, long long val) {
char buf[LONG_STR_SIZE];
int vlen = ll2string(buf,sizeof(buf),val);
return rdbSaveAuxField(rdb,key,strlen(key),buf,vlen);
}
/* Save a few default AUX fields with information about the RDB generated. */
int rdbSaveInfoAuxFields(rio *rdb, int flags, rdbSaveInfo *rsi) {
int redis_bits = (sizeof(void*) == 8) ? 64 : 32;
@ -1401,7 +1407,7 @@ robj *rdbLoadCheckModuleValue(rio *rdb, char *modulename) {
/* Load a Redis object of the specified type from the specified file.
* On success a newly allocated object is returned, otherwise NULL. */
robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) {
robj *o = NULL, *ele, *dec;
uint64_t len;
unsigned int i;
@ -1816,6 +1822,11 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) {
} else {
rdbExitReportCorruptRDB("Unknown RDB encoding type %d",rdbtype);
}
#ifdef ENABLE_MVCC
o->mvcc_tstamp = mvcc_tstamp;
#else
UNUSED(mvcc_tstamp);
#endif
return o;
}
@ -1882,7 +1893,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
char buf[1024];
/* Key-specific attributes, set by opcodes before the key type. */
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
long long lru_clock = LRU_CLOCK();
long long lru_clock = 0;
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = server.loading_process_events_interval_bytes;
@ -2010,6 +2022,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
if (haspreamble) serverLog(LL_NOTICE,"RDB has an AOF tail");
} else if (!strcasecmp(szFromObj(auxkey),"redis-bits")) {
/* Just ignored. */
} else if (!strcasecmp(szFromObj(auxkey),"mvcc-tstamp")) {
static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits");
mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
@ -2053,7 +2068,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
/* Read key */
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
/* Read value */
if ((val = rdbLoadObject(type,rdb,key)) == NULL) goto eoferr;
if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr;
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is

View File

@ -146,7 +146,7 @@ int rdbSaveS3(char *path, rdbSaveInfo *rsi);
int rdbLoadS3(char *path, rdbSaveInfo *rsi);
ssize_t rdbSaveObject(rio *rdb, robj_roptr o, robj *key);
size_t rdbSavedObjectLen(robj *o);
robj *rdbLoadObject(int type, rio *rdb, robj *key);
robj *rdbLoadObject(int type, rio *rdb, robj *key, uint64_t mvcc_tstamp);
void backgroundSaveDoneHandler(int exitcode, int bysignal);
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime);
robj *rdbLoadStringObject(rio *rdb);

View File

@ -285,7 +285,7 @@ int redis_check_rdb(const char *rdbfilename, FILE *fp) {
rdbstate.keys++;
/* Read value */
rdbstate.doing = RDB_CHECK_DOING_READ_OBJECT_VALUE;
if ((val = rdbLoadObject(type,&rdb,key)) == NULL) goto eoferr;
if ((val = rdbLoadObject(type,&rdb,key,OBJ_MVCC_INVALID)) == NULL) goto eoferr;
/* Check if the key already expired. */
if (expiretime != -1 && expiretime < now)
rdbstate.already_expired++;