From 9ba5270bdac9ec87ca42b46d12c0e18c9f98909b Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 18 Jul 2019 22:31:20 -0400 Subject: [PATCH] Subexpire entries should load/save Former-commit-id: a55d98043655473ecdd53db2927381635eefc0b8 --- src/rdb.cpp | 47 ++++++++++++++++++++++++++++++++++++----------- 1 file changed, 36 insertions(+), 11 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index bed797305..97ade6d1f 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1066,6 +1066,13 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) { snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; + /* Save type, key, value */ + if (rdbSaveObjectType(rdb,val) == -1) return -1; + if (rdbSaveStringObject(rdb,key) == -1) return -1; + if (rdbSaveObject(rdb,val,key) == -1) return -1; + + /* Save expire entry after as it will apply to the previously loaded key */ + /* This is because we update the expire datastructure directly without buffering */ if (pexpire != nullptr) { for (auto itr : *pexpire) @@ -1078,10 +1085,6 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) { } } - /* Save type, key, value */ - if (rdbSaveObjectType(rdb,val) == -1) return -1; - if (rdbSaveStringObject(rdb,key) == -1) return -1; - if (rdbSaveObject(rdb,val,key) == -1) return -1; return 1; } @@ -1919,6 +1922,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_clock = 0; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; + robj *subexpireKey = nullptr; + robj *key = nullptr; rdb->update_cksum = rdbLoadProgressCallback; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; @@ -1940,7 +1945,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { lru_clock = LRU_CLOCK(); while(1) { - robj *key, *val; + robj *val; /* Read type. */ if ((type = rdbLoadType(rdb)) == -1) goto eoferr; @@ -2048,6 +2053,18 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } else if (!strcasecmp(szFromObj(auxkey),"mvcc-tstamp")) { static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits"); mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); + } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-key")) { + subexpireKey = auxval; + incrRefCount(subexpireKey); + } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) { + if (key == nullptr || subexpireKey == nullptr) { + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping."); + } + else { + setExpire(NULL, db, key, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10)); + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } } else { /* We ignore fields we don't understand, as by AUX field * contract. */ @@ -2089,6 +2106,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { } /* Read key */ + if (key != nullptr) + decrRefCount(key); + if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; /* Read value */ if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr; @@ -2102,7 +2122,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(val); } else { /* Add the new object in the hash table */ - int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); + int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); // Note: dbMerge will incrRef if (fInserted) { @@ -2112,14 +2132,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { /* Set usage information (for eviction). */ objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); - - /* Decrement the key refcount since dbMerge() will take its - * own reference. */ - decrRefCount(key); } else { - decrRefCount(key); decrRefCount(val); } } @@ -2130,6 +2145,16 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { lfu_freq = -1; lru_idle = -1; } + + if (key != nullptr) + decrRefCount(key); + + if (subexpireKey != nullptr) + { + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB."); + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } /* Verify the checksum if RDB version is >= 5 */ if (rdbver >= 5) {