diff --git a/src/aof.c b/src/aof.c index 928e1c4d1..d3191277f 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1425,6 +1425,8 @@ int rewriteAppendOnlyFileRio(rio *aof) { dictEntry *de; size_t processed = 0; int j; + long key_count = 0; + long long cow_updated_time = 0; for (j = 0; j < server.dbnum; j++) { char selectcmd[] = "*2\r\n$6\r\nSELECT\r\n"; @@ -1484,6 +1486,19 @@ int rewriteAppendOnlyFileRio(rio *aof) { processed = aof->processed_bytes; aofReadDiffFromParent(); } + + /* Update COW info every 1 second (approximately). + * in order to avoid calling mstime() on each iteration, we will + * check the diff every 1024 keys */ + if ((key_count & 1023) == 0) { + key_count = 0; + long long now = mstime(); + if (now - cow_updated_time >= 1000) { + sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite"); + cow_updated_time = now; + } + } + key_count++; } dictReleaseIterator(di); di = NULL; @@ -1577,8 +1592,31 @@ int rewriteAppendOnlyFile(char *filename) { serverLog(LL_NOTICE, "Concatenating %.2f MB of AOF diff received from parent.", (double) sdslen(server.aof_child_diff) / (1024*1024)); - if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0) - goto werr; + + /* Now we write the entire AOF buffer we received from the parent + * via the pipe during the life of this fork child. + * once a second, we'll take a break and send updated COW info to the parent */ + size_t bytes_to_write = sdslen(server.aof_child_diff); + const char *buf = server.aof_child_diff; + long long cow_updated_time = mstime(); + + while (bytes_to_write) { + /* We write the AOF buffer in chunk of 8MB so that we can check the time in between them */ + size_t chunk_size = bytes_to_write < (8<<20) ? bytes_to_write : (8<<20); + + if (rioWrite(&aof,buf,chunk_size) == 0) + goto werr; + + bytes_to_write -= chunk_size; + buf += chunk_size; + + /* Update COW info */ + long long now = mstime(); + if (now - cow_updated_time >= 1000) { + sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite"); + cow_updated_time = now; + } + } /* Make sure data will not remain on the OS's output buffers */ if (fflush(fp)) goto werr; @@ -1709,7 +1747,7 @@ int rewriteAppendOnlyFileBackground(void) { redisSetCpuAffinity(server.aof_rewrite_cpulist); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == C_OK) { - sendChildCOWInfo(CHILD_TYPE_AOF, "AOF rewrite"); + sendChildCOWInfo(CHILD_TYPE_AOF, 1, "AOF rewrite"); exitFromChild(0); } else { exitFromChild(1); diff --git a/src/childinfo.c b/src/childinfo.c index d11aa7bcf..85adfe2f6 100644 --- a/src/childinfo.c +++ b/src/childinfo.c @@ -30,6 +30,12 @@ #include "server.h" #include +typedef struct { + int process_type; /* AOF or RDB child? */ + int on_exit; /* COW size of active or exited child */ + size_t cow_size; /* Copy on write size. */ +} child_info_data; + /* Open a child-parent channel used in order to move information about the * RDB / AOF saving process from the child to the parent (for instance * the amount of copy on write memory used) */ @@ -41,7 +47,7 @@ void openChildInfoPipe(void) { } else if (anetNonBlock(NULL,server.child_info_pipe[0]) != ANET_OK) { closeChildInfoPipe(); } else { - memset(&server.child_info_data,0,sizeof(server.child_info_data)); + server.child_info_nread = 0; } } @@ -54,34 +60,88 @@ void closeChildInfoPipe(void) { close(server.child_info_pipe[1]); server.child_info_pipe[0] = -1; server.child_info_pipe[1] = -1; + server.child_info_nread = 0; } } -/* Send COW data to parent. The child should call this function after populating - * the corresponding fields it want to sent (according to the process type). */ -void sendChildInfo(int ptype) { +/* Send COW data to parent. */ +void sendChildInfo(int process_type, int on_exit, size_t cow_size) { if (server.child_info_pipe[1] == -1) return; - server.child_info_data.magic = CHILD_INFO_MAGIC; - server.child_info_data.process_type = ptype; - ssize_t wlen = sizeof(server.child_info_data); - if (write(server.child_info_pipe[1],&server.child_info_data,wlen) != wlen) { + + child_info_data buffer = {.process_type = process_type, .on_exit = on_exit, .cow_size = cow_size}; + ssize_t wlen = sizeof(buffer); + + if (write(server.child_info_pipe[1],&buffer,wlen) != wlen) { /* Nothing to do on error, this will be detected by the other side. */ } } -/* Receive COW data from parent. */ -void receiveChildInfo(void) { - if (server.child_info_pipe[0] == -1) return; - ssize_t wlen = sizeof(server.child_info_data); - if (read(server.child_info_pipe[0],&server.child_info_data,wlen) == wlen && - server.child_info_data.magic == CHILD_INFO_MAGIC) - { - if (server.child_info_data.process_type == CHILD_TYPE_RDB) { - server.stat_rdb_cow_bytes = server.child_info_data.cow_size; - } else if (server.child_info_data.process_type == CHILD_TYPE_AOF) { - server.stat_aof_cow_bytes = server.child_info_data.cow_size; - } else if (server.child_info_data.process_type == CHILD_TYPE_MODULE) { - server.stat_module_cow_bytes = server.child_info_data.cow_size; - } +/* Update COW data. */ +void updateChildInfo(int process_type, int on_exit, size_t cow_size) { + if (!on_exit) { + server.stat_current_cow_bytes = cow_size; + return; + } + + if (process_type == CHILD_TYPE_RDB) { + server.stat_rdb_cow_bytes = cow_size; + } else if (process_type == CHILD_TYPE_AOF) { + server.stat_aof_cow_bytes = cow_size; + } else if (process_type == CHILD_TYPE_MODULE) { + server.stat_module_cow_bytes = cow_size; + } +} + +/* Read COW info data from the pipe. + * if complete data read into the buffer, process type, copy-on-write type and copy-on-write size + * are stored into *process_type, *on_exit and *cow_size respectively and returns 1. + * otherwise, the partial data is left in the buffer, waiting for the next read, and returns 0. */ +int readChildInfo(int *process_type, int *on_exit, size_t *cow_size) { + /* We are using here a static buffer in combination with the server.child_info_nread to handle short reads */ + static child_info_data buffer; + ssize_t wlen = sizeof(buffer); + + /* Do not overlap */ + if (server.child_info_nread == wlen) server.child_info_nread = 0; + + int nread = read(server.child_info_pipe[0], (char *)&buffer + server.child_info_nread, wlen - server.child_info_nread); + if (nread > 0) { + server.child_info_nread += nread; + } + + /* We have complete child info */ + if (server.child_info_nread == wlen) { + *process_type = buffer.process_type; + *on_exit = buffer.on_exit; + *cow_size = buffer.cow_size; + return 1; + } else { + return 0; + } +} + +/* Receive COW data from child. */ +void receiveChildInfo(void) { + if (server.child_info_pipe[0] == -1) return; + + int process_type; + int on_exit; + size_t cow_size; + if (readChildInfo(&process_type, &on_exit, &cow_size)) { + updateChildInfo(process_type, on_exit, cow_size); + } +} + +/* Receive last COW data from child. */ +void receiveLastChildInfo(void) { + if (server.child_info_pipe[0] == -1) return; + + /* Drain the pipe and update child info */ + int process_type; + int on_exit; + size_t cow_size; + + while (readChildInfo(&process_type, &on_exit, &cow_size) > 0) { + updateChildInfo(process_type, on_exit, cow_size); } } diff --git a/src/module.c b/src/module.c index 2de6bee2f..c9e0b8812 100644 --- a/src/module.c +++ b/src/module.c @@ -7080,11 +7080,17 @@ int RM_Fork(RedisModuleForkDoneHandler cb, void *user_data) { return childpid; } +/* The module is advised to call this function from the fork child once in a while, + * so that it can report COW memory to the parent which will be reported in INFO */ +void RM_SendChildCOWInfo(void) { + sendChildCOWInfo(CHILD_TYPE_MODULE, 0, "Module fork"); +} + /* Call from the child process when you want to terminate it. * retcode will be provided to the done handler executed on the parent process. */ int RM_ExitFromChild(int retcode) { - sendChildCOWInfo(CHILD_TYPE_MODULE, "Module fork"); + sendChildCOWInfo(CHILD_TYPE_MODULE, 1, "Module fork"); exitFromChild(retcode); return REDISMODULE_OK; } @@ -8590,6 +8596,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(CommandFilterArgReplace); REGISTER_API(CommandFilterArgDelete); REGISTER_API(Fork); + REGISTER_API(SendChildCOWInfo); REGISTER_API(ExitFromChild); REGISTER_API(KillForkChild); REGISTER_API(RegisterInfoFunc); diff --git a/src/rdb.c b/src/rdb.c index 1281948d1..25e900689 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1219,9 +1219,11 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { dictIterator *di = NULL; dictEntry *de; char magic[10]; - int j; uint64_t cksum; size_t processed = 0; + int j; + long key_count = 0; + long long cow_updated_time = 0; if (server.rdb_checksum) rdb->update_cksum = rioGenericUpdateChecksum; @@ -1267,6 +1269,23 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { processed = rdb->processed_bytes; aofReadDiffFromParent(); } + + /* Update COW info every 1 second (approximately). + * in order to avoid calling mstime() on each iteration, we will + * check the diff every 1024 keys */ + if ((key_count & 1023) == 0) { + key_count = 0; + long long now = mstime(); + if (now - cow_updated_time >= 1000) { + if (rdbflags & RDBFLAGS_AOF_PREAMBLE) { + sendChildCOWInfo(CHILD_TYPE_AOF, 0, "AOF rewrite"); + } else { + sendChildCOWInfo(CHILD_TYPE_RDB, 0, "RDB"); + } + cow_updated_time = now; + } + } + key_count++; } dictReleaseIterator(di); di = NULL; /* So that we don't release it again on error. */ @@ -1419,7 +1438,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { redisSetCpuAffinity(server.bgsave_cpulist); retval = rdbSave(filename,rsi); if (retval == C_OK) { - sendChildCOWInfo(CHILD_TYPE_RDB, "RDB"); + sendChildCOWInfo(CHILD_TYPE_RDB, 1, "RDB"); } exitFromChild((retval == C_OK) ? 0 : 1); } else { @@ -2786,7 +2805,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { retval = C_ERR; if (retval == C_OK) { - sendChildCOWInfo(CHILD_TYPE_RDB, "RDB"); + sendChildCOWInfo(CHILD_TYPE_RDB, 1, "RDB"); } rioFreeFd(&rdb); diff --git a/src/redismodule.h b/src/redismodule.h index d2afa1f21..36c566bb3 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -778,6 +778,7 @@ REDISMODULE_API int (*RedisModule_CommandFilterArgInsert)(RedisModuleCommandFilt REDISMODULE_API int (*RedisModule_CommandFilterArgReplace)(RedisModuleCommandFilterCtx *fctx, int pos, RedisModuleString *arg) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_CommandFilterArgDelete)(RedisModuleCommandFilterCtx *fctx, int pos) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_Fork)(RedisModuleForkDoneHandler cb, void *user_data) REDISMODULE_ATTR; +REDISMODULE_API void (*RedisModule_SendChildCOWInfo)(void) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_ExitFromChild)(int retcode) REDISMODULE_ATTR; REDISMODULE_API int (*RedisModule_KillForkChild)(int child_pid) REDISMODULE_ATTR; REDISMODULE_API float (*RedisModule_GetUsedMemoryRatio)() REDISMODULE_ATTR; @@ -1033,6 +1034,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(CommandFilterArgReplace); REDISMODULE_GET_API(CommandFilterArgDelete); REDISMODULE_GET_API(Fork); + REDISMODULE_GET_API(SendChildCOWInfo); REDISMODULE_GET_API(ExitFromChild); REDISMODULE_GET_API(KillForkChild); REDISMODULE_GET_API(GetUsedMemoryRatio); diff --git a/src/server.c b/src/server.c index ee2a00986..7d2688b66 100644 --- a/src/server.c +++ b/src/server.c @@ -1578,6 +1578,7 @@ int hasActiveChildProcess() { void resetChildState() { server.child_type = CHILD_TYPE_NONE; server.child_pid = -1; + server.stat_current_cow_bytes = 0; updateDictResizePolicy(); closeChildInfoPipe(); } @@ -2098,6 +2099,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Check if a background saving or AOF rewrite in progress terminated. */ if (hasActiveChildProcess() || ldbPendingChildren()) { + run_with_period(1000) receiveChildInfo(); checkChildrenDone(); } else { /* If there is not a background saving/rewrite in progress check if @@ -3119,7 +3121,7 @@ void initServer(void) { server.rdb_bgsave_scheduled = 0; server.child_info_pipe[0] = -1; server.child_info_pipe[1] = -1; - server.child_info_data.magic = 0; + server.child_info_nread = 0; aofRewriteBufferReset(); server.aof_buf = sdsempty(); server.lastsave = time(NULL); /* At startup we consider the DB saved. */ @@ -3131,6 +3133,7 @@ void initServer(void) { /* A few stats we don't want to reset: server startup time, and peak mem. */ server.stat_starttime = time(NULL); server.stat_peak_memory = 0; + server.stat_current_cow_bytes = 0; server.stat_rdb_cow_bytes = 0; server.stat_aof_cow_bytes = 0; server.stat_module_cow_bytes = 0; @@ -4617,6 +4620,7 @@ sds genRedisInfoString(const char *section) { info = sdscatprintf(info, "# Persistence\r\n" "loading:%d\r\n" + "current_cow_size:%zu\r\n" "rdb_changes_since_last_save:%lld\r\n" "rdb_bgsave_in_progress:%d\r\n" "rdb_last_save_time:%jd\r\n" @@ -4635,6 +4639,7 @@ sds genRedisInfoString(const char *section) { "module_fork_in_progress:%d\r\n" "module_fork_last_cow_size:%zu\r\n", server.loading, + server.stat_current_cow_bytes, server.dirty, server.child_type == CHILD_TYPE_RDB, (intmax_t)server.lastsave, @@ -5342,12 +5347,13 @@ int redisFork(int purpose) { * other child types should handle and store their pid's in dedicated variables. * * Today, we allows CHILD_TYPE_LDB to run in parallel with the other fork types: - * - it isn't used for production, so it will not make the server to be less efficient + * - it isn't used for production, so it will not make the server be less efficient * - used for debugging, and we don't want to block it from running while other * forks are running (like RDB and AOF) */ if (isMutuallyExclusiveChildType(purpose)) { server.child_pid = childpid; server.child_type = purpose; + server.stat_current_cow_bytes = 0; } updateDictResizePolicy(); @@ -5355,17 +5361,16 @@ int redisFork(int purpose) { return childpid; } -void sendChildCOWInfo(int ptype, char *pname) { +void sendChildCOWInfo(int ptype, int on_exit, char *pname) { size_t private_dirty = zmalloc_get_private_dirty(-1); if (private_dirty) { - serverLog(LL_NOTICE, + serverLog(on_exit ? LL_NOTICE : LL_VERBOSE, "%s: %zu MB of memory used by copy-on-write", - pname, private_dirty/(1024*1024)); + pname, private_dirty); } - server.child_info_data.cow_size = private_dirty; - sendChildInfo(ptype); + sendChildInfo(ptype, on_exit, private_dirty); } void memtest(size_t megabytes, int passes); diff --git a/src/server.h b/src/server.h index 9d383cc37..62281a63c 100644 --- a/src/server.h +++ b/src/server.h @@ -1098,7 +1098,6 @@ struct clusterState; #undef hz #endif -#define CHILD_INFO_MAGIC 0xC17DDA7A12345678LL #define CHILD_TYPE_NONE 0 #define CHILD_TYPE_RDB 1 #define CHILD_TYPE_AOF 2 @@ -1228,6 +1227,7 @@ struct redisServer { struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ redisAtomic long long stat_net_input_bytes; /* Bytes read from network. */ redisAtomic long long stat_net_output_bytes; /* Bytes written to network. */ + size_t stat_current_cow_bytes; /* Copy on write bytes while child is active. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ @@ -1342,11 +1342,7 @@ struct redisServer { * value means fractions of microsecons (on average). */ /* Pipe and data structures for child -> parent info sharing. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ - struct { - int process_type; /* AOF or RDB child? */ - size_t cow_size; /* Copy on write size. */ - unsigned long long magic; /* Magic value to make sure data is valid. */ - } child_info_data; + int child_info_nread; /* Num of bytes of the last read from pipe */ /* Propagation of commands in AOF / replication */ redisOpArray also_propagate; /* Additional command to propagate. */ /* Logging */ @@ -2000,15 +1996,16 @@ void restartAOFAfterSYNC(); /* Child info */ void openChildInfoPipe(void); void closeChildInfoPipe(void); -void sendChildInfo(int process_type); +void sendChildInfo(int process_type, int on_exit, size_t cow_size); void receiveChildInfo(void); +void receiveLastChildInfo(void); /* Fork helpers */ int redisFork(int type); int hasActiveChildProcess(); void resetChildState(); int isMutuallyExclusiveChildType(int type); -void sendChildCOWInfo(int ptype, char *pname); +void sendChildCOWInfo(int ptype, int on_exit, char *pname); /* acl.c -- Authentication related prototypes. */ extern rax *Users; diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index aadfe281f..cd4dfbadb 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -198,3 +198,78 @@ test {client freed during loading} { exec kill [srv 0 pid] } } + +start_server {overrides {save ""}} { + test {Test child sending COW info} { + # make sure that rdb_last_cow_size and current_cow_size are zero (the test using new server), + # so that the comparisons during the test will be valid + assert {[s current_cow_size] == 0} + assert {[s rdb_last_cow_size] == 0} + + # using a 200us delay, the bgsave is empirically taking about 10 seconds. + # we need it to take more than some 5 seconds, since redis only report COW once a second. + r config set rdb-key-save-delay 200 + + # populate the db with 10k keys of 4k each + set rd [redis_deferring_client 0] + set size 4096 + set cmd_count 10000 + for {set k 0} {$k < $cmd_count} {incr k} { + $rd set key$k [string repeat A $size] + } + + for {set k 0} {$k < $cmd_count} {incr k} { + catch { $rd read } + } + + $rd close + + # start background rdb save + r bgsave + + # on each iteration, we will write some key to the server to trigger copy-on-write, and + # wait to see that it reflected in INFO. + set iteration 1 + while 1 { + # take a sample before writing new data to the server + set cow_size [s current_cow_size] + if {$::verbose} { + puts "COW info before copy-on-write: $cow_size" + } + + # trigger copy-on-write + r setrange key$iteration 0 [string repeat B $size] + + # wait to see that current_cow_size value updated (as long as the child is in progress) + wait_for_condition 80 100 { + [s rdb_bgsave_in_progress] == 0 || + [s current_cow_size] >= $cow_size + $size + } else { + if {$::verbose} { + puts "COW info on fail: [s current_cow_size]" + } + fail "COW info didn't report" + } + + # for no accurate, stop after 2 iterations + if {!$::accurate && $iteration == 2} { + break + } + + # stop iterating if the bgsave completed + if { [s rdb_bgsave_in_progress] == 0 } { + break + } + + incr iteration 1 + } + + # make sure we saw report of current_cow_size + assert_morethan_equal $iteration 2 + + # if bgsave completed, check that rdb_last_cow_size value is at least as last rdb_active_cow_size. + if { [s rdb_bgsave_in_progress] == 0 } { + assert_morethan_equal [s rdb_last_cow_size] $cow_size + } + } +} \ No newline at end of file diff --git a/tests/support/test.tcl b/tests/support/test.tcl index 23015b3a7..39aebe156 100644 --- a/tests/support/test.tcl +++ b/tests/support/test.tcl @@ -31,36 +31,48 @@ proc assert_match {pattern value} { } } +proc assert_failed {expected_err detail} { + if {$detail ne ""} { + set detail "(detail: $detail)" + } else { + set detail "(context: [info frame -2])" + } + error "assertion:$expected_err $detail" +} + proc assert_equal {value expected {detail ""}} { if {$expected ne $value} { - if {$detail ne ""} { - set detail "(detail: $detail)" - } else { - set detail "(context: [info frame -1])" - } - error "assertion:Expected '$value' to be equal to '$expected' $detail" + assert_failed "Expected '$value' to be equal to '$expected'" $detail } } proc assert_lessthan {value expected {detail ""}} { if {!($value < $expected)} { - if {$detail ne ""} { - set detail "(detail: $detail)" - } else { - set detail "(context: [info frame -1])" - } - error "assertion:Expected '$value' to be lessthan to '$expected' $detail" + assert_failed "Expected '$value' to be less than '$expected'" $detail + } +} + +proc assert_lessthan_equal {value expected {detail ""}} { + if {!($value <= $expected)} { + assert_failed "Expected '$value' to be less than or equal to '$expected'" $detail + } +} + +proc assert_morethan {value expected {detail ""}} { + if {!($value > $expected)} { + assert_failed "Expected '$value' to be more than '$expected'" $detail + } +} + +proc assert_morethan_equal {value expected {detail ""}} { + if {!($value >= $expected)} { + assert_failed "Expected '$value' to be more than or equal to '$expected'" $detail } } proc assert_range {value min max {detail ""}} { if {!($value <= $max && $value >= $min)} { - if {$detail ne ""} { - set detail "(detail: $detail)" - } else { - set detail "(context: [info frame -1])" - } - error "assertion:Expected '$value' to be between to '$min' and '$max' $detail" + assert_failed "Expected '$value' to be between to '$min' and '$max'" $detail } }