From 41f26be34a9df652f8d84d1ed5fe7a28b7ff0cba Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 4 Feb 2020 00:30:13 -0500 Subject: [PATCH 1/4] rename to pro binary Former-commit-id: a8854bfd83de72d6aa418ee6d9b44fae1f622787 --- .gitignore | 2 +- README.md | 12 +++++------ deps/hiredis/Makefile | 4 ++-- keydb.conf | 2 +- src/Makefile | 2 +- src/debug.cpp | 2 +- src/server.cpp | 24 +++++++++++----------- tests/cluster/tests/06-slave-stop-cond.tcl | 2 +- utils/install_server.sh | 4 ++-- utils/redis_init_script | 2 +- utils/speed-regression.tcl | 4 ++-- 11 files changed, 30 insertions(+), 30 deletions(-) diff --git a/.gitignore b/.gitignore index c54671d59..a2d8033da 100644 --- a/.gitignore +++ b/.gitignore @@ -19,7 +19,7 @@ keydb-cli redis-sentinel keydb-sentinel redis-server -keydb-server +keydb-pro-server doc-tools release misc/* diff --git a/README.md b/README.md index c06a32d6c..e081ed024 100644 --- a/README.md +++ b/README.md @@ -158,19 +158,19 @@ Running KeyDB To run KeyDB with the default configuration just type: % cd src - % ./keydb-server + % ./keydb-pro-server If you want to provide your keydb.conf, you have to run it using an additional parameter (the path of the configuration file): % cd src - % ./keydb-server /path/to/keydb.conf + % ./keydb-pro-server /path/to/keydb.conf It is possible to alter the KeyDB configuration by passing parameters directly as options using the command line. Examples: - % ./keydb-server --port 9999 --replicaof 127.0.0.1 6379 - % ./keydb-server /etc/keydb/6379.conf --loglevel debug + % ./keydb-pro-server --port 9999 --replicaof 127.0.0.1 6379 + % ./keydb-pro-server /etc/keydb/6379.conf --loglevel debug All the options in keydb.conf are also supported as options using the command line, with exactly the same name. @@ -178,7 +178,7 @@ line, with exactly the same name. Playing with KeyDB ------------------ -You can use keydb-cli to play with KeyDB. Start a keydb-server instance, +You can use keydb-cli to play with KeyDB. Start a keydb-pro-server instance, then in another terminal try the following: % cd src @@ -243,7 +243,7 @@ Simply make a directory you would like to have the latest binaries dumped in, th ``` $ docker run -it --rm -v /path-to-dump-binaries:/keydb_bin eqalpha/keydb-build-bin ``` -You should receive the following files: keydb-benchmark, keydb-check-aof, keydb-check-rdb, keydb-cli, keydb-sentinel, keydb-server +You should receive the following files: keydb-benchmark, keydb-check-aof, keydb-check-rdb, keydb-cli, keydb-sentinel, keydb-pro-server If you are looking to enable flash support with the build (make MALLOC=memkind) then use the following command: ``` diff --git a/deps/hiredis/Makefile b/deps/hiredis/Makefile index 841990d5c..c231a9b93 100644 --- a/deps/hiredis/Makefile +++ b/deps/hiredis/Makefile @@ -29,9 +29,9 @@ INSTALL_INCLUDE_PATH= $(DESTDIR)$(PREFIX)/$(INCLUDE_PATH) INSTALL_LIBRARY_PATH= $(DESTDIR)$(PREFIX)/$(LIBRARY_PATH) INSTALL_PKGCONF_PATH= $(INSTALL_LIBRARY_PATH)/$(PKGCONF_PATH) -# keydb-server configuration used for testing +# keydb-pro-server configuration used for testing REDIS_PORT=56379 -REDIS_SERVER=keydb-server +REDIS_SERVER=keydb-pro-server define REDIS_TEST_CONFIG daemonize yes pidfile /tmp/hiredis-test-redis.pid diff --git a/keydb.conf b/keydb.conf index 896528241..32442744c 100644 --- a/keydb.conf +++ b/keydb.conf @@ -3,7 +3,7 @@ # Note that in order to read the configuration file, KeyDB must be # started with the file path as first argument: # -# ./keydb-server /path/to/keydb.conf +# ./keydb-pro-server /path/to/keydb.conf # Note on units: when memory size is needed, it is possible to specify # it in the usual form of 1k 5GB 4M and so forth: diff --git a/src/Makefile b/src/Makefile index 7548c6a1d..45c70f6d6 100644 --- a/src/Makefile +++ b/src/Makefile @@ -319,7 +319,7 @@ else endif @touch $@ -# keydb-server +# keydb-pro-server $(REDIS_SERVER_NAME): $(REDIS_SERVER_OBJ) $(REDIS_LD) -o $@ $^ ../deps/hiredis/libhiredis.a ../deps/lua/src/liblua.a ../deps/rocksdb/librocksdb.a $(FINAL_LIBS) diff --git a/src/debug.cpp b/src/debug.cpp index 99e7a0950..b054ed403 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -1542,7 +1542,7 @@ void sigsegvHandler(int sig, siginfo_t *info, void *secret) { "\n=== KEYDB BUG REPORT END. Make sure to include from START to END. ===\n\n" " Please report the crash by opening an issue on github:\n\n" " https://github.com/JohnSully/KeyDB/issues\n\n" -" Suspect RAM error? Use keydb-server --test-memory to verify it.\n\n" +" Suspect RAM error? Use keydb-pro-server --test-memory to verify it.\n\n" ); /* free(messages); Don't call free() with possibly corrupted memory. */ diff --git a/src/server.cpp b/src/server.cpp index 5c6a2873f..382d98c08 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4969,19 +4969,19 @@ void version(void) { } void usage(void) { - fprintf(stderr,"Usage: ./keydb-server [/path/to/keydb.conf] [options]\n"); - fprintf(stderr," ./keydb-server - (read config from stdin)\n"); - fprintf(stderr," ./keydb-server -v or --version\n"); - fprintf(stderr," ./keydb-server -h or --help\n"); - fprintf(stderr," ./keydb-server --test-memory \n\n"); + fprintf(stderr,"Usage: ./keydb-pro-server [/path/to/keydb.conf] [options]\n"); + fprintf(stderr," ./keydb-pro-server - (read config from stdin)\n"); + fprintf(stderr," ./keydb-pro-server -v or --version\n"); + fprintf(stderr," ./keydb-pro-server -h or --help\n"); + fprintf(stderr," ./keydb-pro-server --test-memory \n\n"); fprintf(stderr,"Examples:\n"); - fprintf(stderr," ./keydb-server (run the server with default conf)\n"); - fprintf(stderr," ./keydb-server /etc/redis/6379.conf\n"); - fprintf(stderr," ./keydb-server --port 7777\n"); - fprintf(stderr," ./keydb-server --port 7777 --replicaof 127.0.0.1 8888\n"); - fprintf(stderr," ./keydb-server /etc/mykeydb.conf --loglevel verbose\n\n"); + fprintf(stderr," ./keydb-pro-server (run the server with default conf)\n"); + fprintf(stderr," ./keydb-pro-server /etc/redis/6379.conf\n"); + fprintf(stderr," ./keydb-pro-server --port 7777\n"); + fprintf(stderr," ./keydb-pro-server --port 7777 --replicaof 127.0.0.1 8888\n"); + fprintf(stderr," ./keydb-pro-server /etc/mykeydb.conf --loglevel verbose\n\n"); fprintf(stderr,"Sentinel mode:\n"); - fprintf(stderr," ./keydb-server /etc/sentinel.conf --sentinel\n"); + fprintf(stderr," ./keydb-pro-server /etc/sentinel.conf --sentinel\n"); exit(1); } @@ -5475,7 +5475,7 @@ int main(int argc, char **argv) { exit(0); } else { fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n"); - fprintf(stderr,"Example: ./keydb-server --test-memory 4096\n\n"); + fprintf(stderr,"Example: ./keydb-pro-server --test-memory 4096\n\n"); exit(1); } } diff --git a/tests/cluster/tests/06-slave-stop-cond.tcl b/tests/cluster/tests/06-slave-stop-cond.tcl index f2e67050b..52110856d 100644 --- a/tests/cluster/tests/06-slave-stop-cond.tcl +++ b/tests/cluster/tests/06-slave-stop-cond.tcl @@ -65,7 +65,7 @@ test "Slave #5 is reachable and alive" { test "Slave #5 should not be able to failover" { after 10000 - assert {[RI 5 role] eq {slave}} + assert_equal {slave} [RI 5 role] } test "Cluster should be down" { diff --git a/utils/install_server.sh b/utils/install_server.sh index a1574a2fa..2e3ee9d4a 100755 --- a/utils/install_server.sh +++ b/utils/install_server.sh @@ -37,7 +37,7 @@ # REDIS_CONFIG_FILE=/etc/redis/1234.conf \ # REDIS_LOG_FILE=/var/log/redis_1234.log \ # REDIS_DATA_DIR=/var/lib/redis/1234 \ -# REDIS_EXECUTABLE=`command -v keydb-server` ./utils/install_server.sh +# REDIS_EXECUTABLE=`command -v keydb-pro-server` ./utils/install_server.sh # # This generates a redis config file and an /etc/init.d script, and installs them. # @@ -129,7 +129,7 @@ fi if [ ! -x "$REDIS_EXECUTABLE" ] ; then _MANUAL_EXECUTION=true #get the redis executable path - _REDIS_EXECUTABLE=`command -v keydb-server` + _REDIS_EXECUTABLE=`command -v keydb-pro-server` read -p "Please select the redis executable path [$_REDIS_EXECUTABLE] " REDIS_EXECUTABLE if [ ! -x "$REDIS_EXECUTABLE" ] ; then REDIS_EXECUTABLE=$_REDIS_EXECUTABLE diff --git a/utils/redis_init_script b/utils/redis_init_script index bee5545ef..589792ee3 100755 --- a/utils/redis_init_script +++ b/utils/redis_init_script @@ -12,7 +12,7 @@ ### END INIT INFO REDISPORT=6379 -EXEC=/usr/local/bin/keydb-server +EXEC=/usr/local/bin/keydb-pro-server CLIEXEC=/usr/local/bin/keydb-cli PIDFILE=/var/run/redis_${REDISPORT}.pid diff --git a/utils/speed-regression.tcl b/utils/speed-regression.tcl index 8d5220c75..073f01a19 100755 --- a/utils/speed-regression.tcl +++ b/utils/speed-regression.tcl @@ -27,8 +27,8 @@ proc run-tests branches { } # Start the Redis server - puts " starting the server... [exec ./keydb-server -v]" - set pids [exec echo "port $::port\nloglevel warning\n" | ./keydb-server - > /dev/null 2> /dev/null &] + puts " starting the server... [exec ./keydb-pro-server -v]" + set pids [exec echo "port $::port\nloglevel warning\n" | ./keydb-pro-server - > /dev/null 2> /dev/null &] puts " pids: $pids" after 1000 puts " running the benchmark" From 6bb1429a460b42eba6ed1ac4ba0f13ba7298076a Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 4 Feb 2020 01:22:25 -0500 Subject: [PATCH 2/4] module fixes Former-commit-id: ef4e11ecb8a6f1a05bb21f014120b0ef9e771b60 --- src/module.cpp | 83 ++++++++++++++++++++++++++++++++++++---------- src/networking.cpp | 42 +++++++++++++++++------ src/server.h | 3 ++ 3 files changed, 100 insertions(+), 28 deletions(-) diff --git a/src/module.cpp b/src/module.cpp index c9cc29bfd..b93ad36ad 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -30,6 +30,7 @@ #include "server.h" #include "cluster.h" #include "rdb.h" +#include "aelocker.h" #include #include #include @@ -1276,7 +1277,10 @@ client *moduleGetReplyClient(RedisModuleCtx *ctx) { int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyLongLong(c,ll); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyLongLongAsync(c,ll); return REDISMODULE_OK; } @@ -1286,9 +1290,12 @@ int RM_ReplyWithLongLong(RedisModuleCtx *ctx, long long ll) { int replyWithStatus(RedisModuleCtx *ctx, const char *msg, const char *prefix) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyProto(c,prefix,strlen(prefix)); - addReplyProto(c,msg,strlen(msg)); - addReplyProto(c,"\r\n",2); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyProtoAsync(c,prefix,strlen(prefix)); + addReplyProtoAsync(c,msg,strlen(msg)); + addReplyProtoAsync(c,"\r\n",2); return REDISMODULE_OK; } @@ -1332,15 +1339,19 @@ int RM_ReplyWithSimpleString(RedisModuleCtx *ctx, const char *msg) { * The function always returns REDISMODULE_OK. */ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { client *c = moduleGetReplyClient(ctx); + AeLocker locker; + if (c == NULL) return REDISMODULE_OK; + std::unique_lock lock(c->lock); + locker.arm(c); if (len == REDISMODULE_POSTPONED_ARRAY_LEN) { ctx->postponed_arrays = (void**)zrealloc(ctx->postponed_arrays,sizeof(void*)* (ctx->postponed_arrays_count+1), MALLOC_LOCAL); ctx->postponed_arrays[ctx->postponed_arrays_count] = - addReplyDeferredLen(c); + addReplyDeferredLenAsync(c); ctx->postponed_arrays_count++; } else { - addReplyArrayLen(c,len); + addReplyArrayLenAsync(c,len); } return REDISMODULE_OK; } @@ -1352,7 +1363,10 @@ int RM_ReplyWithArray(RedisModuleCtx *ctx, long len) { int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyNullArray(c); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyNullArrayAsync(c); return REDISMODULE_OK; } @@ -1362,7 +1376,10 @@ int RM_ReplyWithNullArray(RedisModuleCtx *ctx) { int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReply(c,shared.emptyarray); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyAsync(c,shared.emptyarray); return REDISMODULE_OK; } @@ -1395,6 +1412,9 @@ int RM_ReplyWithEmptyArray(RedisModuleCtx *ctx) { void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return; + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); if (ctx->postponed_arrays_count == 0) { serverLog(LL_WARNING, "API misuse detected in module %s: " @@ -1404,7 +1424,7 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { return; } ctx->postponed_arrays_count--; - setDeferredArrayLen(c, + setDeferredArrayLenAsync(c, ctx->postponed_arrays[ctx->postponed_arrays_count], len); if (ctx->postponed_arrays_count == 0) { @@ -1419,7 +1439,10 @@ void RM_ReplySetArrayLength(RedisModuleCtx *ctx, long len) { int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCBuffer(c,(char*)buf,len); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkCBufferAsync(c,(char*)buf,len); return REDISMODULE_OK; } @@ -1430,7 +1453,10 @@ int RM_ReplyWithStringBuffer(RedisModuleCtx *ctx, const char *buf, size_t len) { int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulkCString(c,(char*)buf); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkCStringAsync(c,(char*)buf); return REDISMODULE_OK; } @@ -1440,7 +1466,10 @@ int RM_ReplyWithCString(RedisModuleCtx *ctx, const char *buf) { int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyBulk(c,str); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyBulkAsync(c,str); return REDISMODULE_OK; } @@ -1450,7 +1479,10 @@ int RM_ReplyWithString(RedisModuleCtx *ctx, RedisModuleString *str) { int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReply(c,shared.emptybulk); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyAsync(c,shared.emptybulk); return REDISMODULE_OK; } @@ -1461,7 +1493,10 @@ int RM_ReplyWithEmptyString(RedisModuleCtx *ctx) { int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyVerbatim(c, buf, len, "txt"); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyVerbatimAsync(c, buf, len, "txt"); return REDISMODULE_OK; } @@ -1471,7 +1506,10 @@ int RM_ReplyWithVerbatimString(RedisModuleCtx *ctx, const char *buf, size_t len) int RM_ReplyWithNull(RedisModuleCtx *ctx) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyNull(c); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyNullAsync(c); return REDISMODULE_OK; } @@ -1484,8 +1522,11 @@ int RM_ReplyWithNull(RedisModuleCtx *ctx) { int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); sds proto = sdsnewlen(reply->proto, reply->protolen); - addReplySds(c,proto); + addReplySdsAsync(c,proto); return REDISMODULE_OK; } @@ -1498,7 +1539,10 @@ int RM_ReplyWithCallReply(RedisModuleCtx *ctx, RedisModuleCallReply *reply) { int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyDouble(c,d); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyDoubleAsync(c,d); return REDISMODULE_OK; } @@ -1513,7 +1557,10 @@ int RM_ReplyWithDouble(RedisModuleCtx *ctx, double d) { int RM_ReplyWithLongDouble(RedisModuleCtx *ctx, long double ld) { client *c = moduleGetReplyClient(ctx); if (c == NULL) return REDISMODULE_OK; - addReplyHumanLongDouble(c, ld); + AeLocker locker; + std::unique_lock lock(c->lock); + locker.arm(c); + addReplyHumanLongDoubleAsync(c, ld); return REDISMODULE_OK; } diff --git a/src/networking.cpp b/src/networking.cpp index 8624c4e2c..746e48628 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -661,23 +661,33 @@ void addReplyDoubleAsync(client *c, double d) { addReplyDoubleCore(c, d, true); } +void addReplyBulkCore(client *c, robj_roptr obj, bool fAsync); + /* Add a long double as a bulk reply, but uses a human readable formatting * of the double instead of exposing the crude behavior of doubles to the * dear user. */ -void addReplyHumanLongDouble(client *c, long double d) { +void addReplyHumanLongDoubleCore(client *c, long double d, bool fAsync) { if (c->resp == 2) { robj *o = createStringObjectFromLongDouble(d,1); - addReplyBulk(c,o); + addReplyBulkCore(c,o,fAsync); decrRefCount(o); } else { char buf[MAX_LONG_DOUBLE_CHARS]; int len = ld2string(buf,sizeof(buf),d,LD_STR_HUMAN); - addReplyProto(c,",",1); - addReplyProto(c,buf,len); - addReplyProto(c,"\r\n",2); + addReplyProtoCore(c,",",1,fAsync); + addReplyProtoCore(c,buf,len,fAsync); + addReplyProtoCore(c,"\r\n",2,fAsync); } } +void addReplyHumanLongDouble(client *c, long double d) { + addReplyHumanLongDoubleCore(c, d, false); +} + +void addReplyHumanLongDoubleAsync(client *c, long double d) { + addReplyHumanLongDoubleCore(c, d, true); +} + /* Add a long long as integer reply or bulk len / multi bulk count. * Basically this is used to output . */ void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync) { @@ -912,6 +922,10 @@ void addReplyBulkCString(client *c, const char *s) { addReplyBulkCStringCore(c, s, false); } +void addReplyBulkCStringAsync(client *c, const char *s) { + addReplyBulkCStringCore(c, s, true); +} + /* Add a long long as a bulk reply */ void addReplyBulkLongLong(client *c, long long ll) { char buf[64]; @@ -930,9 +944,9 @@ void addReplyBulkLongLong(client *c, long long ll) { * three first characters of the extension are used, and if the * provided one is shorter than that, the remaining is filled with * spaces. */ -void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { +void addReplyVerbatimCore(client *c, const char *s, size_t len, const char *ext, bool fAsync) { if (c->resp == 2) { - addReplyBulkCBuffer(c,s,len); + addReplyBulkCBufferCore(c,s,len,fAsync); } else { char buf[32]; size_t preflen = snprintf(buf,sizeof(buf),"=%zu\r\nxxx:",len+4); @@ -944,12 +958,20 @@ void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { p[i] = *ext++; } } - addReplyProto(c,buf,preflen); - addReplyProto(c,s,len); - addReplyProto(c,"\r\n",2); + addReplyProtoCore(c,buf,preflen,fAsync); + addReplyProtoCore(c,s,len,fAsync); + addReplyProtoCore(c,"\r\n",2,fAsync); } } +void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext) { + addReplyVerbatimCore(c, s, len, ext, false); +} + +void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext) { + addReplyVerbatimCore(c, s, len, ext, true); +} + /* Add an array of C strings as status replies with a heading. * This function is typically invoked by from commands that support * subcommands in response to the 'help' subcommand. The help array diff --git a/src/server.h b/src/server.h index c3a59db8a..8d976253d 100644 --- a/src/server.h +++ b/src/server.h @@ -2121,10 +2121,12 @@ void addReplyNullArray(client *c); void addReplyNullArrayAsync(client *c); void addReplyBool(client *c, int b); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); +void addReplyVerbatimAsync(client *c, const char *s, size_t len, const char *ext); void addReplyProto(client *c, const char *s, size_t len); void addReplyBulk(client *c, robj_roptr obj); void AddReplyFromClient(client *c, client *src); void addReplyBulkCString(client *c, const char *s); +void addReplyBulkCStringAsync(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); void addReplyBulkLongLong(client *c, long long ll); void addReply(client *c, robj_roptr obj); @@ -2134,6 +2136,7 @@ void addReplyError(client *c, const char *err); void addReplyStatus(client *c, const char *status); void addReplyDouble(client *c, double d); void addReplyHumanLongDouble(client *c, long double d); +void addReplyHumanLongDoubleAsync(client *c, long double d); void addReplyLongLong(client *c, long long ll); #ifdef __cplusplus void addReplyLongLongWithPrefixCore(client *c, long long ll, char prefix, bool fAsync); From 47480943560ab4d110ac4d80f26356c93e733096 Mon Sep 17 00:00:00 2001 From: John Sully Date: Tue, 11 Feb 2020 03:44:28 -0500 Subject: [PATCH 3/4] Fix race condition in allocating connections to threads Former-commit-id: 52434a583aa7114ff5658226441ab82ed3110a57 --- src/networking.cpp | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/src/networking.cpp b/src/networking.cpp index 54e04406f..9d94f5171 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1003,29 +1003,23 @@ int clientHasPendingReplies(client *c) { return (c->bufpos || listLength(c->reply)) && !(c->flags & CLIENT_CLOSE_ASAP); } +static std::atomic rgacceptsInFlight[MAX_EVENT_LOOPS]; int chooseBestThreadForAccept() { - listIter li; - listNode *ln; - int rgcclients[MAX_EVENT_LOOPS] = {0}; - - listRewind(g_pserver->clients, &li); - while ((ln = listNext(&li)) != nullptr) - { - client *c = (client*)listNodeValue(ln); - if (c->iel < 0) - continue; - - rgcclients[c->iel]++; - } - int ielMinLoad = 0; + int cclientsMin = INT_MAX; for (int iel = 0; iel < cserver.cthreads; ++iel) { - if (rgcclients[iel] < cserver.thread_min_client_threshold) + int cclientsThread; + atomicGet(g_pserver->rgthreadvar[iel].cclients, cclientsThread); + cclientsThread += rgacceptsInFlight[iel].load(std::memory_order_relaxed); + if (cclientsThread < cserver.thread_min_client_threshold) return iel; - if (rgcclients[iel] < rgcclients[ielMinLoad]) + if (cclientsThread < cclientsMin) + { + cclientsMin = cclientsThread; ielMinLoad = iel; + } } return ielMinLoad; } @@ -1134,18 +1128,21 @@ void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { { { int ielTarget = chooseBestThreadForAccept(); + rgacceptsInFlight[ielTarget].fetch_add(1, std::memory_order_relaxed); if (ielTarget != ielCur) { char *szT = (char*)zmalloc(NET_IP_STR_LEN, MALLOC_LOCAL); memcpy(szT, cip, NET_IP_STR_LEN); int res = aePostFunction(g_pserver->rgthreadvar[ielTarget].el, [cfd, ielTarget, szT]{ acceptCommonHandler(cfd,0,szT, ielTarget); + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); zfree(szT); }); if (res == AE_OK) continue; } + rgacceptsInFlight[ielTarget].fetch_sub(1, std::memory_order_relaxed); } LLocalThread: From 8f6f496c7e64fa50f72777d9ec290da85e77e722 Mon Sep 17 00:00:00 2001 From: John Sully Date: Sun, 16 Feb 2020 17:08:00 -0500 Subject: [PATCH 4/4] Memory leak fix on config, and redisDb dtor Former-commit-id: b92bbf4de8ffc3edc965e2f9da4dd82ed7071559 --- src/config.cpp | 2 +- src/db.cpp | 8 ++++++++ src/server.h | 7 +++++-- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/src/config.cpp b/src/config.cpp index 2aaad825e..2c9a9d518 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -405,7 +405,7 @@ void loadServerConfigFromString(char *config) { } else if ((!strcasecmp(argv[0],"slaveof") || !strcasecmp(argv[0],"replicaof")) && argc == 3) { slaveof_linenum = linenum; - replicationAddMaster(sdsnew(argv[1]), atoi(argv[2])); + replicationAddMaster(argv[1], atoi(argv[2])); } else if ((!strcasecmp(argv[0],"repl-ping-slave-period") || !strcasecmp(argv[0],"repl-ping-replica-period")) && argc == 2) diff --git a/src/db.cpp b/src/db.cpp index 9190d22f1..e94e0cdb1 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1305,6 +1305,14 @@ void setExpire(client *c, redisDb *db, robj *key, robj *subkey, long long when) rememberSlaveKeyWithExpire(db,key); } +redisDb::~redisDb() +{ + dictRelease(watched_keys); + dictRelease(ready_keys); + dictRelease(blocking_keys); + listRelease(defrag_later); +} + void setExpire(client *c, redisDb *db, robj *key, expireEntry &&e) { dictEntry *kde; diff --git a/src/server.h b/src/server.h index 3ec2e8948..135a216ae 100644 --- a/src/server.h +++ b/src/server.h @@ -1127,10 +1127,13 @@ typedef struct clientReplyBlock { /* Redis database representation. There are multiple databases identified * by integers from 0 (the default database) up to the max configured * database. The database number is the 'id' field in the structure. */ -typedef struct redisDb { +struct redisDb { redisDb() : expireitr(nullptr) {}; + + ~redisDb(); + dict *pdict; /* The keyspace for this DB */ expireset *setexpire; expireset::setiter expireitr; @@ -1142,7 +1145,7 @@ typedef struct redisDb { long long last_expire_set; /* when the last expire was set */ double avg_ttl; /* Average TTL, just for stats */ list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */ -} redisDb; +}; /* Client MULTI/EXEC state */ typedef struct multiCmd {