Freeze time sampling during command execution, and scripts (#10300)

Freeze time during execution of scripts and all other commands.
This means that a key is either expired or not, and doesn't change
state during a script execution. resolves #10182

This PR try to add a new `commandTimeSnapshot` function.
The function logic is extracted from `keyIsExpired`, but the related
calls to `fixed_time_expire` and `mstime()` are removed, see below.

In commands, we will avoid calling `mstime()` multiple times
and just use the one that sampled in call. The background is,
e.g. using `PEXPIRE 1` with valgrind sometimes result in the key
being deleted rather than expired. The reason is that both `PEXPIRE`
command and `checkAlreadyExpired` call `mstime()` separately.

There are other more important changes in this PR:
1. Eliminate `fixed_time_expire`, it is no longer needed. 
   When we want to sample time we should always use a time snapshot. 
   We will use `in_nested_call` instead to update the cached time in `call`.
2. Move the call for `updateCachedTime` from `serverCron` to `afterSleep`.
    Now `commandTimeSnapshot` will always return the sample time, the
    `lookupKeyReadWithFlags` call in `getNodeByQuery` will get a outdated
    cached time (because `processCommand` is out of the `call` context).
    We put the call to `updateCachedTime` in `aftersleep`.
3. Cache the time each time the module lock Redis.
    Call `updateCachedTime` in `moduleGILAfterLock`, affecting `RM_ThreadSafeContextLock`
    and `RM_ThreadSafeContextTryLock`

Currently the commandTimeSnapshot change affects the following TTL commands:
- SET EX / SET PX
- EXPIRE / PEXPIRE
- SETEX / PSETEX
- GETEX EX / GETEX PX
- TTL / PTTL
- EXPIRETIME / PEXPIRETIME
- RESTORE key TTL

And other commands just use the cached mstime (including TIME).

This is considered to be a breaking change since it can break a script
that uses a loop to wait for a key to expire.
This commit is contained in:
Binbin 2022-10-09 13:18:34 +08:00 committed by GitHub
parent d2ad01ab3e
commit 35b3fbd90c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 173 additions and 72 deletions

View File

@ -2358,7 +2358,7 @@ int ACLSaveToFile(const char *filename) {
/* Create a temp file with the new content. */
tmpfilename = sdsnew(filename);
tmpfilename = sdscatfmt(tmpfilename,".tmp-%i-%I",
(int) getpid(),mstime());
(int) getpid(),commandTimeSnapshot());
if ((fd = open(tmpfilename,O_WRONLY|O_CREAT,0644)) == -1) {
serverLog(LL_WARNING,"Opening temp ACL file for ACL SAVE: %s",
strerror(errno));
@ -2518,7 +2518,7 @@ void addACLLogEntry(client *c, int reason, int context, int argpos, sds username
le->count = 1;
le->reason = reason;
le->username = sdsdup(username ? username : c->user->name);
le->ctime = mstime();
le->ctime = commandTimeSnapshot();
if (object) {
le->object = object;
@ -2895,7 +2895,7 @@ void aclCommand(client *c) {
listIter li;
listNode *ln;
listRewind(ACLLog,&li);
mstime_t now = mstime();
mstime_t now = commandTimeSnapshot();
while (count-- && (ln = listNext(&li)) != NULL) {
ACLLogEntry *le = listNodeValue(ln);
addReplyMapLen(c,7);

View File

@ -642,14 +642,6 @@ void handleClientsBlockedOnKeys(void) {
* we can safely call signalKeyAsReady() against this key. */
dictDelete(rl->db->ready_keys,rl->key);
/* Even if we are not inside call(), increment the call depth
* in order to make sure that keys are expired against a fixed
* reference time, and not against the wallclock time. This
* way we can lookup an object multiple times (BLMOVE does
* that) without the risk of it being freed in the second
* lookup, invalidating the first one.
* See https://github.com/redis/redis/pull/6554. */
server.fixed_time_expire++;
updateCachedTime(0);
/* Serve clients blocked on the key. */
@ -681,7 +673,6 @@ void handleClientsBlockedOnKeys(void) {
if (server.also_propagate.numops > 0)
propagatePendingCommands();
}
server.fixed_time_expire--;
/* Free this item. */
decrRefCount(rl->key);

View File

@ -6124,7 +6124,7 @@ void restoreCommand(client *c) {
if (replace)
deleted = dbDelete(c->db,key);
if (ttl && !absttl) ttl+=mstime();
if (ttl && !absttl) ttl+=commandTimeSnapshot();
if (ttl && checkAlreadyExpired(ttl)) {
if (deleted) {
rewriteClientCommandVector(c,2,shared.del,key);
@ -6405,7 +6405,7 @@ try_again:
long long expireat = getExpire(c->db,kv[j]);
if (expireat != -1) {
ttl = expireat-mstime();
ttl = expireat-commandTimeSnapshot();
if (ttl < 0) {
continue;
}

View File

@ -1620,28 +1620,7 @@ int keyIsExpired(redisDb *db, robj *key) {
/* Don't expire anything while loading. It will be done later. */
if (server.loading) return 0;
/* If we are in the context of a Lua script, we pretend that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
if (server.script_caller) {
now = scriptTimeSnapshot();
}
/* If we are in the middle of a command execution, we still want to use
* a reference time that does not change: in that case we just use the
* cached time, that we update before each call in the call() function.
* This way we avoid that commands such as RPOPLPUSH or similar, that
* may re-open the same key multiple times, can invalidate an already
* open object in a next call, if the next call will see the key expired,
* while the first did not. */
else if (server.fixed_time_expire > 0) {
now = server.mstime;
}
/* For the other cases, we want to use the most fresh time we have. */
else {
now = mstime();
}
now = commandTimeSnapshot();
/* The key expired if the current (virtual or real) time is greater
* than the expire time of the key. */

View File

@ -488,7 +488,7 @@ int checkAlreadyExpired(long long when) {
*
* Instead we add the already expired key to the database with expire time
* (possibly in the past) and wait for an explicit DEL from the master. */
return (when <= mstime() && !server.loading && !server.masterhost);
return (when <= commandTimeSnapshot() && !server.loading && !server.masterhost);
}
#define EXPIRE_NX (1<<0)
@ -665,7 +665,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
/* EXPIRE key seconds [ NX | XX | GT | LT] */
void expireCommand(client *c) {
expireGenericCommand(c,mstime(),UNIT_SECONDS);
expireGenericCommand(c,commandTimeSnapshot(),UNIT_SECONDS);
}
/* EXPIREAT key unix-time-seconds [ NX | XX | GT | LT] */
@ -675,7 +675,7 @@ void expireatCommand(client *c) {
/* PEXPIRE key milliseconds [ NX | XX | GT | LT] */
void pexpireCommand(client *c) {
expireGenericCommand(c,mstime(),UNIT_MILLISECONDS);
expireGenericCommand(c,commandTimeSnapshot(),UNIT_MILLISECONDS);
}
/* PEXPIREAT key unix-time-milliseconds [ NX | XX | GT | LT] */
@ -697,7 +697,7 @@ void ttlGenericCommand(client *c, int output_ms, int output_abs) {
* TTL value otherwise. */
expire = getExpire(c->db,c->argv[1]);
if (expire != -1) {
ttl = output_abs ? expire : expire-mstime();
ttl = output_abs ? expire : expire-commandTimeSnapshot();
if (ttl < 0) ttl = 0;
}
if (ttl == -1) {

View File

@ -3838,7 +3838,7 @@ mstime_t RM_GetExpire(RedisModuleKey *key) {
mstime_t expire = getExpire(key->db,key->key);
if (expire == -1 || key->value == NULL)
return REDISMODULE_NO_EXPIRE;
expire -= mstime();
expire -= commandTimeSnapshot();
return expire >= 0 ? expire : 0;
}
@ -3855,7 +3855,7 @@ int RM_SetExpire(RedisModuleKey *key, mstime_t expire) {
if (!(key->mode & REDISMODULE_WRITE) || key->value == NULL || (expire < 0 && expire != REDISMODULE_NO_EXPIRE))
return REDISMODULE_ERR;
if (expire != REDISMODULE_NO_EXPIRE) {
expire += mstime();
expire += commandTimeSnapshot();
setExpire(key->ctx->client,key->db,key->key,expire);
} else {
removeExpire(key->db,key->key);
@ -7752,6 +7752,7 @@ void moduleGILAfterLock() {
/* Bump up the nesting level to prevent immediate propagation
* of possible RM_Call from th thread */
server.module_ctx_nesting++;
updateCachedTime(0);
}
/* Acquire the server lock before executing a thread safe API call.

View File

@ -4006,7 +4006,7 @@ void failoverCommand(client *c) {
serverLog(LL_NOTICE,"FAILOVER requested to any replica.");
}
mstime_t now = mstime();
mstime_t now = commandTimeSnapshot();
if (timeout_in_ms) {
server.failover_end_time = now + timeout_in_ms;
}

View File

@ -201,6 +201,32 @@ mstime_t mstime(void) {
return ustime()/1000;
}
/* Return the command time snapshot in milliseconds.
* The time the command started is the logical time it runs,
* and all the time readings during the execution time should
* reflect the same time.
* More details can be found in the comments below. */
mstime_t commandTimeSnapshot(void) {
/* If we are in the context of a Lua script, we pretend that time is
* blocked to when the Lua script started. This way a key can expire
* only the first time it is accessed and not in the middle of the
* script execution, making propagation to slaves / AOF consistent.
* See issue #1525 on Github for more information. */
if (server.script_caller) {
return scriptTimeSnapshot();
}
/* If we are in the middle of a command execution, we still want to use
* a reference time that does not change: in that case we just use the
* cached time, that we update before each call in the call() function.
* This way we avoid that commands such as RPOPLPUSH or similar, that
* may re-open the same key multiple times, can invalidate an already
* open object in a next call, if the next call will see the key expired,
* while the first did not. */
else {
return server.mstime;
}
}
/* After an RDB dump or AOF rewrite we exit from children using _exit() instead of
* exit(), because the latter may interact with the same file objects used by
* the parent process. However if we are testing the coverage normal exit() is
@ -1171,9 +1197,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* handler if we don't return here fast enough. */
if (server.watchdog_period) watchdogScheduleSignal(server.watchdog_period);
/* Update the time cache. */
updateCachedTime(1);
server.hz = server.config_hz;
/* Adapt the server.hz value to the number of configured clients. If we have
* many clients, we want to call serverCron() with an higher frequency. */
@ -1656,6 +1679,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
void afterSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
/* Update the time cache. */
updateCachedTime(1);
/* Do NOT add anything above moduleAcquireGIL !!! */
/* Acquire the modules GIL so that their threads won't touch anything. */
@ -2408,7 +2434,6 @@ void initServer(void) {
server.main_thread_id = pthread_self();
server.current_client = NULL;
server.errors = raxNew();
server.fixed_time_expire = 0;
server.in_nested_call = 0;
server.clients = listCreate();
server.clients_index = raxNew();
@ -3333,7 +3358,7 @@ void call(client *c, int flags) {
/* Update cache time, in case we have nested calls we want to
* update only on the first call*/
if (server.fixed_time_expire++ == 0) {
if (server.in_nested_call++ == 0) {
updateCachedTimeWithUs(0,call_timer);
}
@ -3341,7 +3366,6 @@ void call(client *c, int flags) {
if (monotonicGetType() == MONOTONIC_CLOCK_HW)
monotonic_start = getMonotonicUs();
server.in_nested_call++;
c->cmd->proc(c);
server.in_nested_call--;
@ -3487,7 +3511,6 @@ void call(client *c, int flags) {
}
}
server.fixed_time_expire--;
server.stat_numcommands++;
/* Record peak memory after each command and before the eviction that runs
@ -4348,14 +4371,9 @@ void echoCommand(client *c) {
}
void timeCommand(client *c) {
struct timeval tv;
/* gettimeofday() can only fail if &tv is a bad address so we
* don't check for errors. */
gettimeofday(&tv,NULL);
addReplyArrayLen(c,2);
addReplyBulkLongLong(c,tv.tv_sec);
addReplyBulkLongLong(c,tv.tv_usec);
addReplyBulkLongLong(c, server.unixtime);
addReplyBulkLongLong(c, server.ustime-server.unixtime*1000000);
}
typedef struct replyFlagNames {
@ -5368,7 +5386,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
if (isShutdownInitiated()) {
info = sdscatfmt(info,
"shutdown_in_milliseconds:%I\r\n",
(int64_t)(server.shutdown_mstime - server.mstime));
(int64_t)(server.shutdown_mstime - commandTimeSnapshot()));
}
/* get all the listeners information */

View File

@ -1520,7 +1520,6 @@ struct redisServer {
clientMemUsageBucket client_mem_usage_buckets[CLIENT_MEM_USAGE_BUCKETS];
rax *clients_timeout_table; /* Radix tree for blocked clients timeouts. */
long fixed_time_expire; /* If > 0, expire keys against server.mstime. */
int in_nested_call; /* If > 0, in a nested call of a call */
rax *clients_index; /* Active clients dictionary by client ID. */
pause_type client_pause_type; /* True if clients are currently paused */
@ -2420,7 +2419,8 @@ int moduleIsModuleCommand(void *module_handle, struct redisCommand *cmd);
/* Utils */
long long ustime(void);
long long mstime(void);
mstime_t mstime(void);
mstime_t commandTimeSnapshot(void);
void getRandomHexChars(char *p, size_t len);
void getRandomBytes(unsigned char *p, size_t len);
uint64_t crc64(uint64_t crc, const unsigned char *s, uint64_t l);

View File

@ -138,7 +138,7 @@ int streamDecrID(streamID *id) {
* as time part and start with sequence part of zero. Otherwise we use the
* previous time (and never go backward) and increment the sequence. */
void streamNextID(streamID *last_id, streamID *new_id) {
uint64_t ms = mstime();
uint64_t ms = commandTimeSnapshot();
if (ms > last_id->ms) {
new_id->ms = ms;
new_id->seq = 0;
@ -1760,7 +1760,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end
raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL);
/* Update the consumer and NACK metadata. */
nack->consumer = consumer;
nack->delivery_time = mstime();
nack->delivery_time = commandTimeSnapshot();
nack->delivery_count = 1;
/* Add the entry in the new consumer local PEL. */
raxInsert(consumer->pel,buf,sizeof(buf),nack,NULL);
@ -1828,7 +1828,7 @@ size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start
addReplyNullArray(c);
} else {
streamNACK *nack = ri.data;
nack->delivery_time = mstime();
nack->delivery_time = commandTimeSnapshot();
nack->delivery_count++;
}
arraylen++;
@ -2450,7 +2450,7 @@ cleanup: /* Cleanup. */
* specified as argument of the function. */
streamNACK *streamCreateNACK(streamConsumer *consumer) {
streamNACK *nack = zmalloc(sizeof(*nack));
nack->delivery_time = mstime();
nack->delivery_time = commandTimeSnapshot();
nack->delivery_count = 1;
nack->consumer = consumer;
return nack;
@ -2524,7 +2524,7 @@ streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid
}
consumer->name = sdsdup(name);
consumer->pel = raxNew();
consumer->seen_time = mstime();
consumer->seen_time = commandTimeSnapshot();
if (dirty) server.dirty++;
if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid);
return consumer;
@ -2538,7 +2538,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) return NULL;
if (refresh) consumer->seen_time = mstime();
if (refresh) consumer->seen_time = commandTimeSnapshot();
return consumer;
}
@ -3002,7 +3002,7 @@ void xpendingCommand(client *c) {
unsigned char startkey[sizeof(streamID)];
unsigned char endkey[sizeof(streamID)];
raxIterator ri;
mstime_t now = mstime();
mstime_t now = commandTimeSnapshot();
streamEncodeID(startkey,&startid);
streamEncodeID(endkey,&endid);
@ -3156,7 +3156,7 @@ void xclaimCommand(client *c) {
/* If we stopped because some IDs cannot be parsed, perhaps they
* are trailing options. */
mstime_t now = mstime();
mstime_t now = commandTimeSnapshot();
streamID last_id = {0,0};
int propagate_last_id = 0;
for (; j < c->argc; j++) {
@ -3405,7 +3405,7 @@ void xautoclaimCommand(client *c) {
raxStart(&ri,group->pel);
raxSeek(&ri,">=",startkey,sizeof(startkey));
size_t arraylen = 0;
mstime_t now = mstime();
mstime_t now = commandTimeSnapshot();
sds name = c->argv[3]->ptr;
int deleted_id_num = 0;
while (attempts-- && count && raxNext(&ri)) {
@ -3876,7 +3876,7 @@ NULL
raxIterator ri;
raxStart(&ri,cg->consumers);
raxSeek(&ri,"^",NULL,0);
mstime_t now = mstime();
mstime_t now = commandTimeSnapshot();
while(raxNext(&ri)) {
streamConsumer *consumer = ri.data;
mstime_t idle = now - consumer->seen_time;

View File

@ -168,7 +168,7 @@ static int getExpireMillisecondsOrReply(client *c, robj *expire, int flags, int
if (unit == UNIT_SECONDS) *milliseconds *= 1000;
if ((flags & OBJ_PX) || (flags & OBJ_EX)) {
*milliseconds += mstime();
*milliseconds += commandTimeSnapshot();
}
if (*milliseconds <= 0) {

View File

@ -164,7 +164,7 @@ void handleBlockedClientsTimeout(void) {
int getTimeoutFromObjectOrReply(client *c, robj *object, mstime_t *timeout, int unit) {
long long tval;
long double ftval;
mstime_t now = mstime();
mstime_t now = commandTimeSnapshot();
if (unit == UNIT_SECONDS) {
if (getLongDoubleFromObjectOrReply(c,object,&ftval,

View File

@ -1376,6 +1376,118 @@ start_server {tags {"scripting needs:debug"}} {
r DEBUG set-active-expire 1
}
test "TIME command using cached time" {
set res [run_script {
local result1 = {redis.call("TIME")}
redis.call("DEBUG", "SLEEP", 0.01)
local result2 = {redis.call("TIME")}
return {result1, result2}
} 0]
assert_equal [lindex $res 0] [lindex $res 1]
}
test "Script block the time in some expiration related commands" {
# The test uses different commands to set the "same" expiration time for different keys,
# and interspersed with "DEBUG SLEEP", to verify that time is frozen in script.
# The commands involved are [P]TTL / SET EX[PX] / [P]EXPIRE / GETEX / [P]SETEX / [P]EXPIRETIME
set res [run_script {
redis.call("SET", "key1{t}", "value", "EX", 1)
redis.call("DEBUG", "SLEEP", 0.01)
redis.call("SET", "key2{t}", "value", "PX", 1000)
redis.call("DEBUG", "SLEEP", 0.01)
redis.call("SET", "key3{t}", "value")
redis.call("EXPIRE", "key3{t}", 1)
redis.call("DEBUG", "SLEEP", 0.01)
redis.call("SET", "key4{t}", "value")
redis.call("PEXPIRE", "key4{t}", 1000)
redis.call("DEBUG", "SLEEP", 0.01)
redis.call("SETEX", "key5{t}", 1, "value")
redis.call("DEBUG", "SLEEP", 0.01)
redis.call("PSETEX", "key6{t}", 1000, "value")
redis.call("DEBUG", "SLEEP", 0.01)
redis.call("SET", "key7{t}", "value")
redis.call("GETEX", "key7{t}", "EX", 1)
redis.call("DEBUG", "SLEEP", 0.01)
redis.call("SET", "key8{t}", "value")
redis.call("GETEX", "key8{t}", "PX", 1000)
redis.call("DEBUG", "SLEEP", 0.01)
local ttl_results = {redis.call("TTL", "key1{t}"),
redis.call("TTL", "key2{t}"),
redis.call("TTL", "key3{t}"),
redis.call("TTL", "key4{t}"),
redis.call("TTL", "key5{t}"),
redis.call("TTL", "key6{t}"),
redis.call("TTL", "key7{t}"),
redis.call("TTL", "key8{t}")}
local pttl_results = {redis.call("PTTL", "key1{t}"),
redis.call("PTTL", "key2{t}"),
redis.call("PTTL", "key3{t}"),
redis.call("PTTL", "key4{t}"),
redis.call("PTTL", "key5{t}"),
redis.call("PTTL", "key6{t}"),
redis.call("PTTL", "key7{t}"),
redis.call("PTTL", "key8{t}")}
local expiretime_results = {redis.call("EXPIRETIME", "key1{t}"),
redis.call("EXPIRETIME", "key2{t}"),
redis.call("EXPIRETIME", "key3{t}"),
redis.call("EXPIRETIME", "key4{t}"),
redis.call("EXPIRETIME", "key5{t}"),
redis.call("EXPIRETIME", "key6{t}"),
redis.call("EXPIRETIME", "key7{t}"),
redis.call("EXPIRETIME", "key8{t}")}
local pexpiretime_results = {redis.call("PEXPIRETIME", "key1{t}"),
redis.call("PEXPIRETIME", "key2{t}"),
redis.call("PEXPIRETIME", "key3{t}"),
redis.call("PEXPIRETIME", "key4{t}"),
redis.call("PEXPIRETIME", "key5{t}"),
redis.call("PEXPIRETIME", "key6{t}"),
redis.call("PEXPIRETIME", "key7{t}"),
redis.call("PEXPIRETIME", "key8{t}")}
return {ttl_results, pttl_results, expiretime_results, pexpiretime_results}
} 8 key1{t} key2{t} key3{t} key4{t} key5{t} key6{t} key7{t} key8{t}]
# The elements in each list are equal.
assert_equal 1 [llength [lsort -unique [lindex $res 0]]]
assert_equal 1 [llength [lsort -unique [lindex $res 1]]]
assert_equal 1 [llength [lsort -unique [lindex $res 2]]]
assert_equal 1 [llength [lsort -unique [lindex $res 3]]]
# Then we check that the expiration time is set successfully.
assert_morethan [lindex $res 0] 0
assert_morethan [lindex $res 1] 0
assert_morethan [lindex $res 2] 0
assert_morethan [lindex $res 3] 0
}
test "RESTORE expired keys with expiration time" {
set res [run_script {
redis.call("SET", "key1{t}", "value")
local encoded = redis.call("DUMP", "key1{t}")
redis.call("RESTORE", "key2{t}", 1, encoded, "REPLACE")
redis.call("DEBUG", "SLEEP", 0.01)
redis.call("RESTORE", "key3{t}", 1, encoded, "REPLACE")
return {redis.call("PEXPIRETIME", "key2{t}"), redis.call("PEXPIRETIME", "key3{t}")}
} 3 key1{t} key2{t} key3{t}]
# Can get the expiration time and they are all equal.
assert_morethan [lindex $res 0] 0
assert_equal [lindex $res 0] [lindex $res 1]
}
r debug set-disable-deny-scripts 0
}
} ;# foreach is_eval