From 58735ed74ecc5ac0ad4290e5e09f65320a352584 Mon Sep 17 00:00:00 2001 From: Wander Hillen Date: Fri, 16 Mar 2018 09:59:14 +0100 Subject: [PATCH 001/411] Fix typos, add some periods --- src/replication.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/replication.c b/src/replication.c index 8c01bfb51..c5c3618a5 100644 --- a/src/replication.c +++ b/src/replication.c @@ -255,7 +255,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { while((ln = listNext(&li))) { client *slave = ln->value; - /* Don't feed slaves that are still waiting for BGSAVE to start */ + /* Don't feed slaves that are still waiting for BGSAVE to start. */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; /* Feed slaves that are waiting for the initial SYNC (so these commands @@ -294,7 +294,7 @@ void replicationFeedSlavesFromMasterStream(list *slaves, char *buf, size_t bufle while((ln = listNext(&li))) { client *slave = ln->value; - /* Don't feed slaves that are still waiting for BGSAVE to start */ + /* Don't feed slaves that are still waiting for BGSAVE to start. */ if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; addReplyString(slave,buf,buflen); } @@ -584,7 +584,7 @@ int startBgsaveForReplication(int mincapa) { } /* If we failed to BGSAVE, remove the slaves waiting for a full - * resynchorinization from the list of salves, inform them with + * resynchronization from the list of slaves, inform them with * an error about what happened, close the connection ASAP. */ if (retval == C_ERR) { serverLog(LL_WARNING,"BGSAVE for replication failed"); @@ -604,7 +604,7 @@ int startBgsaveForReplication(int mincapa) { } /* If the target is socket, rdbSaveToSlavesSockets() already setup - * the salves for a full resync. Otherwise for disk target do it now.*/ + * the slaves for a full resync. Otherwise for disk target do it now.*/ if (!socket_target) { listRewind(server.slaves,&li); while((ln = listNext(&li))) { From 48bc22ec4c2597467865388dad536606fd820347 Mon Sep 17 00:00:00 2001 From: Wander Hillen Date: Fri, 16 Mar 2018 09:59:17 +0100 Subject: [PATCH 002/411] More typos --- src/rdb.c | 2 +- src/server.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/rdb.c b/src/rdb.c index 8f896983b..35448b624 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1800,7 +1800,7 @@ void backgroundSaveDoneHandlerDisk(int exitcode, int bysignal) { } /* A background saving child (BGSAVE) terminated its work. Handle this. - * This function covers the case of RDB -> Salves socket transfers for + * This function covers the case of RDB -> Slaves socket transfers for * diskless replication. */ void backgroundSaveDoneHandlerSocket(int exitcode, int bysignal) { uint64_t *ok_slaves; diff --git a/src/server.c b/src/server.c index 85f05f1f9..94850e2ea 100644 --- a/src/server.c +++ b/src/server.c @@ -2193,7 +2193,7 @@ void preventCommandReplication(client *c) { * CMD_CALL_STATS Populate command stats. * CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset * or if the client flags are forcing propagation. - * CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset + * CMD_CALL_PROPAGATE_REPL Send command to slaves if it modified the dataset * or if the client flags are forcing propagation. * CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL. * CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE. From 2eb05fe3de7b82e8f56f185ad8ab1471fe61053b Mon Sep 17 00:00:00 2001 From: jem Date: Tue, 18 Sep 2018 17:04:00 +0800 Subject: [PATCH 003/411] update leap year comment when div by 400 --- src/localtime.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/localtime.c b/src/localtime.c index 3f59a3331..e2ac81f98 100644 --- a/src/localtime.c +++ b/src/localtime.c @@ -52,8 +52,8 @@ static int is_leap_year(time_t year) { if (year % 4) return 0; /* A year not divisible by 4 is not leap. */ else if (year % 100) return 1; /* If div by 4 and not 100 is surely leap. */ - else if (year % 400) return 0; /* If div by 100 *and* 400 is not leap. */ - else return 1; /* If div by 100 and not by 400 is leap. */ + else if (year % 400) return 0; /* If div by 100 *and* not by 400 is not leap. */ + else return 1; /* If div by 100 and 400 is leap. */ } void nolocks_localtime(struct tm *tmp, time_t t, time_t tz, int dst) { From 7c244182742d822ed909f47d4b4a534c49f3f862 Mon Sep 17 00:00:00 2001 From: jem Date: Tue, 18 Sep 2018 20:42:09 +0800 Subject: [PATCH 004/411] ignore vscode conf dir --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index a188cfc82..717bf3c7c 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ deps/lua/src/liblua.a .prerequisites *.dSYM Makefile.dep +.vscode/* From 26b109ae23b4ad50b0952c6239558877697eb1ce Mon Sep 17 00:00:00 2001 From: Bruce Merry Date: Mon, 31 Dec 2018 11:51:03 +0200 Subject: [PATCH 005/411] Make dbSwapDatabases take args as long This prevents an integer overflow bug. Closes #5737. --- src/db.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/db.c b/src/db.c index 62c8aa131..bac15a6bb 100644 --- a/src/db.c +++ b/src/db.c @@ -997,7 +997,7 @@ void scanDatabaseForReadyLists(redisDb *db) { * * Returns C_ERR if at least one of the DB ids are out of range, otherwise * C_OK is returned. */ -int dbSwapDatabases(int id1, int id2) { +int dbSwapDatabases(long id1, long id2) { if (id1 < 0 || id1 >= server.dbnum || id2 < 0 || id2 >= server.dbnum) return C_ERR; if (id1 == id2) return C_OK; From d8f237a761967ea742babc812f2ebb5b049f871e Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Tue, 15 Jan 2019 07:26:19 +0000 Subject: [PATCH 006/411] Fixed a rounding bug in geo.tcl --- tests/unit/geo.tcl | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/tests/unit/geo.tcl b/tests/unit/geo.tcl index 604697be4..49e421ee9 100644 --- a/tests/unit/geo.tcl +++ b/tests/unit/geo.tcl @@ -61,6 +61,7 @@ set regression_vectors { {939895 151 59.149620271823181 65.204186651485145} {1412 156 149.29737817929004 15.95807862745508} {564862 149 84.062063109158544 -65.685403922426232} + {1546032440391 16751 -1.8175081637769495 20.665668878082954} } set rv_idx 0 @@ -274,8 +275,19 @@ start_server {tags {"geo"}} { foreach place $diff { set mydist [geo_distance $lon $lat $search_lon $search_lat] set mydist [expr $mydist/1000] - if {($mydist / $radius_km) > 0.999} {incr rounding_errors} + if {($mydist / $radius_km) > 0.999} { + incr rounding_errors + continue + } + if {$mydist < $radius_m} { + # This is a false positive for redis since given the + # same points the higher precision calculation provided + # by TCL shows the point within range + incr rounding_errors + continue + } } + # Make sure this is a real error and not a rounidng issue. if {[llength $diff] == $rounding_errors} { set res $res2; # Error silenced From b4f77cc43aefb0710fc11d3103c2fa094d10e029 Mon Sep 17 00:00:00 2001 From: Jim Brunner Date: Wed, 13 Mar 2019 16:31:24 +0000 Subject: [PATCH 007/411] Addition of OnUnload function --- src/module.c | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/src/module.c b/src/module.c index 5ad999751..ba38ed8d4 100644 --- a/src/module.c +++ b/src/module.c @@ -4799,6 +4799,23 @@ int moduleUnload(sds name) { errno = EBUSY; return REDISMODULE_ERR; } + + /* Give module a chance to clean up. */ + int (*onunload)(void *); + onunload = (int (*)(void *))(unsigned long) dlsym(module->handle, "RedisModule_OnUnload"); + if (onunload) { + RedisModuleCtx ctx = REDISMODULE_CTX_INIT; + ctx.module = module; + ctx.client = moduleFreeContextReusedClient; + int unload_status = onunload((void*)&ctx); + moduleFreeContext(&ctx); + + if (unload_status == REDISMODULE_ERR) { + serverLog(LL_WARNING, "Module %s OnUnload failed. Unload canceled.", name); + errno = ECANCELED; + return REDISMODULE_ERR; + } + } moduleUnregisterCommands(module); From 3137f26d4c6fe0ca6e9ea4dc9e31eac89ec548e9 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 3 Jun 2018 15:37:48 +0300 Subject: [PATCH 008/411] Add RedisModule_Assert() API call. --- src/module.c | 10 ++++++++++ src/redismodule.h | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/src/module.c b/src/module.c index 8954fcdf0..c6e6e5989 100644 --- a/src/module.c +++ b/src/module.c @@ -3461,6 +3461,15 @@ void RM_LogIOError(RedisModuleIO *io, const char *levelstr, const char *fmt, ... va_end(ap); } +/* Redis-like assert function. + * + * A failed assertion will shut down the server and produce logging information + * that looks identical to information generated by Redis itself. + */ +void RM__Assert(const char *estr, const char *file, int line) { + _serverAssert(estr, file, line); +} + /* -------------------------------------------------------------------------- * Blocking clients from modules * -------------------------------------------------------------------------- */ @@ -4993,6 +5002,7 @@ void moduleRegisterCoreAPI(void) { REGISTER_API(EmitAOF); REGISTER_API(Log); REGISTER_API(LogIOError); + REGISTER_API(_Assert); REGISTER_API(StringAppendBuffer); REGISTER_API(RetainString); REGISTER_API(StringCompare); diff --git a/src/redismodule.h b/src/redismodule.h index d18c38881..db32df049 100644 --- a/src/redismodule.h +++ b/src/redismodule.h @@ -271,6 +271,7 @@ void REDISMODULE_API_FUNC(RedisModule_SaveFloat)(RedisModuleIO *io, float value) float REDISMODULE_API_FUNC(RedisModule_LoadFloat)(RedisModuleIO *io); void REDISMODULE_API_FUNC(RedisModule_Log)(RedisModuleCtx *ctx, const char *level, const char *fmt, ...); void REDISMODULE_API_FUNC(RedisModule_LogIOError)(RedisModuleIO *io, const char *levelstr, const char *fmt, ...); +void REDISMODULE_API_FUNC(RedisModule__Assert)(const char *estr, const char *file, int line); int REDISMODULE_API_FUNC(RedisModule_StringAppendBuffer)(RedisModuleCtx *ctx, RedisModuleString *str, const char *buf, size_t len); void REDISMODULE_API_FUNC(RedisModule_RetainString)(RedisModuleCtx *ctx, RedisModuleString *str); int REDISMODULE_API_FUNC(RedisModule_StringCompare)(RedisModuleString *a, RedisModuleString *b); @@ -433,6 +434,7 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int REDISMODULE_GET_API(EmitAOF); REDISMODULE_GET_API(Log); REDISMODULE_GET_API(LogIOError); + REDISMODULE_GET_API(_Assert); REDISMODULE_GET_API(StringAppendBuffer); REDISMODULE_GET_API(RetainString); REDISMODULE_GET_API(StringCompare); @@ -499,6 +501,8 @@ static int RedisModule_Init(RedisModuleCtx *ctx, const char *name, int ver, int return REDISMODULE_OK; } +#define RedisModule_Assert(_e) ((_e)?(void)0 : (RedisModule__Assert(#_e,__FILE__,__LINE__),exit(1))) + #else /* Things only defined for the modules core, not exported to modules From 81d3e1d354e1f6cc37f880c862969f407aebebb6 Mon Sep 17 00:00:00 2001 From: Guy Korland Date: Fri, 15 Mar 2019 21:09:59 +0200 Subject: [PATCH 009/411] add CI --- .circleci/config.yml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 .circleci/config.yml diff --git a/.circleci/config.yml b/.circleci/config.yml new file mode 100644 index 000000000..52ff69721 --- /dev/null +++ b/.circleci/config.yml @@ -0,0 +1,21 @@ +version: 2 +jobs: + build: + docker: + - image: circleci/buildpack-deps + steps: + - checkout + - run: + name: dep + command: sudo apt-get install -y tcl + - run: + name: Build + command: make + - run: + name: Test + command: make test +workflows: + version: 2 + workflow: + jobs: + - build From 366fe793358940b9e8b92dd5e778c73891989906 Mon Sep 17 00:00:00 2001 From: Guy Korland Date: Fri, 15 Mar 2019 21:14:15 +0200 Subject: [PATCH 010/411] add pull app --- .github/pull.yml | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .github/pull.yml diff --git a/.github/pull.yml b/.github/pull.yml new file mode 100644 index 000000000..4cc55b0f5 --- /dev/null +++ b/.github/pull.yml @@ -0,0 +1,5 @@ +version: "1" +rules: + - base: unstable + upstream: antirez:unstable + mergeMethod: merge From c9cdf67d91022cec714de34b0d0e2f60506c41dd Mon Sep 17 00:00:00 2001 From: Guy Korland Date: Fri, 15 Mar 2019 21:30:09 +0200 Subject: [PATCH 011/411] Update config.yml --- .circleci/config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 52ff69721..445031a35 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -6,7 +6,7 @@ jobs: steps: - checkout - run: - name: dep + name: Install dependency command: sudo apt-get install -y tcl - run: name: Build From ebdcb1618fe37a06e551cb117f93f410c854678d Mon Sep 17 00:00:00 2001 From: Guy Korland Date: Fri, 15 Mar 2019 21:35:24 +0200 Subject: [PATCH 012/411] Update pull.yml --- .github/pull.yml | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/.github/pull.yml b/.github/pull.yml index 4cc55b0f5..6ccf19ca9 100644 --- a/.github/pull.yml +++ b/.github/pull.yml @@ -3,3 +3,12 @@ rules: - base: unstable upstream: antirez:unstable mergeMethod: merge + - base: 5.0 + upstream: antirez:5.0 + mergeMethod: merge + - base: 4.0 + upstream: antirez:4.0 + mergeMethod: merge + - base: 3.2 + upstream: antirez:3.2 + mergeMethod: merge From 661b5097e90edc5fe2c4d08810c8190541139769 Mon Sep 17 00:00:00 2001 From: Guy Korland Date: Fri, 15 Mar 2019 22:22:06 +0200 Subject: [PATCH 013/411] Update config.yml --- .circleci/config.yml | 5 ----- 1 file changed, 5 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 445031a35..fe2a6ea07 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -14,8 +14,3 @@ jobs: - run: name: Test command: make test -workflows: - version: 2 - workflow: - jobs: - - build From 971d9ee0aa7eb6463f2d95d10c8ecde9a7e3221c Mon Sep 17 00:00:00 2001 From: Itamar Haber Date: Tue, 16 Apr 2019 22:16:12 +0300 Subject: [PATCH 014/411] Adds a "Modules" section to `INFO` Fixes #6012. As long as "INFO is broken", this should be adequate IMO. Once we rework `INFO`, perhaps into RESP3, this implementation should be revisited. --- src/module.c | 19 +++++++++++++++++++ src/server.c | 7 +++++++ src/server.h | 1 + 3 files changed, 27 insertions(+) diff --git a/src/module.c b/src/module.c index c29521670..60c9a0464 100644 --- a/src/module.c +++ b/src/module.c @@ -5244,6 +5244,25 @@ void addReplyLoadedModules(client *c) { dictReleaseIterator(di); } +/* Helper function for the INFO command: adds loaded modules as to info's + * output. + * + * After the call, the passed sds info string is no longer valid and all the + * references must be substituted with the new pointer returned by the call. */ +sds genModulesInfoString(sds info) { + dictIterator *di = dictGetIterator(modules); + dictEntry *de; + + while ((de = dictNext(di)) != NULL) { + sds name = dictGetKey(de); + struct RedisModule *module = dictGetVal(de); + + info = sdscatprintf(info, "module:name=%s,ver=%d\r\n", name, module->ver); + } + dictReleaseIterator(di); + return info; +} + /* Redis MODULE command. * * MODULE LOAD [args...] */ diff --git a/src/server.c b/src/server.c index fb5d679cd..49a65ef57 100644 --- a/src/server.c +++ b/src/server.c @@ -4291,6 +4291,13 @@ sds genRedisInfoString(char *section) { (long)c_ru.ru_utime.tv_sec, (long)c_ru.ru_utime.tv_usec); } + /* Modules */ + if (allsections || defsections || !strcasecmp(section,"modules")) { + if (sections++) info = sdscat(info,"\r\n"); + info = sdscatprintf(info,"# Modules\r\n"); + info = genModulesInfoString(info); + } + /* Command statistics */ if (allsections || !strcasecmp(section,"commandstats")) { if (sections++) info = sdscat(info,"\r\n"); diff --git a/src/server.h b/src/server.h index dfd9f7698..d832c6465 100644 --- a/src/server.h +++ b/src/server.h @@ -2268,6 +2268,7 @@ void bugReportStart(void); void serverLogObjectDebugInfo(const robj *o); void sigsegvHandler(int sig, siginfo_t *info, void *secret); sds genRedisInfoString(char *section); +sds genModulesInfoString(sds info); void enableWatchdog(int period); void disableWatchdog(void); void watchdogScheduleSignal(int period); From 4620bfb47ad68de4b9a89c48c3978b6ad538b9ed Mon Sep 17 00:00:00 2001 From: Yuan Zhou Date: Wed, 29 May 2019 14:21:47 +0800 Subject: [PATCH 015/411] aof: fix assignment for aof_fsync_offset Signed-off-by: Yuan Zhou --- src/aof.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/aof.c b/src/aof.c index 4744847d2..c8fb8e8f6 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1768,7 +1768,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; - server.aof_current_size = server.aof_current_size; + server.aof_fsync_offset = server.aof_current_size; /* Clear regular AOF buffer since its contents was just written to * the new AOF from the background rewrite buffer. */ From 0e0756659128102952c3213a9b4e7161dcc9ca3d Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Thu, 30 May 2019 11:51:58 +0300 Subject: [PATCH 016/411] Jemalloc: Avoid blocking on background thread lock for stats. Background threads may run for a long time, especially when the # of dirty pages is high. Avoid blocking stats calls because of this (which may cause latency spikes). see https://github.com/jemalloc/jemalloc/issues/1502 cherry picked from commit 1a71533511027dbe3f9d989659efeec446915d6b --- deps/jemalloc/src/background_thread.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/deps/jemalloc/src/background_thread.c b/deps/jemalloc/src/background_thread.c index 3517a3bb8..457669c9e 100644 --- a/deps/jemalloc/src/background_thread.c +++ b/deps/jemalloc/src/background_thread.c @@ -787,7 +787,13 @@ background_thread_stats_read(tsdn_t *tsdn, background_thread_stats_t *stats) { nstime_init(&stats->run_interval, 0); for (unsigned i = 0; i < max_background_threads; i++) { background_thread_info_t *info = &background_thread_info[i]; - malloc_mutex_lock(tsdn, &info->mtx); + if (malloc_mutex_trylock(tsdn, &info->mtx)) { + /* + * Each background thread run may take a long time; + * avoid waiting on the stats if the thread is active. + */ + continue; + } if (info->state != background_thread_stopped) { num_runs += info->tot_n_runs; nstime_add(&stats->run_interval, &info->tot_sleep_time); From f7833d560d80604445ad34ad2ef9d829d0aef7d6 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Thu, 30 May 2019 12:51:32 +0300 Subject: [PATCH 017/411] make redis purge jemalloc after flush, and enable background purging thread jemalloc 5 doesn't immediately release memory back to the OS, instead there's a decaying mechanism, which doesn't work when there's no traffic (no allocations). this is most evident if there's no traffic after flushdb, the RSS will remain high. 1) enable jemalloc background purging 2) explicitly purge in flushdb --- src/config.c | 9 ++++++++ src/db.c | 14 ++++++++++++ src/debug.c | 62 +++++++++++++++++++++++++++++++++++++++++++++++++++ src/server.c | 2 ++ src/server.h | 1 + src/zmalloc.c | 34 ++++++++++++++++++++++++++++ src/zmalloc.h | 2 ++ 7 files changed, 124 insertions(+) diff --git a/src/config.c b/src/config.c index 7f0e9af89..16850a1fc 100644 --- a/src/config.c +++ b/src/config.c @@ -474,6 +474,10 @@ void loadServerConfigFromString(char *config) { err = "active defrag can't be enabled without proper jemalloc support"; goto loaderr; #endif } + } else if (!strcasecmp(argv[0],"jemalloc-bg-thread") && argc == 2) { + if ((server.jemalloc_bg_thread = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"daemonize") && argc == 2) { if ((server.daemonize = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; @@ -1152,6 +1156,9 @@ void configSetCommand(client *c) { return; } #endif + } config_set_bool_field( + "jemalloc-bg-thread",server.jemalloc_bg_thread) { + set_jemalloc_bg_thread(server.jemalloc_bg_thread); } config_set_bool_field( "protected-mode",server.protected_mode) { } config_set_bool_field( @@ -1487,6 +1494,7 @@ void configGetCommand(client *c) { config_get_bool_field("rdbchecksum", server.rdb_checksum); config_get_bool_field("activerehashing", server.activerehashing); config_get_bool_field("activedefrag", server.active_defrag_enabled); + config_get_bool_field("jemalloc-bg-thread", server.jemalloc_bg_thread); config_get_bool_field("protected-mode", server.protected_mode); config_get_bool_field("gopher-enabled", server.gopher_enabled); config_get_bool_field("io-threads-do-reads", server.io_threads_do_reads); @@ -2318,6 +2326,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"hll-sparse-max-bytes",server.hll_sparse_max_bytes,CONFIG_DEFAULT_HLL_SPARSE_MAX_BYTES); rewriteConfigYesNoOption(state,"activerehashing",server.activerehashing,CONFIG_DEFAULT_ACTIVE_REHASHING); rewriteConfigYesNoOption(state,"activedefrag",server.active_defrag_enabled,CONFIG_DEFAULT_ACTIVE_DEFRAG); + rewriteConfigYesNoOption(state,"jemalloc-bg-thread",server.jemalloc_bg_thread,1); rewriteConfigYesNoOption(state,"protected-mode",server.protected_mode,CONFIG_DEFAULT_PROTECTED_MODE); rewriteConfigYesNoOption(state,"gopher-enabled",server.gopher_enabled,CONFIG_DEFAULT_GOPHER_ENABLED); rewriteConfigYesNoOption(state,"io-threads-do-reads",server.io_threads_do_reads,CONFIG_DEFAULT_IO_THREADS_DO_READS); diff --git a/src/db.c b/src/db.c index b537a29a4..50e23d6b2 100644 --- a/src/db.c +++ b/src/db.c @@ -441,6 +441,13 @@ void flushdbCommand(client *c) { signalFlushedDb(c->db->id); server.dirty += emptyDb(c->db->id,flags,NULL); addReply(c,shared.ok); +#if defined(USE_JEMALLOC) + /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. + * for large databases, flushdb blocks for long anyway, so a bit more won't + * harm and this way the flush and purge will be synchroneus. */ + if (!(flags & EMPTYDB_ASYNC)) + jemalloc_purge(); +#endif } /* FLUSHALL [ASYNC] @@ -464,6 +471,13 @@ void flushallCommand(client *c) { server.dirty = saved_dirty; } server.dirty++; +#if defined(USE_JEMALLOC) + /* jemalloc 5 doesn't release pages back to the OS when there's no traffic. + * for large databases, flushdb blocks for long anyway, so a bit more won't + * harm and this way the flush and purge will be synchroneus. */ + if (!(flags & EMPTYDB_ASYNC)) + jemalloc_purge(); +#endif } /* This command implements DEL and LAZYDEL. */ diff --git a/src/debug.c b/src/debug.c index 0c6b5630c..c82c99b1f 100644 --- a/src/debug.c +++ b/src/debug.c @@ -297,6 +297,56 @@ void computeDatasetDigest(unsigned char *final) { } } +#ifdef USE_JEMALLOC +void mallctl_int(client *c, robj **argv, int argc) { + int ret; + /* start with the biggest size (int64), and if that fails, try smaller sizes (int32, bool) */ + int64_t old = 0, val; + if (argc > 1) { + long long ll; + if (getLongLongFromObjectOrReply(c, argv[1], &ll, NULL) != C_OK) + return; + val = ll; + } + size_t sz = sizeof(old); + while (sz > 0) { + if ((ret=je_mallctl(argv[0]->ptr, &old, &sz, argc > 1? &val: NULL, argc > 1?sz: 0))) { + if (ret==EINVAL) { + /* size might be wrong, try a smaller one */ + sz /= 2; +#if BYTE_ORDER == BIG_ENDIAN + val <<= 8*sz; +#endif + continue; + } + addReplyErrorFormat(c,"%s", strerror(ret)); + return; + } else { +#if BYTE_ORDER == BIG_ENDIAN + old >>= 64 - 8*sz; +#endif + addReplyLongLong(c, old); + return; + } + } + addReplyErrorFormat(c,"%s", strerror(EINVAL)); +} + +void mallctl_string(client *c, robj **argv, int argc) { + int ret; + char *old; + size_t sz = sizeof(old); + /* for strings, it seems we need to first get the old value, before overriding it. */ + if ((ret=je_mallctl(argv[0]->ptr, &old, &sz, NULL, 0))) { + addReplyErrorFormat(c,"%s", strerror(ret)); + return; + } + addReplyBulkCString(c, old); + if(argc > 1) + je_mallctl(argv[0]->ptr, NULL, 0, &argv[1]->ptr, sizeof(char*)); +} +#endif + void debugCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { @@ -323,6 +373,10 @@ void debugCommand(client *c) { "STRUCTSIZE -- Return the size of different Redis core C structures.", "ZIPLIST -- Show low level info about the ziplist encoding.", "STRINGMATCH-TEST -- Run a fuzz tester against the stringmatchlen() function.", +#ifdef USE_JEMALLOC +"MALLCTL [] -- Get or set a malloc tunning integer.", +"MALLCTL-STR [] -- Get or set a malloc tunning string.", +#endif NULL }; addReplyHelp(c, help); @@ -676,6 +730,14 @@ NULL { stringmatchlen_fuzz_test(); addReplyStatus(c,"Apparently Redis did not crash: test passed"); +#ifdef USE_JEMALLOC + } else if(!strcasecmp(c->argv[1]->ptr,"mallctl") && c->argc >= 3) { + mallctl_int(c, c->argv+2, c->argc-2); + return; + } else if(!strcasecmp(c->argv[1]->ptr,"mallctl-str") && c->argc >= 3) { + mallctl_string(c, c->argv+2, c->argc-2); + return; +#endif } else { addReplySubcommandSyntaxError(c); return; diff --git a/src/server.c b/src/server.c index 4b87b6ac2..fa2c7b1ee 100644 --- a/src/server.c +++ b/src/server.c @@ -2230,6 +2230,7 @@ void initServerConfig(void) { server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; server.active_expire_enabled = 1; + server.jemalloc_bg_thread = 1; server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG; server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES; server.active_defrag_threshold_lower = CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER; @@ -2866,6 +2867,7 @@ void initServer(void) { latencyMonitorInit(); bioInit(); initThreadedIO(); + set_jemalloc_bg_thread(server.jemalloc_bg_thread); server.initial_memory_usage = zmalloc_used_memory(); } diff --git a/src/server.h b/src/server.h index 0813f8bd1..4ae079ff2 100644 --- a/src/server.h +++ b/src/server.h @@ -1129,6 +1129,7 @@ struct redisServer { int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int active_expire_enabled; /* Can be disabled for testing purposes. */ int active_defrag_enabled; + int jemalloc_bg_thread; /* Enable jemalloc background thread */ size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */ int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ diff --git a/src/zmalloc.c b/src/zmalloc.c index 5e6010278..58896a727 100644 --- a/src/zmalloc.c +++ b/src/zmalloc.c @@ -306,6 +306,7 @@ size_t zmalloc_get_rss(void) { #endif #if defined(USE_JEMALLOC) + int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident) { @@ -327,13 +328,46 @@ int zmalloc_get_allocator_info(size_t *allocated, je_mallctl("stats.allocated", allocated, &sz, NULL, 0); return 1; } + +void set_jemalloc_bg_thread(int enable) { + /* let jemalloc do purging asynchronously, required when there's no traffic + * after flushdb */ + if (enable) { + char val = 1; + je_mallctl("background_thread", NULL, 0, &val, 1); + } +} + +int jemalloc_purge() { + /* return all unused (reserved) pages to the OS */ + char tmp[32]; + unsigned narenas = 0; + size_t sz = sizeof(unsigned); + if (!je_mallctl("arenas.narenas", &narenas, &sz, NULL, 0)) { + sprintf(tmp, "arena.%d.purge", narenas); + if (!je_mallctl(tmp, NULL, 0, NULL, 0)) + return 0; + } + return -1; +} + #else + int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident) { *allocated = *resident = *active = 0; return 1; } + +void set_jemalloc_bg_thread(int enable) { + ((void)(enable)); +} + +int jemalloc_purge() { + return 0; +} + #endif /* Get the sum of the specified field (converted form kb to bytes) in diff --git a/src/zmalloc.h b/src/zmalloc.h index 6fb19b046..b136a910d 100644 --- a/src/zmalloc.h +++ b/src/zmalloc.h @@ -86,6 +86,8 @@ size_t zmalloc_used_memory(void); void zmalloc_set_oom_handler(void (*oom_handler)(size_t)); size_t zmalloc_get_rss(void); int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident); +void set_jemalloc_bg_thread(int enable); +int jemalloc_purge(); size_t zmalloc_get_private_dirty(long pid); size_t zmalloc_get_smap_bytes_by_field(char *field, long pid); size_t zmalloc_get_memory_size(void); From 6441bc04756a178c07e900d9dc82da5ab8c5118e Mon Sep 17 00:00:00 2001 From: "zheng.ren01@mljr.com" Date: Tue, 25 Jun 2019 18:34:35 +0800 Subject: [PATCH 018/411] =?UTF-8?q?fix=20readme.md=EF=BC=8CRedis=20data=20?= =?UTF-8?q?types=20should=20add=20`t=5Fstream.c`.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 6c9435b53..3442659e6 100644 --- a/README.md +++ b/README.md @@ -406,7 +406,7 @@ replicas, or to continue the replication after a disconnection. Other C files --- -* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c` and `t_zset.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types. +* `t_hash.c`, `t_list.c`, `t_set.c`, `t_string.c`, `t_zset.c` and `t_stream.c` contains the implementation of the Redis data types. They implement both an API to access a given data type, and the client commands implementations for these data types. * `ae.c` implements the Redis event loop, it's a self contained library which is simple to read and understand. * `sds.c` is the Redis string library, check http://github.com/antirez/sds for more information. * `anet.c` is a library to use POSIX networking in a simpler way compared to the raw interface exposed by the kernel. From 17e98d88516d72ca9a8716e411e5055750b9b71e Mon Sep 17 00:00:00 2001 From: Angus Pearson Date: Tue, 2 Jul 2019 14:28:48 +0100 Subject: [PATCH 019/411] RESP3 double representation for -infinity is `,-inf\r\n`, not `-inf\r\n` --- src/networking.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 4bc22120a..6225229ce 100644 --- a/src/networking.c +++ b/src/networking.c @@ -506,7 +506,7 @@ void addReplyDouble(client *c, double d) { if (c->resp == 2) { addReplyBulkCString(c, d > 0 ? "inf" : "-inf"); } else { - addReplyProto(c, d > 0 ? ",inf\r\n" : "-inf\r\n", + addReplyProto(c, d > 0 ? ",inf\r\n" : ",-inf\r\n", d > 0 ? 6 : 7); } } else { From 527c297efb16694f0239ea221a226afecaf310d2 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Thu, 4 Jul 2019 10:02:26 +0300 Subject: [PATCH 020/411] missing per-skiplist overheads in MEMORY USAGE these had severe impact for small zsets, for instance ones with just one element that is longer than 64 (causing it not to be ziplist encoded) --- src/object.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/object.c b/src/object.c index 234e11f8a..10209a6c8 100644 --- a/src/object.c +++ b/src/object.c @@ -834,7 +834,9 @@ size_t objectComputeSize(robj *o, size_t sample_size) { d = ((zset*)o->ptr)->dict; zskiplist *zsl = ((zset*)o->ptr)->zsl; zskiplistNode *znode = zsl->header->level[0].forward; - asize = sizeof(*o)+sizeof(zset)+(sizeof(struct dictEntry*)*dictSlots(d)); + asize = sizeof(*o)+sizeof(zset)+sizeof(zskiplist)+sizeof(dict)+ + (sizeof(struct dictEntry*)*dictSlots(d))+ + zmalloc_size(zsl->header); while(znode != NULL && samples < sample_size) { elesize += sdsAllocSize(znode->ele); elesize += sizeof(struct dictEntry) + zmalloc_size(znode); From 29754ebe22178f3014227fb38e2e7b4ab4546731 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Mon, 1 Jul 2019 15:22:29 +0300 Subject: [PATCH 021/411] diskless replication on slave side (don't store rdb to file), plus some other related fixes The implementation of the diskless replication was currently diskless only on the master side. The slave side was still storing the received rdb file to the disk before loading it back in and parsing it. This commit adds two modes to load rdb directly from socket: 1) when-empty 2) using "swapdb" the third mode of using diskless slave by flushdb is risky and currently not included. other changes: -------------- distinguish between aof configuration and state so that we can re-enable aof only when sync eventually succeeds (and not when exiting from readSyncBulkPayload after a failed attempt) also a CONFIG GET and INFO during rdb loading would have lied When loading rdb from the network, don't kill the server on short read (that can be a network error) Fix rdb check when performed on preamble AOF tests: run replication tests for diskless slave too make replication test a bit more aggressive Add test for diskless load swapdb --- redis.conf | 16 ++ src/anet.c | 14 + src/anet.h | 1 + src/aof.c | 2 +- src/config.c | 38 ++- src/db.c | 23 +- src/rdb.c | 40 ++- src/redis-check-rdb.c | 4 +- src/replication.c | 340 +++++++++++++++--------- src/rio.c | 109 +++++++- src/rio.h | 10 + src/server.c | 7 +- src/server.h | 19 +- tests/integration/replication-4.tcl | 9 - tests/integration/replication-psync.tcl | 40 ++- tests/integration/replication.tcl | 216 ++++++++++----- tests/support/util.tcl | 12 + 17 files changed, 648 insertions(+), 252 deletions(-) diff --git a/redis.conf b/redis.conf index 060510768..74b6c018f 100644 --- a/redis.conf +++ b/redis.conf @@ -377,6 +377,22 @@ repl-diskless-sync no # it entirely just set it to 0 seconds and the transfer will start ASAP. repl-diskless-sync-delay 5 +# Replica can load the rdb it reads from the replication link directly from the +# socket, or store the rdb to a file and read that file after it was completely +# recived from the master. +# In many cases the disk is slower than the network, and storing and loading +# the rdb file may increase replication time (and even increase the master's +# Copy on Write memory and salve buffers). +# However, parsing the rdb file directly from the socket may mean that we have +# to flush the contents of the current database before the full rdb was received. +# for this reason we have the following options: +# "disabled" - Don't use diskless load (store the rdb file to the disk first) +# "on-empty-db" - Use diskless load only when it is completely safe. +# "swapdb" - Keep a copy of the current db contents in RAM while parsing +# the data directly from the socket. note that this requires +# sufficient memory, if you don't have it, you risk an OOM kill. +repl-diskless-load disabled + # Replicas send PINGs to server in a predefined interval. It's possible to change # this interval with the repl_ping_replica_period option. The default value is 10 # seconds. diff --git a/src/anet.c b/src/anet.c index 2981fca13..2088f4fb1 100644 --- a/src/anet.c +++ b/src/anet.c @@ -193,6 +193,20 @@ int anetSendTimeout(char *err, int fd, long long ms) { return ANET_OK; } +/* Set the socket receive timeout (SO_RCVTIMEO socket option) to the specified + * number of milliseconds, or disable it if the 'ms' argument is zero. */ +int anetRecvTimeout(char *err, int fd, long long ms) { + struct timeval tv; + + tv.tv_sec = ms/1000; + tv.tv_usec = (ms%1000)*1000; + if (setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)) == -1) { + anetSetError(err, "setsockopt SO_RCVTIMEO: %s", strerror(errno)); + return ANET_ERR; + } + return ANET_OK; +} + /* anetGenericResolve() is called by anetResolve() and anetResolveIP() to * do the actual work. It resolves the hostname "host" and set the string * representation of the IP address into the buffer pointed by "ipbuf". diff --git a/src/anet.h b/src/anet.h index 7142f78d2..dd735240d 100644 --- a/src/anet.h +++ b/src/anet.h @@ -70,6 +70,7 @@ int anetEnableTcpNoDelay(char *err, int fd); int anetDisableTcpNoDelay(char *err, int fd); int anetTcpKeepAlive(char *err, int fd); int anetSendTimeout(char *err, int fd, long long ms); +int anetRecvTimeout(char *err, int fd, long long ms); int anetPeerToString(int fd, char *ip, size_t ip_len, int *port); int anetKeepAlive(char *err, int fd, int interval); int anetSockName(int fd, char *ip, size_t ip_len, int *port); diff --git a/src/aof.c b/src/aof.c index 4744847d2..565ee8073 100644 --- a/src/aof.c +++ b/src/aof.c @@ -729,7 +729,7 @@ int loadAppendOnlyFile(char *filename) { server.aof_state = AOF_OFF; fakeClient = createFakeClient(); - startLoading(fp); + startLoadingFile(fp, filename); /* Check if this AOF file has an RDB preamble. In that case we need to * load the RDB file and later continue loading the AOF tail. */ diff --git a/src/config.c b/src/config.c index 2e6e9a6b7..fde00ddf5 100644 --- a/src/config.c +++ b/src/config.c @@ -91,6 +91,13 @@ configEnum aof_fsync_enum[] = { {NULL, 0} }; +configEnum repl_diskless_load_enum[] = { + {"disabled", REPL_DISKLESS_LOAD_DISABLED}, + {"on-empty-db", REPL_DISKLESS_LOAD_WHEN_DB_EMPTY}, + {"swapdb", REPL_DISKLESS_LOAD_SWAPDB}, + {NULL, 0} +}; + /* Output buffer limits presets. */ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { {0, 0, 0}, /* normal */ @@ -427,6 +434,11 @@ void loadServerConfigFromString(char *config) { err = "repl-timeout must be 1 or greater"; goto loaderr; } + } else if (!strcasecmp(argv[0],"repl-diskless-load") && argc==2) { + server.repl_diskless_load = configEnumGetValue(repl_diskless_load_enum,argv[1]); + if (server.repl_diskless_load == INT_MIN) { + err = "argument must be 'disabled', 'on-empty-db', 'swapdb' or 'flushdb'"; + } } else if (!strcasecmp(argv[0],"repl-diskless-sync-delay") && argc==2) { server.repl_diskless_sync_delay = atoi(argv[1]); if (server.repl_diskless_sync_delay < 0) { @@ -466,12 +478,10 @@ void loadServerConfigFromString(char *config) { if (server.config_hz < CONFIG_MIN_HZ) server.config_hz = CONFIG_MIN_HZ; if (server.config_hz > CONFIG_MAX_HZ) server.config_hz = CONFIG_MAX_HZ; } else if (!strcasecmp(argv[0],"appendonly") && argc == 2) { - int yes; - - if ((yes = yesnotoi(argv[1])) == -1) { + if ((server.aof_enabled = yesnotoi(argv[1])) == -1) { err = "argument must be 'yes' or 'no'"; goto loaderr; } - server.aof_state = yes ? AOF_ON : AOF_OFF; + server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF; } else if (!strcasecmp(argv[0],"appendfilename") && argc == 2) { if (!pathIsBaseName(argv[1])) { err = "appendfilename can't be a path, just a filename"; @@ -497,6 +507,12 @@ void loadServerConfigFromString(char *config) { argc == 2) { server.aof_rewrite_min_size = memtoll(argv[1],NULL); + } else if (!strcasecmp(argv[0],"rdb-key-save-delay") && argc==2) { + server.rdb_key_save_delay = atoi(argv[1]); + if (server.rdb_key_save_delay < 0) { + err = "rdb-key-save-delay can't be negative"; + goto loaderr; + } } else if (!strcasecmp(argv[0],"requirepass") && argc == 2) { if (strlen(argv[1]) > CONFIG_AUTHPASS_MAX_LEN) { err = "Password is longer than CONFIG_AUTHPASS_MAX_LEN"; @@ -942,6 +958,7 @@ void configSetCommand(client *c) { int enable = yesnotoi(o->ptr); if (enable == -1) goto badfmt; + server.aof_enabled = enable; if (enable == 0 && server.aof_state != AOF_OFF) { stopAppendOnly(); } else if (enable && server.aof_state == AOF_OFF) { @@ -1132,6 +1149,8 @@ void configSetCommand(client *c) { "slave-priority",server.slave_priority,0,INT_MAX) { } config_set_numerical_field( "replica-priority",server.slave_priority,0,INT_MAX) { + } config_set_numerical_field( + "rdb-key-save-delay",server.rdb_key_save_delay,0,LLONG_MAX) { } config_set_numerical_field( "slave-announce-port",server.slave_announce_port,0,65535) { } config_set_numerical_field( @@ -1199,6 +1218,8 @@ void configSetCommand(client *c) { "maxmemory-policy",server.maxmemory_policy,maxmemory_policy_enum) { } config_set_enum_field( "appendfsync",server.aof_fsync,aof_fsync_enum) { + } config_set_enum_field( + "repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum) { /* Everyhing else is an error... */ } config_set_else { @@ -1346,6 +1367,7 @@ void configGetCommand(client *c) { config_get_numerical_field("cluster-slave-validity-factor",server.cluster_slave_validity_factor); config_get_numerical_field("cluster-replica-validity-factor",server.cluster_slave_validity_factor); config_get_numerical_field("repl-diskless-sync-delay",server.repl_diskless_sync_delay); + config_get_numerical_field("rdb-key-save-delay",server.rdb_key_save_delay); config_get_numerical_field("tcp-keepalive",server.tcpkeepalive); /* Bool (yes/no) values */ @@ -1370,12 +1392,14 @@ void configGetCommand(client *c) { server.aof_fsync,aof_fsync_enum); config_get_enum_field("syslog-facility", server.syslog_facility,syslog_facility_enum); + config_get_enum_field("repl-diskless-load", + server.repl_diskless_load,repl_diskless_load_enum); /* Everything we can't handle with macros follows. */ if (stringmatch(pattern,"appendonly",1)) { addReplyBulkCString(c,"appendonly"); - addReplyBulkCString(c,server.aof_state == AOF_OFF ? "no" : "yes"); + addReplyBulkCString(c,server.aof_enabled ? "yes" : "no"); matches++; } if (stringmatch(pattern,"dir",1)) { @@ -2109,6 +2133,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"repl-timeout",server.repl_timeout,CONFIG_DEFAULT_REPL_TIMEOUT); rewriteConfigBytesOption(state,"repl-backlog-size",server.repl_backlog_size,CONFIG_DEFAULT_REPL_BACKLOG_SIZE); rewriteConfigBytesOption(state,"repl-backlog-ttl",server.repl_backlog_time_limit,CONFIG_DEFAULT_REPL_BACKLOG_TIME_LIMIT); + rewriteConfigEnumOption(state,"repl-diskless-load",server.repl_diskless_load,repl_diskless_load_enum,CONFIG_DEFAULT_REPL_DISKLESS_LOAD); rewriteConfigNumericalOption(state,"repl-diskless-sync-delay",server.repl_diskless_sync_delay,CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY); rewriteConfigNumericalOption(state,"replica-priority",server.slave_priority,CONFIG_DEFAULT_SLAVE_PRIORITY); rewriteConfigNumericalOption(state,"min-replicas-to-write",server.repl_min_slaves_to_write,CONFIG_DEFAULT_MIN_SLAVES_TO_WRITE); @@ -2128,7 +2153,7 @@ int rewriteConfig(char *path) { rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN); rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX); rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS); - rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0); + rewriteConfigYesNoOption(state,"appendonly",server.aof_enabled,0); rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME); rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC); rewriteConfigNumericalOption(state,"auto-aof-rewrite-percentage",server.aof_rewrite_perc,AOF_REWRITE_PERC); @@ -2157,6 +2182,7 @@ int rewriteConfig(char *path) { rewriteConfigClientoutputbufferlimitOption(state); rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ); rewriteConfigEnumOption(state,"supervised",server.supervised_mode,supervised_mode_enum,SUPERVISED_NONE); + rewriteConfigNumericalOption(state,"rdb-key-save-delay",server.rdb_key_save_delay,CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY); /* Rewrite Sentinel config if in Sentinel mode. */ if (server.sentinel_mode) rewriteConfigSentinelOption(state); diff --git a/src/db.c b/src/db.c index 07051dad4..8b7656802 100644 --- a/src/db.c +++ b/src/db.c @@ -344,7 +344,7 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o) { * On success the fuction returns the number of keys removed from the * database(s). Otherwise -1 is returned in the specific case the * DB number is out of range, and errno is set to EINVAL. */ -long long emptyDb(int dbnum, int flags, void(callback)(void*)) { +long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)) { int async = (flags & EMPTYDB_ASYNC); long long removed = 0; @@ -362,12 +362,12 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { } for (int j = startdb; j <= enddb; j++) { - removed += dictSize(server.db[j].dict); + removed += dictSize(dbarray[j].dict); if (async) { - emptyDbAsync(&server.db[j]); + emptyDbAsync(&dbarray[j]); } else { - dictEmpty(server.db[j].dict,callback); - dictEmpty(server.db[j].expires,callback); + dictEmpty(dbarray[j].dict,callback); + dictEmpty(dbarray[j].expires,callback); } } if (server.cluster_enabled) { @@ -381,6 +381,10 @@ long long emptyDb(int dbnum, int flags, void(callback)(void*)) { return removed; } +long long emptyDb(int dbnum, int flags, void(callback)(void*)) { + return emptyDbGeneric(server.db, dbnum, flags, callback); +} + int selectDb(client *c, int id) { if (id < 0 || id >= server.dbnum) return C_ERR; @@ -388,6 +392,15 @@ int selectDb(client *c, int id) { return C_OK; } +long long dbTotalServerKeyCount() { + long long total = 0; + int j; + for (j = 0; j < server.dbnum; j++) { + total += dictSize(server.db[j].dict); + } + return total; +} + /*----------------------------------------------------------------------------- * Hooks for key space changes. * diff --git a/src/rdb.c b/src/rdb.c index 95e4766ea..c566378fb 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -44,6 +44,7 @@ #define rdbExitReportCorruptRDB(...) rdbCheckThenExit(__LINE__,__VA_ARGS__) +char* rdbFileBeingLoaded = NULL; /* used for rdb checking on read error */ extern int rdbCheckMode; void rdbCheckError(const char *fmt, ...); void rdbCheckSetError(const char *fmt, ...); @@ -61,11 +62,17 @@ void rdbCheckThenExit(int linenum, char *reason, ...) { if (!rdbCheckMode) { serverLog(LL_WARNING, "%s", msg); - char *argv[2] = {"",server.rdb_filename}; - redis_check_rdb_main(2,argv,NULL); + if (rdbFileBeingLoaded) { + char *argv[2] = {"",rdbFileBeingLoaded}; + redis_check_rdb_main(2,argv,NULL); + } else { + serverLog(LL_WARNING, "Failure loading rdb format from socket, assuming connection error, resuming operation."); + return; + } } else { rdbCheckError("%s",msg); } + serverLog(LL_WARNING, "Terminating server after rdb file reading failure."); exit(1); } @@ -1039,6 +1046,11 @@ int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { if (rdbSaveObjectType(rdb,val) == -1) return -1; if (rdbSaveStringObject(rdb,key) == -1) return -1; if (rdbSaveObject(rdb,val,key) == -1) return -1; + + /* Delay return if required (for testing) */ + if (server.rdb_key_save_delay) + usleep(server.rdb_key_save_delay); + return 1; } @@ -1800,18 +1812,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key) { /* Mark that we are loading in the global state and setup the fields * needed to provide loading stats. */ -void startLoading(FILE *fp) { - struct stat sb; - +void startLoading(size_t size) { /* Load the DB */ server.loading = 1; server.loading_start_time = time(NULL); server.loading_loaded_bytes = 0; - if (fstat(fileno(fp), &sb) == -1) { - server.loading_total_bytes = 0; - } else { - server.loading_total_bytes = sb.st_size; - } + server.loading_total_bytes = size; +} + +/* Mark that we are loading in the global state and setup the fields + * needed to provide loading stats. + * 'filename' is optional and used for rdb-check on error */ +void startLoadingFile(FILE *fp, char* filename) { + struct stat sb; + if (fstat(fileno(fp), &sb) == -1) + sb.st_size = 0; + rdbFileBeingLoaded = filename; + startLoading(sb.st_size); } /* Refresh the loading progress info */ @@ -1824,6 +1841,7 @@ void loadingProgress(off_t pos) { /* Loading finished */ void stopLoading(void) { server.loading = 0; + rdbFileBeingLoaded = NULL; } /* Track loading progress in order to serve client's from time to time @@ -2089,7 +2107,7 @@ int rdbLoad(char *filename, rdbSaveInfo *rsi) { int retval; if ((fp = fopen(filename,"r")) == NULL) return C_ERR; - startLoading(fp); + startLoadingFile(fp, filename); rioInitWithFile(&rdb,fp); retval = rdbLoadRio(&rdb,rsi,0); fclose(fp); diff --git a/src/redis-check-rdb.c b/src/redis-check-rdb.c index ec00ee71c..e2d71b5a5 100644 --- a/src/redis-check-rdb.c +++ b/src/redis-check-rdb.c @@ -202,7 +202,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } expiretime = -1; - startLoading(fp); + startLoadingFile(fp, rdbfilename); while(1) { robj *key, *val; @@ -314,6 +314,7 @@ int redis_check_rdb(char *rdbfilename, FILE *fp) { } if (closefile) fclose(fp); + stopLoading(); return 0; eoferr: /* unexpected end of file is handled here with a fatal exit */ @@ -324,6 +325,7 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ } err: if (closefile) fclose(fp); + stopLoading(); return 1; } diff --git a/src/replication.c b/src/replication.c index 63a67a06a..e2bac08bd 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1113,11 +1113,22 @@ void restartAOFAfterSYNC() { } } +static int useDisklessLoad() { + /* compute boolean decision to use diskless load */ + return server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB || + (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); +} + + /* Asynchronously read the SYNC payload we receive from a master */ #define REPL_MAX_WRITTEN_BEFORE_FSYNC (1024*1024*8) /* 8 MB */ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { char buf[4096]; ssize_t nread, readlen, nwritten; + int use_diskless_load; + redisDb *diskless_load_backup = NULL; + int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; + int i; off_t left; UNUSED(el); UNUSED(privdata); @@ -1173,90 +1184,177 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { * at the next call. */ server.repl_transfer_size = 0; serverLog(LL_NOTICE, - "MASTER <-> REPLICA sync: receiving streamed RDB from master"); + "MASTER <-> REPLICA sync: receiving streamed RDB from master with EOF %s", + useDisklessLoad()? "to parser":"to disk"); } else { usemark = 0; server.repl_transfer_size = strtol(buf+1,NULL,10); serverLog(LL_NOTICE, - "MASTER <-> REPLICA sync: receiving %lld bytes from master", - (long long) server.repl_transfer_size); + "MASTER <-> REPLICA sync: receiving %lld bytes from master %s", + (long long) server.repl_transfer_size, + useDisklessLoad()? "to parser":"to disk"); } return; } - /* Read bulk data */ - if (usemark) { - readlen = sizeof(buf); - } else { - left = server.repl_transfer_size - server.repl_transfer_read; - readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); - } + use_diskless_load = useDisklessLoad(); + if (!use_diskless_load) { - nread = read(fd,buf,readlen); - if (nread <= 0) { - serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", - (nread == -1) ? strerror(errno) : "connection lost"); - cancelReplicationHandshake(); - return; - } - server.stat_net_input_bytes += nread; - - /* When a mark is used, we want to detect EOF asap in order to avoid - * writing the EOF mark into the file... */ - int eof_reached = 0; - - if (usemark) { - /* Update the last bytes array, and check if it matches our delimiter.*/ - if (nread >= CONFIG_RUN_ID_SIZE) { - memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); + /* read the data from the socket, store it to a file and search for the EOF */ + if (usemark) { + readlen = sizeof(buf); } else { - int rem = CONFIG_RUN_ID_SIZE-nread; - memmove(lastbytes,lastbytes+nread,rem); - memcpy(lastbytes+rem,buf,nread); + left = server.repl_transfer_size - server.repl_transfer_read; + readlen = (left < (signed)sizeof(buf)) ? left : (signed)sizeof(buf); } - if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; - } - server.repl_transfer_lastio = server.unixtime; - if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { - serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s", - (nwritten == -1) ? strerror(errno) : "short write"); - goto error; - } - server.repl_transfer_read += nread; + nread = read(fd,buf,readlen); + if (nread <= 0) { + serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", + (nread == -1) ? strerror(errno) : "connection lost"); + cancelReplicationHandshake(); + return; + } + server.stat_net_input_bytes += nread; - /* Delete the last 40 bytes from the file if we reached EOF. */ - if (usemark && eof_reached) { - if (ftruncate(server.repl_transfer_fd, - server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) - { - serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); + /* When a mark is used, we want to detect EOF asap in order to avoid + * writing the EOF mark into the file... */ + int eof_reached = 0; + + if (usemark) { + /* Update the last bytes array, and check if it matches our delimiter.*/ + if (nread >= CONFIG_RUN_ID_SIZE) { + memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); + } else { + int rem = CONFIG_RUN_ID_SIZE-nread; + memmove(lastbytes,lastbytes+nread,rem); + memcpy(lastbytes+rem,buf,nread); + } + if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; + } + + server.repl_transfer_lastio = server.unixtime; + if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { + serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s", + (nwritten == -1) ? strerror(errno) : "short write"); goto error; } + server.repl_transfer_read += nread; + + /* Delete the last 40 bytes from the file if we reached EOF. */ + if (usemark && eof_reached) { + if (ftruncate(server.repl_transfer_fd, + server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) + { + serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); + goto error; + } + } + + /* Sync data on disk from time to time, otherwise at the end of the transfer + * we may suffer a big delay as the memory buffers are copied into the + * actual disk. */ + if (server.repl_transfer_read >= + server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) + { + off_t sync_size = server.repl_transfer_read - + server.repl_transfer_last_fsync_off; + rdb_fsync_range(server.repl_transfer_fd, + server.repl_transfer_last_fsync_off, sync_size); + server.repl_transfer_last_fsync_off += sync_size; + } + + /* Check if the transfer is now complete */ + if (!usemark) { + if (server.repl_transfer_read == server.repl_transfer_size) + eof_reached = 1; + } + if (!eof_reached) + return; } - /* Sync data on disk from time to time, otherwise at the end of the transfer - * we may suffer a big delay as the memory buffers are copied into the - * actual disk. */ - if (server.repl_transfer_read >= - server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) - { - off_t sync_size = server.repl_transfer_read - - server.repl_transfer_last_fsync_off; - rdb_fsync_range(server.repl_transfer_fd, - server.repl_transfer_last_fsync_off, sync_size); - server.repl_transfer_last_fsync_off += sync_size; + /* We reach here when the slave is using diskless replication, + * or when we are done reading from the socket to the rdb file. */ + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); + /* We need to stop any AOFRW fork before flusing and parsing + * RDB, otherwise we'll create a copy-on-write disaster. */ + if (server.aof_state != AOF_OFF) stopAppendOnly(); + signalFlushedDb(-1); + if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* create a backup of the current db */ + diskless_load_backup = zmalloc(sizeof(redisDb)*server.dbnum); + for (i=0; i REPLICA sync: Loading DB in memory"); + rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + if (use_diskless_load) { + rio rdb; + rioInitWithFd(&rdb,fd,server.repl_transfer_size); + /* Put the socket in blocking mode to simplify RDB transfer. + * We'll restore it when the RDB is received. */ + anetBlock(NULL,fd); + anetRecvTimeout(NULL,fd,server.repl_timeout*1000); - /* Check if the transfer is now complete */ - if (!usemark) { - if (server.repl_transfer_read == server.repl_transfer_size) - eof_reached = 1; - } - - if (eof_reached) { - int aof_is_enabled = server.aof_state != AOF_OFF; - + startLoading(server.repl_transfer_size); + if (rdbLoadRio(&rdb,&rsi,0) != C_OK) { + /* rdbloading failed */ + stopLoading(); + serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from socket"); + cancelReplicationHandshake(); + rioFreeFd(&rdb, NULL); + if (server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { + /* restore the backed up db */ + emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback); + for (i=0; i REPLICA synchronization: %s", - server.rdb_filename, strerror(errno)); + server.rdb_filename, strerror(errno)); cancelReplicationHandshake(); return; } - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); - /* We need to stop any AOFRW fork before flusing and parsing - * RDB, otherwise we'll create a copy-on-write disaster. */ - if(aof_is_enabled) stopAppendOnly(); - signalFlushedDb(-1); - emptyDb( - -1, - server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS, - replicationEmptyDbCallback); - /* Before loading the DB into memory we need to delete the readable - * handler, otherwise it will get called recursively since - * rdbLoad() will call the event loop to process events from time to - * time for non blocking loading. */ - aeDeleteFileEvent(server.el,server.repl_transfer_s,AE_READABLE); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(); - /* Re-enable the AOF if we disabled it earlier, in order to restore - * the original configuration. */ - if (aof_is_enabled) restartAOFAfterSYNC(); + /* Note that there's no point in restarting the AOF on sync failure, + it'll be restarted when sync succeeds or slave promoted. */ return; } - /* Final setup of the connected slave <- master link */ zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); - replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); - server.repl_state = REPL_STATE_CONNECTED; - server.repl_down_since = 0; - /* After a full resynchroniziation we use the replication ID and - * offset of the master. The secondary ID / offset are cleared since - * we are starting a new history. */ - memcpy(server.replid,server.master->replid,sizeof(server.replid)); - server.master_repl_offset = server.master->reploff; - clearReplicationId2(); - /* Let's create the replication backlog if needed. Slaves need to - * accumulate the backlog regardless of the fact they have sub-slaves - * or not, in order to behave correctly if they are promoted to - * masters after a failover. */ - if (server.repl_backlog == NULL) createReplicationBacklog(); - - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); - /* Restart the AOF subsystem now that we finished the sync. This - * will trigger an AOF rewrite, and when done will start appending - * to the new file. */ - if (aof_is_enabled) restartAOFAfterSYNC(); + server.repl_transfer_fd = -1; + server.repl_transfer_tmpfile = NULL; } + /* Final setup of the connected slave <- master link */ + replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); + server.repl_state = REPL_STATE_CONNECTED; + server.repl_down_since = 0; + /* After a full resynchroniziation we use the replication ID and + * offset of the master. The secondary ID / offset are cleared since + * we are starting a new history. */ + memcpy(server.replid,server.master->replid,sizeof(server.replid)); + server.master_repl_offset = server.master->reploff; + clearReplicationId2(); + /* Let's create the replication backlog if needed. Slaves need to + * accumulate the backlog regardless of the fact they have sub-slaves + * or not, in order to behave correctly if they are promoted to + * masters after a failover. */ + if (server.repl_backlog == NULL) createReplicationBacklog(); + + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); + /* Restart the AOF subsystem now that we finished the sync. This + * will trigger an AOF rewrite, and when done will start appending + * to the new file. */ + if (server.aof_enabled) restartAOFAfterSYNC(); return; error: @@ -1845,16 +1928,20 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { } /* Prepare a suitable temp file for bulk transfer */ - while(maxtries--) { - snprintf(tmpfile,256, - "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); - dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); - if (dfd != -1) break; - sleep(1); - } - if (dfd == -1) { - serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno)); - goto error; + if (!useDisklessLoad()) { + while(maxtries--) { + snprintf(tmpfile,256, + "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); + dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); + if (dfd != -1) break; + sleep(1); + } + if (dfd == -1) { + serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno)); + goto error; + } + server.repl_transfer_tmpfile = zstrdup(tmpfile); + server.repl_transfer_fd = dfd; } /* Setup the non blocking download of the bulk file. */ @@ -1871,15 +1958,19 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { server.repl_transfer_size = -1; server.repl_transfer_read = 0; server.repl_transfer_last_fsync_off = 0; - server.repl_transfer_fd = dfd; server.repl_transfer_lastio = server.unixtime; - server.repl_transfer_tmpfile = zstrdup(tmpfile); return; error: aeDeleteFileEvent(server.el,fd,AE_READABLE|AE_WRITABLE); if (dfd != -1) close(dfd); close(fd); + if (server.repl_transfer_fd != -1) + close(server.repl_transfer_fd); + if (server.repl_transfer_tmpfile) + zfree(server.repl_transfer_tmpfile); + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; server.repl_transfer_s = -1; server.repl_state = REPL_STATE_CONNECT; return; @@ -1933,9 +2024,13 @@ void undoConnectWithMaster(void) { void replicationAbortSyncTransfer(void) { serverAssert(server.repl_state == REPL_STATE_TRANSFER); undoConnectWithMaster(); - close(server.repl_transfer_fd); - unlink(server.repl_transfer_tmpfile); - zfree(server.repl_transfer_tmpfile); + if (server.repl_transfer_fd!=-1) { + close(server.repl_transfer_fd); + unlink(server.repl_transfer_tmpfile); + zfree(server.repl_transfer_tmpfile); + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; + } } /* This function aborts a non blocking replication attempt if there is one @@ -2045,6 +2140,9 @@ void replicaofCommand(client *c) { serverLog(LL_NOTICE,"MASTER MODE enabled (user request from '%s')", client); sdsfree(client); + /* Restart the AOF subsystem in case we shut it down during a sync when + * we were still a slave. */ + if (server.aof_enabled && server.aof_state == AOF_OFF) restartAOFAfterSYNC(); } } else { long port; diff --git a/src/rio.c b/src/rio.c index c9c76b8f2..993768b56 100644 --- a/src/rio.c +++ b/src/rio.c @@ -157,6 +157,113 @@ void rioInitWithFile(rio *r, FILE *fp) { r->io.file.autosync = 0; } +/* ------------------- File descriptor implementation ------------------- */ + +static size_t rioFdWrite(rio *r, const void *buf, size_t len) { + UNUSED(r); + UNUSED(buf); + UNUSED(len); + return 0; /* Error, this target does not yet support writing. */ +} + +/* Returns 1 or 0 for success/failure. */ +static size_t rioFdRead(rio *r, void *buf, size_t len) { + size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos; + + /* if the buffer is too small for the entire request: realloc */ + if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len) + r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf)); + + /* if the remaining unused buffer is not large enough: memmove so that we can read the rest */ + if (len > avail && sdsavail(r->io.fd.buf) < len - avail) { + sdsrange(r->io.fd.buf, r->io.fd.pos, -1); + r->io.fd.pos = 0; + } + + /* if we don't already have all the data in the sds, read more */ + while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) { + size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos; + size_t toread = len - buffered; + /* read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two */ + if (toread < PROTO_IOBUF_LEN) + toread = PROTO_IOBUF_LEN; + if (toread > sdsavail(r->io.fd.buf)) + toread = sdsavail(r->io.fd.buf); + if (r->io.fd.read_limit != 0 && + r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) { + if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered) + toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered; + else { + errno = EOVERFLOW; + return 0; + } + } + int retval = read(r->io.fd.fd, (char*)r->io.fd.buf + sdslen(r->io.fd.buf), toread); + if (retval <= 0) { + if (errno == EWOULDBLOCK) errno = ETIMEDOUT; + return 0; + } + sdsIncrLen(r->io.fd.buf, retval); + } + + memcpy(buf, (char*)r->io.fd.buf + r->io.fd.pos, len); + r->io.fd.read_so_far += len; + r->io.fd.pos += len; + return len; +} + +/* Returns read/write position in file. */ +static off_t rioFdTell(rio *r) { + return r->io.fd.read_so_far; +} + +/* Flushes any buffer to target device if applicable. Returns 1 on success + * and 0 on failures. */ +static int rioFdFlush(rio *r) { + /* Our flush is implemented by the write method, that recognizes a + * buffer set to NULL with a count of zero as a flush request. */ + return rioFdWrite(r,NULL,0); +} + +static const rio rioFdIO = { + rioFdRead, + rioFdWrite, + rioFdTell, + rioFdFlush, + NULL, /* update_checksum */ + 0, /* current checksum */ + 0, /* bytes read or written */ + 0, /* read/write chunk size */ + { { NULL, 0 } } /* union for io-specific vars */ +}; + +/* create an rio that implements a buffered read from an fd + * read_limit argument stops buffering when the reaching the limit */ +void rioInitWithFd(rio *r, int fd, size_t read_limit) { + *r = rioFdIO; + r->io.fd.fd = fd; + r->io.fd.pos = 0; + r->io.fd.read_limit = read_limit; + r->io.fd.read_so_far = 0; + r->io.fd.buf = sdsnewlen(NULL, PROTO_IOBUF_LEN); + sdsclear(r->io.fd.buf); +} + +/* release the rio stream. + * optionally returns the unread buffered data. */ +void rioFreeFd(rio *r, sds* out_remainingBufferedData) { + if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) { + if (r->io.fd.pos > 0) + sdsrange(r->io.fd.buf, r->io.fd.pos, -1); + *out_remainingBufferedData = r->io.fd.buf; + } else { + sdsfree(r->io.fd.buf); + if (out_remainingBufferedData) + *out_remainingBufferedData = NULL; + } + r->io.fd.buf = NULL; +} + /* ------------------- File descriptors set implementation ------------------- */ /* Returns 1 or 0 for success/failure. @@ -300,7 +407,7 @@ void rioGenericUpdateChecksum(rio *r, const void *buf, size_t len) { * disk I/O concentrated in very little time. When we fsync in an explicit * way instead the I/O pressure is more distributed across time. */ void rioSetAutoSync(rio *r, off_t bytes) { - serverAssert(r->read == rioFileIO.read); + if(r->write != rioFileIO.write) return; r->io.file.autosync = bytes; } diff --git a/src/rio.h b/src/rio.h index c996c54f6..beea06888 100644 --- a/src/rio.h +++ b/src/rio.h @@ -73,6 +73,14 @@ struct _rio { off_t buffered; /* Bytes written since last fsync. */ off_t autosync; /* fsync after 'autosync' bytes written. */ } file; + /* file descriptor */ + struct { + int fd; /* File descriptor. */ + off_t pos; /* pos in buf that was returned */ + sds buf; /* buffered data */ + size_t read_limit; /* don't allow to buffer/read more than that */ + size_t read_so_far; /* amount of data read from the rio (not buffered) */ + } fd; /* Multiple FDs target (used to write to N sockets). */ struct { int *fds; /* File descriptors. */ @@ -126,9 +134,11 @@ static inline int rioFlush(rio *r) { void rioInitWithFile(rio *r, FILE *fp); void rioInitWithBuffer(rio *r, sds s); +void rioInitWithFd(rio *r, int fd, size_t read_limit); void rioInitWithFdset(rio *r, int *fds, int numfds); void rioFreeFdset(rio *r); +void rioFreeFd(rio *r, sds* out_remainingBufferedData); size_t rioWriteBulkCount(rio *r, char prefix, long count); size_t rioWriteBulkString(rio *r, const char *buf, size_t len); diff --git a/src/server.c b/src/server.c index 78b8d8f1b..8ed5b591c 100644 --- a/src/server.c +++ b/src/server.c @@ -2265,6 +2265,7 @@ void initServerConfig(void) { server.aof_flush_postponed_start = 0; server.aof_rewrite_incremental_fsync = CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC; server.rdb_save_incremental_fsync = CONFIG_DEFAULT_RDB_SAVE_INCREMENTAL_FSYNC; + server.rdb_key_save_delay = CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY; server.aof_load_truncated = CONFIG_DEFAULT_AOF_LOAD_TRUNCATED; server.aof_use_rdb_preamble = CONFIG_DEFAULT_AOF_USE_RDB_PREAMBLE; server.pidfile = NULL; @@ -2334,6 +2335,9 @@ void initServerConfig(void) { server.cached_master = NULL; server.master_initial_offset = -1; server.repl_state = REPL_STATE_NONE; + server.repl_transfer_tmpfile = NULL; + server.repl_transfer_fd = -1; + server.repl_transfer_s = -1; server.repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; server.repl_serve_stale_data = CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA; server.repl_slave_ro = CONFIG_DEFAULT_SLAVE_READ_ONLY; @@ -2342,6 +2346,7 @@ void initServerConfig(void) { server.repl_down_since = 0; /* Never connected, repl is down since EVER. */ server.repl_disable_tcp_nodelay = CONFIG_DEFAULT_REPL_DISABLE_TCP_NODELAY; server.repl_diskless_sync = CONFIG_DEFAULT_REPL_DISKLESS_SYNC; + server.repl_diskless_load = CONFIG_DEFAULT_REPL_DISKLESS_LOAD; server.repl_diskless_sync_delay = CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY; server.repl_ping_slave_period = CONFIG_DEFAULT_REPL_PING_SLAVE_PERIOD; server.repl_timeout = CONFIG_DEFAULT_REPL_TIMEOUT; @@ -4053,7 +4058,7 @@ sds genRedisInfoString(char *section) { (server.aof_last_write_status == C_OK) ? "ok" : "err", server.stat_aof_cow_bytes); - if (server.aof_state != AOF_OFF) { + if (server.aof_enabled) { info = sdscatprintf(info, "aof_current_size:%lld\r\n" "aof_base_size:%lld\r\n" diff --git a/src/server.h b/src/server.h index 8686994f6..f81b1010e 100644 --- a/src/server.h +++ b/src/server.h @@ -132,6 +132,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_RDB_FILENAME "dump.rdb" #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC 0 #define CONFIG_DEFAULT_REPL_DISKLESS_SYNC_DELAY 5 +#define CONFIG_DEFAULT_RDB_KEY_SAVE_DELAY 0 #define CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA 1 #define CONFIG_DEFAULT_SLAVE_READ_ONLY 1 #define CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY 1 @@ -394,6 +395,12 @@ typedef long long mstime_t; /* millisecond time type. */ #define AOF_FSYNC_EVERYSEC 2 #define CONFIG_DEFAULT_AOF_FSYNC AOF_FSYNC_EVERYSEC +/* Replication diskless load defines */ +#define REPL_DISKLESS_LOAD_DISABLED 0 +#define REPL_DISKLESS_LOAD_WHEN_DB_EMPTY 1 +#define REPL_DISKLESS_LOAD_SWAPDB 2 +#define CONFIG_DEFAULT_REPL_DISKLESS_LOAD REPL_DISKLESS_LOAD_DISABLED + /* Zipped structures related defaults */ #define OBJ_HASH_MAX_ZIPLIST_ENTRIES 512 #define OBJ_HASH_MAX_ZIPLIST_VALUE 64 @@ -1158,6 +1165,7 @@ struct redisServer { int daemonize; /* True if running as a daemon */ clientBufferLimitsConfig client_obuf_limits[CLIENT_TYPE_OBUF_COUNT]; /* AOF persistence */ + int aof_enabled; /* AOF configuration */ int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */ int aof_fsync; /* Kind of fsync() policy */ char *aof_filename; /* Name of the AOF file */ @@ -1214,6 +1222,8 @@ struct redisServer { int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */ int rdb_pipe_write_result_to_parent; /* RDB pipes used to return the state */ int rdb_pipe_read_result_from_child; /* of each slave in diskless SYNC. */ + int rdb_key_save_delay; /* Delay in microseconds between keys while + * writing the RDB. (for testings) */ /* Pipe and data structures for child -> parent info sharing. */ int child_info_pipe[2]; /* Pipe used to write the child_info_data. */ struct { @@ -1249,7 +1259,9 @@ struct redisServer { int repl_min_slaves_to_write; /* Min number of slaves to write. */ int repl_min_slaves_max_lag; /* Max lag of slaves to write. */ int repl_good_slaves_count; /* Number of slaves with lag <= max_lag. */ - int repl_diskless_sync; /* Send RDB to slaves sockets directly. */ + int repl_diskless_sync; /* Master send RDB to slaves sockets directly. */ + int repl_diskless_load; /* Slave parse RDB directly from the socket. + * see REPL_DISKLESS_LOAD_* enum */ int repl_diskless_sync_delay; /* Delay to start a diskless repl BGSAVE. */ /* Replication (slave) */ char *masteruser; /* AUTH with this user and masterauth with master */ @@ -1739,7 +1751,8 @@ void replicationCacheMasterUsingMyself(void); void feedReplicationBacklog(void *ptr, size_t len); /* Generic persistence functions */ -void startLoading(FILE *fp); +void startLoadingFile(FILE* fp, char* filename); +void startLoading(size_t size); void loadingProgress(off_t pos); void stopLoading(void); @@ -1996,6 +2009,8 @@ robj *dbUnshareStringValue(redisDb *db, robj *key, robj *o); #define EMPTYDB_NO_FLAGS 0 /* No flags. */ #define EMPTYDB_ASYNC (1<<0) /* Reclaim memory in another thread. */ long long emptyDb(int dbnum, int flags, void(callback)(void*)); +long long emptyDbGeneric(redisDb *dbarray, int dbnum, int flags, void(callback)(void*)); +long long dbTotalServerKeyCount(); int selectDb(client *c, int id); void signalModifiedKey(redisDb *db, robj *key); diff --git a/tests/integration/replication-4.tcl b/tests/integration/replication-4.tcl index 3c6df52a8..54891151b 100644 --- a/tests/integration/replication-4.tcl +++ b/tests/integration/replication-4.tcl @@ -1,12 +1,3 @@ -proc start_bg_complex_data {host port db ops} { - set tclsh [info nameofexecutable] - exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops & -} - -proc stop_bg_complex_data {handle} { - catch {exec /bin/kill -9 $handle} -} - start_server {tags {"repl"}} { start_server {} { diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl index bf8682446..3c98723af 100644 --- a/tests/integration/replication-psync.tcl +++ b/tests/integration/replication-psync.tcl @@ -1,12 +1,3 @@ -proc start_bg_complex_data {host port db ops} { - set tclsh [info nameofexecutable] - exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops & -} - -proc stop_bg_complex_data {handle} { - catch {exec /bin/kill -9 $handle} -} - # Creates a master-slave pair and breaks the link continuously to force # partial resyncs attempts, all this while flooding the master with # write queries. @@ -17,7 +8,7 @@ proc stop_bg_complex_data {handle} { # If reconnect is > 0, the test actually try to break the connection and # reconnect with the master, otherwise just the initial synchronization is # checked for consistency. -proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless reconnect} { +proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { start_server {tags {"repl"}} { start_server {} { @@ -28,8 +19,9 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec $master config set repl-backlog-size $backlog_size $master config set repl-backlog-ttl $backlog_ttl - $master config set repl-diskless-sync $diskless + $master config set repl-diskless-sync $mdl $master config set repl-diskless-sync-delay 1 + $slave config set repl-diskless-load $sdl set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000] set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000] @@ -54,7 +46,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec } } - test "Test replication partial resync: $descr (diskless: $diskless, reconnect: $reconnect)" { + test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" { # Now while the clients are writing data, break the maste-slave # link multiple times. if ($reconnect) { @@ -132,23 +124,25 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec } } -foreach diskless {no yes} { - test_psync {no reconnection, just sync} 6 1000000 3600 0 { - } $diskless 0 +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + test_psync {no reconnection, just sync} 6 1000000 3600 0 { + } $mdl $sdl 0 - test_psync {ok psync} 6 100000000 3600 0 { + test_psync {ok psync} 6 100000000 3600 0 { assert {[s -1 sync_partial_ok] > 0} - } $diskless 1 + } $mdl $sdl 1 - test_psync {no backlog} 6 100 3600 0.5 { + test_psync {no backlog} 6 100 3600 0.5 { assert {[s -1 sync_partial_err] > 0} - } $diskless 1 + } $mdl $sdl 1 - test_psync {ok after delay} 3 100000000 3600 3 { + test_psync {ok after delay} 3 100000000 3600 3 { assert {[s -1 sync_partial_ok] > 0} - } $diskless 1 + } $mdl $sdl 1 - test_psync {backlog expired} 3 100000000 1 3 { + test_psync {backlog expired} 3 100000000 1 3 { assert {[s -1 sync_partial_err] > 0} - } $diskless 1 + } $mdl $sdl 1 + } } diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 0e50c20a9..d69a1761a 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -183,85 +183,92 @@ start_server {tags {"repl"}} { } } -foreach dl {no yes} { - start_server {tags {"repl"}} { - set master [srv 0 client] - $master config set repl-diskless-sync $dl - set master_host [srv 0 host] - set master_port [srv 0 port] - set slaves {} - set load_handle0 [start_write_load $master_host $master_port 3] - set load_handle1 [start_write_load $master_host $master_port 5] - set load_handle2 [start_write_load $master_host $master_port 20] - set load_handle3 [start_write_load $master_host $master_port 8] - set load_handle4 [start_write_load $master_host $master_port 4] - start_server {} { - lappend slaves [srv 0 client] +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + start_server {tags {"repl"}} { + set master [srv 0 client] + $master config set repl-diskless-sync $mdl + $master config set repl-diskless-sync-delay 1 + set master_host [srv 0 host] + set master_port [srv 0 port] + set slaves {} + set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000000] + set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000000] + set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000000] + set load_handle3 [start_write_load $master_host $master_port 8] + set load_handle4 [start_write_load $master_host $master_port 4] + after 5000 ;# wait for some data to accumulate so that we have RDB part for the fork start_server {} { lappend slaves [srv 0 client] start_server {} { lappend slaves [srv 0 client] - test "Connect multiple replicas at the same time (issue #141), diskless=$dl" { - # Send SLAVEOF commands to slaves - [lindex $slaves 0] slaveof $master_host $master_port - [lindex $slaves 1] slaveof $master_host $master_port - [lindex $slaves 2] slaveof $master_host $master_port + start_server {} { + lappend slaves [srv 0 client] + test "Connect multiple replicas at the same time (issue #141), master diskless=$mdl, replica diskless=$sdl" { + # Send SLAVEOF commands to slaves + [lindex $slaves 0] config set repl-diskless-load $sdl + [lindex $slaves 1] config set repl-diskless-load $sdl + [lindex $slaves 2] config set repl-diskless-load $sdl + [lindex $slaves 0] slaveof $master_host $master_port + [lindex $slaves 1] slaveof $master_host $master_port + [lindex $slaves 2] slaveof $master_host $master_port - # Wait for all the three slaves to reach the "online" - # state from the POV of the master. - set retry 500 - while {$retry} { - set info [r -3 info] - if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { - break - } else { - incr retry -1 - after 100 + # Wait for all the three slaves to reach the "online" + # state from the POV of the master. + set retry 500 + while {$retry} { + set info [r -3 info] + if {[string match {*slave0:*state=online*slave1:*state=online*slave2:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slaves not correctly synchronized" } - } - if {$retry == 0} { - error "assertion:Replicas not correctly synchronized" - } - # Wait that slaves acknowledge they are online so - # we are sure that DBSIZE and DEBUG DIGEST will not - # fail because of timing issues. - wait_for_condition 500 100 { - [lindex [[lindex $slaves 0] role] 3] eq {connected} && - [lindex [[lindex $slaves 1] role] 3] eq {connected} && - [lindex [[lindex $slaves 2] role] 3] eq {connected} - } else { - fail "Replicas still not connected after some time" + # Wait that slaves acknowledge they are online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. + wait_for_condition 500 100 { + [lindex [[lindex $slaves 0] role] 3] eq {connected} && + [lindex [[lindex $slaves 1] role] 3] eq {connected} && + [lindex [[lindex $slaves 2] role] 3] eq {connected} + } else { + fail "Slaves still not connected after some time" + } + + # Stop the write load + stop_bg_complex_data $load_handle0 + stop_bg_complex_data $load_handle1 + stop_bg_complex_data $load_handle2 + stop_write_load $load_handle3 + stop_write_load $load_handle4 + + # Make sure that slaves and master have same + # number of keys + wait_for_condition 500 100 { + [$master dbsize] == [[lindex $slaves 0] dbsize] && + [$master dbsize] == [[lindex $slaves 1] dbsize] && + [$master dbsize] == [[lindex $slaves 2] dbsize] + } else { + fail "Different number of keys between master and replica after too long time." + } + + # Check digests + set digest [$master debug digest] + set digest0 [[lindex $slaves 0] debug digest] + set digest1 [[lindex $slaves 1] debug digest] + set digest2 [[lindex $slaves 2] debug digest] + assert {$digest ne 0000000000000000000000000000000000000000} + assert {$digest eq $digest0} + assert {$digest eq $digest1} + assert {$digest eq $digest2} } - - # Stop the write load - stop_write_load $load_handle0 - stop_write_load $load_handle1 - stop_write_load $load_handle2 - stop_write_load $load_handle3 - stop_write_load $load_handle4 - - # Make sure that slaves and master have same - # number of keys - wait_for_condition 500 100 { - [$master dbsize] == [[lindex $slaves 0] dbsize] && - [$master dbsize] == [[lindex $slaves 1] dbsize] && - [$master dbsize] == [[lindex $slaves 2] dbsize] - } else { - fail "Different number of keys between masted and replica after too long time." - } - - # Check digests - set digest [$master debug digest] - set digest0 [[lindex $slaves 0] debug digest] - set digest1 [[lindex $slaves 1] debug digest] - set digest2 [[lindex $slaves 2] debug digest] - assert {$digest ne 0000000000000000000000000000000000000000} - assert {$digest eq $digest0} - assert {$digest eq $digest1} - assert {$digest eq $digest2} - } - } + } + } } } } @@ -309,3 +316,70 @@ start_server {tags {"repl"}} { } } } + +test {slave fails full sync and diskless load swapdb recoveres it} { + start_server {tags {"repl"}} { + set slave [srv 0 client] + set slave_host [srv 0 host] + set slave_port [srv 0 port] + set slave_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + + # Put different data sets on the master and slave + # we need to put large keys on the master since the slave replies to info only once in 2mb + $slave debug populate 2000 slave 10 + $master debug populate 200 master 100000 + $master config set rdbcompression no + + # Set master and slave to use diskless replication + $master config set repl-diskless-sync yes + $master config set repl-diskless-sync-delay 0 + $slave config set repl-diskless-load swapdb + + # Set master with a slow rdb generation, so that we can easily disconnect it mid sync + # 10ms per key, with 200 keys is 2 seconds + $master config set rdb-key-save-delay 10000 + + # Start the replication process... + $slave slaveof $master_host $master_port + + # wait for the slave to start reading the rdb + wait_for_condition 50 100 { + [s -1 loading] eq 1 + } else { + fail "Replica didn't get into loading mode" + } + + # make sure that next sync will not start immediately so that we can catch the slave in betweeen syncs + $master config set repl-diskless-sync-delay 5 + # for faster server shutdown, make rdb saving fast again (the fork is already uses the slow one) + $master config set rdb-key-save-delay 0 + + # waiting slave to do flushdb (key count drop) + wait_for_condition 50 100 { + 2000 != [scan [regexp -inline {keys\=([\d]*)} [$slave info keyspace]] keys=%d] + } else { + fail "Replica didn't flush" + } + + # make sure we're still loading + assert_equal [s -1 loading] 1 + + # kill the slave connection on the master + set killed [$master client kill type slave] + + # wait for loading to stop (fail) + wait_for_condition 50 100 { + [s -1 loading] eq 0 + } else { + fail "Replica didn't disconnect" + } + + # make sure the original keys were restored + assert_equal [$slave dbsize] 2000 + } + } +} diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 74f491e48..41cc5612a 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -399,3 +399,15 @@ proc lshuffle {list} { } return $slist } + +# Execute a background process writing complex data for the specified number +# of ops to the specified Redis instance. +proc start_bg_complex_data {host port db ops} { + set tclsh [info nameofexecutable] + exec $tclsh tests/helpers/bg_complex_data.tcl $host $port $db $ops & +} + +# Stop a process generating write load executed with start_bg_complex_data. +proc stop_bg_complex_data {handle} { + catch {exec /bin/kill -9 $handle} +} From 6e9ffa93ebdb8da02401c72dde0c5dd0e3cdb48d Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 8 Jul 2019 18:32:47 +0200 Subject: [PATCH 022/411] Diskless replica: a few aesthetic changes to replication.c. --- src/replication.c | 126 ++++++++++++++++++++++++++++++++-------------- src/rio.c | 12 +++-- 2 files changed, 96 insertions(+), 42 deletions(-) diff --git a/src/replication.c b/src/replication.c index e2bac08bd..a7c1c0d6a 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1127,7 +1127,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { ssize_t nread, readlen, nwritten; int use_diskless_load; redisDb *diskless_load_backup = NULL; - int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS; + int empty_db_flags = server.repl_slave_lazy_flush ? EMPTYDB_ASYNC : + EMPTYDB_NO_FLAGS; int i; off_t left; UNUSED(el); @@ -1199,8 +1200,8 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { use_diskless_load = useDisklessLoad(); if (!use_diskless_load) { - - /* read the data from the socket, store it to a file and search for the EOF */ + /* Read the data from the socket, store it to a file and search + * for the EOF. */ if (usemark) { readlen = sizeof(buf); } else { @@ -1222,20 +1223,28 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { int eof_reached = 0; if (usemark) { - /* Update the last bytes array, and check if it matches our delimiter.*/ + /* Update the last bytes array, and check if it matches our + * delimiter. */ if (nread >= CONFIG_RUN_ID_SIZE) { - memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE,CONFIG_RUN_ID_SIZE); + memcpy(lastbytes,buf+nread-CONFIG_RUN_ID_SIZE, + CONFIG_RUN_ID_SIZE); } else { int rem = CONFIG_RUN_ID_SIZE-nread; memmove(lastbytes,lastbytes+nread,rem); memcpy(lastbytes+rem,buf,nread); } - if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) eof_reached = 1; + if (memcmp(lastbytes,eofmark,CONFIG_RUN_ID_SIZE) == 0) + eof_reached = 1; } + /* Update the last I/O time for the replication transfer (used in + * order to detect timeouts during replication), and write what we + * got from the socket to the dump file on disk. */ server.repl_transfer_lastio = server.unixtime; if ((nwritten = write(server.repl_transfer_fd,buf,nread)) != nread) { - serverLog(LL_WARNING,"Write error or short write writing to the DB dump file needed for MASTER <-> REPLICA synchronization: %s", + serverLog(LL_WARNING, + "Write error or short write writing to the DB dump file " + "needed for MASTER <-> REPLICA synchronization: %s", (nwritten == -1) ? strerror(errno) : "short write"); goto error; } @@ -1246,14 +1255,16 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { if (ftruncate(server.repl_transfer_fd, server.repl_transfer_read - CONFIG_RUN_ID_SIZE) == -1) { - serverLog(LL_WARNING,"Error truncating the RDB file received from the master for SYNC: %s", strerror(errno)); + serverLog(LL_WARNING, + "Error truncating the RDB file received from the master " + "for SYNC: %s", strerror(errno)); goto error; } } - /* Sync data on disk from time to time, otherwise at the end of the transfer - * we may suffer a big delay as the memory buffers are copied into the - * actual disk. */ + /* Sync data on disk from time to time, otherwise at the end of the + * transfer we may suffer a big delay as the memory buffers are copied + * into the actual disk. */ if (server.repl_transfer_read >= server.repl_transfer_last_fsync_off + REPL_MAX_WRITTEN_BEFORE_FSYNC) { @@ -1269,19 +1280,34 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { if (server.repl_transfer_read == server.repl_transfer_size) eof_reached = 1; } - if (!eof_reached) - return; + + /* If the transfer is yet not complete, we need to read more, so + * return ASAP and wait for the handler to be called again. */ + if (!eof_reached) return; } - /* We reach here when the slave is using diskless replication, - * or when we are done reading from the socket to the rdb file. */ + /* We reach this point in one of the following cases: + * + * 1. The replica is using diskless replication, that is, it reads data + * directly from the socket to the Redis memory, without using + * a temporary RDB file on disk. In that case we just block and + * read everything from the socket. + * + * 2. Or when we are done reading from the socket to the RDB file, in + * such case we want just to read the RDB file in memory. */ serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Flushing old data"); - /* We need to stop any AOFRW fork before flusing and parsing - * RDB, otherwise we'll create a copy-on-write disaster. */ + + /* We need to stop any AOF rewriting child before flusing and parsing + * the RDB, otherwise we'll create a copy-on-write disaster. */ if (server.aof_state != AOF_OFF) stopAppendOnly(); signalFlushedDb(-1); - if (use_diskless_load && server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { - /* create a backup of the current db */ + + /* When diskless RDB loading is used by replicas, it may be configured + * in order to save the current DB instead of throwing it away, + * so that we can restore it in case of failed transfer. */ + if (use_diskless_load && + server.repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) + { diskless_load_backup = zmalloc(sizeof(redisDb)*server.dbnum); for (i=0; i REPLICA synchronization: %s", + serverLog(LL_WARNING, + "Failed trying to rename the temp DB into %s in " + "MASTER <-> REPLICA synchronization: %s", server.rdb_filename, strerror(errno)); cancelReplicationHandshake(); return; } if (rdbLoad(server.rdb_filename,&rsi) != C_OK) { - serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); + serverLog(LL_WARNING, + "Failed trying to load the MASTER synchronization " + "DB from disk"); cancelReplicationHandshake(); /* Note that there's no point in restarting the AOF on sync failure, - it'll be restarted when sync succeeds or slave promoted. */ + it'll be restarted when sync succeeds or replica promoted. */ return; } + + /* Cleanup. */ zfree(server.repl_transfer_tmpfile); close(server.repl_transfer_fd); server.repl_transfer_fd = -1; server.repl_transfer_tmpfile = NULL; } + /* Final setup of the connected slave <- master link */ replicationCreateMasterClient(server.repl_transfer_s,rsi.repl_stream_db); server.repl_state = REPL_STATE_CONNECTED; server.repl_down_since = 0; + /* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since * we are starting a new history. */ memcpy(server.replid,server.master->replid,sizeof(server.replid)); server.master_repl_offset = server.master->reploff; clearReplicationId2(); + /* Let's create the replication backlog if needed. Slaves need to * accumulate the backlog regardless of the fact they have sub-slaves * or not, in order to behave correctly if they are promoted to * masters after a failover. */ if (server.repl_backlog == NULL) createReplicationBacklog(); - serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Finished with success"); + /* Restart the AOF subsystem now that we finished the sync. This * will trigger an AOF rewrite, and when done will start appending * to the new file. */ diff --git a/src/rio.c b/src/rio.c index 993768b56..9327c17a8 100644 --- a/src/rio.c +++ b/src/rio.c @@ -173,13 +173,13 @@ static size_t rioFdRead(rio *r, void *buf, size_t len) { /* if the buffer is too small for the entire request: realloc */ if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len) r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf)); - + /* if the remaining unused buffer is not large enough: memmove so that we can read the rest */ if (len > avail && sdsavail(r->io.fd.buf) < len - avail) { sdsrange(r->io.fd.buf, r->io.fd.pos, -1); r->io.fd.pos = 0; } - + /* if we don't already have all the data in the sds, read more */ while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) { size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos; @@ -251,8 +251,10 @@ void rioInitWithFd(rio *r, int fd, size_t read_limit) { /* release the rio stream. * optionally returns the unread buffered data. */ -void rioFreeFd(rio *r, sds* out_remainingBufferedData) { - if(out_remainingBufferedData && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) { +void rioFreeFd(rio *r, sds *out_remainingBufferedData) { + if (out_remainingBufferedData && + (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) + { if (r->io.fd.pos > 0) sdsrange(r->io.fd.buf, r->io.fd.pos, -1); *out_remainingBufferedData = r->io.fd.buf; @@ -264,7 +266,7 @@ void rioFreeFd(rio *r, sds* out_remainingBufferedData) { r->io.fd.buf = NULL; } -/* ------------------- File descriptors set implementation ------------------- */ +/* ------------------- File descriptors set implementation ------------------ */ /* Returns 1 or 0 for success/failure. * The function returns success as long as we are able to correctly write From 65e463d7a15fd0e7bddc357323a99b55f7b786fc Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 8 Jul 2019 18:39:59 +0200 Subject: [PATCH 023/411] Diskless replica: a few aesthetic changes to rio.c --- src/rio.c | 57 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 32 insertions(+), 25 deletions(-) diff --git a/src/rio.c b/src/rio.c index 9327c17a8..1a53992a2 100644 --- a/src/rio.c +++ b/src/rio.c @@ -157,7 +157,11 @@ void rioInitWithFile(rio *r, FILE *fp) { r->io.file.autosync = 0; } -/* ------------------- File descriptor implementation ------------------- */ +/* ------------------- File descriptor implementation ------------------- + * We use this RIO implemetnation when reading an RDB file directly from + * the socket to the memory via rdbLoadRio(), thus this implementation + * only implements reading from a file descriptor that is, normally, + * just a socket. */ static size_t rioFdWrite(rio *r, const void *buf, size_t len) { UNUSED(r); @@ -170,27 +174,28 @@ static size_t rioFdWrite(rio *r, const void *buf, size_t len) { static size_t rioFdRead(rio *r, void *buf, size_t len) { size_t avail = sdslen(r->io.fd.buf)-r->io.fd.pos; - /* if the buffer is too small for the entire request: realloc */ + /* If the buffer is too small for the entire request: realloc. */ if (sdslen(r->io.fd.buf) + sdsavail(r->io.fd.buf) < len) r->io.fd.buf = sdsMakeRoomFor(r->io.fd.buf, len - sdslen(r->io.fd.buf)); - /* if the remaining unused buffer is not large enough: memmove so that we can read the rest */ + /* If the remaining unused buffer is not large enough: memmove so that we + * can read the rest. */ if (len > avail && sdsavail(r->io.fd.buf) < len - avail) { sdsrange(r->io.fd.buf, r->io.fd.pos, -1); r->io.fd.pos = 0; } - /* if we don't already have all the data in the sds, read more */ + /* If we don't already have all the data in the sds, read more */ while (len > sdslen(r->io.fd.buf) - r->io.fd.pos) { size_t buffered = sdslen(r->io.fd.buf) - r->io.fd.pos; size_t toread = len - buffered; - /* read either what's missing, or PROTO_IOBUF_LEN, the bigger of the two */ - if (toread < PROTO_IOBUF_LEN) - toread = PROTO_IOBUF_LEN; - if (toread > sdsavail(r->io.fd.buf)) - toread = sdsavail(r->io.fd.buf); + /* Read either what's missing, or PROTO_IOBUF_LEN, the bigger of + * the two. */ + if (toread < PROTO_IOBUF_LEN) toread = PROTO_IOBUF_LEN; + if (toread > sdsavail(r->io.fd.buf)) toread = sdsavail(r->io.fd.buf); if (r->io.fd.read_limit != 0 && - r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) { + r->io.fd.read_so_far + buffered + toread > r->io.fd.read_limit) + { if (r->io.fd.read_limit >= r->io.fd.read_so_far - buffered) toread = r->io.fd.read_limit - r->io.fd.read_so_far - buffered; else { @@ -198,7 +203,9 @@ static size_t rioFdRead(rio *r, void *buf, size_t len) { return 0; } } - int retval = read(r->io.fd.fd, (char*)r->io.fd.buf + sdslen(r->io.fd.buf), toread); + int retval = read(r->io.fd.fd, + (char*)r->io.fd.buf + sdslen(r->io.fd.buf), + toread); if (retval <= 0) { if (errno == EWOULDBLOCK) errno = ETIMEDOUT; return 0; @@ -237,8 +244,8 @@ static const rio rioFdIO = { { { NULL, 0 } } /* union for io-specific vars */ }; -/* create an rio that implements a buffered read from an fd - * read_limit argument stops buffering when the reaching the limit */ +/* Create an RIO that implements a buffered read from an fd + * read_limit argument stops buffering when the reaching the limit. */ void rioInitWithFd(rio *r, int fd, size_t read_limit) { *r = rioFdIO; r->io.fd.fd = fd; @@ -249,24 +256,24 @@ void rioInitWithFd(rio *r, int fd, size_t read_limit) { sdsclear(r->io.fd.buf); } -/* release the rio stream. - * optionally returns the unread buffered data. */ -void rioFreeFd(rio *r, sds *out_remainingBufferedData) { - if (out_remainingBufferedData && - (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) - { - if (r->io.fd.pos > 0) - sdsrange(r->io.fd.buf, r->io.fd.pos, -1); - *out_remainingBufferedData = r->io.fd.buf; +/* Release the RIO tream. Optionally returns the unread buffered data + * when the SDS pointer 'remaining' is passed. */ +void rioFreeFd(rio *r, sds *remaining) { + if (remaining && (size_t)r->io.fd.pos < sdslen(r->io.fd.buf)) { + if (r->io.fd.pos > 0) sdsrange(r->io.fd.buf, r->io.fd.pos, -1); + *remaining = r->io.fd.buf; } else { sdsfree(r->io.fd.buf); - if (out_remainingBufferedData) - *out_remainingBufferedData = NULL; + if (out_remainingBufferedData) *remaining = NULL; } r->io.fd.buf = NULL; } -/* ------------------- File descriptors set implementation ------------------ */ +/* ------------------- File descriptors set implementation ------------------ + * This target is used to write the RDB file to N different replicas via + * sockets, when the master just streams the data to the replicas without + * creating an RDB on-disk image (diskless replication option). + * It only implements writes. */ /* Returns 1 or 0 for success/failure. * The function returns success as long as we are able to correctly write From 23063bf23e3206bc947d7048b513a453a8277c6f Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 10 Jul 2019 09:34:21 +0200 Subject: [PATCH 024/411] Diskless replica: fix mispelled var name. --- src/rio.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rio.c b/src/rio.c index 1a53992a2..5359bc3d6 100644 --- a/src/rio.c +++ b/src/rio.c @@ -264,7 +264,7 @@ void rioFreeFd(rio *r, sds *remaining) { *remaining = r->io.fd.buf; } else { sdsfree(r->io.fd.buf); - if (out_remainingBufferedData) *remaining = NULL; + if (remaining) *remaining = NULL; } r->io.fd.buf = NULL; } From 26c4e8a9656a9aac1ed502f02334b627e59f1a4b Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 10 Jul 2019 11:42:26 +0200 Subject: [PATCH 025/411] Diskless replica: refactoring of DBs backups. --- src/replication.c | 69 ++++++++++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 22 deletions(-) diff --git a/src/replication.c b/src/replication.c index a7c1c0d6a..a89552a8d 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1119,6 +1119,49 @@ static int useDisklessLoad() { (server.repl_diskless_load == REPL_DISKLESS_LOAD_WHEN_DB_EMPTY && dbTotalServerKeyCount()==0); } +/* Helper function for readSyncBulkPayload() to make backups of the current + * DBs before socket-loading the new ones. The backups may be restored later + * or freed by disklessLoadRestoreBackups(). */ +redisDb *disklessLoadMakeBackups(void) { + redisDb *backups = zmalloc(sizeof(redisDb)*server.dbnum); + for (int i=0; i Date: Wed, 10 Jul 2019 12:36:14 +0200 Subject: [PATCH 026/411] Diskless replica: fix disklessLoadRestoreBackups() bug. --- src/replication.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/replication.c b/src/replication.c index a89552a8d..26e7cf8f0 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1138,16 +1138,16 @@ redisDb *disklessLoadMakeBackups(void) { * * If the socket loading went wrong, we want to restore the old backups * into the server databases. This function does just that in the case - * the 'count' argument (the number of DBs to replace) is non-zero. + * the 'restore' argument (the number of DBs to replace) is non-zero. * * When instead the loading succeeded we want just to free our old backups, - * in that case the funciton will do just that when 'count' is 0. */ -void disklessLoadRestoreBackups(redisDb *backup, int count, int empty_db_flags) + * in that case the funciton will do just that when 'restore' is 0. */ +void disklessLoadRestoreBackups(redisDb *backup, int restore, int empty_db_flags) { - if (count) { + if (restore) { /* Restore. */ emptyDbGeneric(server.db,-1,empty_db_flags,replicationEmptyDbCallback); - for (int i=0; i Date: Wed, 10 Jul 2019 18:08:31 +0200 Subject: [PATCH 027/411] Client side caching: add tracking clients in INFO. --- src/server.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/server.c b/src/server.c index 8ed5b591c..4337b8f01 100644 --- a/src/server.c +++ b/src/server.c @@ -3895,10 +3895,12 @@ sds genRedisInfoString(char *section) { "connected_clients:%lu\r\n" "client_recent_max_input_buffer:%zu\r\n" "client_recent_max_output_buffer:%zu\r\n" - "blocked_clients:%d\r\n", + "blocked_clients:%d\r\n" + "tracking_clients:%d\r\n", listLength(server.clients)-listLength(server.slaves), maxin, maxout, - server.blocked_clients); + server.blocked_clients, + server.tracking_clients); } /* Memory */ From c3d91f5d8b9e5207f984080507b146435fdd172a Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 10 Jul 2019 18:17:07 +0200 Subject: [PATCH 028/411] Client side caching: implement CLIENT GETREDIR. This subcommand may simplify the writing of Redis client libraries using the tracking feature and/or improve observability and debugging capabilities. --- src/networking.c | 36 ++++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/src/networking.c b/src/networking.c index 716b35859..1a8e3530a 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1954,20 +1954,21 @@ void clientCommand(client *c) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { const char *help[] = { -"id -- Return the ID of the current connection.", -"getname -- Return the name of the current connection.", -"kill -- Kill connection made from .", -"kill