From 9ec3294b97211fca206ade1c2d99ccfc762f5a73 Mon Sep 17 00:00:00 2001 From: Wang Yuan Date: Mon, 25 Oct 2021 18:08:34 +0800 Subject: [PATCH] Add timestamp annotations in AOF (#9326) Add timestamp annotation in AOF, one part of #9325. Enabled with the new `aof-timestamp-enabled` config option. Timestamp annotation format is "#TS:${timestamp}\r\n"." TS" is short of timestamp and this method could save extra bytes in AOF. We can use timestamp annotation for some special functions. - know the executing time of commands - restore data to a specific point-in-time (by using redis-check-rdb to truncate the file) --- redis.conf | 5 +++ src/aof.c | 39 ++++++++++++++++++- src/config.c | 1 + src/redis-check-aof.c | 79 ++++++++++++++++++++++++++++++++++----- src/server.c | 1 + src/server.h | 3 ++ tests/integration/aof.tcl | 67 +++++++++++++++++++++++++++++++++ 7 files changed, 185 insertions(+), 10 deletions(-) diff --git a/redis.conf b/redis.conf index 368cf3a73..bcb5fa022 100644 --- a/redis.conf +++ b/redis.conf @@ -1412,6 +1412,11 @@ aof-load-truncated yes # tail. aof-use-rdb-preamble yes +# Redis supports recording timestamp annotations in the AOF to support restoring +# the data from a specific point-in-time. However, using this capability changes +# the AOF format in a way that may not be compatible with existing AOF parsers. +aof-timestamp-enabled no + ################################ LUA SCRIPTING ############################### # Max execution time of a Lua script in milliseconds. diff --git a/src/aof.c b/src/aof.c index 0090c9472..3621167e1 100644 --- a/src/aof.c +++ b/src/aof.c @@ -600,8 +600,37 @@ sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) { return dst; } +/* Generate a piece of timestamp annotation for AOF if current record timestamp + * in AOF is not equal server unix time. If we specify 'force' argument to 1, + * we would generate one without check, currently, it is useful in AOF rewriting + * child process which always needs to record one timestamp at the beginning of + * rewriting AOF. + * + * Timestamp annotation format is "#TS:${timestamp}\r\n". "TS" is short of + * timestamp and this method could save extra bytes in AOF. */ +sds genAofTimestampAnnotationIfNeeded(int force) { + sds ts = NULL; + + if (force || server.aof_cur_timestamp < server.unixtime) { + server.aof_cur_timestamp = force ? time(NULL) : server.unixtime; + ts = sdscatfmt(sdsempty(), "#TS:%I\r\n", server.aof_cur_timestamp); + serverAssert(sdslen(ts) <= AOF_ANNOTATION_LINE_MAX_LEN); + } + return ts; +} + void feedAppendOnlyFile(int dictid, robj **argv, int argc) { sds buf = sdsempty(); + + /* Feed timestamp if needed */ + if (server.aof_timestamp_enabled) { + sds ts = genAofTimestampAnnotationIfNeeded(0); + if (ts != NULL) { + buf = sdscatsds(buf, ts); + sdsfree(ts); + } + } + /* The DB this command was targeting is not the same as the last command * we appended. To issue a SELECT command is needed. */ if (dictid != server.aof_selected_db) { @@ -732,7 +761,7 @@ int loadAppendOnlyFile(char *filename) { int argc, j; unsigned long len; robj **argv; - char buf[128]; + char buf[AOF_ANNOTATION_LINE_MAX_LEN]; sds argsds; struct redisCommand *cmd; @@ -749,6 +778,7 @@ int loadAppendOnlyFile(char *filename) { else goto readerr; } + if (buf[0] == '#') continue; /* Skip annotations */ if (buf[0] != '*') goto fmterr; if (buf[1] == '\0') goto readerr; argc = atoi(buf+1); @@ -1383,6 +1413,13 @@ int rewriteAppendOnlyFileRio(rio *aof) { long key_count = 0; long long updated_time = 0; + /* Record timestamp at the beginning of rewriting AOF. */ + if (server.aof_timestamp_enabled) { + sds ts = genAofTimestampAnnotationIfNeeded(1); + if (rioWrite(aof,ts,sdslen(ts)) == 0) { sdsfree(ts); goto werr; } + sdsfree(ts); + } + for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; redisDb *db = server.db+j; diff --git a/src/config.c b/src/config.c index 909929147..1b31ed944 100644 --- a/src/config.c +++ b/src/config.c @@ -2579,6 +2579,7 @@ standardConfig configs[] = { createBoolConfig("rdb-save-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.rdb_save_incremental_fsync, 1, NULL, NULL), createBoolConfig("aof-load-truncated", NULL, MODIFIABLE_CONFIG, server.aof_load_truncated, 1, NULL, NULL), createBoolConfig("aof-use-rdb-preamble", NULL, MODIFIABLE_CONFIG, server.aof_use_rdb_preamble, 1, NULL, NULL), + createBoolConfig("aof-timestamp-enabled", NULL, MODIFIABLE_CONFIG, server.aof_timestamp_enabled, 0, NULL, NULL), createBoolConfig("cluster-replica-no-failover", "cluster-slave-no-failover", MODIFIABLE_CONFIG, server.cluster_slave_no_failover, 0, NULL, NULL), /* Failover by default. */ createBoolConfig("replica-lazy-flush", "slave-lazy-flush", MODIFIABLE_CONFIG, server.repl_slave_lazy_flush, 0, NULL, NULL), createBoolConfig("replica-serve-stale-data", "slave-serve-stale-data", MODIFIABLE_CONFIG, server.repl_serve_stale_data, 1, NULL, NULL), diff --git a/src/redis-check-aof.c b/src/redis-check-aof.c index 1507e0a06..8cbe84896 100644 --- a/src/redis-check-aof.c +++ b/src/redis-check-aof.c @@ -40,6 +40,7 @@ static char error[1044]; static off_t epos; static long long line = 1; +static time_t to_timestamp = 0; int consumeNewline(char *buf) { if (strncmp(buf,"\r\n",2) != 0) { @@ -50,6 +51,47 @@ int consumeNewline(char *buf) { return 1; } +int readAnnotations(FILE *fp) { + char buf[AOF_ANNOTATION_LINE_MAX_LEN]; + while (1) { + epos = ftello(fp); + if (fgets(buf, sizeof(buf), fp) == NULL) { + return 0; + } + if (buf[0] == '#') { + if (to_timestamp && strncmp(buf, "#TS:", 4) == 0) { + time_t ts = strtol(buf+4, NULL, 10); + if (ts <= to_timestamp) continue; + if (epos == 0) { + printf("AOF has nothing before timestamp %ld, " + "aborting...\n", to_timestamp); + fclose(fp); + exit(1); + } + /* Truncate remaining AOF if exceeding 'to_timestamp' */ + if (ftruncate(fileno(fp), epos) == -1) { + printf("Failed to truncate AOF to timestamp %ld\n", + to_timestamp); + exit(1); + } else { + printf("Successfully truncated AOF to timestamp %ld\n", + to_timestamp); + fclose(fp); + exit(0); + } + } + continue; + } else { + if (fseek(fp, -(ftello(fp)-epos), SEEK_CUR) == -1) { + ERROR("Fseek error: %s", strerror(errno)); + return 0; + } + return 1; + } + } + return 1; +} + int readLong(FILE *fp, char prefix, long *target) { char buf[128], *eptr; epos = ftello(fp); @@ -107,6 +149,7 @@ off_t process(FILE *fp) { while(1) { if (!multi) pos = ftello(fp); + if (!readAnnotations(fp)) break; if (!readArgc(fp, &argc)) break; for (i = 0; i < argc; i++) { @@ -148,20 +191,25 @@ int redis_check_aof_main(int argc, char **argv) { int fix = 0; if (argc < 2) { - printf("Usage: %s [--fix] \n", argv[0]); - exit(1); + goto invalid_args; } else if (argc == 2) { filename = argv[1]; } else if (argc == 3) { - if (strcmp(argv[1],"--fix") != 0) { - printf("Invalid argument: %s\n", argv[1]); - exit(1); + if (!strcmp(argv[1],"--fix")) { + filename = argv[2]; + fix = 1; + } else { + goto invalid_args; + } + } else if (argc == 4) { + if (!strcmp(argv[1], "--truncate-to-timestamp")) { + to_timestamp = strtol(argv[2],NULL,10); + filename = argv[3]; + } else { + goto invalid_args; } - filename = argv[2]; - fix = 1; } else { - printf("Invalid arguments\n"); - exit(1); + goto invalid_args; } FILE *fp = fopen(filename,"r+"); @@ -203,6 +251,14 @@ int redis_check_aof_main(int argc, char **argv) { off_t pos = process(fp); off_t diff = size-pos; + + /* In truncate-to-timestamp mode, just exit if there is nothing to truncate. */ + if (diff == 0 && to_timestamp) { + printf("Truncate nothing in AOF to timestamp %ld\n", to_timestamp); + fclose(fp); + exit(0); + } + printf("AOF analyzed: size=%lld, ok_up_to=%lld, ok_up_to_line=%lld, diff=%lld\n", (long long) size, (long long) pos, line, (long long) diff); if (diff > 0) { @@ -232,4 +288,9 @@ int redis_check_aof_main(int argc, char **argv) { fclose(fp); exit(0); + +invalid_args: + printf("Usage: %s [--fix|--truncate-to-timestamp $timestamp] \n", + argv[0]); + exit(1); } diff --git a/src/server.c b/src/server.c index c07154ca2..bf779a19b 100644 --- a/src/server.c +++ b/src/server.c @@ -3678,6 +3678,7 @@ void initServerConfig(void) { server.aof_rewrite_scheduled = 0; server.aof_flush_sleep = 0; server.aof_last_fsync = time(NULL); + server.aof_cur_timestamp = 0; atomicSet(server.aof_bio_fsync_status,C_OK); server.aof_rewrite_time_last = -1; server.aof_rewrite_time_start = -1; diff --git a/src/server.h b/src/server.h index 569702c52..9619e10a6 100644 --- a/src/server.h +++ b/src/server.h @@ -106,6 +106,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define LOG_MAX_LEN 1024 /* Default maximum length of syslog messages.*/ #define AOF_REWRITE_ITEMS_PER_CMD 64 #define AOF_READ_DIFF_INTERVAL_BYTES (1024*10) +#define AOF_ANNOTATION_LINE_MAX_LEN 1024 #define CONFIG_AUTHPASS_MAX_LEN 512 #define CONFIG_RUN_ID_SIZE 40 #define RDB_EOF_MARK_SIZE 40 @@ -1504,6 +1505,8 @@ struct redisServer { time_t aof_last_fsync; /* UNIX time of last fsync() */ time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */ time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */ + time_t aof_cur_timestamp; /* Current record timestamp in AOF */ + int aof_timestamp_enabled; /* Enable record timestamp in AOF */ int aof_lastbgrewrite_status; /* C_OK or C_ERR */ unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */ int aof_rewrite_incremental_fsync;/* fsync incrementally while aof rewriting? */ diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index a803eab51..e697bbed7 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -414,4 +414,71 @@ tags {"aof external:skip"} { assert_equal 2 [$client zcard myzset3] } } + + test {Generate timestamp annotations in AOF} { + start_server {overrides {appendonly {yes} appendfilename {appendonly.aof}}} { + r config set aof-timestamp-enabled yes + r config set aof-use-rdb-preamble no + set aof [file join [lindex [r config get dir] 1] appendonly.aof] + + r set foo bar + assert_match "#TS:*" [exec head -n 1 $aof] + + r bgrewriteaof + waitForBgrewriteaof r + assert_match "#TS:*" [exec head -n 1 $aof] + } + } + + # redis could load AOF which has timestamp annotations inside + create_aof { + append_to_aof "#TS:1628217470\r\n" + append_to_aof [formatCommand set foo1 bar1] + append_to_aof "#TS:1628217471\r\n" + append_to_aof [formatCommand set foo2 bar2] + append_to_aof "#TS:1628217472\r\n" + append_to_aof "#TS:1628217473\r\n" + append_to_aof [formatCommand set foo3 bar3] + append_to_aof "#TS:1628217474\r\n" + } + start_server_aof [list dir $server_path] { + test {Successfully load AOF which has timestamp annotations inside} { + set c [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + assert_equal "bar1" [$c get foo1] + assert_equal "bar2" [$c get foo2] + assert_equal "bar3" [$c get foo3] + } + } + + test {Truncate AOF to specific timestamp} { + # truncate to timestamp 1628217473 + exec src/redis-check-aof --truncate-to-timestamp 1628217473 $aof_path + start_server_aof [list dir $server_path] { + set c [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + assert_equal "bar1" [$c get foo1] + assert_equal "bar2" [$c get foo2] + assert_equal "bar3" [$c get foo3] + } + + # truncate to timestamp 1628217471 + exec src/redis-check-aof --truncate-to-timestamp 1628217471 $aof_path + start_server_aof [list dir $server_path] { + set c [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + assert_equal "bar1" [$c get foo1] + assert_equal "bar2" [$c get foo2] + assert_equal "" [$c get foo3] + } + + # truncate to timestamp 1628217470 + exec src/redis-check-aof --truncate-to-timestamp 1628217470 $aof_path + start_server_aof [list dir $server_path] { + set c [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + assert_equal "bar1" [$c get foo1] + assert_equal "" [$c get foo2] + } + + # truncate to timestamp 1628217469 + catch {exec src/redis-check-aof --truncate-to-timestamp 1628217469 $aof_path} e + assert_match {*aborting*} $e + } }