diff --git a/src/aof.c b/src/aof.c index 3f33bda3a..07f9f8d08 100644 --- a/src/aof.c +++ b/src/aof.c @@ -773,7 +773,6 @@ int loadAppendOnlyFile(char *filename) { if (!(loops++ % 1000)) { loadingProgress(ftello(fp)); processEventsWhileBlocked(); - loadingCron(); processModuleLoadingProgressEvent(1); } @@ -859,7 +858,7 @@ int loadAppendOnlyFile(char *filename) { fakeClient->cmd = NULL; if (server.aof_load_truncated) valid_up_to = ftello(fp); if (server.key_load_delay) - usleep(server.key_load_delay); + debugDelay(server.key_load_delay); } /* This point can only be reached when EOF is reached without errors. diff --git a/src/config.c b/src/config.c index 5b73d9e28..0e1aa01e9 100644 --- a/src/config.c +++ b/src/config.c @@ -2319,8 +2319,8 @@ standardConfig configs[] = { createIntConfig("repl-timeout", NULL, MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_timeout, 60, INTEGER_CONFIG, NULL, NULL), createIntConfig("repl-ping-replica-period", "repl-ping-slave-period", MODIFIABLE_CONFIG, 1, INT_MAX, server.repl_ping_slave_period, 10, INTEGER_CONFIG, NULL, NULL), createIntConfig("list-compress-depth", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.list_compress_depth, 0, INTEGER_CONFIG, NULL, NULL), - createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL), - createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL), + createIntConfig("rdb-key-save-delay", NULL, MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.rdb_key_save_delay, 0, INTEGER_CONFIG, NULL, NULL), + createIntConfig("key-load-delay", NULL, MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.key_load_delay, 0, INTEGER_CONFIG, NULL, NULL), createIntConfig("active-expire-effort", NULL, MODIFIABLE_CONFIG, 1, 10, server.active_expire_effort, 1, INTEGER_CONFIG, NULL, NULL), /* From 1 to 10. */ createIntConfig("hz", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.config_hz, CONFIG_DEFAULT_HZ, INTEGER_CONFIG, NULL, updateHZ), createIntConfig("min-replicas-to-write", "min-slaves-to-write", MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_min_slaves_to_write, 0, INTEGER_CONFIG, NULL, updateGoodSlaves), diff --git a/src/debug.c b/src/debug.c index 373938e6e..5dd9ec849 100644 --- a/src/debug.c +++ b/src/debug.c @@ -1804,3 +1804,12 @@ void disableWatchdog(void) { sigaction(SIGALRM, &act, NULL); server.watchdog_period = 0; } + +/* Positive input is sleep time in microseconds. Negative input is fractions + * of microseconds, i.e. -10 means 100 nanoseconds. */ +void debugDelay(int usec) { + /* Since even the shortest sleep results in context switch and system call, + * the way we achive short sleeps is by statistically sleeping less often. */ + if (usec < 0) usec = (rand() % -usec) == 0 ? 1: 0; + if (usec) usleep(usec); +} diff --git a/src/networking.c b/src/networking.c index 4a58cec40..71e30cfa1 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2920,6 +2920,9 @@ void processEventsWhileBlocked(void) { long long events = server.events_processed_while_blocked - startval; if (!events) break; } + + whileBlockedCron(); + ProcessingEventsWhileBlocked = 0; } diff --git a/src/rdb.c b/src/rdb.c index eab77fb9e..d4e44be5d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1084,7 +1084,7 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { /* Delay return if required (for testing) */ if (server.rdb_key_save_delay) - usleep(server.rdb_key_save_delay); + debugDelay(server.rdb_key_save_delay); return 1; } @@ -1997,6 +1997,7 @@ void startLoading(size_t size, int rdbflags) { server.loading_start_time = time(NULL); server.loading_loaded_bytes = 0; server.loading_total_bytes = size; + blockingOperationStarts(); /* Fire the loading modules start event. */ int subevent; @@ -2030,6 +2031,7 @@ void loadingProgress(off_t pos) { /* Loading finished */ void stopLoading(int success) { server.loading = 0; + blockingOperationEnds(); rdbFileBeingLoaded = NULL; /* Fire the loading modules end event. */ @@ -2073,7 +2075,6 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { replicationSendNewlineToMaster(); loadingProgress(r->processed_bytes); processEventsWhileBlocked(); - loadingCron(); processModuleLoadingProgressEvent(0); } } @@ -2336,7 +2337,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { /* Loading the database more slowly is useful in order to test * certain edge cases. */ - if (server.key_load_delay) usleep(server.key_load_delay); + if (server.key_load_delay) + debugDelay(server.key_load_delay); /* Reset the state that is key-specified and is populated by * opcodes before the key, so that we start from scratch again. */ diff --git a/src/scripting.c b/src/scripting.c index bccbcf637..be44ec67f 100644 --- a/src/scripting.c +++ b/src/scripting.c @@ -1424,6 +1424,7 @@ void luaMaskCountHook(lua_State *lua, lua_Debug *ar) { "Script SHA1 is: %s", elapsed, server.lua_cur_script); server.lua_timedout = 1; + blockingOperationStarts(); /* Once the script timeouts we reenter the event loop to permit others * to call SCRIPT KILL or SHUTDOWN NOSAVE if needed. For this reason * we need to mask the client executing the script from the event loop. @@ -1575,6 +1576,7 @@ void evalGenericCommand(client *c, int evalsha) { if (delhook) lua_sethook(lua,NULL,0,0); /* Disable hook */ if (server.lua_timedout) { server.lua_timedout = 0; + blockingOperationEnds(); /* Restore the client that was protected when the script timeout * was detected. */ unprotectClient(c); diff --git a/src/server.c b/src/server.c index 94c3040f5..5e9c85585 100644 --- a/src/server.c +++ b/src/server.c @@ -2109,22 +2109,62 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { return 1000/server.hz; } -/* This function fill in the role of serverCron during RDB or AOF loading. + +void blockingOperationStarts() { + updateCachedTime(0); + server.blocked_last_cron = server.mstime; +} + +void blockingOperationEnds() { + server.blocked_last_cron = 0; +} + +/* This function fill in the role of serverCron during RDB or AOF loading, and + * also during blocked scripts. * It attempts to do its duties at a similar rate as the configured server.hz, * and updates cronloops variable so that similarly to serverCron, the * run_with_period can be used. */ -void loadingCron() { - long long now = server.ustime; - static long long next_event = 0; - if (now >= next_event) { - cronUpdateMemoryStats(); - +void whileBlockedCron() { + /* Here we may want to perform some cron jobs (normally done server.hz times + * per second). */ + + /* Since this function depends on a call to blockingOperationStarts, let's + * make sure it was done. */ + serverAssert(server.blocked_last_cron); + + /* In case we where called too soon, leave right away. This way one time + * jobs after the loop below don't need an if. and we don't bother to start + * latency monitor if this function is called too often. */ + if (server.blocked_last_cron >= server.mstime) + return; + + mstime_t latency; + latencyStartMonitor(latency); + + /* In some cases we may be called with big intervals, so we may need to do + * extra work here. This is because some of the functions in serverCron rely + * on the fact that it is performed every 10 ms or so. For instance, if + * activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we + * need to call it multiple times. */ + long hz_ms = 1000/server.hz; + while (server.blocked_last_cron < server.mstime) { + + /* Defrag keys gradually. */ + activeDefragCycle(); + + server.blocked_last_cron += hz_ms; /* Increment cronloop so that run_with_period works. */ server.cronloops++; - - /* Decide when the next event should fire. */ - next_event = now + 1000000 / server.hz; } + + /* Other cron jobs do not need to be done in a loop. No need to check + * server.blocked_last_cron since we have an early exit at the top. */ + + /* Update memory stats during loading (excluding blocked scripts) */ + if (server.loading) cronUpdateMemoryStats(); + + latencyEndMonitor(latency); + latencyAddSampleIfNeeded("while-blocked-cron",latency); } extern int ProcessingEventsWhileBlocked; @@ -2830,6 +2870,7 @@ void resetServerStats(void) { server.stat_net_output_bytes = 0; server.stat_unexpected_error_replies = 0; server.aof_delayed_fsync = 0; + server.blocked_last_cron = 0; } void initServer(void) { diff --git a/src/server.h b/src/server.h index 53b4fcce3..6dac914c6 100644 --- a/src/server.h +++ b/src/server.h @@ -1265,9 +1265,11 @@ struct redisServer { char *rdb_pipe_buff; /* In diskless replication, this buffer holds data */ int rdb_pipe_bufflen; /* that was read from the the rdb pipe. */ int rdb_key_save_delay; /* Delay in microseconds between keys while - * writing the RDB. (for testings) */ + * writing the RDB. (for testings). negative + * value means fractions of microsecons (on average). */ int key_load_delay; /* Delay in microseconds between keys while - * loading aof or rdb. (for testings) */ + * loading aof or rdb. (for testings). negative + * value means fractions of microsecons (on average). */ /* Pipe and data structures for child -> parent info sharing. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ struct { @@ -1393,6 +1395,7 @@ struct redisServer { int daylight_active; /* Currently in daylight saving time. */ mstime_t mstime; /* 'unixtime' in milliseconds. */ ustime_t ustime; /* 'unixtime' in microseconds. */ + long long blocked_last_cron; /* Indicate the mstime of the last time we did cron jobs from a blocking operation */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@ -1698,6 +1701,9 @@ void pauseClients(mstime_t duration); int clientsArePaused(void); void processEventsWhileBlocked(void); void loadingCron(void); +void whileBlockedCron(); +void blockingOperationStarts(); +void blockingOperationEnds(); int handleClientsWithPendingWrites(void); int handleClientsWithPendingWritesUsingThreads(void); int handleClientsWithPendingReadsUsingThreads(void); @@ -2464,6 +2470,7 @@ int memtest_preserving_test(unsigned long *m, size_t bytes, int passes); void mixDigest(unsigned char *digest, void *ptr, size_t len); void xorDigest(unsigned char *digest, void *ptr, size_t len); int populateCommandTableParseFlags(struct redisCommand *c, char *strflags); +void debugDelay(int usec); /* TLS stuff */ void tlsInit(void); diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 8340ad207..14b17f4e7 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -455,3 +455,14 @@ proc start_bg_complex_data {host port db ops} { proc stop_bg_complex_data {handle} { catch {exec /bin/kill -9 $handle} } + +proc populate {num prefix size} { + set rd [redis_deferring_client] + for {set j 0} {$j < $num} {incr j} { + $rd set $prefix$j [string repeat A $size] + } + for {set j 0} {$j < $num} {incr j} { + $rd read + } + $rd close +} diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 390bad192..fe4e44d3e 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -37,10 +37,9 @@ start_server {tags {"memefficiency"}} { } run_solo {defrag} { -start_server {tags {"defrag"}} { +start_server {tags {"defrag"} overrides {appendonly yes auto-aof-rewrite-percentage 0 save ""}} { if {[string match {*jemalloc*} [s mem_allocator]]} { test "Active defrag" { - r config set save "" ;# prevent bgsave from interfereing with save below r config set hz 100 r config set activedefrag no r config set active-defrag-threshold-lower 5 @@ -49,9 +48,9 @@ start_server {tags {"defrag"}} { r config set active-defrag-ignore-bytes 2mb r config set maxmemory 100mb r config set maxmemory-policy allkeys-lru - r debug populate 700000 asdf1 150 - r debug populate 170000 asdf2 300 - r ping ;# trigger eviction following the previous population + + populate 700000 asdf1 150 + populate 170000 asdf2 300 after 120 ;# serverCron only updates the info once in 100ms set frag [s allocator_frag_ratio] if {$::verbose} { @@ -107,19 +106,57 @@ start_server {tags {"defrag"}} { # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher assert {$max_latency <= 30} - } else { - set _ "" } # verify the data isn't corrupted or changed set newdigest [r debug digest] assert {$digest eq $newdigest} r save ;# saving an rdb iterates over all the data / pointers - } {OK} + + # if defrag is supported, test AOF loading too + if {[r config get activedefrag] eq "activedefrag yes"} { + # reset stats and load the AOF file + r config resetstat + r config set key-load-delay -50 ;# sleep on average 1/50 usec + r debug loadaof + r config set activedefrag no + # measure hits and misses right after aof loading + set misses [s active_defrag_misses] + set hits [s active_defrag_hits] + + after 120 ;# serverCron only updates the info once in 100ms + set frag [s allocator_frag_ratio] + set max_latency 0 + foreach event [r latency latest] { + lassign $event eventname time latency max + if {$eventname == "loading-cron"} { + set max_latency $max + } + } + if {$::verbose} { + puts "AOF loading:" + puts "frag $frag" + puts "hits: $hits" + puts "misses: $misses" + puts "max latency $max_latency" + puts [r latency latest] + puts [r latency history loading-cron] + } + # make sure we had defrag hits during AOF loading + assert {$hits > 100000} + # make sure the defragger did enough work to keep the fragmentation low during loading. + # we cannot check that it went all the way down, since we don't wait for full defrag cycle to complete. + assert {$frag < 1.4} + # since the AOF contains simple (fast) SET commands (and the cron during loading runs every 1000 commands), + # it'll still not block the loading for long periods of time. + assert {$max_latency <= 30} + } + } + r config set appendonly no + r config set key-load-delay 0 test "Active defrag big keys" { r flushdb r config resetstat - r config set save "" ;# prevent bgsave from interfereing with save below r config set hz 100 r config set activedefrag no r config set active-defrag-max-scan-fields 1000 @@ -247,7 +284,6 @@ start_server {tags {"defrag"}} { test "Active defrag big list" { r flushdb r config resetstat - r config set save "" ;# prevent bgsave from interfereing with save below r config set hz 100 r config set activedefrag no r config set active-defrag-max-scan-fields 1000 @@ -354,7 +390,6 @@ start_server {tags {"defrag"}} { start_server {tags {"defrag"}} { r flushdb r config resetstat - r config set save "" ;# prevent bgsave from interfereing with save below r config set hz 100 r config set activedefrag no r config set active-defrag-max-scan-fields 1000