Fix for issue #187 we need to properly handle the case where a key with a subkey expirey itself expires during load

Former-commit-id: e6a9a6b428b91b6108df24ae6285ea9b582b7b23
This commit is contained in:
John Sully 2020-06-01 15:33:05 -04:00
parent bd4f1e2a16
commit 4f7102f46c
4 changed files with 41 additions and 8 deletions

View File

@ -2163,6 +2163,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
uint64_t mvcc_tstamp = OBJ_MVCC_INVALID; uint64_t mvcc_tstamp = OBJ_MVCC_INVALID;
robj *subexpireKey = nullptr; robj *subexpireKey = nullptr;
sds key = nullptr; sds key = nullptr;
bool fLastKeyExpired = false;
rdb->update_cksum = rdbLoadProgressCallback; rdb->update_cksum = rdbLoadProgressCallback;
rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes; rdb->max_processing_chunk = g_pserver->loading_process_events_interval_bytes;
@ -2295,12 +2296,23 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits"); static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits");
mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
} else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-key")) { } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-key")) {
if (subexpireKey != nullptr) {
serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? key : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)");
decrRefCount(subexpireKey);
subexpireKey = nullptr;
}
subexpireKey = auxval; subexpireKey = auxval;
incrRefCount(subexpireKey); incrRefCount(subexpireKey);
} else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) { } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) {
if (key == nullptr || subexpireKey == nullptr) { if (key == nullptr || subexpireKey == nullptr) {
if (!fLastKeyExpired) { // This is not an error if we just expired the key associated with this subexpire
serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? key : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)"); serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? key : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)");
} }
if (subexpireKey) {
decrRefCount(subexpireKey);
subexpireKey = nullptr;
}
}
else { else {
redisObject keyobj; redisObject keyobj;
initStaticStringObject(keyobj,key); initStaticStringObject(keyobj,key);
@ -2404,6 +2416,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(objKeyDup); rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(objKeyDup);
decrRefCount(objKeyDup); decrRefCount(objKeyDup);
} }
serverLog(LL_WARNING, "Loaded expired key");
fLastKeyExpired = true;
sdsfree(key); sdsfree(key);
key = nullptr; key = nullptr;
decrRefCount(val); decrRefCount(val);
@ -2411,6 +2425,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
} else { } else {
/* Add the new object in the hash table */ /* Add the new object in the hash table */
int fInserted = dbMerge(db, &keyobj, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef int fInserted = dbMerge(db, &keyobj, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
serverLog(LL_WARNING, "Loaded: %s", key);
fLastKeyExpired = false;
if (fInserted) if (fInserted)
{ {

View File

@ -1,10 +1,3 @@
proc log_file_matches {log pattern} {
set fp [open $log r]
set content [read $fp]
close $fp
string match $pattern $content
}
start_server {tags {"repl"} overrides {hz 100}} { start_server {tags {"repl"} overrides {hz 100}} {
set slave [srv 0 client] set slave [srv 0 client]
set slave_host [srv 0 host] set slave_host [srv 0 host]

View File

@ -1,3 +1,10 @@
proc log_file_matches {log pattern} {
set fp [open $log r]
set content [read $fp]
close $fp
string match $pattern $content
}
proc randstring {min max {type binary}} { proc randstring {min max {type binary}} {
set len [expr {$min+int(rand()*($max-$min+1))}] set len [expr {$min+int(rand()*($max-$min+1))}]
set output {} set output {}

View File

@ -272,6 +272,23 @@ start_server {tags {"expire"}} {
r expiremember testkey foo 10000 r expiremember testkey foo 10000
r save r save
r debug reload r debug reload
if {[log_file_matches [srv 0 stdout] "*Corrupt subexpire*"]} {
fail "Server reported corrupt subexpire"
}
assert [expr [r ttl testkey foo] > 0] assert [expr [r ttl testkey foo] > 0]
} }
test {Load subkey for an expired key works} {
# Note test inherits keys from previous tests, we want more traffic in the RDB
r multi
r sadd testset val1
r expiremember testset val1 300
r pexpire testset 1
r debug reload
r exec
set logerr [log_file_matches [srv 0 stdout] "*Corrupt subexpire*"]
if {$logerr} {
fail "Server reported corrupt subexpire"
}
}
} }