Add timestamp annotations in AOF (#9326)
Add timestamp annotation in AOF, one part of #9325. Enabled with the new `aof-timestamp-enabled` config option. Timestamp annotation format is "#TS:${timestamp}\r\n"." TS" is short of timestamp and this method could save extra bytes in AOF. We can use timestamp annotation for some special functions. - know the executing time of commands - restore data to a specific point-in-time (by using redis-check-rdb to truncate the file)
This commit is contained in:
parent
085615af97
commit
9ec3294b97
@ -1412,6 +1412,11 @@ aof-load-truncated yes
|
||||
# tail.
|
||||
aof-use-rdb-preamble yes
|
||||
|
||||
# Redis supports recording timestamp annotations in the AOF to support restoring
|
||||
# the data from a specific point-in-time. However, using this capability changes
|
||||
# the AOF format in a way that may not be compatible with existing AOF parsers.
|
||||
aof-timestamp-enabled no
|
||||
|
||||
################################ LUA SCRIPTING ###############################
|
||||
|
||||
# Max execution time of a Lua script in milliseconds.
|
||||
|
39
src/aof.c
39
src/aof.c
@ -600,8 +600,37 @@ sds catAppendOnlyGenericCommand(sds dst, int argc, robj **argv) {
|
||||
return dst;
|
||||
}
|
||||
|
||||
/* Generate a piece of timestamp annotation for AOF if current record timestamp
|
||||
* in AOF is not equal server unix time. If we specify 'force' argument to 1,
|
||||
* we would generate one without check, currently, it is useful in AOF rewriting
|
||||
* child process which always needs to record one timestamp at the beginning of
|
||||
* rewriting AOF.
|
||||
*
|
||||
* Timestamp annotation format is "#TS:${timestamp}\r\n". "TS" is short of
|
||||
* timestamp and this method could save extra bytes in AOF. */
|
||||
sds genAofTimestampAnnotationIfNeeded(int force) {
|
||||
sds ts = NULL;
|
||||
|
||||
if (force || server.aof_cur_timestamp < server.unixtime) {
|
||||
server.aof_cur_timestamp = force ? time(NULL) : server.unixtime;
|
||||
ts = sdscatfmt(sdsempty(), "#TS:%I\r\n", server.aof_cur_timestamp);
|
||||
serverAssert(sdslen(ts) <= AOF_ANNOTATION_LINE_MAX_LEN);
|
||||
}
|
||||
return ts;
|
||||
}
|
||||
|
||||
void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
|
||||
sds buf = sdsempty();
|
||||
|
||||
/* Feed timestamp if needed */
|
||||
if (server.aof_timestamp_enabled) {
|
||||
sds ts = genAofTimestampAnnotationIfNeeded(0);
|
||||
if (ts != NULL) {
|
||||
buf = sdscatsds(buf, ts);
|
||||
sdsfree(ts);
|
||||
}
|
||||
}
|
||||
|
||||
/* The DB this command was targeting is not the same as the last command
|
||||
* we appended. To issue a SELECT command is needed. */
|
||||
if (dictid != server.aof_selected_db) {
|
||||
@ -732,7 +761,7 @@ int loadAppendOnlyFile(char *filename) {
|
||||
int argc, j;
|
||||
unsigned long len;
|
||||
robj **argv;
|
||||
char buf[128];
|
||||
char buf[AOF_ANNOTATION_LINE_MAX_LEN];
|
||||
sds argsds;
|
||||
struct redisCommand *cmd;
|
||||
|
||||
@ -749,6 +778,7 @@ int loadAppendOnlyFile(char *filename) {
|
||||
else
|
||||
goto readerr;
|
||||
}
|
||||
if (buf[0] == '#') continue; /* Skip annotations */
|
||||
if (buf[0] != '*') goto fmterr;
|
||||
if (buf[1] == '\0') goto readerr;
|
||||
argc = atoi(buf+1);
|
||||
@ -1383,6 +1413,13 @@ int rewriteAppendOnlyFileRio(rio *aof) {
|
||||
long key_count = 0;
|
||||
long long updated_time = 0;
|
||||
|
||||
/* Record timestamp at the beginning of rewriting AOF. */
|
||||
if (server.aof_timestamp_enabled) {
|
||||
sds ts = genAofTimestampAnnotationIfNeeded(1);
|
||||
if (rioWrite(aof,ts,sdslen(ts)) == 0) { sdsfree(ts); goto werr; }
|
||||
sdsfree(ts);
|
||||
}
|
||||
|
||||
for (j = 0; j < server.dbnum; j++) {
|
||||
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
|
||||
redisDb *db = server.db+j;
|
||||
|
@ -2579,6 +2579,7 @@ standardConfig configs[] = {
|
||||
createBoolConfig("rdb-save-incremental-fsync", NULL, MODIFIABLE_CONFIG, server.rdb_save_incremental_fsync, 1, NULL, NULL),
|
||||
createBoolConfig("aof-load-truncated", NULL, MODIFIABLE_CONFIG, server.aof_load_truncated, 1, NULL, NULL),
|
||||
createBoolConfig("aof-use-rdb-preamble", NULL, MODIFIABLE_CONFIG, server.aof_use_rdb_preamble, 1, NULL, NULL),
|
||||
createBoolConfig("aof-timestamp-enabled", NULL, MODIFIABLE_CONFIG, server.aof_timestamp_enabled, 0, NULL, NULL),
|
||||
createBoolConfig("cluster-replica-no-failover", "cluster-slave-no-failover", MODIFIABLE_CONFIG, server.cluster_slave_no_failover, 0, NULL, NULL), /* Failover by default. */
|
||||
createBoolConfig("replica-lazy-flush", "slave-lazy-flush", MODIFIABLE_CONFIG, server.repl_slave_lazy_flush, 0, NULL, NULL),
|
||||
createBoolConfig("replica-serve-stale-data", "slave-serve-stale-data", MODIFIABLE_CONFIG, server.repl_serve_stale_data, 1, NULL, NULL),
|
||||
|
@ -40,6 +40,7 @@
|
||||
static char error[1044];
|
||||
static off_t epos;
|
||||
static long long line = 1;
|
||||
static time_t to_timestamp = 0;
|
||||
|
||||
int consumeNewline(char *buf) {
|
||||
if (strncmp(buf,"\r\n",2) != 0) {
|
||||
@ -50,6 +51,47 @@ int consumeNewline(char *buf) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
int readAnnotations(FILE *fp) {
|
||||
char buf[AOF_ANNOTATION_LINE_MAX_LEN];
|
||||
while (1) {
|
||||
epos = ftello(fp);
|
||||
if (fgets(buf, sizeof(buf), fp) == NULL) {
|
||||
return 0;
|
||||
}
|
||||
if (buf[0] == '#') {
|
||||
if (to_timestamp && strncmp(buf, "#TS:", 4) == 0) {
|
||||
time_t ts = strtol(buf+4, NULL, 10);
|
||||
if (ts <= to_timestamp) continue;
|
||||
if (epos == 0) {
|
||||
printf("AOF has nothing before timestamp %ld, "
|
||||
"aborting...\n", to_timestamp);
|
||||
fclose(fp);
|
||||
exit(1);
|
||||
}
|
||||
/* Truncate remaining AOF if exceeding 'to_timestamp' */
|
||||
if (ftruncate(fileno(fp), epos) == -1) {
|
||||
printf("Failed to truncate AOF to timestamp %ld\n",
|
||||
to_timestamp);
|
||||
exit(1);
|
||||
} else {
|
||||
printf("Successfully truncated AOF to timestamp %ld\n",
|
||||
to_timestamp);
|
||||
fclose(fp);
|
||||
exit(0);
|
||||
}
|
||||
}
|
||||
continue;
|
||||
} else {
|
||||
if (fseek(fp, -(ftello(fp)-epos), SEEK_CUR) == -1) {
|
||||
ERROR("Fseek error: %s", strerror(errno));
|
||||
return 0;
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
return 1;
|
||||
}
|
||||
|
||||
int readLong(FILE *fp, char prefix, long *target) {
|
||||
char buf[128], *eptr;
|
||||
epos = ftello(fp);
|
||||
@ -107,6 +149,7 @@ off_t process(FILE *fp) {
|
||||
|
||||
while(1) {
|
||||
if (!multi) pos = ftello(fp);
|
||||
if (!readAnnotations(fp)) break;
|
||||
if (!readArgc(fp, &argc)) break;
|
||||
|
||||
for (i = 0; i < argc; i++) {
|
||||
@ -148,20 +191,25 @@ int redis_check_aof_main(int argc, char **argv) {
|
||||
int fix = 0;
|
||||
|
||||
if (argc < 2) {
|
||||
printf("Usage: %s [--fix] <file.aof>\n", argv[0]);
|
||||
exit(1);
|
||||
goto invalid_args;
|
||||
} else if (argc == 2) {
|
||||
filename = argv[1];
|
||||
} else if (argc == 3) {
|
||||
if (strcmp(argv[1],"--fix") != 0) {
|
||||
printf("Invalid argument: %s\n", argv[1]);
|
||||
exit(1);
|
||||
if (!strcmp(argv[1],"--fix")) {
|
||||
filename = argv[2];
|
||||
fix = 1;
|
||||
} else {
|
||||
goto invalid_args;
|
||||
}
|
||||
} else if (argc == 4) {
|
||||
if (!strcmp(argv[1], "--truncate-to-timestamp")) {
|
||||
to_timestamp = strtol(argv[2],NULL,10);
|
||||
filename = argv[3];
|
||||
} else {
|
||||
goto invalid_args;
|
||||
}
|
||||
filename = argv[2];
|
||||
fix = 1;
|
||||
} else {
|
||||
printf("Invalid arguments\n");
|
||||
exit(1);
|
||||
goto invalid_args;
|
||||
}
|
||||
|
||||
FILE *fp = fopen(filename,"r+");
|
||||
@ -203,6 +251,14 @@ int redis_check_aof_main(int argc, char **argv) {
|
||||
|
||||
off_t pos = process(fp);
|
||||
off_t diff = size-pos;
|
||||
|
||||
/* In truncate-to-timestamp mode, just exit if there is nothing to truncate. */
|
||||
if (diff == 0 && to_timestamp) {
|
||||
printf("Truncate nothing in AOF to timestamp %ld\n", to_timestamp);
|
||||
fclose(fp);
|
||||
exit(0);
|
||||
}
|
||||
|
||||
printf("AOF analyzed: size=%lld, ok_up_to=%lld, ok_up_to_line=%lld, diff=%lld\n",
|
||||
(long long) size, (long long) pos, line, (long long) diff);
|
||||
if (diff > 0) {
|
||||
@ -232,4 +288,9 @@ int redis_check_aof_main(int argc, char **argv) {
|
||||
|
||||
fclose(fp);
|
||||
exit(0);
|
||||
|
||||
invalid_args:
|
||||
printf("Usage: %s [--fix|--truncate-to-timestamp $timestamp] <file.aof>\n",
|
||||
argv[0]);
|
||||
exit(1);
|
||||
}
|
||||
|
@ -3678,6 +3678,7 @@ void initServerConfig(void) {
|
||||
server.aof_rewrite_scheduled = 0;
|
||||
server.aof_flush_sleep = 0;
|
||||
server.aof_last_fsync = time(NULL);
|
||||
server.aof_cur_timestamp = 0;
|
||||
atomicSet(server.aof_bio_fsync_status,C_OK);
|
||||
server.aof_rewrite_time_last = -1;
|
||||
server.aof_rewrite_time_start = -1;
|
||||
|
@ -106,6 +106,7 @@ typedef long long ustime_t; /* microsecond time type. */
|
||||
#define LOG_MAX_LEN 1024 /* Default maximum length of syslog messages.*/
|
||||
#define AOF_REWRITE_ITEMS_PER_CMD 64
|
||||
#define AOF_READ_DIFF_INTERVAL_BYTES (1024*10)
|
||||
#define AOF_ANNOTATION_LINE_MAX_LEN 1024
|
||||
#define CONFIG_AUTHPASS_MAX_LEN 512
|
||||
#define CONFIG_RUN_ID_SIZE 40
|
||||
#define RDB_EOF_MARK_SIZE 40
|
||||
@ -1504,6 +1505,8 @@ struct redisServer {
|
||||
time_t aof_last_fsync; /* UNIX time of last fsync() */
|
||||
time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
|
||||
time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
|
||||
time_t aof_cur_timestamp; /* Current record timestamp in AOF */
|
||||
int aof_timestamp_enabled; /* Enable record timestamp in AOF */
|
||||
int aof_lastbgrewrite_status; /* C_OK or C_ERR */
|
||||
unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */
|
||||
int aof_rewrite_incremental_fsync;/* fsync incrementally while aof rewriting? */
|
||||
|
@ -414,4 +414,71 @@ tags {"aof external:skip"} {
|
||||
assert_equal 2 [$client zcard myzset3]
|
||||
}
|
||||
}
|
||||
|
||||
test {Generate timestamp annotations in AOF} {
|
||||
start_server {overrides {appendonly {yes} appendfilename {appendonly.aof}}} {
|
||||
r config set aof-timestamp-enabled yes
|
||||
r config set aof-use-rdb-preamble no
|
||||
set aof [file join [lindex [r config get dir] 1] appendonly.aof]
|
||||
|
||||
r set foo bar
|
||||
assert_match "#TS:*" [exec head -n 1 $aof]
|
||||
|
||||
r bgrewriteaof
|
||||
waitForBgrewriteaof r
|
||||
assert_match "#TS:*" [exec head -n 1 $aof]
|
||||
}
|
||||
}
|
||||
|
||||
# redis could load AOF which has timestamp annotations inside
|
||||
create_aof {
|
||||
append_to_aof "#TS:1628217470\r\n"
|
||||
append_to_aof [formatCommand set foo1 bar1]
|
||||
append_to_aof "#TS:1628217471\r\n"
|
||||
append_to_aof [formatCommand set foo2 bar2]
|
||||
append_to_aof "#TS:1628217472\r\n"
|
||||
append_to_aof "#TS:1628217473\r\n"
|
||||
append_to_aof [formatCommand set foo3 bar3]
|
||||
append_to_aof "#TS:1628217474\r\n"
|
||||
}
|
||||
start_server_aof [list dir $server_path] {
|
||||
test {Successfully load AOF which has timestamp annotations inside} {
|
||||
set c [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
assert_equal "bar1" [$c get foo1]
|
||||
assert_equal "bar2" [$c get foo2]
|
||||
assert_equal "bar3" [$c get foo3]
|
||||
}
|
||||
}
|
||||
|
||||
test {Truncate AOF to specific timestamp} {
|
||||
# truncate to timestamp 1628217473
|
||||
exec src/redis-check-aof --truncate-to-timestamp 1628217473 $aof_path
|
||||
start_server_aof [list dir $server_path] {
|
||||
set c [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
assert_equal "bar1" [$c get foo1]
|
||||
assert_equal "bar2" [$c get foo2]
|
||||
assert_equal "bar3" [$c get foo3]
|
||||
}
|
||||
|
||||
# truncate to timestamp 1628217471
|
||||
exec src/redis-check-aof --truncate-to-timestamp 1628217471 $aof_path
|
||||
start_server_aof [list dir $server_path] {
|
||||
set c [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
assert_equal "bar1" [$c get foo1]
|
||||
assert_equal "bar2" [$c get foo2]
|
||||
assert_equal "" [$c get foo3]
|
||||
}
|
||||
|
||||
# truncate to timestamp 1628217470
|
||||
exec src/redis-check-aof --truncate-to-timestamp 1628217470 $aof_path
|
||||
start_server_aof [list dir $server_path] {
|
||||
set c [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
|
||||
assert_equal "bar1" [$c get foo1]
|
||||
assert_equal "" [$c get foo2]
|
||||
}
|
||||
|
||||
# truncate to timestamp 1628217469
|
||||
catch {exec src/redis-check-aof --truncate-to-timestamp 1628217469 $aof_path} e
|
||||
assert_match {*aborting*} $e
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user