Subexpire entries should load/save

Former-commit-id: a55d98043655473ecdd53db2927381635eefc0b8
This commit is contained in:
John Sully 2019-07-18 22:31:20 -04:00
parent 9f42bb5d91
commit 9ba5270bda

View File

@ -1066,6 +1066,13 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) {
snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp); snprintf(szT, 32, "%" PRIu64, val->mvcc_tstamp);
if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1; if (rdbSaveAuxFieldStrStr(rdb,"mvcc-tstamp", szT) == -1) return -1;
/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;
/* Save expire entry after as it will apply to the previously loaded key */
/* This is because we update the expire datastructure directly without buffering */
if (pexpire != nullptr) if (pexpire != nullptr)
{ {
for (auto itr : *pexpire) for (auto itr : *pexpire)
@ -1078,10 +1085,6 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, expireEntry *pexpire) {
} }
} }
/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1;
if (rdbSaveStringObject(rdb,key) == -1) return -1;
if (rdbSaveObject(rdb,val,key) == -1) return -1;
return 1; return 1;
} }
@ -1919,6 +1922,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime(); long long lru_idle = -1, lfu_freq = -1, expiretime = -1, now = mstime();
long long lru_clock = 0; long long lru_clock = 0;
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
robj *subexpireKey = nullptr;
robj *key = nullptr;
rdb->update_cksum = rdbLoadProgressCallback; rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
@ -1940,7 +1945,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
lru_clock = LRU_CLOCK(); lru_clock = LRU_CLOCK();
while(1) { while(1) {
robj *key, *val; robj *val;
/* Read type. */ /* Read type. */
if ((type = rdbLoadType(rdb)) == -1) goto eoferr; if ((type = rdbLoadType(rdb)) == -1) goto eoferr;
@ -2048,6 +2053,18 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
} else if (!strcasecmp(szFromObj(auxkey),"mvcc-tstamp")) { } else if (!strcasecmp(szFromObj(auxkey),"mvcc-tstamp")) {
static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits"); static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits");
mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
} else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-key")) {
subexpireKey = auxval;
incrRefCount(subexpireKey);
} else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) {
if (key == nullptr || subexpireKey == nullptr) {
serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping.");
}
else {
setExpire(NULL, db, key, subexpireKey, strtoll(szFromObj(auxval), nullptr, 10));
decrRefCount(subexpireKey);
subexpireKey = nullptr;
}
} else { } else {
/* We ignore fields we don't understand, as by AUX field /* We ignore fields we don't understand, as by AUX field
* contract. */ * contract. */
@ -2089,6 +2106,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
} }
/* Read key */ /* Read key */
if (key != nullptr)
decrRefCount(key);
if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr;
/* Read value */ /* Read value */
if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr; if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr;
@ -2102,7 +2122,7 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
decrRefCount(val); decrRefCount(val);
} else { } else {
/* Add the new object in the hash table */ /* Add the new object in the hash table */
int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); int fInserted = dbMerge(db, key, val, rsi->fForceSetKey); // Note: dbMerge will incrRef
if (fInserted) if (fInserted)
{ {
@ -2112,14 +2132,9 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
/* Set usage information (for eviction). */ /* Set usage information (for eviction). */
objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock); objectSetLRUOrLFU(val,lfu_freq,lru_idle,lru_clock);
/* Decrement the key refcount since dbMerge() will take its
* own reference. */
decrRefCount(key);
} }
else else
{ {
decrRefCount(key);
decrRefCount(val); decrRefCount(val);
} }
} }
@ -2131,6 +2146,16 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) {
lru_idle = -1; lru_idle = -1;
} }
if (key != nullptr)
decrRefCount(key);
if (subexpireKey != nullptr)
{
serverLog(LL_WARNING, "Corrupt subexpire entry in RDB.");
decrRefCount(subexpireKey);
subexpireKey = nullptr;
}
/* Verify the checksum if RDB version is >= 5 */ /* Verify the checksum if RDB version is >= 5 */
if (rdbver >= 5) { if (rdbver >= 5) {
uint64_t cksum, expected = rdb->cksum; uint64_t cksum, expected = rdb->cksum;