Fix bug when AOF enabled after startup. put the new incr file in the manifest only when AOFRW is done. (#10616)

Changes:

- When AOF is enabled **after** startup, the data accumulated during `AOF_WAIT_REWRITE`
  will only be stored in a temp INCR AOF file. Only after the first AOFRW is successful, we will
  add it to manifest file.
  Before this fix, the manifest referred to the temp file which could cause a restart during that
  time to load it without it's base.
- Add `aof_rewrites_consecutive_failures` info field for  aofrw limiting implementation.

Now we can guarantee that these behaviors of MP-AOF are the same as before (past redis releases):
- When AOF is enabled after startup, the data accumulated during `AOF_WAIT_REWRITE` will only
  be stored in a visible place. Only after the first AOFRW is successful, we will add it to manifest file.
- When disable AOF, we did not delete the AOF file in the past so there's no need to change that
  behavior now (yet).
- When toggling AOF off and then on (could be as part of a full-sync), a crash or restart before the
  first rewrite is completed, would result with the previous version being loaded (might not be right thing,
  but that's what we always had).
This commit is contained in:
chenyang8094 2022-04-26 21:31:19 +08:00 committed by GitHub
parent 3a1d14259d
commit 46ec6ad98e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 270 additions and 32 deletions

123
src/aof.c
View File

@ -87,7 +87,7 @@ void aofManifestFreeAndUpdate(aofManifest *am);
#define RDB_FORMAT_SUFFIX ".rdb"
#define AOF_FORMAT_SUFFIX ".aof"
#define MANIFEST_NAME_SUFFIX ".manifest"
#define MANIFEST_TEMP_NAME_PREFIX "temp_"
#define TEMP_FILE_NAME_PREFIX "temp-"
/* AOF manifest key. */
#define AOF_MANIFEST_KEY_FILE_NAME "file"
@ -169,7 +169,7 @@ sds getAofManifestFileName() {
}
sds getTempAofManifestFileName() {
return sdscatprintf(sdsempty(), "%s%s%s", MANIFEST_TEMP_NAME_PREFIX,
return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX,
server.aof_filename, MANIFEST_NAME_SUFFIX);
}
@ -462,6 +462,12 @@ sds getNewIncrAofName(aofManifest *am) {
return ai->file_name;
}
/* Get temp INCR type AOF name. */
sds getTempIncrAofName() {
return sdscatprintf(sdsempty(), "%s%s%s", TEMP_FILE_NAME_PREFIX, server.aof_filename,
INCR_FILE_SUFFIX);
}
/* Get the last INCR AOF name or create a new one. */
sds getLastIncrAofName(aofManifest *am) {
serverAssert(am != NULL);
@ -674,6 +680,17 @@ int aofDelHistoryFiles(void) {
return persistAofManifest(server.aof_manifest);
}
/* Used to clean up temp INCR AOF when AOFRW fails. */
void aofDelTempIncrAofFile() {
sds aof_filename = getTempIncrAofName();
sds aof_filepath = makePath(server.aof_dirname, aof_filename);
serverLog(LL_NOTICE, "Removing the temp incr aof file %s in the background", aof_filename);
bg_unlink(aof_filepath);
sdsfree(aof_filepath);
sdsfree(aof_filename);
return;
}
/* Called after `loadDataFromDisk` when redis start. If `server.aof_state` is
* 'AOF_ON', It will do three things:
* 1. Force create a BASE file when redis starts with an empty dataset
@ -739,44 +756,52 @@ int aofFileExist(char *filename) {
}
/* Called in `rewriteAppendOnlyFileBackground`. If `server.aof_state`
* is 'AOF_ON' or 'AOF_WAIT_REWRITE', It will do two things:
* is 'AOF_ON', It will do two things:
* 1. Open a new INCR type AOF for writing
* 2. Synchronously update the manifest file to the disk
*
* The above two steps of modification are atomic, that is, if
* any step fails, the entire operation will rollback and returns
* C_ERR, and if all succeeds, it returns C_OK.
*
* If `server.aof_state` is 'AOF_WAIT_REWRITE', It will open a temporary INCR AOF
* file to accumulate data during AOF_WAIT_REWRITE, and it will eventually be
* renamed in the `backgroundRewriteDoneHandler` and written to the manifest file.
* */
int openNewIncrAofForAppend(void) {
serverAssert(server.aof_manifest != NULL);
int newfd;
int newfd = -1;
aofManifest *temp_am = NULL;
sds new_aof_name = NULL;
/* Only open new INCR AOF when AOF enabled. */
if (server.aof_state == AOF_OFF) return C_OK;
/* Dup a temp aof_manifest to modify. */
aofManifest *temp_am = aofManifestDup(server.aof_manifest);
/* Open new AOF. */
sds new_aof_name = getNewIncrAofName(temp_am);
if (server.aof_state == AOF_WAIT_REWRITE) {
/* Use a temporary INCR AOF file to accumulate data during AOF_WAIT_REWRITE. */
new_aof_name = getTempIncrAofName();
} else {
/* Dup a temp aof_manifest to modify. */
temp_am = aofManifestDup(server.aof_manifest);
new_aof_name = sdsdup(getNewIncrAofName(temp_am));
}
sds new_aof_filepath = makePath(server.aof_dirname, new_aof_name);
newfd = open(new_aof_filepath, O_WRONLY|O_TRUNC|O_CREAT, 0644);
sdsfree(new_aof_filepath);
if (newfd == -1) {
serverLog(LL_WARNING, "Can't open the append-only file %s: %s",
new_aof_name, strerror(errno));
aofManifestFree(temp_am);
return C_ERR;
goto cleanup;
}
/* Persist AOF Manifest. */
int ret = persistAofManifest(temp_am);
if (ret == C_ERR) {
close(newfd);
aofManifestFree(temp_am);
return C_ERR;
if (temp_am) {
/* Persist AOF Manifest. */
if (persistAofManifest(temp_am) == C_ERR) {
goto cleanup;
}
}
sdsfree(new_aof_name);
/* If reaches here, we can safely modify the `server.aof_manifest`
* and `server.aof_fd`. */
@ -788,8 +813,14 @@ int openNewIncrAofForAppend(void) {
/* Reset the aof_last_incr_size. */
server.aof_last_incr_size = 0;
/* Update `server.aof_manifest`. */
aofManifestFreeAndUpdate(temp_am);
if (temp_am) aofManifestFreeAndUpdate(temp_am);
return C_OK;
cleanup:
if (new_aof_name) sdsfree(new_aof_name);
if (newfd != -1) close(newfd);
if (temp_am) aofManifestFree(temp_am);
return C_ERR;
}
/* Whether to limit the execution of Background AOF rewrite.
@ -818,17 +849,13 @@ int aofRewriteLimited(void) {
static int next_delay_minutes = 0;
static time_t next_rewrite_time = 0;
/* If the number of incr AOFs exceeds the threshold but server.aof_lastbgrewrite_status is OK, it
* means that redis may have just loaded a dataset containing many incr AOFs. At this time, we
* will not limit the AOFRW. */
unsigned long incr_aof_num = listLength(server.aof_manifest->incr_aof_list);
if (incr_aof_num < AOF_REWRITE_LIMITE_THRESHOLD || server.aof_lastbgrewrite_status == C_OK) {
if (server.stat_aofrw_consecutive_failures < AOF_REWRITE_LIMITE_THRESHOLD) {
/* We may be recovering from limited state, so reset all states. */
next_delay_minutes = 0;
next_rewrite_time = 0;
return 0;
}
/* if it is in the limiting state, then check if the next_rewrite_time is reached */
if (next_rewrite_time != 0) {
if (server.unixtime < next_rewrite_time) {
@ -1266,7 +1293,7 @@ void feedAppendOnlyFile(int dictid, robj **argv, int argc) {
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON ||
server.child_type == CHILD_TYPE_AOF)
(server.aof_state == AOF_WAIT_REWRITE && server.child_type == CHILD_TYPE_AOF))
{
server.aof_buf = sdscatlen(server.aof_buf, buf, sdslen(buf));
}
@ -2404,6 +2431,9 @@ void bgrewriteaofCommand(client *c) {
addReplyError(c,"Background append only file rewriting already in progress");
} else if (hasActiveChildProcess() || server.in_exec) {
server.aof_rewrite_scheduled = 1;
/* When manually triggering AOFRW we reset the count
* so that it can be executed immediately. */
server.stat_aofrw_consecutive_failures = 0;
addReplyStatus(c,"Background append only file rewriting scheduled");
} else if (rewriteAppendOnlyFileBackground() == C_OK) {
addReplyStatus(c,"Background append only file rewriting started");
@ -2514,7 +2544,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
serverLog(LL_WARNING,
"Error trying to rename the temporary AOF file %s into %s: %s",
tmpfile,
new_base_filename,
new_base_filepath,
strerror(errno));
aofManifestFree(temp_am);
sdsfree(new_base_filepath);
@ -2523,6 +2553,35 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rename", latency);
/* Rename the temporary incr aof file to 'new_incr_filename'. */
if (server.aof_state == AOF_WAIT_REWRITE) {
/* Get temporary incr aof name. */
sds temp_incr_aof_name = getTempIncrAofName();
sds temp_incr_filepath = makePath(server.aof_dirname, temp_incr_aof_name);
sdsfree(temp_incr_aof_name);
/* Get next new incr aof name. */
sds new_incr_filename = getNewIncrAofName(temp_am);
sds new_incr_filepath = makePath(server.aof_dirname, new_incr_filename);
latencyStartMonitor(latency);
if (rename(temp_incr_filepath, new_incr_filepath) == -1) {
serverLog(LL_WARNING,
"Error trying to rename the temporary incr AOF file %s into %s: %s",
temp_incr_filepath,
new_incr_filepath,
strerror(errno));
bg_unlink(new_base_filepath);
sdsfree(new_base_filepath);
aofManifestFree(temp_am);
sdsfree(temp_incr_filepath);
sdsfree(new_incr_filepath);
goto cleanup;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-rename", latency);
sdsfree(temp_incr_filepath);
sdsfree(new_incr_filepath);
}
/* Change the AOF file type in 'incr_aof_list' from AOF_FILE_TYPE_INCR
* to AOF_FILE_TYPE_HIST, and move them to the 'history_aof_list'. */
markRewrittenIncrAofAsHistory(temp_am);
@ -2553,6 +2612,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
aofDelHistoryFiles();
server.aof_lastbgrewrite_status = C_OK;
server.stat_aofrw_consecutive_failures = 0;
serverLog(LL_NOTICE, "Background AOF rewrite finished successfully");
/* Change state from WAIT_REWRITE to ON if needed */
@ -2563,14 +2623,17 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
"Background AOF rewrite signal handler took %lldus", ustime()-now);
} else if (!bysignal && exitcode != 0) {
server.aof_lastbgrewrite_status = C_ERR;
server.stat_aofrw_consecutive_failures++;
serverLog(LL_WARNING,
"Background AOF rewrite terminated with error");
} else {
/* SIGUSR1 is whitelisted, so we have a way to kill a child without
* triggering an error condition. */
if (bysignal != SIGUSR1)
if (bysignal != SIGUSR1) {
server.aof_lastbgrewrite_status = C_ERR;
server.stat_aofrw_consecutive_failures++;
}
serverLog(LL_WARNING,
"Background AOF rewrite terminated by signal %d", bysignal);
@ -2578,6 +2641,12 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) {
cleanup:
aofRemoveTempFile(server.child_pid);
/* Clear AOF buffer and delete temp incr aof for next rewrite. */
if (server.aof_state == AOF_WAIT_REWRITE) {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
aofDelTempIncrAofFile();
}
server.aof_rewrite_time_last = time(NULL)-server.aof_rewrite_time_start;
server.aof_rewrite_time_start = -1;
/* Schedule a new rewrite if we are waiting for it to switch the AOF ON. */

View File

@ -1335,8 +1335,11 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
* however to try every second is enough in case of 'hz' is set to
* a higher frequency. */
run_with_period(1000) {
if (server.aof_state == AOF_ON && server.aof_last_write_status == C_ERR)
flushAppendOnlyFile(0);
if ((server.aof_state == AOF_ON || server.aof_state == AOF_WAIT_REWRITE) &&
server.aof_last_write_status == C_ERR)
{
flushAppendOnlyFile(0);
}
}
/* Clear the paused clients state if needed. */
@ -2348,6 +2351,7 @@ void resetServerStats(void) {
}
server.stat_aof_rewrites = 0;
server.stat_rdb_saves = 0;
server.stat_aofrw_consecutive_failures = 0;
atomicSet(server.stat_net_input_bytes, 0);
atomicSet(server.stat_net_output_bytes, 0);
server.stat_unexpected_error_replies = 0;
@ -5467,6 +5471,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
"aof_current_rewrite_time_sec:%jd\r\n"
"aof_last_bgrewrite_status:%s\r\n"
"aof_rewrites:%lld\r\n"
"aof_rewrites_consecutive_failures:%lld\r\n"
"aof_last_write_status:%s\r\n"
"aof_last_cow_size:%zu\r\n"
"module_fork_in_progress:%d\r\n"
@ -5498,6 +5503,7 @@ sds genRedisInfoString(dict *section_dict, int all_sections, int everything) {
-1 : time(NULL)-server.aof_rewrite_time_start),
(server.aof_lastbgrewrite_status == C_OK) ? "ok" : "err",
server.stat_aof_rewrites,
server.stat_aofrw_consecutive_failures,
(server.aof_last_write_status == C_OK &&
aof_bio_fsync_status == C_OK) ? "ok" : "err",
server.stat_aof_cow_bytes,

View File

@ -1568,6 +1568,7 @@ struct redisServer {
monotime stat_last_active_defrag_time; /* Timestamp of current active defrag start */
size_t stat_peak_memory; /* Max used memory record */
long long stat_aof_rewrites; /* number of aof file rewrites performed */
long long stat_aofrw_consecutive_failures; /* The number of consecutive failures of aofrw */
long long stat_rdb_saves; /* number of rdb saves performed */
long long stat_fork_time; /* Time needed to perform latest fork() */
double stat_fork_rate; /* Fork rate in GB/sec. */

View File

@ -1104,7 +1104,11 @@ tags {"external:skip"} {
# Set a key so that AOFRW can be delayed
r set k v
# Let AOFRW fail two times, this will trigger AOFRW limit
# Let AOFRW fail 3 times, this will trigger AOFRW limit
r bgrewriteaof
catch {exec kill -9 [get_child_pid 0]}
waitForBgrewriteaof r
r bgrewriteaof
catch {exec kill -9 [get_child_pid 0]}
waitForBgrewriteaof r
@ -1118,6 +1122,7 @@ tags {"external:skip"} {
{file appendonly.aof.6.incr.aof seq 6 type i}
{file appendonly.aof.7.incr.aof seq 7 type i}
{file appendonly.aof.8.incr.aof seq 8 type i}
{file appendonly.aof.9.incr.aof seq 9 type i}
}
# Write 1KB data to trigger AOFRW
@ -1137,6 +1142,7 @@ tags {"external:skip"} {
{file appendonly.aof.6.incr.aof seq 6 type i}
{file appendonly.aof.7.incr.aof seq 7 type i}
{file appendonly.aof.8.incr.aof seq 8 type i}
{file appendonly.aof.9.incr.aof seq 9 type i}
}
# Turn off auto rewrite
@ -1154,11 +1160,11 @@ tags {"external:skip"} {
waitForBgrewriteaof r
# Can create New INCR AOF
assert_equal 1 [check_file_exist $aof_dirpath "${aof_basename}.9${::incr_aof_sufix}${::aof_format_suffix}"]
assert_equal 1 [check_file_exist $aof_dirpath "${aof_basename}.10${::incr_aof_sufix}${::aof_format_suffix}"]
assert_aof_manifest_content $aof_manifest_file {
{file appendonly.aof.11.base.rdb seq 11 type b}
{file appendonly.aof.9.incr.aof seq 9 type i}
{file appendonly.aof.10.incr.aof seq 10 type i}
}
set d1 [r debug digest]
@ -1166,5 +1172,161 @@ tags {"external:skip"} {
set d2 [r debug digest]
assert {$d1 eq $d2}
}
start_server {overrides {aof-use-rdb-preamble {yes} appendonly {no}}} {
set dir [get_redis_dir]
set aof_basename "appendonly.aof"
set aof_dirname "appendonlydir"
set aof_dirpath "$dir/$aof_dirname"
set aof_manifest_name "$aof_basename$::manifest_suffix"
set aof_manifest_file "$dir/$aof_dirname/$aof_manifest_name"
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
test "AOF will open a temporary INCR AOF to accumulate data until the first AOFRW success when AOF is dynamically enabled" {
r config set save ""
# Increase AOFRW execution time to give us enough time to kill it
r config set rdb-key-save-delay 10000000
# Start write load
set load_handle0 [start_write_load $master_host $master_port 10]
wait_for_condition 50 100 {
[r dbsize] > 0
} else {
fail "No write load detected."
}
# Enable AOF will trigger an initialized AOFRW
r config set appendonly yes
# Let AOFRW fail
assert_equal 1 [s aof_rewrite_in_progress]
set pid1 [get_child_pid 0]
catch {exec kill -9 $pid1}
# Wait for AOFRW to exit and delete temp incr aof
wait_for_condition 1000 100 {
[count_log_message 0 "Removing the temp incr aof file"] == 1
} else {
fail "temp aof did not delete"
}
# Make sure manifest file is not created
assert_equal 0 [check_file_exist $aof_dirpath $aof_manifest_name]
# Make sure BASE AOF is not created
assert_equal 0 [check_file_exist $aof_dirpath "${aof_basename}.1${::base_aof_sufix}${::rdb_format_suffix}"]
# Make sure the next AOFRW has started
wait_for_condition 1000 50 {
[s aof_rewrite_in_progress] == 1
} else {
fail "aof rewrite did not scheduled"
}
# Do a successful AOFRW
set total_forks [s total_forks]
r config set rdb-key-save-delay 0
catch {exec kill -9 [get_child_pid 0]}
# Make sure the next AOFRW has started
wait_for_condition 1000 10 {
[s total_forks] == [expr $total_forks + 1]
} else {
fail "aof rewrite did not scheduled"
}
waitForBgrewriteaof r
assert_equal 2 [count_log_message 0 "Removing the temp incr aof file"]
# BASE and INCR AOF are successfully created
assert_aof_manifest_content $aof_manifest_file {
{file appendonly.aof.1.base.rdb seq 1 type b}
{file appendonly.aof.1.incr.aof seq 1 type i}
}
stop_write_load $load_handle0
wait_load_handlers_disconnected
set d1 [r debug digest]
r debug loadaof
set d2 [r debug digest]
assert {$d1 eq $d2}
# Dynamic disable AOF again
r config set appendonly no
# Disabling AOF does not delete previous AOF files
r debug loadaof
set d2 [r debug digest]
assert {$d1 eq $d2}
assert_equal 0 [s rdb_changes_since_last_save]
r config set rdb-key-save-delay 10000000
set load_handle0 [start_write_load $master_host $master_port 10]
wait_for_condition 50 100 {
[s rdb_changes_since_last_save] > 0
} else {
fail "No write load detected."
}
# Re-enable AOF
r config set appendonly yes
# Let AOFRW fail
assert_equal 1 [s aof_rewrite_in_progress]
set pid1 [get_child_pid 0]
catch {exec kill -9 $pid1}
# Wait for AOFRW to exit and delete temp incr aof
wait_for_condition 1000 100 {
[count_log_message 0 "Removing the temp incr aof file"] == 3
} else {
fail "temp aof did not delete 3 times"
}
# Make sure no new incr AOF was created
assert_aof_manifest_content $aof_manifest_file {
{file appendonly.aof.1.base.rdb seq 1 type b}
{file appendonly.aof.1.incr.aof seq 1 type i}
}
# Make sure the next AOFRW has started
wait_for_condition 1000 50 {
[s aof_rewrite_in_progress] == 1
} else {
fail "aof rewrite did not scheduled"
}
# Do a successful AOFRW
set total_forks [s total_forks]
r config set rdb-key-save-delay 0
catch {exec kill -9 [get_child_pid 0]}
wait_for_condition 1000 10 {
[s total_forks] == [expr $total_forks + 1]
} else {
fail "aof rewrite did not scheduled"
}
waitForBgrewriteaof r
assert_equal 4 [count_log_message 0 "Removing the temp incr aof file"]
# New BASE and INCR AOF are successfully created
assert_aof_manifest_content $aof_manifest_file {
{file appendonly.aof.2.base.rdb seq 2 type b}
{file appendonly.aof.2.incr.aof seq 2 type i}
}
stop_write_load $load_handle0
wait_load_handlers_disconnected
set d1 [r debug digest]
r debug loadaof
set d2 [r debug digest]
assert {$d1 eq $d2}
}
}
}
}