Report child copy-on-write info continuously

Add INFO field, rdb_active_cow_size, to report COW of a live fork child while
it's active.
- once in 1024 keys check the time, and if there's more than one second since
  the last report send a report to the parent via the pipe.
- refactor the child_info_data struct, it's an implementation detail that
  shouldn't be in the server struct, and not used to communicate data between
  caller and callee
- remove the magic value from that struct (not sure what it was good for), and
  instead add handling of short reads.
- add another value to the structure, cow_type, to indicate if the report is
  for the new rdb_active_cow_size field, or it's the last report of a
  successful operation
- add new Module API to report the active COW
- add more asserts variants to test.tcl
This commit is contained in:
YaacovHazan 2020-12-20 20:23:20 +02:00 committed by Oran Agra
parent f9dacf8aac
commit ea930a352c
9 changed files with 277 additions and 62 deletions

View File

@ -1425,6 +1425,8 @@ int rewriteAppendOnlyFileRio(rio *aof) {
dictEntry *de;
size_t processed = 0;
int j;
long key_count = 0;
long long cow_updated_time = 0;
for (j = 0; j < server.dbnum; j++) {
char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n";
@ -1484,6 +1486,19 @@ int rewriteAppendOnlyFileRio(rio *aof) {
processed = aof->processed_bytes;
aofReadDiffFromParent();
}
/* Update COW info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
if ((key_count & 1023) == 0) {
key_count = 0;
long long now = mstime();
if (now - cow_updated_time >= 1000) {
sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite");
cow_updated_time = now;
}
}
key_count++;
}
dictReleaseIterator(di);
di = NULL;
@ -1577,8 +1592,31 @@ int rewriteAppendOnlyFile(char *filename) {
serverLog(LL_NOTICE,
"Concatenating %.2f MB of AOF diff received from parent.",
(double) sdslen(server.aof_child_diff) / (1024*1024));
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;
/* Now we write the entire AOF buffer we received from the parent
* via the pipe during the life of this fork child.
* once a second, we'll take a break and send updated COW info to the parent */
size_t bytes_to_write = sdslen(server.aof_child_diff);
const char *buf = server.aof_child_diff;
long long cow_updated_time = mstime();
while (bytes_to_write) {
/* We write the AOF buffer in chunk of 8MB so that we can check the time in between them */
size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20);
if (rioWrite(&aof,buf,chunk_size) == 0)
goto werr;
bytes_to_write -= chunk_size;
buf += chunk_size;
/* Update COW info */
long long now = mstime();
if (now - cow_updated_time >= 1000) {
sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite");
cow_updated_time = now;
}
}
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp)) goto werr;
@ -1709,7 +1747,7 @@ int rewriteAppendOnlyFileBackground(void) {
redisSetCpuAffinity(server.aof_rewrite_cpulist);
snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid());
if (rewriteAppendOnlyFile(tmpfile) == C_OK) {
sendChildCOWInfo(CHILD_TYPE_AOF, "AOF rewrite");
sendChildCOWInfo(CHILD_TYPE_AOF, 1, "AOF rewrite");
exitFromChild(0);
} else {
exitFromChild(1);

View File

@ -30,6 +30,12 @@
#include "server.h"
#include <unistd.h>
typedef struct {
int process_type; /* AOF or RDB child? */
int on_exit; /* COW size of active or exited child */
size_t cow_size; /* Copy on write size. */
} child_info_data;
/* Open a child-parent channel used in order to move information about the
* RDB / AOF saving process from the child to the parent (for instance
* the amount of copy on write memory used) */
@ -41,7 +47,7 @@ void openChildInfoPipe(void) {
} else if (anetNonBlock(NULL,server.child_info_pipe[0]) != ANET_OK) {
closeChildInfoPipe();
} else {
memset(&server.child_info_data,0,sizeof(server.child_info_data));
server.child_info_nread = 0;
}
}
@ -54,34 +60,88 @@ void closeChildInfoPipe(void) {
close(server.child_info_pipe[1]);
server.child_info_pipe[0] = -1;
server.child_info_pipe[1] = -1;
server.child_info_nread = 0;
}
}
/* Send COW data to parent. The child should call this function after populating
* the corresponding fields it want to sent (according to the process type). */
void sendChildInfo(int ptype) {
/* Send COW data to parent. */
void sendChildInfo(int process_type, int on_exit, size_t cow_size) {
if (server.child_info_pipe[1] == -1) return;
server.child_info_data.magic = CHILD_INFO_MAGIC;
server.child_info_data.process_type = ptype;
ssize_t wlen = sizeof(server.child_info_data);
if (write(server.child_info_pipe[1],&server.child_info_data,wlen) != wlen) {
child_info_data buffer = {.process_type = process_type, .on_exit = on_exit, .cow_size = cow_size};
ssize_t wlen = sizeof(buffer);
if (write(server.child_info_pipe[1],&buffer,wlen) != wlen) {
/* Nothing to do on error, this will be detected by the other side. */
}
}
/* Receive COW data from parent. */
void receiveChildInfo(void) {
if (server.child_info_pipe[0] == -1) return;
ssize_t wlen = sizeof(server.child_info_data);
if (read(server.child_info_pipe[0],&server.child_info_data,wlen) == wlen &&
server.child_info_data.magic == CHILD_INFO_MAGIC)
{
if (server.child_info_data.process_type == CHILD_TYPE_RDB) {
server.stat_rdb_cow_bytes = server.child_info_data.cow_size;
} else if (server.child_info_data.process_type == CHILD_TYPE_AOF) {
server.stat_aof_cow_bytes = server.child_info_data.cow_size;
} else if (server.child_info_data.process_type == CHILD_TYPE_MODULE) {
server.stat_module_cow_bytes = server.child_info_data.cow_size;
}
/* Update COW data. */
void updateChildInfo(int process_type, int on_exit, size_t cow_size) {
if (!on_exit) {
server.stat_current_cow_bytes = cow_size;
return;
}
if (process_type == CHILD_TYPE_RDB) {
server.stat_rdb_cow_bytes = cow_size;
} else if (process_type == CHILD_TYPE_AOF) {
server.stat_aof_cow_bytes = cow_size;
} else if (process_type == CHILD_TYPE_MODULE) {
server.stat_module_cow_bytes = cow_size;
}
}
/* Read COW info data from the pipe.
* if complete data read into the buffer, process type, copy-on-write type and copy-on-write size
* are stored into *process_type, *on_exit and *cow_size respectively and returns 1.
* otherwise, the partial data is left in the buffer, waiting for the next read, and returns 0. */
int readChildInfo(int *process_type, int *on_exit, size_t *cow_size) {
/* We are using here a static buffer in combination with the server.child_info_nread to handle short reads */
static child_info_data buffer;
ssize_t wlen = sizeof(buffer);
/* Do not overlap */
if (server.child_info_nread == wlen) server.child_info_nread = 0;
int nread = read(server.child_info_pipe[0], (char *)&buffer + server.child_info_nread, wlen - server.child_info_nread);
if (nread > 0) {
server.child_info_nread += nread;
}
/* We have complete child info */
if (server.child_info_nread == wlen) {
*process_type = buffer.process_type;
*on_exit = buffer.on_exit;
*cow_size = buffer.cow_size;
return 1;
} else {
return 0;
}
}
/* Receive COW data from child. */
void receiveChildInfo(void) {
if (server.child_info_pipe[0] == -1) return;
int process_type;
int on_exit;
size_t cow_size;
if (readChildInfo(&process_type, &on_exit, &cow_size)) {
updateChildInfo(process_type, on_exit, cow_size);
}
}
/* Receive last COW data from child. */
void receiveLastChildInfo(void) {
if (server.child_info_pipe[0] == -1) return;
/* Drain the pipe and update child info */
int process_type;
int on_exit;
size_t cow_size;
while (readChildInfo(&process_type, &on_exit, &cow_size) > 0) {
updateChildInfo(process_type, on_exit, cow_size);
}
}

View File

@ -7080,11 +7080,17 @@ int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) {
return childpid;
}
/* The module is advised to call this function from the fork child once in a while,
* so that it can report COW memory to the parent which will be reported in INFO */
void RM_SendChildCOWInfo(void) {
sendChildCOWInfo(CHILD_TYPE_MODULE, 0, "Module fork");
}
/* Call from the child process when you want to terminate it.
* retcode will be provided to the done handler executed on the parent process.
*/
int RM_ExitFromChild(int retcode) {
sendChildCOWInfo(CHILD_TYPE_MODULE, "Module fork");
sendChildCOWInfo(CHILD_TYPE_MODULE, 1, "Module fork");
exitFromChild(retcode);
return REDISMODULE_OK;
}
@ -8590,6 +8596,7 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(CommandFilterArgReplace);
REGISTER_API(CommandFilterArgDelete);
REGISTER_API(Fork);
REGISTER_API(SendChildCOWInfo);
REGISTER_API(ExitFromChild);
REGISTER_API(KillForkChild);
REGISTER_API(RegisterInfoFunc);

View File

@ -1219,9 +1219,11 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
uint64_t cksum;
size_t processed = 0;
int j;
long key_count = 0;
long long cow_updated_time = 0;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
@ -1267,6 +1269,23 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
/* Update COW info every 1 second (approximately).
* in order to avoid calling mstime() on each iteration, we will
* check the diff every 1024 keys */
if ((key_count & 1023) == 0) {
key_count = 0;
long long now = mstime();
if (now - cow_updated_time >= 1000) {
if (rdbflags & RDBFLAGS_AOF_PREAMBLE) {
sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite");
} else {
sendChildCOWInfo(CHILD_TYPE_RDB, 0, "RDB");
}
cow_updated_time = now;
}
}
key_count++;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
@ -1419,7 +1438,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) {
redisSetCpuAffinity(server.bgsave_cpulist);
retval = rdbSave(filename,rsi);
if (retval == C_OK) {
sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
sendChildCOWInfo(CHILD_TYPE_RDB, 1, "RDB");
}
exitFromChild((retval == C_OK) ? 0 : 1);
} else {
@ -2786,7 +2805,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) {
retval = C_ERR;
if (retval == C_OK) {
sendChildCOWInfo(CHILD_TYPE_RDB, "RDB");
sendChildCOWInfo(CHILD_TYPE_RDB, 1, "RDB");
}
rioFreeFd(&rdb);

View File

@ -778,6 +778,7 @@ REDISMODULE_API int (*RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilt
REDISMODULE_API int (*RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data) REDISMODULE_ATTR;
REDISMODULE_API void (*RedisModule_SendChildCOWInfo)(void) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR;
REDISMODULE_API int (*RedisModule_KillForkChild)(int child_pid) REDISMODULE_ATTR;
REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)() REDISMODULE_ATTR;
@ -1033,6 +1034,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int
REDISMODULE_GET_API(CommandFilterArgReplace);
REDISMODULE_GET_API(CommandFilterArgDelete);
REDISMODULE_GET_API(Fork);
REDISMODULE_GET_API(SendChildCOWInfo);
REDISMODULE_GET_API(ExitFromChild);
REDISMODULE_GET_API(KillForkChild);
REDISMODULE_GET_API(GetUsedMemoryRatio);

View File

@ -1578,6 +1578,7 @@ int hasActiveChildProcess() {
void resetChildState() {
server.child_type = CHILD_TYPE_NONE;
server.child_pid = -1;
server.stat_current_cow_bytes = 0;
updateDictResizePolicy();
closeChildInfoPipe();
}
@ -2098,6 +2099,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Check if a background saving or AOF rewrite in progress terminated. */
if (hasActiveChildProcess() || ldbPendingChildren())
{
run_with_period(1000) receiveChildInfo();
checkChildrenDone();
} else {
/* If there is not a background saving/rewrite in progress check if
@ -3119,7 +3121,7 @@ void initServer(void) {
server.rdb_bgsave_scheduled = 0;
server.child_info_pipe[0] = -1;
server.child_info_pipe[1] = -1;
server.child_info_data.magic = 0;
server.child_info_nread = 0;
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
@ -3131,6 +3133,7 @@ void initServer(void) {
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
server.stat_current_cow_bytes = 0;
server.stat_rdb_cow_bytes = 0;
server.stat_aof_cow_bytes = 0;
server.stat_module_cow_bytes = 0;
@ -4617,6 +4620,7 @@ sds genRedisInfoString(const char *section) {
info = sdscatprintf(info,
"# Persistence\r\n"
"loading:%d\r\n"
"current_cow_size:%zu\r\n"
"rdb_changes_since_last_save:%lld\r\n"
"rdb_bgsave_in_progress:%d\r\n"
"rdb_last_save_time:%jd\r\n"
@ -4635,6 +4639,7 @@ sds genRedisInfoString(const char *section) {
"module_fork_in_progress:%d\r\n"
"module_fork_last_cow_size:%zu\r\n",
server.loading,
server.stat_current_cow_bytes,
server.dirty,
server.child_type == CHILD_TYPE_RDB,
(intmax_t)server.lastsave,
@ -5342,12 +5347,13 @@ int redisFork(int purpose) {
* other child types should handle and store their pid's in dedicated variables.
*
* Today, we allows CHILD_TYPE_LDB to run in parallel with the other fork types:
* - it isn't used for production, so it will not make the server to be less efficient
* - it isn't used for production, so it will not make the server be less efficient
* - used for debugging, and we don't want to block it from running while other
* forks are running (like RDB and AOF) */
if (isMutuallyExclusiveChildType(purpose)) {
server.child_pid = childpid;
server.child_type = purpose;
server.stat_current_cow_bytes = 0;
}
updateDictResizePolicy();
@ -5355,17 +5361,16 @@ int redisFork(int purpose) {
return childpid;
}
void sendChildCOWInfo(int ptype, char *pname) {
void sendChildCOWInfo(int ptype, int on_exit, char *pname) {
size_t private_dirty = zmalloc_get_private_dirty(-1);
if (private_dirty) {
serverLog(LL_NOTICE,
serverLog(on_exit ? LL_NOTICE : LL_VERBOSE,
"%s: %zu MB of memory used by copy-on-write",
pname, private_dirty/(1024*1024));
pname, private_dirty);
}
server.child_info_data.cow_size = private_dirty;
sendChildInfo(ptype);
sendChildInfo(ptype, on_exit, private_dirty);
}
void memtest(size_t megabytes, int passes);

View File

@ -1098,7 +1098,6 @@ struct clusterState;
#undef hz
#endif
#define CHILD_INFO_MAGIC 0xC17DDA7A12345678LL
#define CHILD_TYPE_NONE 0
#define CHILD_TYPE_RDB 1
#define CHILD_TYPE_AOF 2
@ -1228,6 +1227,7 @@ struct redisServer {
struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */
redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */
size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */
size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */
@ -1342,11 +1342,7 @@ struct redisServer {
* value means fractions of microsecons (on average). */
/* Pipe and data structures for child -> parent info sharing. */
int child_info_pipe[2]; /* Pipe used to write the child_info_data. */
struct {
int process_type; /* AOF or RDB child? */
size_t cow_size; /* Copy on write size. */
unsigned long long magic; /* Magic value to make sure data is valid. */
} child_info_data;
int child_info_nread; /* Num of bytes of the last read from pipe */
/* Propagation of commands in AOF / replication */
redisOpArray also_propagate; /* Additional command to propagate. */
/* Logging */
@ -2000,15 +1996,16 @@ void restartAOFAfterSYNC();
/* Child info */
void openChildInfoPipe(void);
void closeChildInfoPipe(void);
void sendChildInfo(int process_type);
void sendChildInfo(int process_type, int on_exit, size_t cow_size);
void receiveChildInfo(void);
void receiveLastChildInfo(void);
/* Fork helpers */
int redisFork(int type);
int hasActiveChildProcess();
void resetChildState();
int isMutuallyExclusiveChildType(int type);
void sendChildCOWInfo(int ptype, char *pname);
void sendChildCOWInfo(int ptype, int on_exit, char *pname);
/* acl.c -- Authentication related prototypes. */
extern rax *Users;

View File

@ -198,3 +198,78 @@ test {client freed during loading} {
exec kill [srv 0 pid]
}
}
start_server {overrides {save ""}} {
test {Test child sending COW info} {
# make sure that rdb_last_cow_size and current_cow_size are zero (the test using new server),
# so that the comparisons during the test will be valid
assert {[s current_cow_size] == 0}
assert {[s rdb_last_cow_size] == 0}
# using a 200us delay, the bgsave is empirically taking about 10 seconds.
# we need it to take more than some 5 seconds, since redis only report COW once a second.
r config set rdb-key-save-delay 200
# populate the db with 10k keys of 4k each
set rd [redis_deferring_client 0]
set size 4096
set cmd_count 10000
for {set k 0} {$k < $cmd_count} {incr k} {
$rd set key$k [string repeat A $size]
}
for {set k 0} {$k < $cmd_count} {incr k} {
catch { $rd read }
}
$rd close
# start background rdb save
r bgsave
# on each iteration, we will write some key to the server to trigger copy-on-write, and
# wait to see that it reflected in INFO.
set iteration 1
while 1 {
# take a sample before writing new data to the server
set cow_size [s current_cow_size]
if {$::verbose} {
puts "COW info before copy-on-write: $cow_size"
}
# trigger copy-on-write
r setrange key$iteration 0 [string repeat B $size]
# wait to see that current_cow_size value updated (as long as the child is in progress)
wait_for_condition 80 100 {
[s rdb_bgsave_in_progress] == 0 ||
[s current_cow_size] >= $cow_size + $size
} else {
if {$::verbose} {
puts "COW info on fail: [s current_cow_size]"
}
fail "COW info didn't report"
}
# for no accurate, stop after 2 iterations
if {!$::accurate && $iteration == 2} {
break
}
# stop iterating if the bgsave completed
if { [s rdb_bgsave_in_progress] == 0 } {
break
}
incr iteration 1
}
# make sure we saw report of current_cow_size
assert_morethan_equal $iteration 2
# if bgsave completed, check that rdb_last_cow_size value is at least as last rdb_active_cow_size.
if { [s rdb_bgsave_in_progress] == 0 } {
assert_morethan_equal [s rdb_last_cow_size] $cow_size
}
}
}

View File

@ -31,36 +31,48 @@ proc assert_match {pattern value} {
}
}
proc assert_failed {expected_err detail} {
if {$detail ne ""} {
set detail "(detail: $detail)"
} else {
set detail "(context: [info frame -2])"
}
error "assertion:$expected_err $detail"
}
proc assert_equal {value expected {detail ""}} {
if {$expected ne $value} {
if {$detail ne ""} {
set detail "(detail: $detail)"
} else {
set detail "(context: [info frame -1])"
}
error "assertion:Expected '$value' to be equal to '$expected' $detail"
assert_failed "Expected '$value' to be equal to '$expected'" $detail
}
}
proc assert_lessthan {value expected {detail ""}} {
if {!($value < $expected)} {
if {$detail ne ""} {
set detail "(detail: $detail)"
} else {
set detail "(context: [info frame -1])"
}
error "assertion:Expected '$value' to be lessthan to '$expected' $detail"
assert_failed "Expected '$value' to be less than '$expected'" $detail
}
}
proc assert_lessthan_equal {value expected {detail ""}} {
if {!($value <= $expected)} {
assert_failed "Expected '$value' to be less than or equal to '$expected'" $detail
}
}
proc assert_morethan {value expected {detail ""}} {
if {!($value > $expected)} {
assert_failed "Expected '$value' to be more than '$expected'" $detail
}
}
proc assert_morethan_equal {value expected {detail ""}} {
if {!($value >= $expected)} {
assert_failed "Expected '$value' to be more than or equal to '$expected'" $detail
}
}
proc assert_range {value min max {detail ""}} {
if {!($value <= $max && $value >= $min)} {
if {$detail ne ""} {
set detail "(detail: $detail)"
} else {
set detail "(context: [info frame -1])"
}
error "assertion:Expected '$value' to be between to '$min' and '$max' $detail"
assert_failed "Expected '$value' to be between to '$min' and '$max'" $detail
}
}