RESTORE ABSTTL won't store expired keys into the db (#7472)

Similarly to EXPIREAT with TTL in the past, which implicitly deletes the
key and return success, RESTORE should not store key that are already
expired into the db.
When used together with REPLACE it should emit a DEL to keyspace
notification and replication stream.
This commit is contained in:
Oran Agra 2020-07-10 10:02:37 +03:00 committed by GitHub
parent d6180c8c86
commit 5977a94842
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 46 additions and 16 deletions

View File

@ -4988,7 +4988,8 @@ void restoreCommand(client *c) {
}
/* Make sure this key does not already exist here... */
if (!replace && lookupKeyWrite(c->db,c->argv[1]) != NULL) {
robj *key = c->argv[1];
if (!replace && lookupKeyWrite(c->db,key) != NULL) {
addReply(c,shared.busykeyerr);
return;
}
@ -5010,24 +5011,37 @@ void restoreCommand(client *c) {
rioInitWithBuffer(&payload,c->argv[3]->ptr);
if (((type = rdbLoadObjectType(&payload)) == -1) ||
((obj = rdbLoadObject(type,&payload,c->argv[1]->ptr)) == NULL))
((obj = rdbLoadObject(type,&payload,key->ptr)) == NULL))
{
addReplyError(c,"Bad data format");
return;
}
/* Remove the old key if needed. */
if (replace) dbDelete(c->db,c->argv[1]);
int deleted = 0;
if (replace)
deleted = dbDelete(c->db,key);
if (ttl && !absttl) ttl+=mstime();
if (ttl && checkAlreadyExpired(ttl)) {
if (deleted) {
rewriteClientCommandVector(c,2,shared.del,key);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"del",key,c->db->id);
server.dirty++;
}
addReply(c, shared.ok);
return;
}
/* Create the key and set the TTL if any */
dbAdd(c->db,c->argv[1],obj);
dbAdd(c->db,key,obj);
if (ttl) {
if (!absttl) ttl+=mstime();
setExpire(c,c->db,c->argv[1],ttl);
setExpire(c,c->db,key,ttl);
}
objectSetLRUOrLFU(obj,lfu_freq,lru_idle,lru_clock,1000);
signalModifiedKey(c,c->db,c->argv[1]);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",c->argv[1],c->db->id);
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_GENERIC,"restore",key,c->db->id);
addReply(c,shared.ok);
server.dirty++;
}

View File

@ -475,6 +475,16 @@ void flushSlaveKeysWithExpireList(void) {
}
}
int checkAlreadyExpired(long long when) {
/* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
* should never be executed as a DEL when load the AOF or in the context
* of a slave instance.
*
* Instead we add the already expired key to the database with expire time
* (possibly in the past) and wait for an explicit DEL from the master. */
return (when <= mstime() && !server.loading && !server.masterhost);
}
/*-----------------------------------------------------------------------------
* Expires Commands
*----------------------------------------------------------------------------*/
@ -502,13 +512,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
return;
}
/* EXPIRE with negative TTL, or EXPIREAT with a timestamp into the past
* should never be executed as a DEL when load the AOF or in the context
* of a slave instance.
*
* Instead we take the other branch of the IF statement setting an expire
* (possibly in the past) and wait for an explicit DEL from the master. */
if (when <= mstime() && !server.loading && !server.masterhost) {
if (checkAlreadyExpired(when)) {
robj *aux;
int deleted = server.lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :

View File

@ -2070,6 +2070,7 @@ void propagateExpire(redisDb *db, robj *key, int lazy);
int expireIfNeeded(redisDb *db, robj *key);
long long getExpire(redisDb *db, robj *key);
void setExpire(client *c, redisDb *db, robj *key, long long when);
int checkAlreadyExpired(long long when);
robj *lookupKey(redisDb *db, robj *key, int flags);
robj *lookupKeyRead(redisDb *db, robj *key);
robj *lookupKeyWrite(redisDb *db, robj *key);

View File

@ -37,6 +37,17 @@ start_server {tags {"dump"}} {
r get foo
} {bar}
test {RESTORE with ABSTTL in the past} {
r set foo bar
set encoded [r dump foo]
set now [clock milliseconds]
r debug set-active-expire 0
r restore foo [expr $now-3000] $encoded absttl REPLACE
catch {r debug object foo} e
r debug set-active-expire 1
set e
} {ERR no such key}
test {RESTORE can set LRU} {
r set foo bar
set encoded [r dump foo]