From 41f26be34a9df652f8d84d1ed5fe7a28b7ff0cba Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 4 Feb 2020 00:30:13 -0500 Subject: [PATCH 01/33] rename to pro binary Former-commit-id: a8854bfd83de72d6aa418ee6d9b44fae1f622787 --- .gitignore | 2 +- README.md | 12 +++++------ deps/hiredis/Makefile | 4 ++-- keydb.conf | 2 +- src/Makefile | 2 +- src/debug.cpp | 2 +- src/server.cpp | 24 +++++++++++----------- tests/cluster/tests/06-slave-stop-cond.tcl | 2 +- utils/install_server.sh | 4 ++-- utils/redis_init_script | 2 +- utils/speed-regression.tcl | 4 ++-- 11 files changed, 30 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index c54671d59..a2d8033da 100644 --- a/.gitignore +++ b/.gitignore @@ -19,7 +19,7 @@ keydb-cli redis-sentinel keydb-sentinel redis-server -keydb-server +keydb-pro-server doc-tools release misc/* diff --git a/README.md b/README.md index c06a32d6c..e081ed024 100644 --- a/README.md +++ b/README.md @@ -158,19 +158,19 @@ Running KeyDB To run KeyDB with the default configuration just type: % cd src - % ./keydb-server + % ./keydb-pro-server If you want to provide your keydb.conf, you have to run it using an additional parameter (the path of the configuration file): % cd src - % ./keydb-server /path/to/keydb.conf + % ./keydb-pro-server /path/to/keydb.conf It is possible to alter the KeyDB configuration by passing parameters directly as options using the command line. Examples: - % ./keydb-server --port 9999 --replicaof 127.0.0.1 6379 - % ./keydb-server /etc/keydb/6379.conf --loglevel debug + % ./keydb-pro-server --port 9999 --replicaof 127.0.0.1 6379 + % ./keydb-pro-server /etc/keydb/6379.conf --loglevel debug All the options in keydb.conf are also supported as options using the command line, with exactly the same name. @@ -178,7 +178,7 @@ line, with exactly the same name. Playing with KeyDB ------------------ -You can use keydb-cli to play with KeyDB. Start a keydb-server instance, +You can use keydb-cli to play with KeyDB. Start a keydb-pro-server instance, then in another terminal try the following: % cd src @@ -243,7 +243,7 @@ Simply make a directory you would like to have the latest binaries dumped in, th ``` $ docker run -it --rm -v /path-to-dump-binaries:/keydb_bin eqalpha/keydb-build-bin ``` -You should receive the following files: keydb-benchmark, keydb-check-aof, keydb-check-rdb, keydb-cli, keydb-sentinel, keydb-server +You should receive the following files: keydb-benchmark, keydb-check-aof, keydb-check-rdb, keydb-cli, keydb-sentinel, keydb-pro-server If you are looking to enable flash support with the build (make MALLOC=memkind) then use the following command: ``` diff --git a/deps/hiredis/Makefile b/deps/hiredis/Makefile index 841990d5c..c231a9b93 100644 --- a/deps/hiredis/Makefile +++ b/deps/hiredis/Makefile @@ -29,9 +29,9 @@ INSTALL_INCLUDE_PATH= $(DESTDIR)$(PREFIX)/$(INCLUDE_PATH) INSTALL_LIBRARY_PATH= $(DESTDIR)$(PREFIX)/$(LIBRARY_PATH) INSTALL_PKGCONF_PATH= $(INSTALL_LIBRARY_PATH)/$(PKGCONF_PATH) -# keydb-server configuration used for testing +# keydb-pro-server configuration used for testing REDIS_PORT=56379 -REDIS_SERVER=keydb-server +REDIS_SERVER=keydb-pro-server define REDIS_TEST_CONFIG daemonize yes pidfile /tmp/hiredis-test-redis.pid diff --git a/keydb.conf b/keydb.conf index 896528241..32442744c 100644 --- a/keydb.conf +++ b/keydb.conf @@ -3,7 +3,7 @@ # Note that in order to read the configuration file, KeyDB must be # started with the file path as first argument: # -# ./keydb-server /path/to/keydb.conf +# ./keydb-pro-server /path/to/keydb.conf # Note on units: when memory size is needed, it is possible to specify # it in the usual form of 1k 5GB 4M and so forth: diff --git a/src/Makefile b/src/Makefile index 7548c6a1d..45c70f6d6 100644 --- a/src/Makefile +++ b/src/Makefile @@ -319,7 +319,7 @@ else endif @touch $@ -# keydb-server +# keydb-pro-server $(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS) diff --git a/src/debug.cpp b/src/debug.cpp index 99e7a0950..b054ed403 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -1542,7 +1542,7 @@ void sigsegvHandler(int sig, siginfo_t *info, void *secret) { "\n=== KEYDB BUG REPORT END. Make sure to include from START to END. ===\n\n" " Please report the crash by opening an issue on github:\n\n" " https://github.com/JohnSully/KeyDB/issues\n\n" -" Suspect RAM error? Use keydb-server --test-memory to verify it.\n\n" +" Suspect RAM error? Use keydb-pro-server --test-memory to verify it.\n\n" ); /* free(messages); Don't call free() with possibly corrupted memory. */ diff --git a/src/server.cpp b/src/server.cpp index 5c6a2873f..382d98c08 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4969,19 +4969,19 @@ void version(void) { } void usage(void) { - fprintf(stderr,"Usage: ./keydb-server [/path/to/keydb.conf] [options]\n"); - fprintf(stderr," ./keydb-server - (read config from stdin)\n"); - fprintf(stderr," ./keydb-server -v or --version\n"); - fprintf(stderr," ./keydb-server -h or --help\n"); - fprintf(stderr," ./keydb-server --test-memory \n\n"); + fprintf(stderr,"Usage: ./keydb-pro-server [/path/to/keydb.conf] [options]\n"); + fprintf(stderr," ./keydb-pro-server - (read config from stdin)\n"); + fprintf(stderr," ./keydb-pro-server -v or --version\n"); + fprintf(stderr," ./keydb-pro-server -h or --help\n"); + fprintf(stderr," ./keydb-pro-server --test-memory \n\n"); fprintf(stderr,"Examples:\n"); - fprintf(stderr," ./keydb-server (run the server with default conf)\n"); - fprintf(stderr," ./keydb-server /etc/redis/6379.conf\n"); - fprintf(stderr," ./keydb-server --port 7777\n"); - fprintf(stderr," ./keydb-server --port 7777 --replicaof 127.0.0.1 8888\n"); - fprintf(stderr," ./keydb-server /etc/mykeydb.conf --loglevel verbose\n\n"); + fprintf(stderr," ./keydb-pro-server (run the server with default conf)\n"); + fprintf(stderr," ./keydb-pro-server /etc/redis/6379.conf\n"); + fprintf(stderr," ./keydb-pro-server --port 7777\n"); + fprintf(stderr," ./keydb-pro-server --port 7777 --replicaof 127.0.0.1 8888\n"); + fprintf(stderr," ./keydb-pro-server /etc/mykeydb.conf --loglevel verbose\n\n"); fprintf(stderr,"Sentinel mode:\n"); - fprintf(stderr," ./keydb-server /etc/sentinel.conf --sentinel\n"); + fprintf(stderr," ./keydb-pro-server /etc/sentinel.conf --sentinel\n"); exit(1); } @@ -5475,7 +5475,7 @@ int main(int argc, char **argv) { exit(0); } else { fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); - fprintf(stderr,"Example: ./keydb-server --test-memory 4096\n\n"); + fprintf(stderr,"Example: ./keydb-pro-server --test-memory 4096\n\n"); exit(1); } } diff --git a/tests/cluster/tests/06-slave-stop-cond.tcl b/tests/cluster/tests/06-slave-stop-cond.tcl index f2e67050b..52110856d 100644 --- a/tests/cluster/tests/06-slave-stop-cond.tcl +++ b/tests/cluster/tests/06-slave-stop-cond.tcl @@ -65,7 +65,7 @@ test "Slave #5 is reachable and alive" { test "Slave #5 should not be able to failover" { after 10000 - assert {[RI 5 role] eq {slave}} + assert_equal {slave} [RI 5 role] } test "Cluster should be down" { diff --git a/utils/install_server.sh b/utils/install_server.sh index a1574a2fa..2e3ee9d4a 100755 --- a/utils/install_server.sh +++ b/utils/install_server.sh @@ -37,7 +37,7 @@ # REDIS_CONFIG_FILE=/etc/redis/1234.conf \ # REDIS_LOG_FILE=/var/log/redis_1234.log \ # REDIS_DATA_DIR=/var/lib/redis/1234 \ -# REDIS_EXECUTABLE=`command -v keydb-server` ./utils/install_server.sh +# REDIS_EXECUTABLE=`command -v keydb-pro-server` ./utils/install_server.sh # # This generates a redis config file and an /etc/init.d script, and installs them. # @@ -129,7 +129,7 @@ fi if [ ! -x "$REDIS_EXECUTABLE" ] ; then _MANUAL_EXECUTION=true #get the redis executable path - _REDIS_EXECUTABLE=`command -v keydb-server` + _REDIS_EXECUTABLE=`command -v keydb-pro-server` read -p "Please select the redis executable path [$_REDIS_EXECUTABLE] " REDIS_EXECUTABLE if [ ! -x "$REDIS_EXECUTABLE" ] ; then REDIS_EXECUTABLE=$_REDIS_EXECUTABLE diff --git a/utils/redis_init_script b/utils/redis_init_script index bee5545ef..589792ee3 100755 --- a/utils/redis_init_script +++ b/utils/redis_init_script @@ -12,7 +12,7 @@ ### END INIT INFO REDISPORT=6379 -EXEC=/usr/local/bin/keydb-server +EXEC=/usr/local/bin/keydb-pro-server CLIEXEC=/usr/local/bin/keydb-cli PIDFILE=/var/run/redis_${REDISPORT}.pid diff --git a/utils/speed-regression.tcl b/utils/speed-regression.tcl index 8d5220c75..073f01a19 100755 --- a/utils/speed-regression.tcl +++ b/utils/speed-regression.tcl @@ -27,8 +27,8 @@ proc run-tests branches { } # Start the Redis server - puts " starting the server... [exec ./keydb-server -v]" - set pids [exec echo "port $::port\nloglevel warning\n" | ./keydb-server - > /dev/null 2> /dev/null &] + puts " starting the server... [exec ./keydb-pro-server -v]" + set pids [exec echo "port $::port\nloglevel warning\n" | ./keydb-pro-server - > /dev/null 2> /dev/null &] puts " pids: $pids" after 1000 puts " running the benchmark" From 6bb1429a460b42eba6ed1ac4ba0f13ba7298076a Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 4 Feb 2020 01:22:25 -0500 Subject: [PATCH 02/33] module fixes Former-commit-id: ef4e11ecb8a6f1a05bb21f014120b0ef9e771b60 --- src/module.cpp | 83 ++++++++++++++++++++++++++++++++++++---------- src/networking.cpp | 42 +++++++++++++++++------ src/server.h | 3 ++ 3 files changed, 100 insertions(+), 28 deletions(-) diff --git a/src/module.cpp b/src/module.cpp index c9cc29bfd..b93ad36ad 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -30,6 +30,7 @@ #include "server.h" #include "cluster.h" #include "rdb.h" +#include "aelocker.h" #include #include #include @@ -1276,7 +1277,10 @@ client *moduleGetReplyClient(RedisModuleCtx *ctx) { int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyLongLong(c,ll); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyLongLongAsync(c,ll); return REDISMODULE_OK; } @@ -1286,9 +1290,12 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyProto(c,prefix,strlen(prefix)); - addReplyProto(c,msg,strlen(msg)); - addReplyProto(c,"\r\n",2); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyProtoAsync(c,prefix,strlen(prefix)); + addReplyProtoAsync(c,msg,strlen(msg)); + addReplyProtoAsync(c,"\r\n",2); return REDISMODULE_OK; } @@ -1332,15 +1339,19 @@ int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) { * The function always returns REDISMODULE_OK. */ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { client *c = moduleGetReplyClient(ctx); + AeLocker locker; + if (c == NULL) return REDISMODULE_OK; + std::unique_lock lock(c->lock); + locker.arm(c); if (len == REDISMODULE_POSTPONED_ARRAY_LEN) { ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)* (ctx->postponed_arrays_count+1), MALLOC_LOCAL); ctx->postponed_arrays[ctx->postponed_arrays_count] = - addReplyDeferredLen(c); + addReplyDeferredLenAsync(c); ctx->postponed_arrays_count++; } else { - addReplyArrayLen(c,len); + addReplyArrayLenAsync(c,len); } return REDISMODULE_OK; } @@ -1352,7 +1363,10 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyNullArray(c); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyNullArrayAsync(c); return REDISMODULE_OK; } @@ -1362,7 +1376,10 @@ int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReply(c,shared.emptyarray); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyAsync(c,shared.emptyarray); return REDISMODULE_OK; } @@ -1395,6 +1412,9 @@ int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return; + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); if (ctx->postponed_arrays_count == 0) { serverLog(LL_WARNING, "API misuse detected in module %s: " @@ -1404,7 +1424,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { return; } ctx->postponed_arrays_count--; - setDeferredArrayLen(c, + setDeferredArrayLenAsync(c, ctx->postponed_arrays[ctx->postponed_arrays_count], len); if (ctx->postponed_arrays_count == 0) { @@ -1419,7 +1439,10 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCBuffer(c,(char*)buf,len); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkCBufferAsync(c,(char*)buf,len); return REDISMODULE_OK; } @@ -1430,7 +1453,10 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCString(c,(char*)buf); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkCStringAsync(c,(char*)buf); return REDISMODULE_OK; } @@ -1440,7 +1466,10 @@ int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulk(c,str); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkAsync(c,str); return REDISMODULE_OK; } @@ -1450,7 +1479,10 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReply(c,shared.emptybulk); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyAsync(c,shared.emptybulk); return REDISMODULE_OK; } @@ -1461,7 +1493,10 @@ int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyVerbatim(c, buf, len, "txt"); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyVerbatimAsync(c, buf, len, "txt"); return REDISMODULE_OK; } @@ -1471,7 +1506,10 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) int RM_ReplyWithNull(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyNull(c); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyNullAsync(c); return REDISMODULE_OK; } @@ -1484,8 +1522,11 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) { int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); sds proto = sdsnewlen(reply->proto, reply->protolen); - addReplySds(c,proto); + addReplySdsAsync(c,proto); return REDISMODULE_OK; } @@ -1498,7 +1539,10 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyDouble(c,d); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyDoubleAsync(c,d); return REDISMODULE_OK; } @@ -1513,7 +1557,10 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyHumanLongDouble(c, ld); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyHumanLongDoubleAsync(c, ld); return REDISMODULE_OK; } diff --git a/src/networking.cpp b/src/networking.cpp index 8624c4e2c..746e48628 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -661,23 +661,33 @@ void addReplyDoubleAsync(client *c, double d) { addReplyDoubleCore(c, d, true); } +void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync); + /* Add a long double as a bulk reply, but uses a human readable formatting * of the double instead of exposing the crude behavior of doubles to the * dear user. */ -void addReplyHumanLongDouble(client *c, long double d) { +void addReplyHumanLongDoubleCore(client *c, long double d, bool fAsync) { if (c->resp == 2) { robj *o = createStringObjectFromLongDouble(d,1); - addReplyBulk(c,o); + addReplyBulkCore(c,o,fAsync); decrRefCount(o); } else { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); - addReplyProto(c,",",1); - addReplyProto(c,buf,len); - addReplyProto(c,"\r\n",2); + addReplyProtoCore(c,",",1,fAsync); + addReplyProtoCore(c,buf,len,fAsync); + addReplyProtoCore(c,"\r\n",2,fAsync); } } +void addReplyHumanLongDouble(client *c, long double d) { + addReplyHumanLongDoubleCore(c, d, false); +} + +void addReplyHumanLongDoubleAsync(client *c, long double d) { + addReplyHumanLongDoubleCore(c, d, true); +} + /* Add a long long as integer reply or bulk len / multi bulk count. * Basically this is used to output . */ void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) { @@ -912,6 +922,10 @@ void addReplyBulkCString(client *c, const char *s) { addReplyBulkCStringCore(c, s, false); } +void addReplyBulkCStringAsync(client *c, const char *s) { + addReplyBulkCStringCore(c, s, true); +} + /* Add a long long as a bulk reply */ void addReplyBulkLongLong(client *c, long long ll) { char buf[64]; @@ -930,9 +944,9 @@ void addReplyBulkLongLong(client *c, long long ll) { * three first characters of the extension are used, and if the * provided one is shorter than that, the remaining is filled with * spaces. */ -void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { +void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, bool fAsync) { if (c->resp == 2) { - addReplyBulkCBuffer(c,s,len); + addReplyBulkCBufferCore(c,s,len,fAsync); } else { char buf[32]; size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4); @@ -944,12 +958,20 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { p[i] = *ext++; } } - addReplyProto(c,buf,preflen); - addReplyProto(c,s,len); - addReplyProto(c,"\r\n",2); + addReplyProtoCore(c,buf,preflen,fAsync); + addReplyProtoCore(c,s,len,fAsync); + addReplyProtoCore(c,"\r\n",2,fAsync); } } +void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { + addReplyVerbatimCore(c, s, len, ext, false); +} + +void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext) { + addReplyVerbatimCore(c, s, len, ext, true); +} + /* Add an array of C strings as status replies with a heading. * This function is typically invoked by from commands that support * subcommands in response to the 'help' subcommand. The help array diff --git a/src/server.h b/src/server.h index c3a59db8a..8d976253d 100644 --- a/src/server.h +++ b/src/server.h @@ -2121,10 +2121,12 @@ void addReplyNullArray(client *c); void addReplyNullArrayAsync(client *c); void addReplyBool(client *c, int b); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); +void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext); void addReplyProto(client *c, const char *s, size_t len); void addReplyBulk(client *c, robj_roptr obj); void AddReplyFromClient(client *c, client *src); void addReplyBulkCString(client *c, const char *s); +void addReplyBulkCStringAsync(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkLongLong(client *c, long long ll); void addReply(client *c, robj_roptr obj); @@ -2134,6 +2136,7 @@ void addReplyError(client *c, const char *err); void addReplyStatus(client *c, const char *status); void addReplyDouble(client *c, double d); void addReplyHumanLongDouble(client *c, long double d); +void addReplyHumanLongDoubleAsync(client *c, long double d); void addReplyLongLong(client *c, long long ll); #ifdef __cplusplus void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); From 8800a516d152190c46e547bd69da3aca6515944c Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 6 Feb 2020 23:31:12 -0500 Subject: [PATCH 03/33] Add test to detect issue #137 and #132 Former-commit-id: 49d86746edef497a568c6f3a64695d420305cca8 --- src/debug.cpp | 20 +++++++++++++++++++- tests/integration/replication.tcl | 2 ++ tests/test_helper.tcl | 1 + tests/unit/rreplay.tcl | 6 +++--- 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/src/debug.cpp b/src/debug.cpp index 234f197be..1916dc79b 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -685,8 +685,26 @@ NULL } else if (!strcasecmp(szFromObj(c->argv[1]),"stringmatch-test") && c->argc == 2) { stringmatchlen_fuzz_test(); addReplyStatus(c,"Apparently Redis did not crash: test passed"); - } else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 2) { + } else if (!strcasecmp(szFromObj(c->argv[1]), "force-master") && c->argc == 3) { c->flags |= CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY; + if (!strcasecmp(szFromObj(c->argv[2]), "yes")) + { + redisMaster *mi = (redisMaster*)zcalloc(sizeof(redisMaster), MALLOC_LOCAL); + mi->master = c; + listAddNodeHead(g_pserver->masters, mi); + } + else if (strcasecmp(szFromObj(c->argv[2]), "flagonly")) // if we didn't set flagonly assume its an unset + { + serverAssert(c->flags & CLIENT_MASTER); + if (listLength(g_pserver->masters)) + { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + serverAssert(mi->master == c); + listDelNode(g_pserver->masters, listFirst(g_pserver->masters)); + zfree(mi); + } + c->flags &= ~(CLIENT_MASTER | CLIENT_MASTER_FORCE_REPLY); + } addReply(c, shared.ok); } else { addReplySubcommandSyntaxError(c); diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 0e50c20a9..5ed1ffc52 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -309,3 +309,5 @@ start_server {tags {"repl"}} { } } } + + diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index ec1d7cd31..0711d4cc2 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -37,6 +37,7 @@ set ::all_tests { unit/acl unit/rreplay unit/cron + unit/replication integration/block-repl integration/replication integration/replication-2 diff --git a/tests/unit/rreplay.tcl b/tests/unit/rreplay.tcl index 2029f521d..2fd1d3714 100644 --- a/tests/unit/rreplay.tcl +++ b/tests/unit/rreplay.tcl @@ -1,7 +1,7 @@ -start_server {tags {"rreplay"}} { +start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY use current db} { - r debug force-master + r debug force-master flagonly r select 4 r set dbnum invalid r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n" @@ -10,7 +10,7 @@ start_server {tags {"rreplay"}} { reconnect test {RREPLAY db different} { - r debug force-master + r debug force-master flagonly r select 4 r set testkey four r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2 From eabc436814c58c112d56bda275d70b0622843168 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 6 Feb 2020 23:31:31 -0500 Subject: [PATCH 04/33] Fix issue #137 and #132 Former-commit-id: 050d58007f84e4f71b0ae8b053ae4d6fd5bb4ec7 --- src/db.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index 605baaf40..9190d22f1 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -177,9 +177,8 @@ robj_roptr lookupKeyRead(redisDb *db, robj *key) { * Returns the linked value object if the key exists or NULL if the key * does not exist in the specified DB. */ robj *lookupKeyWrite(redisDb *db, robj *key) { + expireIfNeeded(db,key); robj *o = lookupKey(db,key,LOOKUP_UPDATEMVCC); - if (expireIfNeeded(db,key)) - o = NULL; return o; } From 6c2cef768707fa662e5ce152c3ec556ef5a70794 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 8 Feb 2020 16:49:41 -0500 Subject: [PATCH 05/33] Addmissing test file Former-commit-id: fb2bdf7d05e27b15dcb53b09d6820416a99a3ba7 --- tests/unit/replication.tcl | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 tests/unit/replication.tcl diff --git a/tests/unit/replication.tcl b/tests/unit/replication.tcl new file mode 100644 index 000000000..ada6af1e5 --- /dev/null +++ b/tests/unit/replication.tcl @@ -0,0 +1,12 @@ + +start_server {tags {"repl"}} { + test "incr of expired key on replica doesn't cause a crash" { + r debug force-master yes + r set testkey 1 + r pexpire testkey 1 + after 500 + r incr testkey + r incr testkey + r debug force-master no + } +} From 25ef65463e2c08191009c656883d6602d46ae342 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 00:25:03 -0500 Subject: [PATCH 06/33] Ensure multi-master works for ring topologies Former-commit-id: a7cc3aac28ccec4dadb80aa2cc7279c53982bc28 --- src/multi.cpp | 8 ++- src/replication.cpp | 77 ++++++++++++++++++++---- src/server.h | 2 + tests/integration/replication-active.tcl | 18 ++++++ tests/test_helper.tcl | 1 + tests/unit/rreplay.tcl | 4 +- 6 files changed, 94 insertions(+), 16 deletions(-) diff --git a/src/multi.cpp b/src/multi.cpp index 2018e08d9..c9485b0ae 100644 --- a/src/multi.cpp +++ b/src/multi.cpp @@ -28,6 +28,7 @@ */ #include "server.h" +bool FInReplicaReplay(); /* ================================ MULTI/EXEC ============================== */ @@ -172,12 +173,15 @@ void execCommand(client *c) { * This way we'll deliver the MULTI/..../EXEC block as a whole and * both the AOF and the replication link will have the same consistency * and atomicity guarantees. */ - if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN))) { + if (!must_propagate && !(c->cmd->flags & (CMD_READONLY|CMD_ADMIN)) && !(FInReplicaReplay())) { execCommandPropagateMulti(c); must_propagate = 1; } - call(c,g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL); + int flags = g_pserver->loading ? CMD_CALL_NONE : CMD_CALL_FULL; + if (FInReplicaReplay()) + flags &= ~CMD_CALL_PROPAGATE; + call(c,flags); /* Commands may alter argc/argv, restore mstate. */ c->mstate.commands[j].argc = c->argc; diff --git a/src/replication.cpp b/src/replication.cpp index b8236420d..e42872cd4 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -42,6 +42,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, int newfd); @@ -353,6 +354,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that char szMvcc[128]; + incrementMvccTstamp(); uint64_t mvccTstamp = getMvccTstamp(); int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp); int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp); @@ -432,6 +434,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { clientReplyBlock* reply = (clientReplyBlock*)listNodeValue(lnReply); addReplyProtoAsync(replica, reply->buf(), reply->used); } + if (!fSendRaw) { addReplyAsync(replica,shared.crlf); @@ -2420,6 +2423,8 @@ void freeMasterInfo(redisMaster *mi) { zfree(mi->masterauth); zfree(mi->masteruser); + if (mi->clientFake) + freeClient(mi->clientFake); delete mi->staleKeyMap; zfree(mi); } @@ -2477,6 +2482,11 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->master = NULL; mi->repl_state = REPL_STATE_CONNECT; mi->repl_down_since = g_pserver->unixtime; + if (mi->clientFake) { + freeClient(mi->clientFake); + mi->clientFake = nullptr; + + } /* We lost connection with our master, don't disconnect slaves yet, * maybe we'll be able to PSYNC with our master later. We'll disconnect * the slaves only if we'll have to do a full resync with our master. */ @@ -3344,14 +3354,33 @@ public: return m_cnesting == 1; } + redisMaster *getMi(client *c) + { + if (m_mi == nullptr) + m_mi = MasterInfoFromClient(c); + return m_mi; + } + + int nesting() const { return m_cnesting; } + private: int m_cnesting = 0; bool m_fCancelled = false; + redisMaster *m_mi = nullptr; }; +static thread_local ReplicaNestState *s_pstate = nullptr; + +bool FInReplicaReplay() +{ + return s_pstate != nullptr && s_pstate->nesting() > 0; +} + + +static std::unordered_map g_mapmvcc; + void replicaReplayCommand(client *c) { - static thread_local ReplicaNestState *s_pstate = nullptr; if (s_pstate == nullptr) s_pstate = new (MALLOC_LOCAL) ReplicaNestState; @@ -3375,9 +3404,10 @@ void replicaReplayCommand(client *c) return; } - unsigned char uuid[UUID_BINARY_LEN]; + std::string uuid; + uuid.resize(UUID_BINARY_LEN); if (c->argv[1]->type != OBJ_STRING || sdslen((sds)ptrFromObj(c->argv[1])) != 36 - || uuid_parse((sds)ptrFromObj(c->argv[1]), uuid) != 0) + || uuid_parse((sds)ptrFromObj(c->argv[1]), (unsigned char*)uuid.data()) != 0) { addReplyError(c, "Expected UUID arg1"); s_pstate->Cancel(); @@ -3413,7 +3443,7 @@ void replicaReplayCommand(client *c) } } - if (FSameUuidNoNil(uuid, cserver.uuid)) + if (FSameUuidNoNil((unsigned char*)uuid.data(), cserver.uuid)) { addReply(c, shared.ok); s_pstate->Cancel(); @@ -3423,33 +3453,56 @@ void replicaReplayCommand(client *c) if (!s_pstate->FPush()) return; + redisMaster *mi = s_pstate->getMi(c); + client *cFake = mi->clientFake; + if (mi->clientFakeNesting != s_pstate->nesting()) + cFake = nullptr; + serverAssert(mi != nullptr); + if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc && (mi->clientFake == nullptr)) + { + s_pstate->Cancel(); + s_pstate->Pop(); + return; + } + // OK We've recieved a command lets execute client *current_clientSave = serverTL->current_client; - client *cFake = createClient(-1, c->iel); + if (cFake == nullptr) + cFake = createClient(-1, c->iel); cFake->lock.lock(); cFake->authenticated = c->authenticated; cFake->puser = c->puser; cFake->querybuf = sdscatsds(cFake->querybuf,(sds)ptrFromObj(c->argv[2])); selectDb(cFake, c->db->id); auto ccmdPrev = serverTL->commandsExecuted; + cFake->flags |= CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP; processInputBuffer(cFake, (CMD_CALL_FULL & (~CMD_CALL_PROPAGATE))); + cFake->flags &= ~(CLIENT_MASTER | CLIENT_PREVENT_REPL_PROP); bool fExec = ccmdPrev != serverTL->commandsExecuted; cFake->lock.unlock(); - if (fExec) + if (fExec || cFake->flags & CLIENT_MULTI) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - redisMaster *mi = MasterInfoFromClient(c); - if (mi != nullptr) // this should never be null but I'd prefer not to crash - { - mi->mvccLastSync = mvcc; - } + g_mapmvcc[uuid] = mvcc; } else { + serverLog(LL_WARNING, "Command didn't execute: %s", cFake->buf); addReplyError(c, "command did not execute"); } - freeClient(cFake); + serverAssert(sdslen(cFake->querybuf) == 0); + if (cFake->flags & CLIENT_MULTI) + { + mi->clientFake = cFake; + mi->clientFakeNesting = s_pstate->nesting(); + } + else + { + if (mi->clientFake == cFake) + mi->clientFake = nullptr; + freeClient(cFake); + } serverTL->current_client = current_clientSave; // call() will not propogate this for us, so we do so here diff --git a/src/server.h b/src/server.h index 781f043f0..22d983b0f 100644 --- a/src/server.h +++ b/src/server.h @@ -1535,6 +1535,8 @@ struct redisMaster { int masterport; /* Port of master */ client *cached_master; /* Cached master to be reused for PSYNC. */ client *master; + client *clientFake; + int clientFakeNesting; /* The following two fields is where we store master PSYNC replid/offset * while the PSYNC is in progress. At the end we'll copy the fields into * the server->master client structure. */ diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 4f37f2adf..0c28eb85d 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -59,6 +59,24 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { } } + test {Active replicas propogate transaction} { + $master set testkey 0 + $master multi + $master incr testkey + $master incr testkey + after 5000 + $master get testkey + $master exec + assert_equal 2 [$master get testkey] + after 500 + wait_for_condition 50 500 { + [string match "2" [$slave get testkey]] + } else { + fail "Transaction failed to replicate" + } + $master flushall + } + test {Active replicas WAIT} { # Test that wait succeeds since replicas should be syncronized $master set testkey foo diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 0711d4cc2..376004041 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -45,6 +45,7 @@ set ::all_tests { integration/replication-4 integration/replication-psync integration/replication-active + integration/replication-multimaster integration/aof integration/rdb integration/convert-zipmap-hash-on-load diff --git a/tests/unit/rreplay.tcl b/tests/unit/rreplay.tcl index 2fd1d3714..e11030f95 100644 --- a/tests/unit/rreplay.tcl +++ b/tests/unit/rreplay.tcl @@ -1,7 +1,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { test {RREPLAY use current db} { - r debug force-master flagonly + r debug force-master yes r select 4 r set dbnum invalid r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$5\r\ndbnum\r\n\$4\r\nfour\r\n" @@ -10,7 +10,7 @@ start_server {tags {"rreplay"} overrides {active-replica yes}} { reconnect test {RREPLAY db different} { - r debug force-master flagonly + r debug force-master yes r select 4 r set testkey four r rreplay "f4d5b2b5-4f07-4ee5-a4f2-5dc98507dfce" "*3\r\n\$3\r\nSET\r\n\$7\r\ntestkey\r\n\$4\r\nbebe\r\n" 2 From eac3cffe416623af74a4d7b88fc39c5092d111a6 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 00:29:26 -0500 Subject: [PATCH 07/33] CLANG build fix Former-commit-id: dc78bf1ccbd3dfd2de582d2a0d0be3223de3c7c3 --- src/replication.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/replication.cpp b/src/replication.cpp index e42872cd4..fc632a6cd 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -43,6 +43,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, int newfd); From d346ad77347901d519f87514b817fdbe1db3e895 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 18:15:29 -0500 Subject: [PATCH 08/33] Add missing test file Former-commit-id: 0c101dccc825668cb7ff07c23e82db0f5642b786 --- tests/integration/replication-multimaster.tcl | 74 +++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 tests/integration/replication-multimaster.tcl diff --git a/tests/integration/replication-multimaster.tcl b/tests/integration/replication-multimaster.tcl new file mode 100644 index 000000000..e5e77fdad --- /dev/null +++ b/tests/integration/replication-multimaster.tcl @@ -0,0 +1,74 @@ +foreach topology {mesh ring} { +start_server {tags {"multi-master"} overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { +start_server {overrides {hz 500 active-replica yes multi-master yes}} { + + for {set j 0} {$j < 4} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + } + + # Initialize as mesh + if [string equal $topology "mesh"] { + for {set j 0} {$j < 4} {incr j} { + for {set k 0} {$k < 4} {incr k} { + if $j!=$k { + $R($j) replicaof $R_host($k) $R_port($k) + after 100 + } + } + }} + #Else Ring + if [string equal $topology "ring"] { + $R(0) replicaof $R_host(3) $R_port(3) + after 100 + $R(1) replicaof $R_host(0) $R_port(0) + after 100 + $R(2) replicaof $R_host(1) $R_port(1) + after 100 + $R(3) replicaof $R_host(2) $R_port(2) + } + + after 2000 + + test "$topology replicates to all nodes" { + $R(0) set testkey foo + after 500 + assert_equal foo [$R(1) get testkey] "replicates to 1" + assert_equal foo [$R(2) get testkey] "replicates to 2" + } + + test "$topology replicates only once" { + $R(0) set testkey 1 + after 500 + $R(1) incr testkey + after 500 + $R(2) incr testkey + after 500 + assert_equal 3 [$R(0) get testkey] + assert_equal 3 [$R(1) get testkey] + assert_equal 3 [$R(2) get testkey] + assert_equal 3 [$R(3) get testkey] + } + + test "$topology transaction replicates only once" { + for {set j 0} {$j < 1000} {incr j} { + $R(0) set testkey 1 + $R(0) multi + $R(0) incr testkey + $R(0) incr testkey + $R(0) exec + after 1 + assert_equal 3 [$R(0) get testkey] "node 0" + assert_equal 3 [$R(1) get testkey] "node 1" + assert_equal 3 [$R(2) get testkey] "node 2" + assert_equal 3 [$R(3) get testkey] "node 3" + } + } +} +} +} +} +} From 68235881e99a41b53cd2f0cc365edc9d0476d1ce Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 30 Jan 2020 17:55:48 -0500 Subject: [PATCH 09/33] Fix memory leak in cron Former-commit-id: f1748f8c7611ad96d7ba4fed66439cd1f043e6f3 --- src/cron.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cron.cpp b/src/cron.cpp index fd777d43d..230cf4ed4 100644 --- a/src/cron.cpp +++ b/src/cron.cpp @@ -67,6 +67,7 @@ void cronCommand(client *c) robj *o = createObject(OBJ_CRON, spjob.release()); setKey(c->db, c->argv[ARG_NAME], o); + decrRefCount(o); // use an expire to trigger execution. Note: We use a subkey expire here so legacy clients don't delete it. setExpire(c, c->db, c->argv[ARG_NAME], c->argv[ARG_NAME], base + interval); addReply(c, shared.ok); From 30ece138d5e4faa414d98f585791538eb3c93455 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 10 Feb 2020 19:52:57 -0500 Subject: [PATCH 10/33] Fix issue #119 Former-commit-id: 46224721237616c345f6726b721a354d7bda71df --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index c6f360905..2d6027348 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1690,7 +1690,7 @@ void clientsCron(int iel) { /* The following functions do different service checks on the client. * The protocol is that they return non-zero if the client was * terminated. */ - if (clientsCronHandleTimeout(c,now)) goto LContinue; + if (clientsCronHandleTimeout(c,now)) continue; // Client free'd so don't release the lock if (clientsCronResizeQueryBuffer(c)) goto LContinue; if (clientsCronTrackExpansiveClients(c)) goto LContinue; LContinue: From e4d74b993f928dce069996e32a90f4b98e17a4ca Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 00:59:07 -0500 Subject: [PATCH 11/33] Fix cases where duplicate RREPLAY is applied Former-commit-id: c3317686f8b8d94a3b2295def899ae30e208f327 --- src/replication.cpp | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index fc632a6cd..9b8570c49 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -1259,19 +1259,19 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) { aePostFunction(g_pserver->rgthreadvar[replica->iel].el, [replica] { // Because the client could have been closed while the lambda waited to run we need to - // verify the replica is still connected + // verify the replica is still connected listIter li; - listNode *ln; - listRewind(g_pserver->slaves,&li); - bool fFound = false; - while ((ln = listNext(&li))) { - if (listNodeValue(ln) == replica) { - fFound = true; - break; - } - } - if (!fFound) - return; + listNode *ln; + listRewind(g_pserver->slaves,&li); + bool fFound = false; + while ((ln = listNext(&li))) { + if (listNodeValue(ln) == replica) { + fFound = true; + break; + } + } + if (!fFound) + return; aeDeleteFileEvent(g_pserver->rgthreadvar[replica->iel].el,replica->fd,AE_WRITABLE); if (aeCreateFileEvent(g_pserver->rgthreadvar[replica->iel].el, replica->fd, AE_WRITABLE, sendBulkToSlave, replica) == AE_ERR) { freeClient(replica); @@ -2329,10 +2329,11 @@ int connectWithMaster(redisMaster *mi) { void undoConnectWithMaster(redisMaster *mi) { int fd = mi->repl_transfer_s; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{ + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fd]{ aeDeleteFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,fd,AE_READABLE|AE_WRITABLE); close(fd); }); + serverAssert(res == AE_OK); mi->repl_transfer_s = -1; } @@ -3459,7 +3460,7 @@ void replicaReplayCommand(client *c) if (mi->clientFakeNesting != s_pstate->nesting()) cFake = nullptr; serverAssert(mi != nullptr); - if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc && (mi->clientFake == nullptr)) + if (mvcc != 0 && g_mapmvcc[uuid] >= mvcc) { s_pstate->Cancel(); s_pstate->Pop(); @@ -3485,7 +3486,8 @@ void replicaReplayCommand(client *c) { addReply(c, shared.ok); selectDb(c, cFake->db->id); - g_mapmvcc[uuid] = mvcc; + if (mvcc > g_mapmvcc[uuid]) + g_mapmvcc[uuid] = mvcc; } else { From fef9925b7f90e622cfded283d8cd637c68def82a Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 01:00:21 -0500 Subject: [PATCH 12/33] Fix higher latency at low load by grouping clients to threads. This fixes slow perf in cluster benchmarks mentioned in issue #102 Former-commit-id: 1a4c3224c9848f02fbdb49674045b593cfc41d31 --- src/ae.cpp | 5 +++- src/aof.cpp | 9 ++++--- src/cluster.cpp | 12 +++++++++- src/config.cpp | 5 ++++ src/networking.cpp | 58 ++++++++++++++++++++++++++++++++++++++++++---- src/server.h | 1 + 6 files changed, 81 insertions(+), 9 deletions(-) diff --git a/src/ae.cpp b/src/ae.cpp index 90c148510..b92cd4a67 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -273,7 +273,8 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) cmd.proc = proc; cmd.clientData = arg; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); - AE_ASSERT(size == sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; return AE_OK; } @@ -296,6 +297,8 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch } auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); + if (size != sizeof(cmd)) + return AE_ERR; AE_ASSERT(size == sizeof(cmd)); int ret = AE_OK; if (fSynchronous) diff --git a/src/aof.cpp b/src/aof.cpp index b82be9a34..65647bbba 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -167,11 +167,12 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { * not one already. */ if (!g_pserver->aof_rewrite_pending) { g_pserver->aof_rewrite_pending = true; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { g_pserver->aof_rewrite_pending = false; if (g_pserver->aof_pipe_write_data_to_child >= 0) aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); }); + serverAssert(res == AE_OK); // we can't handle an error here } } @@ -1563,16 +1564,18 @@ error: void aofClosePipes(void) { int fdAofAckPipe = g_pserver->aof_pipe_read_ack_from_child; - aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ + int res = aePostFunction(g_pserver->el_alf_pip_read_ack_from_child, [fdAofAckPipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofAckPipe,AE_READABLE); close (fdAofAckPipe); }); + serverAssert(res == AE_OK); int fdAofWritePipe = g_pserver->aof_pipe_write_data_to_child; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ + res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [fdAofWritePipe]{ aeDeleteFileEventAsync(serverTL->el,fdAofWritePipe,AE_WRITABLE); close(fdAofWritePipe); }); + serverAssert(res == AE_OK); g_pserver->aof_pipe_write_data_to_child = -1; close(g_pserver->aof_pipe_read_data_from_parent); diff --git a/src/cluster.cpp b/src/cluster.cpp index f6a6e03dc..c20b0f4c4 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -295,6 +295,15 @@ int clusterLoadConfig(char *filename) { if (clusterGetMaxEpoch() > g_pserver->cluster->currentEpoch) { g_pserver->cluster->currentEpoch = clusterGetMaxEpoch(); } + + if (dictSize(g_pserver->cluster->nodes) > 1 && cserver.thread_min_client_threshold < 100) + { + // Because we expect the individual load of a client to be much less in a cluster (it will spread over multiple server) + // we can increase the grouping of clients on a single thread within reason + cserver.thread_min_client_threshold *= dictSize(g_pserver->cluster->nodes); + cserver.thread_min_client_threshold = std::min(cserver.thread_min_client_threshold, 200); + serverLog(LL_NOTICE, "Expanding min-clients-per-thread to %d due to cluster", cserver.thread_min_client_threshold); + } return C_OK; fmterr: @@ -623,9 +632,10 @@ void freeClusterLink(clusterLink *link) { if (link->node) link->node->link = NULL; link->node = nullptr; - aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [link]{ freeClusterLink(link); }); + serverAssert(res == AE_OK); return; } if (link->fd != -1) { diff --git a/src/config.cpp b/src/config.cpp index c056b98dc..2aaad825e 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -805,6 +805,11 @@ void loadServerConfigFromString(char *config) { } else if (!strcasecmp(argv[0],"enable-pro")) { cserver.fUsePro = true; break; + } else if (!strcasecmp(argv[0],"min-clients-per-thread") && argc == 2) { + cserver.thread_min_client_threshold = atoi(argv[1]); + if (cserver.thread_min_client_threshold < 0 || cserver.thread_min_client_threshold > 400) { + err = "min-thread-client must be between 0 and 400"; goto loaderr; + } } else { err = "Bad directive or wrong number of arguments"; goto loaderr; } diff --git a/src/networking.cpp b/src/networking.cpp index 97744c410..097df0f87 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,6 +1003,33 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } +int chooseBestThreadForAccept(int ielCur) +{ + listIter li; + listNode *ln; + int rgcclients[MAX_EVENT_LOOPS] = {0}; + + listRewind(g_pserver->clients, &li); + while ((ln = listNext(&li)) != nullptr) + { + client *c = (client*)listNodeValue(ln); + if (c->iel < 0) + continue; + + rgcclients[c->iel]++; + } + + int ielMinLoad = 0; + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + if (rgcclients[iel] < cserver.thread_min_client_threshold) + return iel; + if (rgcclients[iel] < rgcclients[ielMinLoad]) + ielMinLoad = iel; + } + return ielMinLoad; +} + #define MAX_ACCEPTS_PER_CALL 1000 static void acceptCommonHandler(int fd, int flags, char *ip, int iel) { client *c; @@ -1105,7 +1132,22 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (!g_fTestMode) { - // We always accept on the same thread + { + int ielTarget = chooseBestThreadForAccept(ielCur); + if (ielTarget != ielCur) + { + char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); + memcpy(szT, cip, NET_IP_STR_LEN); + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ + acceptCommonHandler(cfd,0,szT, ielTarget); + zfree(szT); + }); + + if (res == AE_OK) + continue; + } + } + LLocalThread: aeAcquireLock(); acceptCommonHandler(cfd,0,cip, ielCur); @@ -1122,10 +1164,15 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { goto LLocalThread; char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); - aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ + int res = aePostFunction(g_pserver->rgthreadvar[iel].el, [cfd, iel, szT]{ acceptCommonHandler(cfd,0,szT, iel); zfree(szT); }); + if (res != AE_OK) + { + zfree(szT); + goto LLocalThread; + } } } } @@ -1151,13 +1198,16 @@ void acceptUnixHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int ielTarget = rand() % cserver.cthreads; if (ielTarget == ielCur) { + LLocalThread: acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielCur); } else { - aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ + int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget]{ acceptCommonHandler(cfd,CLIENT_UNIX_SOCKET,NULL, ielTarget); }); + if (res != AE_OK) + goto LLocalThread; } aeReleaseLock(); @@ -2529,7 +2579,7 @@ NULL { int iel = client->iel; freeClientAsync(client); - aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { + aePostFunction(g_pserver->rgthreadvar[client->iel].el, [iel] { // note: failure is OK freeClientsInAsyncFreeQueue(iel); }); } diff --git a/src/server.h b/src/server.h index 22d983b0f..3ff023677 100644 --- a/src/server.h +++ b/src/server.h @@ -1604,6 +1604,7 @@ struct redisServerConst { unsigned char uuid[UUID_BINARY_LEN]; /* This server's UUID - populated on boot */ bool fUsePro = false; + int thread_min_client_threshold = 50; }; struct redisServer { From d4c1e981247cb8ac2c33d64dcef9937ee422f451 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 01:41:00 -0500 Subject: [PATCH 13/33] Implement an error handler so bug #125 can't happen Former-commit-id: 16a019dba053fd0654116ff98a2ad0b66a9ed4e6 --- src/aof.cpp | 28 +++++++++++++++++++--------- src/networking.cpp | 4 ++-- src/server.cpp | 1 + src/server.h | 1 + 4 files changed, 23 insertions(+), 11 deletions(-) diff --git a/src/aof.cpp b/src/aof.cpp index 65647bbba..de8a8260e 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -124,6 +124,21 @@ void aofChildWriteDiffData(aeEventLoop *el, int fd, void *privdata, int mask) { } } +void installAofRewriteEvent() +{ + serverTL->fRetrySetAofEvent = false; + if (!g_pserver->aof_rewrite_pending) { + g_pserver->aof_rewrite_pending = true; + int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { + g_pserver->aof_rewrite_pending = false; + if (g_pserver->aof_pipe_write_data_to_child >= 0) + aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); + }); + if (res != AE_OK) + serverTL->fRetrySetAofEvent = true; + } +} + /* Append data to the AOF rewrite buffer, allocating new blocks if needed. */ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { listNode *ln = listLast(g_pserver->aof_rewrite_buf_blocks); @@ -165,15 +180,7 @@ void aofRewriteBufferAppend(unsigned char *s, unsigned long len) { /* Install a file event to send data to the rewrite child if there is * not one already. */ - if (!g_pserver->aof_rewrite_pending) { - g_pserver->aof_rewrite_pending = true; - int res = aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [] { - g_pserver->aof_rewrite_pending = false; - if (g_pserver->aof_pipe_write_data_to_child >= 0) - aeCreateFileEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, g_pserver->aof_pipe_write_data_to_child, AE_WRITABLE, aofChildWriteDiffData, NULL); - }); - serverAssert(res == AE_OK); // we can't handle an error here - } + installAofRewriteEvent(); } /* Write the buffer (possibly composed of multiple blocks) into the specified @@ -349,6 +356,9 @@ void flushAppendOnlyFile(int force) { int sync_in_progress = 0; mstime_t latency; + if (serverTL->fRetrySetAofEvent) + installAofRewriteEvent(); + if (sdslen(g_pserver->aof_buf) == 0) { /* Check if we need to do fsync even the aof buffer is empty, * because previously in AOF_FSYNC_EVERYSEC mode, fsync is diff --git a/src/networking.cpp b/src/networking.cpp index 097df0f87..54e04406f 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,7 +1003,7 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } -int chooseBestThreadForAccept(int ielCur) +int chooseBestThreadForAccept() { listIter li; listNode *ln; @@ -1133,7 +1133,7 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (!g_fTestMode) { { - int ielTarget = chooseBestThreadForAccept(ielCur); + int ielTarget = chooseBestThreadForAccept(); if (ielTarget != ielCur) { char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); diff --git a/src/server.cpp b/src/server.cpp index 2d6027348..15f52ab52 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2884,6 +2884,7 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); pvar->current_client = nullptr; pvar->clients_paused = 0; + pvar->fRetrySetAofEvent = false; if (pvar->el == NULL) { serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", diff --git a/src/server.h b/src/server.h index 3ff023677..3ec2e8948 100644 --- a/src/server.h +++ b/src/server.h @@ -1526,6 +1526,7 @@ struct redisServerThreadVars { struct fastlock lockPendingWrite { "thread pending write" }; char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ long unsigned commandsExecuted = 0; + bool fRetrySetAofEvent = false; }; struct redisMaster { From 47480943560ab4d110ac4d80f26356c93e733096 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 03:44:28 -0500 Subject: [PATCH 14/33] Fix race condition in allocating connections to threads Former-commit-id: 52434a583aa7114ff5658226441ab82ed3110a57 --- src/networking.cpp | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 54e04406f..9d94f5171 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,29 +1003,23 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } +static std::atomic rgacceptsInFlight[MAX_EVENT_LOOPS]; int chooseBestThreadForAccept() { - listIter li; - listNode *ln; - int rgcclients[MAX_EVENT_LOOPS] = {0}; - - listRewind(g_pserver->clients, &li); - while ((ln = listNext(&li)) != nullptr) - { - client *c = (client*)listNodeValue(ln); - if (c->iel < 0) - continue; - - rgcclients[c->iel]++; - } - int ielMinLoad = 0; + int cclientsMin = INT_MAX; for (int iel = 0; iel < cserver.cthreads; ++iel) { - if (rgcclients[iel] < cserver.thread_min_client_threshold) + int cclientsThread; + atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread); + cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed); + if (cclientsThread < cserver.thread_min_client_threshold) return iel; - if (rgcclients[iel] < rgcclients[ielMinLoad]) + if (cclientsThread < cclientsMin) + { + cclientsMin = cclientsThread; ielMinLoad = iel; + } } return ielMinLoad; } @@ -1134,18 +1128,21 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { { { int ielTarget = chooseBestThreadForAccept(); + rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed); if (ielTarget != ielCur) { char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ acceptCommonHandler(cfd,0,szT, ielTarget); + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); zfree(szT); }); if (res == AE_OK) continue; } + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); } LLocalThread: From 3b496ea2829aec27f9f1c9ae86823c008476617e Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 03:33:45 -0500 Subject: [PATCH 15/33] aeDeleteEventLoop use after free and leak fixes Former-commit-id: 77820e5d50e0426570285a2c740c47b05a7c6c2f --- src/ae.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/ae.cpp b/src/ae.cpp index d84a5b0d6..075e73ebd 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -402,10 +402,18 @@ extern "C" void aeDeleteEventLoop(aeEventLoop *eventLoop) { aeApiFree(eventLoop); zfree(eventLoop->events); zfree(eventLoop->fired); - zfree(eventLoop); fastlock_free(&eventLoop->flock); close(eventLoop->fdCmdRead); close(eventLoop->fdCmdWrite); + + auto *te = eventLoop->timeEventHead; + while (te) + { + auto *teNext = te->next; + zfree(te); + te = teNext; + } + zfree(eventLoop); } extern "C" void aeStop(aeEventLoop *eventLoop) { From 4bf9beb484ecc83330a99c76509dba47733ebcbf Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 03:33:45 -0500 Subject: [PATCH 16/33] aeDeleteEventLoop use after free and leak fixes Former-commit-id: 2fd93c5789a4e81455d51b2a4786f708e8d6a2d7 --- src/ae.cpp | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/ae.cpp b/src/ae.cpp index b92cd4a67..16ac3ebba 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -390,10 +390,18 @@ extern "C" void aeDeleteEventLoop(aeEventLoop *eventLoop) { aeApiFree(eventLoop); zfree(eventLoop->events); zfree(eventLoop->fired); - zfree(eventLoop); fastlock_free(&eventLoop->flock); close(eventLoop->fdCmdRead); close(eventLoop->fdCmdWrite); + + auto *te = eventLoop->timeEventHead; + while (te) + { + auto *teNext = te->next; + zfree(te); + te = teNext; + } + zfree(eventLoop); } extern "C" void aeStop(aeEventLoop *eventLoop) { From 00f80cce27e7c25d89b5ddd2e15d9a5b28f74d23 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 03:36:47 -0500 Subject: [PATCH 17/33] Fix leak when tombstone exists Former-commit-id: 3d0ccdf6d2ddc523a3532c46cf905023d207b8cb --- src/db.cpp | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/db.cpp b/src/db.cpp index 29067c975..c22ccaaf5 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -412,7 +412,9 @@ bool redisDbPersistentData::syncDelete(robj *key) auto itr = m_pdbSnapshot->find_cached_threadsafe(szFromObj(key)); if (itr != nullptr) { - dictAdd(m_pdictTombstone, sdsdup(szFromObj(key)), nullptr); + sds keyTombstone = sdsdup(szFromObj(key)); + if (dictAdd(m_pdictTombstone, keyTombstone, nullptr) != DICT_OK) + sdsfree(keyTombstone); } } if (g_pserver->cluster_enabled) slotToKeyDel(key); From 41c75234bd592de8cb531188767a2964596a829c Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 03:43:29 -0500 Subject: [PATCH 18/33] Fix memory leak of ReplicaNestState on shutdown Former-commit-id: 4781eda7225c2640e25387663c33ef74cd98b0c4 --- src/replication.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 9b8570c49..ba2008c0a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3371,7 +3371,7 @@ private: redisMaster *m_mi = nullptr; }; -static thread_local ReplicaNestState *s_pstate = nullptr; +static thread_local std::unique_ptr s_pstate; bool FInReplicaReplay() { @@ -3384,7 +3384,7 @@ static std::unordered_map g_mapmvcc; void replicaReplayCommand(client *c) { if (s_pstate == nullptr) - s_pstate = new (MALLOC_LOCAL) ReplicaNestState; + s_pstate = std::make_unique(); // the replay command contains two arguments: // 1: The UUID of the source From b1ccb096a429f0e06ff18d9c62c3f360ac75585b Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 16:49:04 -0500 Subject: [PATCH 19/33] Add GC shutdown command Former-commit-id: 28e8f68016b554f3d410502c2af1641368c2bb92 --- src/gc.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/gc.h b/src/gc.h index 6612b0afb..9717dcbee 100644 --- a/src/gc.h +++ b/src/gc.h @@ -30,6 +30,12 @@ public: return m_epochNext; } + void shutdown() + { + std::unique_lock lock(m_lock); + m_vecepochs.clear(); + } + void endEpoch(uint64_t epoch, bool fNoFree = false) { std::unique_lock lock(m_lock); From 8f6f496c7e64fa50f72777d9ec290da85e77e722 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 17:08:00 -0500 Subject: [PATCH 20/33] Memory leak fix on config, and redisDb dtor Former-commit-id: b92bbf4de8ffc3edc965e2f9da4dd82ed7071559 --- src/config.cpp | 2 +- src/db.cpp | 8 ++++++++ src/server.h | 7 +++++-- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/config.cpp b/src/config.cpp index 2aaad825e..2c9a9d518 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -405,7 +405,7 @@ void loadServerConfigFromString(char *config) { } else if ((!strcasecmp(argv[0],"slaveof") || !strcasecmp(argv[0],"replicaof")) && argc == 3) { slaveof_linenum = linenum; - replicationAddMaster(sdsnew(argv[1]), atoi(argv[2])); + replicationAddMaster(argv[1], atoi(argv[2])); } else if ((!strcasecmp(argv[0],"repl-ping-slave-period") || !strcasecmp(argv[0],"repl-ping-replica-period")) && argc == 2) diff --git a/src/db.cpp b/src/db.cpp index 9190d22f1..e94e0cdb1 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1305,6 +1305,14 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) rememberSlaveKeyWithExpire(db,key); } +redisDb::~redisDb() +{ + dictRelease(watched_keys); + dictRelease(ready_keys); + dictRelease(blocking_keys); + listRelease(defrag_later); +} + void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) { dictEntry *kde; diff --git a/src/server.h b/src/server.h index 3ec2e8948..135a216ae 100644 --- a/src/server.h +++ b/src/server.h @@ -1127,10 +1127,13 @@ typedef struct clientReplyBlock { /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ -typedef struct redisDb { +struct redisDb { redisDb() : expireitr(nullptr) {}; + + ~redisDb(); + dict *pdict; /* The keyspace for this DB */ expireset *setexpire; expireset::setiter expireitr; @@ -1142,7 +1145,7 @@ typedef struct redisDb { long long last_expire_set; /* when the last expire was set */ double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ -} redisDb; +}; /* Client MULTI/EXEC state */ typedef struct multiCmd { From ccb4ce65c842b504a35d2ff71c88979ce2c9797f Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 18:40:00 -0500 Subject: [PATCH 21/33] Fix leak deserializing expire Former-commit-id: 1a7e14d8ef96f7849ecdb120f3339b2423ca25d8 --- src/db.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/db.cpp b/src/db.cpp index d3b79b730..78860ebb9 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2465,6 +2465,9 @@ std::unique_ptr deserializeExpire(sds key, const char *str, size_t spexpire = std::make_unique(key, subkey, when); else spexpire->update(subkey, when); + + if (subkey) + sdsfree(subkey); } *poffset = offset; From 0337da40da7f16f831420e38cd2e39cae88241fb Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 18:41:29 -0500 Subject: [PATCH 22/33] Fix memory leak in RDB load Former-commit-id: a424194f42e61a324489464a0fed14837b8191e4 --- src/rdb.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 3179e6e2d..e350adf15 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1676,9 +1676,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { == NULL) return NULL; if (rdbtype == RDB_TYPE_ZSET_2) { - if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) { + sdsfree(sdsele); + return NULL; + } } else { - if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadDoubleValue(rdb,&score) == -1) { + sdsfree(sdsele); + return NULL; + } } /* Don't care about integer-encoded strings. */ @@ -2487,6 +2493,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { decrRefCount(val); val = nullptr; } + decrRefCount(key); + key = nullptr; } if (g_pserver->key_load_delay) usleep(g_pserver->key_load_delay); From d3a69998e4e513b799fcf72a2d13bb8ec883c30f Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 18:41:29 -0500 Subject: [PATCH 23/33] Fix memory leak in RDB load Former-commit-id: 06ad1c15d719a34fed36244b12a593f749bbb8a6 --- src/rdb.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 48205f430..5b316a4a8 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1545,9 +1545,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { == NULL) return NULL; if (rdbtype == RDB_TYPE_ZSET_2) { - if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) { + sdsfree(sdsele); + return NULL; + } } else { - if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadDoubleValue(rdb,&score) == -1) { + sdsfree(sdsele); + return NULL; + } } /* Don't care about integer-encoded strings. */ @@ -2181,6 +2187,8 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { decrRefCount(val); val = nullptr; } + decrRefCount(key); + key = nullptr; } /* Reset the state that is key-specified and is populated by From dde7fc4c18c705625415d05e1836f8063f342370 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 18:41:29 -0500 Subject: [PATCH 24/33] Fix memory leak in RDB load Former-commit-id: 4e9d2f08b11cc76bb2716514fa6d55f76160fd13 --- src/rdb.cpp | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index a3351fe59..ebace63e5 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1597,9 +1597,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, robj *key, uint64_t mvcc_tstamp) { == NULL) return NULL; if (rdbtype == RDB_TYPE_ZSET_2) { - if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) { + sdsfree(sdsele); + return NULL; + } } else { - if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadDoubleValue(rdb,&score) == -1) { + sdsfree(sdsele); + return NULL; + } } /* Don't care about integer-encoded strings. */ @@ -2403,6 +2409,8 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { decrRefCount(val); val = nullptr; } + decrRefCount(key); + key = nullptr; } if (g_pserver->key_load_delay) usleep(g_pserver->key_load_delay); From 6011a072d15109edca29cf296848120c06cb3e93 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 18:43:36 -0500 Subject: [PATCH 25/33] Virtual dtor for DB base class Former-commit-id: 953b1cb3a2fec808751d2e7507efe8896f671921 --- src/server.h | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/server.h b/src/server.h index c9f3d0dda..5077a9ddd 100644 --- a/src/server.h +++ b/src/server.h @@ -1226,7 +1226,7 @@ class redisDbPersistentData friend class redisDbPersistentDataSnapshot; public: - ~redisDbPersistentData(); + virtual ~redisDbPersistentData(); redisDbPersistentData() = default; redisDbPersistentData(redisDbPersistentData &&) = default; @@ -1424,6 +1424,7 @@ struct redisDb : public redisDbPersistentDataSnapshot redisDb() : expireitr(nullptr) {} + void initialize(int id); virtual ~redisDb(); From b6db2d32ad14e24f968d780f975b8fd08c08df44 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 18:53:46 -0500 Subject: [PATCH 26/33] Graceful shutdown of server threads when quit is requested Former-commit-id: b9db899f6ccea62222170c6eec264d403a7a911d --- src/db.cpp | 2 +- src/server.cpp | 25 ++++++++++++++++++++++--- src/server.h | 3 +++ 3 files changed, 26 insertions(+), 4 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index e94e0cdb1..562485941 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -977,7 +977,7 @@ void shutdownCommand(client *c) { * Also when in Sentinel mode clear the SAVE flag and force NOSAVE. */ if (g_pserver->loading || g_pserver->sentinel_mode) flags = (flags & ~SHUTDOWN_SAVE) | SHUTDOWN_NOSAVE; - if (prepareForShutdown(flags) == C_OK) exit(0); + if (prepareForShutdown(flags) == C_OK) throw ShutdownException(); addReplyError(c,"Errors trying to SHUTDOWN. Check logs."); } diff --git a/src/server.cpp b/src/server.cpp index 15f52ab52..6fee88732 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1894,7 +1894,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* We received a SIGTERM, shutting down here in a safe way, as it is * not ok doing so inside the signal handler. */ if (g_pserver->shutdown_asap) { - if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0); + if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) throw ShutdownException(); serverLog(LL_WARNING,"SIGTERM received but errors trying to shut down the server, check the logs for more information"); g_pserver->shutdown_asap = 0; } @@ -3812,8 +3812,17 @@ int prepareForShutdown(int flags) { /* Close the listening sockets. Apparently this allows faster restarts. */ closeListeningSockets(1); + + for (int iel = 0; iel < cserver.cthreads; ++iel) + { + aePostFunction(g_pserver->rgthreadvar[iel].el, [iel]{ + g_pserver->rgthreadvar[iel].el->stop = 1; + }); + } + serverLog(LL_WARNING,"%s is now ready to exit, bye bye...", g_pserver->sentinel_mode ? "Sentinel" : "KeyDB"); + return C_OK; } @@ -5033,8 +5042,16 @@ void *workerThreadMain(void *parg) aeEventLoop *el = g_pserver->rgthreadvar[iel].el; aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE); - aeMain(el); + try + { + aeMain(el); + } + catch (ShutdownException) + { + } + serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); + return NULL; } @@ -5334,7 +5351,9 @@ int main(int argc, char **argv) { /* The main thread sleeps until all the workers are done. this is so that all worker threads are orthogonal in their startup/shutdown */ void *pvRet; - pthread_join(rgthread[IDX_EVENT_LOOP_MAIN], &pvRet); + for (int iel = 0; iel < cserver.cthreads; ++iel) + pthread_join(rgthread[iel], &pvRet); + return 0; } diff --git a/src/server.h b/src/server.h index 135a216ae..58ea3e620 100644 --- a/src/server.h +++ b/src/server.h @@ -2922,6 +2922,9 @@ inline int FCorrectThread(client *c) } #define AssertCorrectThread(c) serverAssert(FCorrectThread(c)) +class ShutdownException +{}; + #define redisDebug(fmt, ...) \ printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__) #define redisDebugMark() \ From d229b03b2c69b4f63bc569169452661d321ca2e6 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 28 Jan 2020 19:06:33 -0500 Subject: [PATCH 27/33] Support C++14 and remove dependency on future standard 2a Former-commit-id: 18496b62853738bf7bd48f65fe34aafcba8bbe0b --- src/Makefile | 2 +- src/config.cpp | 200 ++++++++++++++++++++++++--------------------- src/connection.cpp | 26 +++--- src/server.h | 2 +- 4 files changed, 122 insertions(+), 108 deletions(-) diff --git a/src/Makefile b/src/Makefile index b89f69b53..fd4cf9926 100644 --- a/src/Makefile +++ b/src/Makefile @@ -21,7 +21,7 @@ NODEPS:=clean distclean # Default settings STD=-std=c11 -pedantic -DREDIS_STATIC='' -CXX_STD=-std=c++2a -pedantic -fno-rtti -D__STDC_FORMAT_MACROS +CXX_STD=-std=c++14 -pedantic -fno-rtti -D__STDC_FORMAT_MACROS ifneq (,$(findstring clang,$(CC))) ifneq (,$(findstring FreeBSD,$(uname_S))) STD+=-Wno-c11-extensions diff --git a/src/config.cpp b/src/config.cpp index 442309a82..8f6cfbad5 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -123,7 +123,7 @@ clientBufferLimitsConfig clientBufferLimitsDefaults[CLIENT_TYPE_OBUF_COUNT] = { * rewrite. */ typedef struct boolConfigData { int *config; /* The pointer to the server config this value is stored in */ - const int default_value; /* The default value of the config on rewrite */ + int default_value; /* The default value of the config on rewrite */ int (*is_valid_fn)(int val, const char **err); /* Optional function to check validity of new value (generic doc above) */ int (*update_fn)(int val, int prev, const char **err); /* Optional function to apply new value at runtime (generic doc above) */ } boolConfigData; @@ -140,7 +140,7 @@ typedef struct stringConfigData { typedef struct enumConfigData { int *config; /* The pointer to the server config this value is stored in */ configEnum *enum_value; /* The underlying enum type this data represents */ - const int default_value; /* The default value of the config on rewrite */ + int default_value; /* The default value of the config on rewrite */ int (*is_valid_fn)(int val, const char **err); /* Optional function to check validity of new value (generic doc above) */ int (*update_fn)(int val, int prev, const char **err); /* Optional function to apply new value at runtime (generic doc above) */ } enumConfigData; @@ -162,7 +162,7 @@ typedef struct numericConfigData { int is_memory; /* Indicates if this value can be loaded as a memory value */ long long lower_bound; /* The lower bound of this numeric value */ long long upper_bound; /* The upper bound of this numeric value */ - const long long default_value; /* The default value of the config on rewrite */ + long long default_value; /* The default value of the config on rewrite */ int (*is_valid_fn)(long long val, const char **err); /* Optional function to check validity of new value (generic doc above) */ int (*update_fn)(long long val, long long prev, const char **err); /* Optional function to apply new value at runtime (generic doc above) */ numericType numeric_type; /* An enum indicating the type of this value */ @@ -1669,15 +1669,17 @@ static void boolConfigRewrite(typeData data, const char *name, struct rewriteCon rewriteConfigYesNoOption(state, name,*(data.yesno.config), data.yesno.default_value); } -#define createBoolConfig(name, alias, modifiable, config_addr, default, is_valid, update) { \ - embedCommonConfig(name, alias, modifiable) \ - {boolConfigInit, boolConfigLoad, boolConfigSet, boolConfigGet, boolConfigRewrite}, \ - { { /* .data.yesno */ \ - &(config_addr), \ - (default), \ - (is_valid), \ - (update), \ - } } \ +constexpr standardConfig createBoolConfig(const char *name, const char *alias, int modifiable, int &config_addr, int defaultValue, int (*is_valid)(int val, const char **err), int (*update)(int val, int prev, const char **err)) +{ + standardConfig conf = { + embedCommonConfig(name, alias, modifiable) + { boolConfigInit, boolConfigLoad, boolConfigSet, boolConfigGet, boolConfigRewrite } + }; + conf.data.yesno.config = &config_addr; + conf.data.yesno.default_value = defaultValue; + conf.data.yesno.is_valid_fn = is_valid; + conf.data.yesno.update_fn = update; + return conf; } /* String Configs */ @@ -1734,16 +1736,19 @@ static void stringConfigRewrite(typeData data, const char *name, struct rewriteC #define ALLOW_EMPTY_STRING 0 #define EMPTY_STRING_IS_NULL 1 -#define createStringConfig(name, alias, modifiable, empty_to_null, config_addr, default, is_valid, update) { \ - embedCommonConfig(name, alias, modifiable) \ - embedConfigInterface(stringConfigInit, stringConfigLoad, stringConfigSet, stringConfigGet, stringConfigRewrite) \ - { .string = { \ - &(config_addr), \ - (default), \ - (is_valid), \ - (update), \ - (empty_to_null), \ - } } \ +constexpr standardConfig createStringConfig(const char *name, const char *alias, int modifiable, int empty_to_null, char *&config_addr, const char *defaultValue, int (*is_valid)(char*,const char**), int (*update)(char*,char*,const char**)) { + standardConfig conf = { + embedCommonConfig(name, alias, modifiable) + embedConfigInterface(stringConfigInit, stringConfigLoad, stringConfigSet, stringConfigGet, stringConfigRewrite) + }; + conf.data.string = { + &(config_addr), + (defaultValue), + (is_valid), + (update), + (empty_to_null), + }; + return conf; } /* Enum configs */ @@ -1805,16 +1810,20 @@ static void configEnumRewrite(typeData data, const char *name, struct rewriteCon rewriteConfigEnumOption(state, name,*(data.enumd.config), data.enumd.enum_value, data.enumd.default_value); } -#define createEnumConfig(name, alias, modifiable, enum, config_addr, default, is_valid, update) { \ - embedCommonConfig(name, alias, modifiable) \ - embedConfigInterface(configEnumInit, configEnumLoad, configEnumSet, configEnumGet, configEnumRewrite) \ - { .enumd = { \ - &(config_addr), \ - (enum), \ - (default), \ - (is_valid), \ - (update), \ - } } \ +constexpr standardConfig createEnumConfig(const char *name, const char *alias, int modifiable, configEnum *enumVal, int &config_addr, int defaultValue, int (*is_valid)(int,const char**), int (*update)(int,int,const char**)) { + standardConfig c = { + embedCommonConfig(name, alias, modifiable) + embedConfigInterface(configEnumInit, configEnumLoad, configEnumSet, configEnumGet, configEnumRewrite) + }; + c.data.enumd = { + &(config_addr), + (enumVal), + (defaultValue), + (is_valid), + (update), + }; + + return c; } /* Gets a 'long long val' and sets it into the union, using a macro to get @@ -1986,85 +1995,90 @@ static void numericConfigRewrite(typeData data, const char *name, struct rewrite #define INTEGER_CONFIG 0 #define MEMORY_CONFIG 1 -#define embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) { \ - embedCommonConfig(name, alias, modifiable) \ - embedConfigInterface(numericConfigInit, numericConfigLoad, numericConfigSet, numericConfigGet, numericConfigRewrite) \ - { .numeric = { \ - .is_memory = (memory), \ - .lower_bound = (lower), \ - .upper_bound = (upper), \ - .default_value = (default), \ - .is_valid_fn = (is_valid), \ - .update_fn = (update), - -#define createIntConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - .numeric_type = NUMERIC_TYPE_INT, \ - .config { .i = &(config_addr) } \ - } } \ +constexpr standardConfig embedCommonNumericalConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) { + standardConfig conf = { + embedCommonConfig(name, alias, modifiable) + embedConfigInterface(numericConfigInit, numericConfigLoad, numericConfigSet, numericConfigGet, numericConfigRewrite) + }; + conf.data.numeric.is_memory = (memory); + conf.data.numeric.lower_bound = (lower); + conf.data.numeric.upper_bound = (upper); + conf.data.numeric.default_value = (defaultValue); + conf.data.numeric.is_valid_fn = (is_valid); + conf.data.numeric.update_fn = (update); + return conf; } -#define createUIntConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - .numeric_type = NUMERIC_TYPE_UINT, \ - .config { .ui = &(config_addr) } \ - } } \ +constexpr standardConfig createIntConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, int &config_addr, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) +{ + standardConfig conf = embedCommonNumericalConfig(name, alias, modifiable, lower, upper, defaultValue, memory, is_valid, update); + conf.data.numeric.numeric_type = NUMERIC_TYPE_INT; + conf.data.numeric.config.i = &config_addr; + return conf; } -#define createLongConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - .numeric_type = NUMERIC_TYPE_LONG, \ - .config { .l = &(config_addr) } \ - } } \ +constexpr standardConfig createUIntConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, unsigned int &config_addr, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) +{ + auto conf = embedCommonNumericalConfig(name, alias, modifiable, lower, upper, defaultValue, memory, is_valid, update); + conf.data.numeric.numeric_type = NUMERIC_TYPE_UINT; + conf.data.numeric.config.ui = &(config_addr); + return conf; } -#define createULongConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - .numeric_type = NUMERIC_TYPE_ULONG, \ - .config { .ul = &(config_addr) } \ - } } \ +constexpr standardConfig createLongConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, long &config_addr, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) { + auto conf = embedCommonNumericalConfig(name, alias, modifiable, lower, upper, defaultValue, memory, is_valid, update); + conf.data.numeric.numeric_type = NUMERIC_TYPE_LONG; + conf.data.numeric.config.l = &(config_addr); + return conf; } -#define createLongLongConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - .numeric_type = NUMERIC_TYPE_LONG_LONG, \ - .config { .ll = &(config_addr) } \ - } } \ +constexpr standardConfig createULongConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, unsigned long &config_addr, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) { + auto conf = embedCommonNumericalConfig(name, alias, modifiable, lower, upper, defaultValue, memory, is_valid, update); + conf.data.numeric.numeric_type = NUMERIC_TYPE_ULONG; + conf.data.numeric.config.ul = &(config_addr); + return conf; } -#define createULongLongConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - .numeric_type = NUMERIC_TYPE_ULONG_LONG, \ - .config { .ull = &(config_addr) } \ - } } \ +constexpr standardConfig createLongLongConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, long long &config_addr, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) { + auto conf = embedCommonNumericalConfig(name, alias, modifiable, lower, upper, defaultValue, memory, is_valid, update); + conf.data.numeric.numeric_type = NUMERIC_TYPE_LONG_LONG; + conf.data.numeric.config.ll = &(config_addr); + return conf; } -#define createSizeTConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - .numeric_type = NUMERIC_TYPE_SIZE_T, \ - .config { .st = &(config_addr) } \ - } } \ +constexpr standardConfig createULongLongConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, unsigned long long &config_addr, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) { + auto conf = embedCommonNumericalConfig(name, alias, modifiable, lower, upper, defaultValue, memory, is_valid, update); + conf.data.numeric.numeric_type = NUMERIC_TYPE_ULONG_LONG; + conf.data.numeric.config.ull = &(config_addr); + return conf; } -#define createSSizeTConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - .numeric_type = NUMERIC_TYPE_SSIZE_T, \ - .config { .sst = &(config_addr) } \ - } } \ +constexpr standardConfig createSizeTConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, size_t &config_addr, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) { + auto conf = embedCommonNumericalConfig(name, alias, modifiable, lower, upper, defaultValue, memory, is_valid, update); + conf.data.numeric.numeric_type = NUMERIC_TYPE_SIZE_T; + conf.data.numeric.config.st = &(config_addr); + return conf; } -#define createTimeTConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - .numeric_type = NUMERIC_TYPE_TIME_T, \ - .config { .tt = &(config_addr) } \ - } } \ +constexpr standardConfig createSSizeTConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, ssize_t &config_addr, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) { + auto conf = embedCommonNumericalConfig(name, alias, modifiable, lower, upper, defaultValue, memory, is_valid, update); + conf.data.numeric.numeric_type = NUMERIC_TYPE_SSIZE_T; + conf.data.numeric.config.sst = &(config_addr); + return conf; } -#define createOffTConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - embedCommonNumericalConfig(name, alias, modifiable, lower, upper, config_addr, default, memory, is_valid, update) \ - .numeric_type = NUMERIC_TYPE_OFF_T, \ - .config { .ot = &(config_addr) } \ - } } \ +constexpr standardConfig createTimeTConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, time_t &config_addr, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) { + auto conf = embedCommonNumericalConfig(name, alias, modifiable, lower, upper, defaultValue, memory, is_valid, update); + conf.data.numeric.numeric_type = NUMERIC_TYPE_TIME_T; + conf.data.numeric.config.tt = &(config_addr); + return conf; +} + +constexpr standardConfig createOffTConfig(const char *name, const char *alias, int modifiable, long long lower, long long upper, off_t &config_addr, long long defaultValue, int memory, int (*is_valid)(long long, const char**), int (*update)(long long, long long, const char**)) { + auto conf = embedCommonNumericalConfig(name, alias, modifiable, lower, upper, defaultValue, memory, is_valid, update); + conf.data.numeric.numeric_type = NUMERIC_TYPE_OFF_T; + conf.data.numeric.config.ot = &(config_addr); + return conf; } static int isValidActiveDefrag(int val, const char **err) { diff --git a/src/connection.cpp b/src/connection.cpp index b819c982a..fe987e3f0 100644 --- a/src/connection.cpp +++ b/src/connection.cpp @@ -330,19 +330,19 @@ static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, ConnectionType CT_Socket = { - .ae_handler = connSocketEventHandler, - .connect = connSocketConnect, - .write = connSocketWrite, - .read = connSocketRead, - .close = connSocketClose, - .accept = connSocketAccept, - .set_write_handler = connSocketSetWriteHandler, - .set_read_handler = connSocketSetReadHandler, - .get_last_error = connSocketGetLastError, - .blocking_connect = connSocketBlockingConnect, - .sync_write = connSocketSyncWrite, - .sync_read = connSocketSyncRead, - .sync_readline = connSocketSyncReadLine + connSocketEventHandler, + connSocketConnect, + connSocketWrite, + connSocketRead, + connSocketClose, + connSocketAccept, + connSocketSetWriteHandler, + connSocketSetReadHandler, + connSocketGetLastError, + connSocketBlockingConnect, + connSocketSyncWrite, + connSocketSyncRead, + connSocketSyncReadLine }; diff --git a/src/server.h b/src/server.h index d5cb48c92..0c7f6cc1a 100644 --- a/src/server.h +++ b/src/server.h @@ -241,7 +241,7 @@ public: #define C_ERR -1 /* Static server configuration */ -#define CONFIG_DEFAULT_HZ 20 /* Time interrupt calls/sec. */ +#define CONFIG_DEFAULT_HZ 10 /* Time interrupt calls/sec. */ #define CONFIG_MIN_HZ 1 #define CONFIG_MAX_HZ 500 #define MAX_CLIENTS_PER_CLOCK_TICK 200 /* HZ is adapted based on that. */ From 0adc93ab41bd3ebc2477e06dab14b1cf9beeb64e Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Feb 2020 18:57:13 -0500 Subject: [PATCH 28/33] processEventsWhileBlocked not exception safe Former-commit-id: 432543956c74351a12af2cb258a7a0ba5daa9b27 --- src/networking.cpp | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 3306844da..ec827b22e 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3191,6 +3191,7 @@ void unpauseClientsIfNecessary() * * The function returns the total number of events processed. */ int processEventsWhileBlocked(int iel) { + serverAssert(GlobalLocksAcquired()); int iterations = 4; /* See the function top-comment. */ int count = 0; @@ -3202,12 +3203,25 @@ int processEventsWhileBlocked(int iel) { } aeReleaseLock(); serverAssertDebug(!GlobalLocksAcquired()); - while (iterations--) { - int events = 0; - events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(iel); - if (!events) break; - count += events; + try + { + while (iterations--) { + int events = 0; + events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); + events += handleClientsWithPendingWrites(iel); + if (!events) break; + count += events; + } + } + catch (...) + { + // Caller expects us to be locked so fix and rethrow + AeLocker locker; + if (c != nullptr) + c->lock.lock(); + locker.arm(c); + locker.release(); + throw; } AeLocker locker; if (c != nullptr) From 4b12a63b3a74089c5d41e0247a2c308ccba6835a Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Feb 2020 19:00:31 -0500 Subject: [PATCH 29/33] Fix race condition with IStorage batches Former-commit-id: a2eabd044c9048662a63ff0f980ed947dc145912 --- src/db.cpp | 2 -- src/server.h | 1 - 2 files changed, 3 deletions(-) diff --git a/src/db.cpp b/src/db.cpp index d1f9758ea..9adf8cd11 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -2265,7 +2265,6 @@ redisDbPersistentData::changelist redisDbPersistentData::processChanges() serverAssert(m_fTrackingChanges >= 0); changelist vecRet; - fastlock_lock(&m_lockStorage); if (m_spstorage != nullptr) { m_spstorage->beginWriteBatch(); @@ -2305,7 +2304,6 @@ void redisDbPersistentData::commitChanges(const changelist &vec) } if (m_spstorage != nullptr) m_spstorage->endWriteBatch(); - fastlock_unlock(&m_lockStorage); } redisDbPersistentData::~redisDbPersistentData() diff --git a/src/server.h b/src/server.h index 28dcf509e..8a4ef3d22 100644 --- a/src/server.h +++ b/src/server.h @@ -1358,7 +1358,6 @@ private: std::unique_ptr m_spdbSnapshotHOLDER; const redisDbPersistentDataSnapshot *m_pdbSnapshotASYNC = nullptr; int m_refCount = 0; - fastlock m_lockStorage { "storage" }; }; class redisDbPersistentDataSnapshot : protected redisDbPersistentData From c810823abed5967e17c1e12b45329c20248c5821 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Feb 2020 18:57:13 -0500 Subject: [PATCH 30/33] processEventsWhileBlocked not exception safe Former-commit-id: 1ef187533c26bfa0c084a815b8b80de92ba1cf0b --- src/networking.cpp | 29 +++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 9d94f5171..803edb5ac 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3065,6 +3065,7 @@ void unpauseClientsIfNecessary() * * The function returns the total number of events processed. */ int processEventsWhileBlocked(int iel) { + serverAssert(GlobalLocksAcquired()); int iterations = 4; /* See the function top-comment. */ int count = 0; @@ -3074,14 +3075,30 @@ int processEventsWhileBlocked(int iel) { serverAssert(c->flags & CLIENT_PROTECTED); c->lock.unlock(); } + aeReleaseLock(); - while (iterations--) { - int events = 0; - events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(iel); - if (!events) break; - count += events; + serverAssertDebug(!GlobalLocksAcquired()); + try + { + while (iterations--) { + int events = 0; + events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); + events += handleClientsWithPendingWrites(iel); + if (!events) break; + count += events; + } } + catch (...) + { + // Caller expects us to be locked so fix and rethrow + AeLocker locker; + if (c != nullptr) + c->lock.lock(); + locker.arm(c); + locker.release(); + throw; + } + AeLocker locker; if (c != nullptr) c->lock.lock(); From 2c28a3f3d779e05017e5d9dc53acf945adab3abf Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Feb 2020 18:57:13 -0500 Subject: [PATCH 31/33] processEventsWhileBlocked not exception safe Former-commit-id: 45b711f9d124a948a66e667992176b5387943512 --- src/networking.cpp | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index bbb24495c..242e69798 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3183,6 +3183,7 @@ void unpauseClientsIfNecessary() * * The function returns the total number of events processed. */ int processEventsWhileBlocked(int iel) { + serverAssert(GlobalLocksAcquired()); int iterations = 4; /* See the function top-comment. */ int count = 0; @@ -3194,12 +3195,25 @@ int processEventsWhileBlocked(int iel) { } aeReleaseLock(); serverAssertDebug(!GlobalLocksAcquired()); - while (iterations--) { - int events = 0; - events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(iel); - if (!events) break; - count += events; + try + { + while (iterations--) { + int events = 0; + events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); + events += handleClientsWithPendingWrites(iel); + if (!events) break; + count += events; + } + } + catch (...) + { + // Caller expects us to be locked so fix and rethrow + AeLocker locker; + if (c != nullptr) + c->lock.lock(); + locker.arm(c); + locker.release(); + throw; } AeLocker locker; if (c != nullptr) From 950cda8ea565ecb2c3eead85856148577f7f8ae1 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Feb 2020 19:07:13 -0500 Subject: [PATCH 32/33] Incorrect use of std::atomic Former-commit-id: 5dc84850addff084c95f9280718fcb1d6fee3b82 --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index e23e5a54d..12f388023 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1780,7 +1780,7 @@ void updateCachedTime(int update_daylight_info) { t /= 1000; __atomic_store(&g_pserver->mstime, &t, __ATOMIC_RELAXED); t /= 1000; - __atomic_store(&g_pserver->unixtime, &t, __ATOMIC_RELAXED); + g_pserver->unixtime = t; /* To get information about daylight saving time, we need to call * localtime_r and cache the result. However calling localtime_r in this From 1d368edc10610851e56e8bb5b6e239d1ca5a7fcf Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 17 Feb 2020 19:54:05 -0500 Subject: [PATCH 33/33] Add double unlock detection and improve fastlock_unlock assmebly Former-commit-id: 98aefac09b6b59371e6c1c77d1ef2794bfc5ae62 --- src/fastlock.cpp | 56 ++++++++++++++++++++++++-------------------- src/fastlock_x64.asm | 32 +++++++------------------ src/networking.cpp | 1 - 3 files changed, 39 insertions(+), 50 deletions(-) diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 19375cd0e..4f8b2e6dc 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -74,6 +74,10 @@ extern int g_fInCrash; #define __has_feature(x) 0 #endif +#ifdef __linux__ +extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex); +#endif + #if __has_feature(thread_sanitizer) /* Report that a lock has been created at address "lock". */ @@ -206,6 +210,11 @@ DeadlockDetector g_dlock; static_assert(sizeof(pid_t) <= sizeof(fastlock::m_pidOwner), "fastlock::m_pidOwner not large enough"); uint64_t g_longwaits = 0; +extern "C" void fastlock_panic(struct fastlock *lock) +{ + _serverPanic(__FILE__, __LINE__, "fastlock lock/unlock mismatch for: %s", lock->szName); +} + uint64_t fastlock_getlongwaitcount() { uint64_t rval; @@ -337,31 +346,6 @@ extern "C" int fastlock_trylock(struct fastlock *lock, int fWeak) return false; } -#ifdef __linux__ -#define ROL32(v, shift) ((v << shift) | (v >> (32-shift))) -void unlock_futex(struct fastlock *lock, uint16_t ifutex) -{ - unsigned mask = (1U << (ifutex % 32)); - unsigned futexT; - __atomic_load(&lock->futex, &futexT, __ATOMIC_RELAXED); - futexT &= mask; - - if (futexT == 0) - return; - - for (;;) - { - __atomic_load(&lock->futex, &futexT, __ATOMIC_ACQUIRE); - futexT &= mask; - if (!futexT) - break; - - if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1) - break; - } -} -#endif - extern "C" void fastlock_unlock(struct fastlock *lock) { --lock->m_depth; @@ -384,6 +368,26 @@ extern "C" void fastlock_unlock(struct fastlock *lock) } #endif +#ifdef __linux__ +#define ROL32(v, shift) ((v << shift) | (v >> (32-shift))) +extern "C" void unlock_futex(struct fastlock *lock, uint16_t ifutex) +{ + unsigned mask = (1U << (ifutex % 32)); + unsigned futexT; + + for (;;) + { + __atomic_load(&lock->futex, &futexT, __ATOMIC_ACQUIRE); + futexT &= mask; + if (!futexT) + break; + + if (futex(&lock->m_ticket.u, FUTEX_WAKE_BITSET_PRIVATE, INT_MAX, nullptr, mask) == 1) + break; + } +} +#endif + extern "C" void fastlock_free(struct fastlock *lock) { // NOP @@ -413,4 +417,4 @@ void fastlock_lock_recursive(struct fastlock *lock, int nesting) { fastlock_lock(lock); lock->m_depth = nesting; -} \ No newline at end of file +} diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index 7c9990a6d..bcea0e095 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -126,6 +126,7 @@ fastlock_trylock: .ALIGN 16 .global fastlock_unlock +.type fastlock_unlock,@function fastlock_unlock: # RDI points to the struct: # int32_t m_pidOwner @@ -133,34 +134,19 @@ fastlock_unlock: # [rdi+64] ... # uint16_t active # uint16_t avail - push r11 sub dword ptr [rdi+4], 1 # decrement m_depth, don't use dec because it partially writes the flag register and we don't know its state jnz .LDone # if depth is non-zero this is a recursive unlock, and we still hold it mov dword ptr [rdi], -1 # pidOwner = -1 (we don't own it anymore) - mov ecx, [rdi+64] # get current active (this one) - inc ecx # bump it to the next thread - mov [rdi+64], cx # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) + mov esi, [rdi+64] # get current active (this one) + inc esi # bump it to the next thread + mov word ptr [rdi+64], si # give up our ticket (note: lock is not required here because the spinlock itself guards this variable) mfence # sync other threads # At this point the lock is removed, however we must wake up any pending futexs - mov r9d, 1 # eax is the bitmask for 2 threads - rol r9d, cl # place the mask in the right spot for the next 2 threads - add rdi, 64 # rdi now points to the token + mov edx, [rdi+64+4] # load the futex mask + bt edx, esi # is the next thread waiting on a futex? + jc unlock_futex # unlock the futex if necessary + ret # if not we're done. .ALIGN 16 -.LRetryWake: - mov r11d, [rdi+4] # load the futex mask - and r11d, r9d # are any threads waiting on a futex? - jz .LDone # if not we're done. - # we have to wake the futexs - # rdi ARG1 futex (already in rdi) - mov esi, (10 | 128) # rsi ARG2 FUTEX_WAKE_BITSET_PRIVATE - mov edx, 0x7fffffff # rdx ARG3 INT_MAX (number of threads to wake) - xor r10d, r10d # r10 ARG4 NULL - mov r8, rdi # r8 ARG5 dup rdi - # r9 ARG6 mask (already set above) - mov eax, 202 # sys_futex - syscall - cmp eax, 1 # did we wake as many as we expected? - jnz .LRetryWake .LDone: - pop r11 + js fastlock_panic # panic if we made m_depth negative ret diff --git a/src/networking.cpp b/src/networking.cpp index 803edb5ac..00322df72 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -3077,7 +3077,6 @@ int processEventsWhileBlocked(int iel) { } aeReleaseLock(); - serverAssertDebug(!GlobalLocksAcquired()); try { while (iterations--) {