From efd66faa9c7d88da02549f435e56050152cba3b9 Mon Sep 17 00:00:00 2001 From: vattezhang Date: Mon, 18 Feb 2019 22:48:55 +0800 Subject: [PATCH 01/50] benchmark: add auth check in benchmark When we run benchmark but forget to set the right requirepass, benchmark should return error. --- src/redis-benchmark.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 31f91eb0f..4f0f3404a 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -204,6 +204,12 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (redisBufferRead(c->context) != REDIS_OK) { fprintf(stderr,"Error: %s\n",c->context->errstr); exit(1); + } + else if (strlen(c->context->reader->buf)>=32 + && !strncmp(c->context->reader->buf,"-NOAUTH Authentication required.", 32)) + { + fprintf(stderr,"Error: %s\n",c->context->reader->buf); + exit(1); } else { while(c->pending) { if (redisGetReply(c->context,&reply) != REDIS_OK) { From f66aed65984f731d7183a78cfa12d7fc4d7d06f0 Mon Sep 17 00:00:00 2001 From: vattezhang Date: Wed, 27 Feb 2019 21:20:00 +0800 Subject: [PATCH 02/50] fix: fix sentinel command table and new flags format --- src/sentinel.c | 13 +++++++++---- src/server.h | 1 + 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/src/sentinel.c b/src/sentinel.c index 4d03c9c12..92ea75436 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -450,11 +450,11 @@ struct redisCommand sentinelcmds[] = { {"punsubscribe",punsubscribeCommand,-1,"",0,NULL,0,0,0,0,0}, {"publish",sentinelPublishCommand,3,"",0,NULL,0,0,0,0,0}, {"info",sentinelInfoCommand,-1,"",0,NULL,0,0,0,0,0}, - {"role",sentinelRoleCommand,1,"l",0,NULL,0,0,0,0,0}, - {"client",clientCommand,-2,"rs",0,NULL,0,0,0,0,0}, + {"role",sentinelRoleCommand,1,"ok-loading",0,NULL,0,0,0,0,0}, + {"client",clientCommand,-2,"read-only no-script",0,NULL,0,0,0,0,0}, {"shutdown",shutdownCommand,-1,"",0,NULL,0,0,0,0,0}, - {"auth",authCommand,2,"sltF",0,NULL,0,0,0,0,0}, - {"hello",helloCommand,-2,"sF",0,NULL,0,0,0,0,0} + {"auth",authCommand,2,"no-script ok-loading ok-stale fast",0,NULL,0,0,0,0,0}, + {"hello",helloCommand,-2,"no-script fast",0,NULL,0,0,0,0,0} }; /* This function overwrites a few normal Redis config default with Sentinel @@ -477,6 +477,11 @@ void initSentinel(void) { retval = dictAdd(server.commands, sdsnew(cmd->name), cmd); serverAssert(retval == DICT_OK); + + /* Translate the command string flags description into an actual + * set of flags. */ + if (populateCommandTableParseFlags(cmd,cmd->sflags) == C_ERR) + serverPanic("Unsupported command flag"); } /* Initialize various data structures. */ diff --git a/src/server.h b/src/server.h index 994952654..c29a40b6b 100644 --- a/src/server.h +++ b/src/server.h @@ -2264,6 +2264,7 @@ void serverLogHexDump(int level, char *descr, void *value, size_t len); int memtest_preserving_test(unsigned long *m, size_t bytes, int passes); void mixDigest(unsigned char *digest, void *ptr, size_t len); void xorDigest(unsigned char *digest, void *ptr, size_t len); +int populateCommandTableParseFlags(struct redisCommand *c, char *strflags); #define redisDebug(fmt, ...) \ printf("DEBUG %s:%d > " fmt "\n", __FILE__, __LINE__, __VA_ARGS__) From 4bfc1763c0d283204cfbaaacefe87a4883f819c8 Mon Sep 17 00:00:00 2001 From: vattezhang Date: Wed, 13 Mar 2019 20:46:33 +0800 Subject: [PATCH 03/50] fix: fix benchmark cannot exit when NOAUTH err happens --- src/redis-benchmark.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 2c53bc936..edeaf3a25 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -419,11 +419,10 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { fprintf(stderr,"Error: %s\n",c->context->errstr); exit(1); } - else if (strlen(c->context->reader->buf)>=32 - && !strncmp(c->context->reader->buf,"-NOAUTH Authentication required.", 32)) + else if (NULL != strstr(c->context->reader->buf,"NOAUTH")) { fprintf(stderr,"Error: %s\n",c->context->reader->buf); - exit(1); + exit(1); } else { while(c->pending) { if (redisGetReply(c->context,&reply) != REDIS_OK) { From 2d4635b483460c18335089141db306a3bc122123 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 24 Mar 2019 12:00:33 +0200 Subject: [PATCH 04/50] fix: missing initialization. --- src/module.c | 1 + src/modules/hellofilter.c => tests/modules/commandfilter.c | 0 tests/{modules => unit/moduleapi}/commandfilter.tcl | 0 3 files changed, 1 insertion(+) rename src/modules/hellofilter.c => tests/modules/commandfilter.c (100%) rename tests/{modules => unit/moduleapi}/commandfilter.tcl (100%) diff --git a/src/module.c b/src/module.c index ff7f27cdd..f468d4996 100644 --- a/src/module.c +++ b/src/module.c @@ -753,6 +753,7 @@ void RM_SetModuleAttribs(RedisModuleCtx *ctx, const char *name, int ver, int api module->usedby = listCreate(); module->using = listCreate(); module->filters = listCreate(); + module->in_call = 0; ctx->module = module; } diff --git a/src/modules/hellofilter.c b/tests/modules/commandfilter.c similarity index 100% rename from src/modules/hellofilter.c rename to tests/modules/commandfilter.c diff --git a/tests/modules/commandfilter.tcl b/tests/unit/moduleapi/commandfilter.tcl similarity index 100% rename from tests/modules/commandfilter.tcl rename to tests/unit/moduleapi/commandfilter.tcl From a631f66710954a35f4854a98d395e29b898ff6c6 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 24 Mar 2019 12:03:03 +0200 Subject: [PATCH 05/50] Add runtest-moduleapi with commandfilter coverage. --- runtest-moduleapi | 16 +++++++++++++++ src/modules/Makefile | 7 +------ tests/modules/Makefile | 24 ++++++++++++++++++++++ tests/modules/commandfilter.c | 28 +++++++++++++------------- tests/test_helper.tcl | 1 - tests/unit/moduleapi/commandfilter.tcl | 16 +++++++-------- 6 files changed, 63 insertions(+), 29 deletions(-) create mode 100755 runtest-moduleapi create mode 100644 tests/modules/Makefile diff --git a/runtest-moduleapi b/runtest-moduleapi new file mode 100755 index 000000000..84cdb9bb8 --- /dev/null +++ b/runtest-moduleapi @@ -0,0 +1,16 @@ +#!/bin/sh +TCL_VERSIONS="8.5 8.6" +TCLSH="" + +for VERSION in $TCL_VERSIONS; do + TCL=`which tclsh$VERSION 2>/dev/null` && TCLSH=$TCL +done + +if [ -z $TCLSH ] +then + echo "You need tcl 8.5 or newer in order to run the Redis test" + exit 1 +fi + +make -C tests/modules && \ +$TCLSH tests/test_helper.tcl --single unit/moduleapi/commandfilter "${@}" diff --git a/src/modules/Makefile b/src/modules/Makefile index 537aa0daf..4f6b50f2e 100644 --- a/src/modules/Makefile +++ b/src/modules/Makefile @@ -13,7 +13,7 @@ endif .SUFFIXES: .c .so .xo .o -all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so hellofilter.so +all: helloworld.so hellotype.so helloblock.so testmodule.so hellocluster.so hellotimer.so hellodict.so .c.xo: $(CC) -I. $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ @@ -47,11 +47,6 @@ hellodict.xo: ../redismodule.h hellodict.so: hellodict.xo -hellofilter.xo: ../redismodule.h - -hellofilter.so: hellofilter.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc - testmodule.xo: ../redismodule.h testmodule.so: testmodule.xo diff --git a/tests/modules/Makefile b/tests/modules/Makefile new file mode 100644 index 000000000..014d20afa --- /dev/null +++ b/tests/modules/Makefile @@ -0,0 +1,24 @@ + +# find the OS +uname_S := $(shell sh -c 'uname -s 2>/dev/null || echo not') + +# Compile flags for linux / osx +ifeq ($(uname_S),Linux) + SHOBJ_CFLAGS ?= -W -Wall -fno-common -g -ggdb -std=c99 -O2 + SHOBJ_LDFLAGS ?= -shared +else + SHOBJ_CFLAGS ?= -W -Wall -dynamic -fno-common -g -ggdb -std=c99 -O2 + SHOBJ_LDFLAGS ?= -bundle -undefined dynamic_lookup +endif + +.SUFFIXES: .c .so .xo .o + +all: commandfilter.so + +.c.xo: + $(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ + +commandfilter.xo: ../../src/redismodule.h + +commandfilter.so: commandfilter.xo + $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc diff --git a/tests/modules/commandfilter.c b/tests/modules/commandfilter.c index 448e12983..d25d49c44 100644 --- a/tests/modules/commandfilter.c +++ b/tests/modules/commandfilter.c @@ -1,18 +1,18 @@ #define REDISMODULE_EXPERIMENTAL_API -#include "../redismodule.h" +#include "redismodule.h" #include static RedisModuleString *log_key_name; -static const char log_command_name[] = "hellofilter.log"; -static const char ping_command_name[] = "hellofilter.ping"; -static const char unregister_command_name[] = "hellofilter.unregister"; +static const char log_command_name[] = "commandfilter.log"; +static const char ping_command_name[] = "commandfilter.ping"; +static const char unregister_command_name[] = "commandfilter.unregister"; static int in_log_command = 0; static RedisModuleCommandFilter *filter = NULL; -int HelloFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int CommandFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { (void) argc; (void) argv; @@ -23,7 +23,7 @@ int HelloFilter_UnregisterCommand(RedisModuleCtx *ctx, RedisModuleString **argv, return REDISMODULE_OK; } -int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int CommandFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { (void) argc; (void) argv; @@ -39,7 +39,7 @@ int HelloFilter_PingCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int a return REDISMODULE_OK; } -int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) +int CommandFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { RedisModuleString *s = RedisModule_CreateString(ctx, "", 0); @@ -74,9 +74,9 @@ int HelloFilter_LogCommand(RedisModuleCtx *ctx, RedisModuleString **argv, int ar return REDISMODULE_OK; } -void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) +void CommandFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) { - if (in_log_command) return; /* don't process our own RM_Call() from HelloFilter_LogCommand() */ + if (in_log_command) return; /* don't process our own RM_Call() from CommandFilter_LogCommand() */ /* Fun manipulations: * - Remove @delme @@ -117,7 +117,7 @@ void HelloFilter_CommandFilter(RedisModuleCommandFilterCtx *filter) } int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) { - if (RedisModule_Init(ctx,"hellofilter",1,REDISMODULE_APIVER_1) + if (RedisModule_Init(ctx,"commandfilter",1,REDISMODULE_APIVER_1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (argc != 2) { @@ -130,18 +130,18 @@ int RedisModule_OnLoad(RedisModuleCtx *ctx, RedisModuleString **argv, int argc) RedisModule_StringToLongLong(argv[1], &noself); if (RedisModule_CreateCommand(ctx,log_command_name, - HelloFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + CommandFilter_LogCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,ping_command_name, - HelloFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR) + CommandFilter_PingCommand,"deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; if (RedisModule_CreateCommand(ctx,unregister_command_name, - HelloFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) + CommandFilter_UnregisterCommand,"write deny-oom",1,1,1) == REDISMODULE_ERR) return REDISMODULE_ERR; - if ((filter = RedisModule_RegisterCommandFilter(ctx, HelloFilter_CommandFilter, + if ((filter = RedisModule_RegisterCommandFilter(ctx, CommandFilter_CommandFilter, noself ? REDISMODULE_CMDFILTER_NOSELF : 0)) == NULL) return REDISMODULE_ERR; diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index d2f281526..568eacdee 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -63,7 +63,6 @@ set ::all_tests { unit/lazyfree unit/wait unit/pendingquerybuf - modules/commandfilter } # Index to the next test to run in the ::all_tests list. set ::next_test 0 diff --git a/tests/unit/moduleapi/commandfilter.tcl b/tests/unit/moduleapi/commandfilter.tcl index 1e5c41d2b..6078f64f2 100644 --- a/tests/unit/moduleapi/commandfilter.tcl +++ b/tests/unit/moduleapi/commandfilter.tcl @@ -1,4 +1,4 @@ -set testmodule [file normalize src/modules/hellofilter.so] +set testmodule [file normalize tests/modules/commandfilter.so] start_server {tags {"modules"}} { r module load $testmodule log-key 0 @@ -27,7 +27,7 @@ start_server {tags {"modules"}} { test {Command Filter applies on RM_Call() commands} { r del log-key - r hellofilter.ping + r commandfilter.ping r lrange log-key 0 -1 } "{ping @log}" @@ -39,13 +39,13 @@ start_server {tags {"modules"}} { test {Command Filter applies on Lua redis.call() that calls a module} { r del log-key - r eval "redis.call('hellofilter.ping')" 0 + r eval "redis.call('commandfilter.ping')" 0 r lrange log-key 0 -1 } "{ping @log}" test {Command Filter is unregistered implicitly on module unload} { r del log-key - r module unload hellofilter + r module unload commandfilter r set mykey @log r lrange log-key 0 -1 } {} @@ -59,14 +59,14 @@ start_server {tags {"modules"}} { assert_equal "{set mykey @log}" [r lrange log-key 0 -1] # Unregister - r hellofilter.unregister + r commandfilter.unregister r del log-key r set mykey @log r lrange log-key 0 -1 } {} - r module unload hellofilter + r module unload commandfilter r module load $testmodule log-key 1 test {Command Filter REDISMODULE_CMDFILTER_NOSELF works as expected} { @@ -74,10 +74,10 @@ start_server {tags {"modules"}} { assert_equal "{set mykey @log}" [r lrange log-key 0 -1] r del log-key - r hellofilter.ping + r commandfilter.ping assert_equal {} [r lrange log-key 0 -1] - r eval "redis.call('hellofilter.ping')" 0 + r eval "redis.call('commandfilter.ping')" 0 assert_equal {} [r lrange log-key 0 -1] } From 48d14e5aa71b43de0e7747648908b26e34e59ddb Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 24 Mar 2019 13:10:55 +0200 Subject: [PATCH 06/50] slave corrupts replication stream when module blocked client uses large reply (or POSTPONED_ARRAY) when redis appends the blocked client reply list to the real client, it didn't bother to check if it is in fact the master client. so a slave executing that module command will send replies to the master, causing the master to send the slave error responses, which will mess up the replication offset (slave will advance it's replication offset, and the master does not) --- src/module.c | 7 +------ src/networking.c | 13 +++++++++++++ src/server.h | 1 + 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/module.c b/src/module.c index ff7f27cdd..0c8197ac7 100644 --- a/src/module.c +++ b/src/module.c @@ -3747,12 +3747,7 @@ void moduleHandleBlockedClients(void) { * We need to glue such replies to the client output buffer and * free the temporary client we just used for the replies. */ if (c) { - if (bc->reply_client->bufpos) - addReplyProto(c,bc->reply_client->buf, - bc->reply_client->bufpos); - if (listLength(bc->reply_client->reply)) - listJoin(c->reply,bc->reply_client->reply); - c->reply_bytes += bc->reply_client->reply_bytes; + AddReplyFromClient(c, bc->reply_client); } freeClient(bc->reply_client); diff --git a/src/networking.c b/src/networking.c index 09cbff387..7fdd1984d 100644 --- a/src/networking.c +++ b/src/networking.c @@ -744,6 +744,19 @@ void addReplySubcommandSyntaxError(client *c) { sdsfree(cmd); } +/* Append 'src' client output buffers into 'dst' client output buffers. + * This function clears the output buffers of 'src' */ +void AddReplyFromClient(client *dst, client *src) { + if (prepareClientToWrite(dst) != C_OK) + return; + addReplyProto(dst,src->buf, src->bufpos); + if (listLength(src->reply)) + listJoin(dst->reply,src->reply); + dst->reply_bytes += src->reply_bytes; + src->reply_bytes = 0; + src->bufpos = 0; +} + /* Copy 'src' client output buffers into 'dst' client output buffers. * The function takes care of freeing the old output buffers of the * destination client. */ diff --git a/src/server.h b/src/server.h index 95e0355a6..dfd9f7698 100644 --- a/src/server.h +++ b/src/server.h @@ -1529,6 +1529,7 @@ void addReplyNullArray(client *c); void addReplyBool(client *c, int b); void addReplyVerbatim(client *c, const char *s, size_t len, const char *ext); void addReplyProto(client *c, const char *s, size_t len); +void AddReplyFromClient(client *c, client *src); void addReplyBulk(client *c, robj *obj); void addReplyBulkCString(client *c, const char *s); void addReplyBulkCBuffer(client *c, const void *p, size_t len); From 0e00a99f3275355dcd4f8fb5d6e67b421ce40d17 Mon Sep 17 00:00:00 2001 From: David Carlier Date: Thu, 28 Mar 2019 06:38:16 +0000 Subject: [PATCH 07/50] build fix --- src/networking.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/networking.c b/src/networking.c index 09cbff387..02fc44e75 100644 --- a/src/networking.c +++ b/src/networking.c @@ -29,6 +29,7 @@ #include "server.h" #include "atomicvar.h" +#include #include #include #include From 41fc29512c191363058175b9b255d54e0918b60f Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 8 Apr 2019 17:39:22 +0200 Subject: [PATCH 08/50] Fix assert comparison in fetchClusterSlotsConfiguration(). --- src/redis-benchmark.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 12e9f7e41..4e2662f21 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -1192,7 +1192,7 @@ static int fetchClusterSlotsConfiguration(client c) { assert(reply->type == REDIS_REPLY_ARRAY); for (i = 0; i < reply->elements; i++) { redisReply *r = reply->element[i]; - assert(r->type = REDIS_REPLY_ARRAY); + assert(r->type == REDIS_REPLY_ARRAY); assert(r->elements >= 3); int from, to, slot; from = r->element[0]->integer; From b364f3fc21ca0130f56eb1ec13ff608062e67b96 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 8 Apr 2019 18:06:50 +0200 Subject: [PATCH 09/50] ACL: regression test for #5998. --- tests/unit/acl.tcl | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index 82c75f82d..90f2c9bbf 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -108,4 +108,11 @@ start_server {tags {"acl"}} { assert_match {*+debug|segfault*} $cmdstr assert_match {*+acl*} $cmdstr } + + test {ACL regression: memory leaks adding / removing subcommands} { + r AUTH default "" + r ACL setuser newuser reset -debug +debug|a +debug|b +debug|c + r ACL setuser newuser -debug + # The test framework will detect a leak if any. + } } From 1a505a3ba9b44c0cb420b37458a4ee2f1c4f92b4 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 8 Apr 2019 18:08:37 +0200 Subject: [PATCH 10/50] ACL: Fix memory leak in ACLResetSubcommandsForCommand(). This commit fixes bug reported at #5998. Thanks to @tomcat1102. --- src/acl.c | 2 ++ tests/unit/acl.tcl | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/acl.c b/src/acl.c index d9f431f4f..0205e51ad 100644 --- a/src/acl.c +++ b/src/acl.c @@ -542,6 +542,8 @@ struct redisCommand *ACLLookupCommand(const char *name) { * and command ID. */ void ACLResetSubcommandsForCommand(user *u, unsigned long id) { if (u->allowed_subcommands && u->allowed_subcommands[id]) { + for (int i = 0; u->allowed_subcommands[id][i]; i++) + sdsfree(u->allowed_subcommands[id][i]); zfree(u->allowed_subcommands[id]); u->allowed_subcommands[id] = NULL; } diff --git a/tests/unit/acl.tcl b/tests/unit/acl.tcl index 90f2c9bbf..058441433 100644 --- a/tests/unit/acl.tcl +++ b/tests/unit/acl.tcl @@ -109,7 +109,7 @@ start_server {tags {"acl"}} { assert_match {*+acl*} $cmdstr } - test {ACL regression: memory leaks adding / removing subcommands} { + test {ACL #5998 regression: memory leaks adding / removing subcommands} { r AUTH default "" r ACL setuser newuser reset -debug +debug|a +debug|b +debug|c r ACL setuser newuser -debug From 36fff343fb1052c62d1fbdd0c846782118117fb6 Mon Sep 17 00:00:00 2001 From: yongman Date: Tue, 9 Apr 2019 09:24:22 +0800 Subject: [PATCH 11/50] Fix memleak in bitfieldCommand --- src/bitops.c | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/bitops.c b/src/bitops.c index 8d03a7699..ee1ce0460 100644 --- a/src/bitops.c +++ b/src/bitops.c @@ -994,12 +994,18 @@ void bitfieldCommand(client *c) { /* Lookup for read is ok if key doesn't exit, but errors * if it's not a string. */ o = lookupKeyRead(c->db,c->argv[1]); - if (o != NULL && checkType(c,o,OBJ_STRING)) return; + if (o != NULL && checkType(c,o,OBJ_STRING)) { + zfree(ops); + return; + } } else { /* Lookup by making room up to the farest bit reached by * this operation. */ if ((o = lookupStringForBitCommand(c, - highest_write_offset)) == NULL) return; + highest_write_offset)) == NULL) { + zfree(ops); + return; + } } addReplyArrayLen(c,numops); From f4ae084e94caab308595824c6ee0857fc1cc3d29 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 10 Apr 2019 18:53:27 +0200 Subject: [PATCH 12/50] Aesthetic change to #5962 to conform to Redis style. --- src/module.c | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/module.c b/src/module.c index 0c8197ac7..1e7c0eca3 100644 --- a/src/module.c +++ b/src/module.c @@ -3746,9 +3746,7 @@ void moduleHandleBlockedClients(void) { * replies to send to the client in a thread safe context. * We need to glue such replies to the client output buffer and * free the temporary client we just used for the replies. */ - if (c) { - AddReplyFromClient(c, bc->reply_client); - } + if (c) AddReplyFromClient(c, bc->reply_client); freeClient(bc->reply_client); if (c != NULL) { From 154914e6540f165363a2979974c7334271e5eee3 Mon Sep 17 00:00:00 2001 From: James Rouzier Date: Thu, 11 Apr 2019 12:19:02 -0400 Subject: [PATCH 13/50] Fix start and end key initialize --- src/t_stream.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/t_stream.c b/src/t_stream.c index f4ace87a2..9e7d3d126 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -492,14 +492,14 @@ void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamI streamEncodeID(si->start_key,start); } else { si->start_key[0] = 0; - si->start_key[0] = 0; + si->start_key[1] = 0; } if (end) { streamEncodeID(si->end_key,end); } else { si->end_key[0] = UINT64_MAX; - si->end_key[0] = UINT64_MAX; + si->end_key[1] = UINT64_MAX; } /* Seek the correct node in the radix tree. */ From d21b3b9d9e94932e1c4114717a864fcf10387f25 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 15 Apr 2019 16:50:26 +0200 Subject: [PATCH 14/50] Test: disable module testing for now. --- tests/test_helper.tcl | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index d2f281526..568eacdee 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -63,7 +63,6 @@ set ::all_tests { unit/lazyfree unit/wait unit/pendingquerybuf - modules/commandfilter } # Index to the next test to run in the ::all_tests list. set ::next_test 0 From 7bda78088dbdcb7acefe6b51339bb634adaca096 Mon Sep 17 00:00:00 2001 From: git-hulk Date: Tue, 23 Apr 2019 20:08:14 +0800 Subject: [PATCH 15/50] FIX: core dump in redis-benchmark when the `-r` is the last arg --- src/redis-benchmark.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 4e2662f21..2759e6a3c 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -1294,7 +1294,7 @@ int parseOptions(int argc, const char **argv) { if (*p < '0' || *p > '9') goto invalid; } config.randomkeys = 1; - config.randomkeys_keyspacelen = atoi(argv[++i]); + config.randomkeys_keyspacelen = atoi(next); if (config.randomkeys_keyspacelen < 0) config.randomkeys_keyspacelen = 0; } else if (!strcmp(argv[i],"-q")) { From 03c7e580d70c0b2bf026259cec717e08538ea234 Mon Sep 17 00:00:00 2001 From: vattezhang Date: Fri, 26 Apr 2019 18:50:51 +0800 Subject: [PATCH 16/50] update --- src/redis-benchmark.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 7bac6fdd4..4e2662f21 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -418,11 +418,6 @@ static void readHandler(aeEventLoop *el, int fd, void *privdata, int mask) { if (redisBufferRead(c->context) != REDIS_OK) { fprintf(stderr,"Error: %s\n",c->context->errstr); exit(1); - } - else if (NULL != strstr(c->context->reader->buf,"NOAUTH")) - { - fprintf(stderr,"Error: %s\n",c->context->reader->buf); - exit(1); } else { while(c->pending) { if (redisGetReply(c->context,&reply) != REDIS_OK) { From 42bccb59c5c2bd5adc188de6aa3e68bc3601630d Mon Sep 17 00:00:00 2001 From: vattezhang Date: Fri, 26 Apr 2019 19:47:07 +0800 Subject: [PATCH 17/50] fix: benchmark auth fails when server have requirepass --- src/redis-benchmark.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 4e2662f21..e4134c9ea 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -254,6 +254,19 @@ static redisConfig *getRedisConfig(const char *ip, int port, else fprintf(stderr,"%s: %s\n",hostsocket,err); goto fail; } + + if(config.auth){ + void *authReply = NULL; + redisAppendCommand(c, "AUTH %s", config.auth); + if (REDIS_OK != redisGetReply(c, &authReply)) goto fail; + if (reply) freeReplyObject(reply); + reply = ((redisReply *) authReply); + if (reply->type == REDIS_REPLY_ERROR) { + fprintf(stderr, "ERROR: %s\n", reply->str); + goto fail; + } + } + redisAppendCommand(c, "CONFIG GET %s", "save"); redisAppendCommand(c, "CONFIG GET %s", "appendonly"); int i = 0; From b19e3e0e4cb8dfe615a5c11330660fd6804279a6 Mon Sep 17 00:00:00 2001 From: abhay Date: Thu, 25 Apr 2019 13:50:25 +0530 Subject: [PATCH 18/50] removed obsolete warning as per - https://github.com/antirez/redis/issues/5291 --- redis.conf | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/redis.conf b/redis.conf index 5ea915905..060510768 100644 --- a/redis.conf +++ b/redis.conf @@ -942,13 +942,7 @@ aof-use-rdb-preamble yes lua-time-limit 5000 ################################ REDIS CLUSTER ############################### -# -# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -# WARNING EXPERIMENTAL: Redis Cluster is considered to be stable code, however -# in order to mark it as "mature" we need to wait for a non trivial percentage -# of users to deploy it in production. -# ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ -# + # Normal Redis instances can't be part of a Redis Cluster; only nodes that are # started as cluster nodes can. In order to start a Redis instance as a # cluster node enable the cluster support uncommenting the following: From 7ff5c6011de7749eda50b728d26fa7bb56ea7d88 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Mon, 29 Apr 2019 14:38:28 +0800 Subject: [PATCH 19/50] aof: enhance AOF_FSYNC_EVERYSEC, more details in #5985 --- src/aof.c | 34 +++++++++++++++++++++++++++++++--- src/server.h | 1 + 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/src/aof.c b/src/aof.c index 615eebd01..4744847d2 100644 --- a/src/aof.c +++ b/src/aof.c @@ -197,6 +197,12 @@ ssize_t aofRewriteBufferWrite(int fd) { * AOF file implementation * ------------------------------------------------------------------------- */ +/* Return true if an AOf fsync is currently already in progress in a + * BIO thread. */ +int aofFsyncInProgress(void) { + return bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; +} + /* Starts a background task that performs fsync() against the specified * file descriptor (the one of the AOF file) in another thread. */ void aof_background_fsync(int fd) { @@ -335,10 +341,24 @@ void flushAppendOnlyFile(int force) { int sync_in_progress = 0; mstime_t latency; - if (sdslen(server.aof_buf) == 0) return; + if (sdslen(server.aof_buf) == 0) { + /* Check if we need to do fsync even the aof buffer is empty, + * because previously in AOF_FSYNC_EVERYSEC mode, fsync is + * called only when aof buffer is not empty, so if users + * stop write commands before fsync called in one second, + * the data in page cache cannot be flushed in time. */ + if (server.aof_fsync == AOF_FSYNC_EVERYSEC && + server.aof_fsync_offset != server.aof_current_size && + server.unixtime > server.aof_last_fsync && + !(sync_in_progress = aofFsyncInProgress())) { + goto try_fsync; + } else { + return; + } + } if (server.aof_fsync == AOF_FSYNC_EVERYSEC) - sync_in_progress = bioPendingJobsOfType(BIO_AOF_FSYNC) != 0; + sync_in_progress = aofFsyncInProgress(); if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { /* With this append fsync policy we do background fsyncing. @@ -470,6 +490,7 @@ void flushAppendOnlyFile(int force) { server.aof_buf = sdsempty(); } +try_fsync: /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are * children doing I/O in the background. */ if (server.aof_no_fsync_on_rewrite && @@ -484,10 +505,14 @@ void flushAppendOnlyFile(int force) { redis_fsync(server.aof_fd); /* Let's try to get this data on the disk */ latencyEndMonitor(latency); latencyAddSampleIfNeeded("aof-fsync-always",latency); + server.aof_fsync_offset = server.aof_current_size; server.aof_last_fsync = server.unixtime; } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && server.unixtime > server.aof_last_fsync)) { - if (!sync_in_progress) aof_background_fsync(server.aof_fd); + if (!sync_in_progress) { + aof_background_fsync(server.aof_fd); + server.aof_fsync_offset = server.aof_current_size; + } server.aof_last_fsync = server.unixtime; } } @@ -694,6 +719,7 @@ int loadAppendOnlyFile(char *filename) { * operation is received. */ if (fp && redis_fstat(fileno(fp),&sb) != -1 && sb.st_size == 0) { server.aof_current_size = 0; + server.aof_fsync_offset = server.aof_current_size; fclose(fp); return C_ERR; } @@ -832,6 +858,7 @@ loaded_ok: /* DB loaded, cleanup and return C_OK to the caller. */ stopLoading(); aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; + server.aof_fsync_offset = server.aof_current_size; return C_OK; readerr: /* Read error. If feof(fp) is true, fall through to unexpected EOF. */ @@ -1741,6 +1768,7 @@ void backgroundRewriteDoneHandler(int exitcode, int bysignal) { server.aof_selected_db = -1; /* Make sure SELECT is re-issued */ aofUpdateCurrentSize(); server.aof_rewrite_base_size = server.aof_current_size; + server.aof_current_size = server.aof_current_size; /* Clear regular AOF buffer since its contents was just written to * the new AOF from the background rewrite buffer. */ diff --git a/src/server.h b/src/server.h index dfd9f7698..e7f01b2ef 100644 --- a/src/server.h +++ b/src/server.h @@ -1140,6 +1140,7 @@ struct redisServer { off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */ off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */ off_t aof_current_size; /* AOF current size. */ + off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */ int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */ pid_t aof_child_pid; /* PID if rewriting process */ list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. */ From c76bb465f2b7a2d5af77040c08128ce9310b2b44 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 5 May 2019 08:19:52 +0300 Subject: [PATCH 20/50] make replication tests more stable on slow machines solving few replication related tests race conditions which fail on slow machines bugfix in slave buffers test: since the test is executed twice, each time with a different commands count, the threshold for the delta can't be a constant. --- tests/integration/psync2.tcl | 5 ++++- tests/integration/replication-psync.tcl | 26 +++++++++++++++++++++++++ tests/unit/maxmemory.tcl | 7 ++++--- 3 files changed, 34 insertions(+), 4 deletions(-) diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl index 8663d6fcc..d1212b640 100644 --- a/tests/integration/psync2.tcl +++ b/tests/integration/psync2.tcl @@ -166,12 +166,15 @@ start_server {} { # Pick a random slave set slave_id [expr {($master_id+1)%5}] set sync_count [status $R($master_id) sync_full] + set sync_partial [status $R($master_id) sync_partial_ok] catch { $R($slave_id) config rewrite $R($slave_id) debug restart } + # note: just waiting for connected_slaves==4 has a race condition since + # we might do the check before the master realized that the slave disconnected wait_for_condition 50 1000 { - [status $R($master_id) connected_slaves] == 4 + [status $R($master_id) sync_partial_ok] == $sync_partial + 1 } else { fail "Replica not reconnecting" } diff --git a/tests/integration/replication-psync.tcl b/tests/integration/replication-psync.tcl index a3bce2a4c..bf8682446 100644 --- a/tests/integration/replication-psync.tcl +++ b/tests/integration/replication-psync.tcl @@ -79,6 +79,32 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond diskless rec stop_bg_complex_data $load_handle0 stop_bg_complex_data $load_handle1 stop_bg_complex_data $load_handle2 + + # Wait for the slave to reach the "online" + # state from the POV of the master. + set retry 5000 + while {$retry} { + set info [$master info] + if {[string match {*slave0:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slave not correctly synchronized" + } + + # Wait that slave acknowledge it is online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. (-LOADING error) + wait_for_condition 5000 100 { + [lindex [$slave role] 3] eq {connected} + } else { + fail "Slave still not connected after some time" + } + set retry 10 while {$retry && ([$master debug digest] ne [$slave debug digest])}\ { diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 1def57af5..0f64ddc18 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -161,7 +161,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} } # make sure master doesn't disconnect slave because of timeout - $master config set repl-timeout 300 ;# 5 minutes + $master config set repl-timeout 1200 ;# 20 minutes (for valgrind and slow machines) $master config set maxmemory-policy allkeys-random $master config set client-output-buffer-limit "replica 100000000 100000000 300" $master config set repl-backlog-size [expr {10*1024}] @@ -212,7 +212,8 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} assert {[$master dbsize] == 100} assert {$slave_buf > 2*1024*1024} ;# some of the data may have been pushed to the OS buffers - assert {$delta < 50*1024 && $delta > -50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + set delta_max [expr {$cmd_count / 2}] ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + assert {$delta < $delta_max && $delta > -$delta_max} $master client kill type slave set killed_used [s -1 used_memory] @@ -221,7 +222,7 @@ proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} set killed_used_no_repl [expr {$killed_used - $killed_mem_not_counted_for_evict}] set delta_no_repl [expr {$killed_used_no_repl - $used_no_repl}] assert {$killed_slave_buf == 0} - assert {$delta_no_repl > -50*1024 && $delta_no_repl < 50*1024} ;# 1 byte unaccounted for, with 1M commands will consume some 1MB + assert {$delta_no_repl > -$delta_max && $delta_no_repl < $delta_max} } # unfreeze slave process (after the 'test' succeeded or failed, but before we attempt to terminate the server From 7653918c7a265ddb2581c23f035fb97ffb4692e0 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 5 May 2019 20:32:53 +0300 Subject: [PATCH 21/50] Preserve client->id for blocked clients. --- src/module.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/module.c b/src/module.c index c29521670..7dee7e776 100644 --- a/src/module.c +++ b/src/module.c @@ -3866,7 +3866,10 @@ RedisModuleCtx *RM_GetThreadSafeContext(RedisModuleBlockedClient *bc) { * in order to keep things like the currently selected database and similar * things. */ ctx->client = createClient(-1); - if (bc) selectDb(ctx->client,bc->dbid); + if (bc) { + selectDb(ctx->client,bc->dbid); + ctx->client->id = bc->client->id; + } return ctx; } From 0bf3acce381acbf322629d6e00b6815ce449b51d Mon Sep 17 00:00:00 2001 From: WuYunlong Date: Mon, 6 May 2019 11:46:07 +0800 Subject: [PATCH 22/50] Do not active expire keys in the background when the switch is off. --- src/server.c | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/server.c b/src/server.c index fb5d679cd..eaa7172a7 100644 --- a/src/server.c +++ b/src/server.c @@ -1674,10 +1674,12 @@ void clientsCron(void) { void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ - if (server.active_expire_enabled && server.masterhost == NULL) { - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else if (server.masterhost != NULL) { - expireSlaveKeys(); + if (server.active_expire_enabled) { + if (server.masterhost == NULL) { + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); + } else { + expireSlaveKeys(); + } } /* Defrag keys gradually. */ From e374c6eb5503a5eb20f8ed7b5c36fa2549824a4a Mon Sep 17 00:00:00 2001 From: liaotonglang Date: Mon, 6 May 2019 17:15:49 +0800 Subject: [PATCH 23/50] delete sdsTest() from REDIS_TEST sdsTest() defined in sds.c dit not match the call in server.c. remove it from REDIS_TEST, since test-sds defined in Makefile. --- src/server.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/server.c b/src/server.c index fb5d679cd..674eef206 100644 --- a/src/server.c +++ b/src/server.c @@ -4718,8 +4718,6 @@ int main(int argc, char **argv) { return sha1Test(argc, argv); } else if (!strcasecmp(argv[2], "util")) { return utilTest(argc, argv); - } else if (!strcasecmp(argv[2], "sds")) { - return sdsTest(argc, argv); } else if (!strcasecmp(argv[2], "endianconv")) { return endianconvTest(argc, argv); } else if (!strcasecmp(argv[2], "crc64")) { From 62261aa905583015cf30c47bdaabbd637d1da7ac Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 24 Oct 2017 08:35:05 +0200 Subject: [PATCH 24/50] Threaded IO: implement handleClientsWithPendingWritesUsingThreads(). This is just an experiment for now, there are a couple of race conditions, mostly harmless for the performance gain experiment that this commit represents so far. The general idea here is to take Redis single threaded and instead fan-out on expansive kernel calls: write(2) in this case, but the same concept could be easily implemented for read(2) and protcol parsing. However just threading writes like in this commit, is enough to evaluate if the approach is sounding. --- src/networking.c | 156 +++++++++++++++++++++++++++++++++++++++++++++-- src/server.c | 11 ++-- src/server.h | 4 ++ 3 files changed, 162 insertions(+), 9 deletions(-) diff --git a/src/networking.c b/src/networking.c index ffb435625..3958e4f5e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1065,9 +1065,17 @@ void freeClient(client *c) { * a context where calling freeClient() is not possible, because the client * should be valid for the continuation of the flow of the program. */ void freeClientAsync(client *c) { + /* We need to handle concurrent access to the server.clients_to_close list + * only in the freeClientAsync() function, since it's the only function that + * may access the list while Redis uses I/O threads. All the other accesses + * are in the context of the main thread while the other threads are + * idle. */ + static pthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER; if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; c->flags |= CLIENT_CLOSE_ASAP; + pthread_mutex_lock(&async_free_queue_mutex); listAddNodeTail(server.clients_to_close,c); + pthread_mutex_unlock(&async_free_queue_mutex); } void freeClientsInAsyncFreeQueue(void) { @@ -1091,7 +1099,12 @@ client *lookupClientByID(uint64_t id) { } /* Write data in output buffers to client. Return C_OK if the client - * is still valid after the call, C_ERR if it was freed. */ + * is still valid after the call, C_ERR if it was freed because of some + * error. + * + * This function is called by threads, but always with handler_installed + * set to 0. So when handler_installed is set to 0 the function must be + * thread safe. */ int writeToClient(int fd, client *c, int handler_installed) { ssize_t nwritten = 0, totwritten = 0; size_t objlen; @@ -1153,14 +1166,15 @@ int writeToClient(int fd, client *c, int handler_installed) { zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_SLAVE)) break; } + /* FIXME: Fixme, use atomic var for this. */ server.stat_net_output_bytes += totwritten; if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { - serverLog(LL_VERBOSE, - "Error writing to client: %s", strerror(errno)); - freeClient(c); + // serverLog(LL_VERBOSE, + // "Error writing to client: %s", strerror(errno)); + freeClientAsync(c); return C_ERR; } } @@ -1173,11 +1187,15 @@ int writeToClient(int fd, client *c, int handler_installed) { } if (!clientHasPendingReplies(c)) { c->sentlen = 0; + /* Note that writeToClient() is called in a threaded way, but + * adDeleteFileEvent() is not thread safe: however writeToClient() + * is always called with handler_installed set to 0 from threads + * so we are fine. */ if (handler_installed) aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); /* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { - freeClient(c); + freeClientAsync(c); return C_ERR; } } @@ -2452,3 +2470,131 @@ int processEventsWhileBlocked(void) { } return count; } + +/* ============================================================================= + * Threaded I/O + * =========================================================================== */ + +#define SERVER_MAX_IO_THREADS 32 + +pthread_t io_threads[SERVER_MAX_IO_THREADS]; +pthread_mutex_t io_threads_done_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t io_threads_done_cond = PTHREAD_COND_INITIALIZER; +pthread_mutex_t io_threads_idle_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t io_threads_idle_cond = PTHREAD_COND_INITIALIZER; +pthread_cond_t io_threads_start_cond = PTHREAD_COND_INITIALIZER; +int io_threads_done = 0; /* Number of threads that completed the work. */ +int io_threads_idle = 0; /* Number of threads in idle state ready to go. */ +list *io_threads_list[SERVER_MAX_IO_THREADS]; + +void *IOThreadMain(void *myid) { + /* The ID is the thread number (from 0 to server.iothreads_num-1), and is + * used by the thread to just manipulate a single sub-array of clients. */ + long id = (unsigned long)myid; + + while(1) { + /* ... Wait for start ... */ + pthread_mutex_lock(&io_threads_idle_mutex); + io_threads_idle++; + pthread_cond_signal(&io_threads_idle_cond); + printf("[%ld] Waiting start...\n", id); + pthread_cond_wait(&io_threads_start_cond,&io_threads_idle_mutex); + printf("[%ld] Started\n", id); + pthread_mutex_unlock(&io_threads_idle_mutex); + printf("%d to handle\n", (int)listLength(io_threads_list[id])); + + /* ... Process ... */ + listIter li; + listNode *ln; + listRewind(io_threads_list[id],&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + writeToClient(c->fd,c,0); + } + listEmpty(io_threads_list[id]); + + /* Report success. */ + pthread_mutex_lock(&io_threads_done_mutex); + io_threads_done++; + pthread_cond_signal(&io_threads_done_cond); + pthread_mutex_unlock(&io_threads_done_mutex); + printf("[%ld] Done\n", id); + } +} + +/* Initialize the data structures needed for threaded I/O. */ +void initThreadedIO(void) { + pthread_t tid; + + server.io_threads_num = 4; + for (int i = 0; i < server.io_threads_num; i++) { + if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { + serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); + exit(1); + } + io_threads[i] = tid; + io_threads_list[i] = listCreate(); + } +} + +int handleClientsWithPendingWritesUsingThreads(void) { + int processed = listLength(server.clients_pending_write); + if (processed == 0) return 0; /* Return ASAP if there are no clients. */ + + printf("%d TOTAL\n", processed); + + /* Wait for all threads to be ready. */ + pthread_mutex_lock(&io_threads_idle_mutex); + while(io_threads_idle < server.io_threads_num) { + pthread_cond_wait(&io_threads_idle_cond,&io_threads_idle_mutex); + } + printf("All threads are idle: %d\n", io_threads_idle); + io_threads_idle = 0; + pthread_mutex_unlock(&io_threads_idle_mutex); + + /* Distribute the clients across N different lists. */ + listIter li; + listNode *ln; + listRewind(server.clients_pending_write,&li); + int item_id = 0; + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + c->flags &= ~CLIENT_PENDING_WRITE; + int target_id = item_id % server.io_threads_num; + listAddNodeTail(io_threads_list[target_id],c); + item_id++; + } + + /* Start all threads. */ + printf("Send start condition\n"); + pthread_mutex_lock(&io_threads_done_mutex); + io_threads_done = 0; + pthread_cond_broadcast(&io_threads_start_cond); + pthread_mutex_unlock(&io_threads_done_mutex); + + /* Wait for all threads to end their work. */ + pthread_mutex_lock(&io_threads_done_mutex); + while(io_threads_done < server.io_threads_num) { + pthread_cond_wait(&io_threads_done_cond,&io_threads_done_mutex); + } + pthread_mutex_unlock(&io_threads_done_mutex); + printf("All threads finshed\n"); + + /* Run the list of clients again to install the write handler where + * needed. */ + listRewind(server.clients_pending_write,&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + + /* Install the write handler if there are pending writes in some + * of the clients. */ + if (clientHasPendingReplies(c) && + aeCreateFileEvent(server.el, c->fd, AE_WRITABLE, + sendReplyToClient, c) == AE_ERR) + { + freeClientAsync(c); + } + } + listEmpty(server.clients_pending_write); + return processed; +} diff --git a/src/server.c b/src/server.c index fb5d679cd..c437880d5 100644 --- a/src/server.c +++ b/src/server.c @@ -1981,9 +1981,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { flushAppendOnlyFile(0); } - /* Close clients that need to be closed asynchronous */ - freeClientsInAsyncFreeQueue(); - /* Clear the paused clients flag if needed. */ clientsArePaused(); /* Don't check return value, just use the side effect.*/ @@ -2075,7 +2072,12 @@ void beforeSleep(struct aeEventLoop *eventLoop) { flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ - handleClientsWithPendingWrites(); + /* XXX: Put a condition based on number of waiting clients: if we + * have less than a given number of clients, use non threaded code. */ + handleClientsWithPendingWritesUsingThreads(); + + /* Close clients that need to be closed asynchronous */ + freeClientsInAsyncFreeQueue(); /* Before we are going to sleep, let the threads access the dataset by * releasing the GIL. Redis main thread will not touch anything at this @@ -2861,6 +2863,7 @@ void initServer(void) { slowlogInit(); latencyMonitorInit(); bioInit(); + initThreadedIO(); server.initial_memory_usage = zmalloc_used_memory(); } diff --git a/src/server.h b/src/server.h index dfd9f7698..d2a563c96 100644 --- a/src/server.h +++ b/src/server.h @@ -1062,6 +1062,8 @@ struct redisServer { int protected_mode; /* Don't accept external connections. */ int gopher_enabled; /* If true the server will reply to gopher queries. Will still serve RESP2 queries. */ + int io_threads_num; /* Number of IO threads to use. */ + /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ off_t loading_total_bytes; @@ -1576,12 +1578,14 @@ void pauseClients(mstime_t duration); int clientsArePaused(void); int processEventsWhileBlocked(void); int handleClientsWithPendingWrites(void); +int handleClientsWithPendingWritesUsingThreads(void); int clientHasPendingReplies(client *c); void unlinkClient(client *c); int writeToClient(int fd, client *c, int handler_installed); void linkClient(client *c); void protectClient(client *c); void unprotectClient(client *c); +void initThreadedIO(void); #ifdef __GNUC__ void addReplyErrorFormat(client *c, const char *fmt, ...) From c1cb2ae69532671c1372cf76be5bb25b07f95e24 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 25 Mar 2019 12:16:13 +0100 Subject: [PATCH 25/50] Threaded IO: allow to disable debug printf. --- src/networking.c | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/networking.c b/src/networking.c index 3958e4f5e..5dfaaf8ba 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2471,9 +2471,11 @@ int processEventsWhileBlocked(void) { return count; } -/* ============================================================================= +/* ========================================================================== * Threaded I/O - * =========================================================================== */ + * ========================================================================== */ + +int tio_debug = 0; #define SERVER_MAX_IO_THREADS 32 @@ -2497,11 +2499,11 @@ void *IOThreadMain(void *myid) { pthread_mutex_lock(&io_threads_idle_mutex); io_threads_idle++; pthread_cond_signal(&io_threads_idle_cond); - printf("[%ld] Waiting start...\n", id); + if (tio_debug) printf("[%ld] Waiting start...\n", id); pthread_cond_wait(&io_threads_start_cond,&io_threads_idle_mutex); - printf("[%ld] Started\n", id); + if (tio_debug) printf("[%ld] Started\n", id); pthread_mutex_unlock(&io_threads_idle_mutex); - printf("%d to handle\n", (int)listLength(io_threads_list[id])); + if (tio_debug) printf("%d to handle\n", (int)listLength(io_threads_list[id])); /* ... Process ... */ listIter li; @@ -2518,7 +2520,7 @@ void *IOThreadMain(void *myid) { io_threads_done++; pthread_cond_signal(&io_threads_done_cond); pthread_mutex_unlock(&io_threads_done_mutex); - printf("[%ld] Done\n", id); + if (tio_debug) printf("[%ld] Done\n", id); } } @@ -2541,14 +2543,14 @@ int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ - printf("%d TOTAL\n", processed); + if (tio_debug) printf("%d TOTAL\n", processed); /* Wait for all threads to be ready. */ pthread_mutex_lock(&io_threads_idle_mutex); while(io_threads_idle < server.io_threads_num) { pthread_cond_wait(&io_threads_idle_cond,&io_threads_idle_mutex); } - printf("All threads are idle: %d\n", io_threads_idle); + if (tio_debug) printf("All threads are idle: %d\n", io_threads_idle); io_threads_idle = 0; pthread_mutex_unlock(&io_threads_idle_mutex); @@ -2566,7 +2568,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { } /* Start all threads. */ - printf("Send start condition\n"); + if (tio_debug) printf("Send start condition\n"); pthread_mutex_lock(&io_threads_done_mutex); io_threads_done = 0; pthread_cond_broadcast(&io_threads_start_cond); @@ -2578,7 +2580,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { pthread_cond_wait(&io_threads_done_cond,&io_threads_done_mutex); } pthread_mutex_unlock(&io_threads_done_mutex); - printf("All threads finshed\n"); + if (tio_debug) printf("All threads finshed\n"); /* Run the list of clients again to install the write handler where * needed. */ From df0a28f661380bd0bad90e8d9fa8eccedcb1d57d Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 25 Mar 2019 12:56:48 +0100 Subject: [PATCH 26/50] Threaded IO: second attempt without signaling conditions. --- src/networking.c | 104 +++++++++++++++++++++++++---------------------- src/server.c | 2 - 2 files changed, 55 insertions(+), 51 deletions(-) diff --git a/src/networking.c b/src/networking.c index 5dfaaf8ba..cd241dac2 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2480,13 +2480,9 @@ int tio_debug = 0; #define SERVER_MAX_IO_THREADS 32 pthread_t io_threads[SERVER_MAX_IO_THREADS]; -pthread_mutex_t io_threads_done_mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_cond_t io_threads_done_cond = PTHREAD_COND_INITIALIZER; -pthread_mutex_t io_threads_idle_mutex = PTHREAD_MUTEX_INITIALIZER; -pthread_cond_t io_threads_idle_cond = PTHREAD_COND_INITIALIZER; -pthread_cond_t io_threads_start_cond = PTHREAD_COND_INITIALIZER; -int io_threads_done = 0; /* Number of threads that completed the work. */ -int io_threads_idle = 0; /* Number of threads in idle state ready to go. */ +pthread_mutex_t io_threads_mutex[SERVER_MAX_IO_THREADS]; +_Atomic unsigned long io_threads_pending[SERVER_MAX_IO_THREADS]; +int io_threads_active; list *io_threads_list[SERVER_MAX_IO_THREADS]; void *IOThreadMain(void *myid) { @@ -2496,30 +2492,23 @@ void *IOThreadMain(void *myid) { while(1) { /* ... Wait for start ... */ - pthread_mutex_lock(&io_threads_idle_mutex); - io_threads_idle++; - pthread_cond_signal(&io_threads_idle_cond); - if (tio_debug) printf("[%ld] Waiting start...\n", id); - pthread_cond_wait(&io_threads_start_cond,&io_threads_idle_mutex); - if (tio_debug) printf("[%ld] Started\n", id); - pthread_mutex_unlock(&io_threads_idle_mutex); - if (tio_debug) printf("%d to handle\n", (int)listLength(io_threads_list[id])); + pthread_mutex_lock(&io_threads_mutex[id]); + if (io_threads_pending[id]) { + if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); - /* ... Process ... */ - listIter li; - listNode *ln; - listRewind(io_threads_list[id],&li); - while((ln = listNext(&li))) { - client *c = listNodeValue(ln); - writeToClient(c->fd,c,0); + /* ... Process ... */ + listIter li; + listNode *ln; + listRewind(io_threads_list[id],&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + writeToClient(c->fd,c,0); + io_threads_pending[id]--; + } + listEmpty(io_threads_list[id]); } - listEmpty(io_threads_list[id]); - /* Report success. */ - pthread_mutex_lock(&io_threads_done_mutex); - io_threads_done++; - pthread_cond_signal(&io_threads_done_cond); - pthread_mutex_unlock(&io_threads_done_mutex); + pthread_mutex_unlock(&io_threads_mutex[id]); if (tio_debug) printf("[%ld] Done\n", id); } } @@ -2529,30 +2518,50 @@ void initThreadedIO(void) { pthread_t tid; server.io_threads_num = 4; + io_threads_active = 0; /* We start with threads not active. */ for (int i = 0; i < server.io_threads_num; i++) { + pthread_mutex_init(&io_threads_mutex[i],NULL); + io_threads_pending[i] = 0; + io_threads_list[i] = listCreate(); + pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); exit(1); } io_threads[i] = tid; - io_threads_list[i] = listCreate(); } } +void startThreadedIO(void) { + if (tio_debug) printf("--- STARTING THREADED IO ---\n"); + serverAssert(io_threads_active == 0); + for (int j = 0; j < server.io_threads_num; j++) + pthread_mutex_unlock(&io_threads_mutex[j]); + io_threads_active = 1; +} + +void stopThreadedIO(void) { + if (tio_debug) printf("--- STOPPING THREADED IO ---\n"); + serverAssert(io_threads_active == 1); + for (int j = 0; j < server.io_threads_num; j++) + pthread_mutex_lock(&io_threads_mutex[j]); + io_threads_active = 0; +} + int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ - if (tio_debug) printf("%d TOTAL\n", processed); - - /* Wait for all threads to be ready. */ - pthread_mutex_lock(&io_threads_idle_mutex); - while(io_threads_idle < server.io_threads_num) { - pthread_cond_wait(&io_threads_idle_cond,&io_threads_idle_mutex); + /* If we have just a few clients to serve, don't use I/O threads, but the + * boring synchronous code. */ + if (processed < (server.io_threads_num*2)) { + if (io_threads_active) stopThreadedIO(); + return handleClientsWithPendingWrites(); + } else { + if (!io_threads_active) startThreadedIO(); } - if (tio_debug) printf("All threads are idle: %d\n", io_threads_idle); - io_threads_idle = 0; - pthread_mutex_unlock(&io_threads_idle_mutex); + + if (tio_debug) printf("%d TOTAL pending clients\n", processed); /* Distribute the clients across N different lists. */ listIter li; @@ -2563,23 +2572,20 @@ int handleClientsWithPendingWritesUsingThreads(void) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; int target_id = item_id % server.io_threads_num; + pthread_mutex_lock(&io_threads_mutex[target_id]); listAddNodeTail(io_threads_list[target_id],c); + io_threads_pending[target_id]++; + pthread_mutex_unlock(&io_threads_mutex[target_id]); item_id++; } - /* Start all threads. */ - if (tio_debug) printf("Send start condition\n"); - pthread_mutex_lock(&io_threads_done_mutex); - io_threads_done = 0; - pthread_cond_broadcast(&io_threads_start_cond); - pthread_mutex_unlock(&io_threads_done_mutex); - /* Wait for all threads to end their work. */ - pthread_mutex_lock(&io_threads_done_mutex); - while(io_threads_done < server.io_threads_num) { - pthread_cond_wait(&io_threads_done_cond,&io_threads_done_mutex); + while(1) { + unsigned long pending = 0; + for (int j = 0; j < server.io_threads_num; j++) + pending += io_threads_pending[j]; + if (pending == 0) break; } - pthread_mutex_unlock(&io_threads_done_mutex); if (tio_debug) printf("All threads finshed\n"); /* Run the list of clients again to install the write handler where diff --git a/src/server.c b/src/server.c index c437880d5..de5a814d0 100644 --- a/src/server.c +++ b/src/server.c @@ -2072,8 +2072,6 @@ void beforeSleep(struct aeEventLoop *eventLoop) { flushAppendOnlyFile(0); /* Handle writes with pending output buffers. */ - /* XXX: Put a condition based on number of waiting clients: if we - * have less than a given number of clients, use non threaded code. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */ From 444b5f2af5f448f35a459bf0619b9168d3897aa7 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 25 Mar 2019 16:33:23 +0100 Subject: [PATCH 27/50] Threaded IO: 3rd version: use the mutex only to stop the thread. --- src/networking.c | 52 ++++++++++++++++++++++++++++++------------------ 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/src/networking.c b/src/networking.c index cd241dac2..17d6b1866 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2491,24 +2491,34 @@ void *IOThreadMain(void *myid) { long id = (unsigned long)myid; while(1) { - /* ... Wait for start ... */ - pthread_mutex_lock(&io_threads_mutex[id]); - if (io_threads_pending[id]) { - if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); - - /* ... Process ... */ - listIter li; - listNode *ln; - listRewind(io_threads_list[id],&li); - while((ln = listNext(&li))) { - client *c = listNodeValue(ln); - writeToClient(c->fd,c,0); - io_threads_pending[id]--; - } - listEmpty(io_threads_list[id]); + /* Wait for start */ + for (int j = 0; j < 1000000; j++) { + if (io_threads_pending[id] != 0) break; } - pthread_mutex_unlock(&io_threads_mutex[id]); + /* Give the main thread a chance to stop this thread. */ + if (io_threads_pending[id] == 0) { + pthread_mutex_lock(&io_threads_mutex[id]); + pthread_mutex_unlock(&io_threads_mutex[id]); + continue; + } + + serverAssert(io_threads_pending[id] != 0); + + if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); + + /* Process: note that the main thread will never touch our list + * before we drop the pending count to 0. */ + listIter li; + listNode *ln; + listRewind(io_threads_list[id],&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + writeToClient(c->fd,c,0); + } + listEmpty(io_threads_list[id]); + io_threads_pending[id] = 0; + if (tio_debug) printf("[%ld] Done\n", id); } } @@ -2572,13 +2582,17 @@ int handleClientsWithPendingWritesUsingThreads(void) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE; int target_id = item_id % server.io_threads_num; - pthread_mutex_lock(&io_threads_mutex[target_id]); listAddNodeTail(io_threads_list[target_id],c); - io_threads_pending[target_id]++; - pthread_mutex_unlock(&io_threads_mutex[target_id]); item_id++; } + /* Give the start condition to the waiting threads, by setting the + * start condition atomic var. */ + for (int j = 0; j < server.io_threads_num; j++) { + int count = listLength(io_threads_list[j]); + io_threads_pending[j] = count; + } + /* Wait for all threads to end their work. */ while(1) { unsigned long pending = 0; From 0353d133217ffe8ba91b4fa2bbc37e48f62f2738 Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Mon, 25 Mar 2019 17:05:06 +0000 Subject: [PATCH 28/50] Threaded IO: stop threads when no longer needed + C11 in Makefile. Now threads are stopped even when the connections drop immediately to zero, not allowing the networking code to detect the condition and stop the threads. serverCron() will handle that. --- src/Makefile | 2 +- src/networking.c | 29 ++++++++++++++++++++++++----- src/server.c | 3 +++ src/server.h | 1 + 4 files changed, 29 insertions(+), 6 deletions(-) diff --git a/src/Makefile b/src/Makefile index 93cfdc28e..1c80e547f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua NODEPS:=clean distclean # Default settings -STD=-std=c99 -pedantic -DREDIS_STATIC='' +1TD=-std=c11 -pedantic -DREDIS_STATIC='' ifneq (,$(findstring clang,$(CC))) ifneq (,$(findstring FreeBSD,$(uname_S))) STD+=-Wno-c11-extensions diff --git a/src/networking.c b/src/networking.c index 17d6b1866..d61e1f044 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2527,7 +2527,7 @@ void *IOThreadMain(void *myid) { void initThreadedIO(void) { pthread_t tid; - server.io_threads_num = 4; + server.io_threads_num = 8; io_threads_active = 0; /* We start with threads not active. */ for (int i = 0; i < server.io_threads_num; i++) { pthread_mutex_init(&io_threads_mutex[i],NULL); @@ -2543,6 +2543,7 @@ void initThreadedIO(void) { } void startThreadedIO(void) { + printf("S"); fflush(stdout); if (tio_debug) printf("--- STARTING THREADED IO ---\n"); serverAssert(io_threads_active == 0); for (int j = 0; j < server.io_threads_num; j++) @@ -2551,6 +2552,7 @@ void startThreadedIO(void) { } void stopThreadedIO(void) { + printf("E"); fflush(stdout); if (tio_debug) printf("--- STOPPING THREADED IO ---\n"); serverAssert(io_threads_active == 1); for (int j = 0; j < server.io_threads_num; j++) @@ -2558,19 +2560,36 @@ void stopThreadedIO(void) { io_threads_active = 0; } +/* This function checks if there are not enough pending clients to justify + * taking the I/O threads active: in that case I/O threads are stopped if + * currently active. + * + * The function returns 0 if the I/O threading should be used becuase there + * are enough active threads, otherwise 1 is returned and the I/O threads + * could be possibly stopped (if already active) as a side effect. */ +int stopThreadedIOIfNeeded(void) { + int pending = listLength(server.clients_pending_write); + if (pending < (server.io_threads_num*2)) { + if (io_threads_active) stopThreadedIO(); + return 1; + } else { + return 0; + } +} + int handleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return 0; /* Return ASAP if there are no clients. */ /* If we have just a few clients to serve, don't use I/O threads, but the * boring synchronous code. */ - if (processed < (server.io_threads_num*2)) { - if (io_threads_active) stopThreadedIO(); + if (stopThreadedIOIfNeeded()) { return handleClientsWithPendingWrites(); - } else { - if (!io_threads_active) startThreadedIO(); } + /* Start threads if needed. */ + if (!io_threads_active) startThreadedIO(); + if (tio_debug) printf("%d TOTAL pending clients\n", processed); /* Distribute the clients across N different lists. */ diff --git a/src/server.c b/src/server.c index de5a814d0..325c9010c 100644 --- a/src/server.c +++ b/src/server.c @@ -2001,6 +2001,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { migrateCloseTimedoutSockets(); } + /* Stop the I/O threads if we don't have enough pending work. */ + stopThreadedIOIfNeeded(); + /* Start a scheduled BGSAVE if the corresponding flag is set. This is * useful when we are forced to postpone a BGSAVE because an AOF * rewrite is in progress. diff --git a/src/server.h b/src/server.h index d2a563c96..96ee37887 100644 --- a/src/server.h +++ b/src/server.h @@ -1579,6 +1579,7 @@ int clientsArePaused(void); int processEventsWhileBlocked(void); int handleClientsWithPendingWrites(void); int handleClientsWithPendingWritesUsingThreads(void); +int stopThreadedIOIfNeeded(void); int clientHasPendingReplies(client *c); void unlinkClient(client *c); int writeToClient(int fd, client *c, int handler_installed); From 79626ca3400b7ac2d1f8dd529ecfd8f2d169b73b Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 27 Mar 2019 18:39:13 +0100 Subject: [PATCH 29/50] Threaded IO: use main thread if num of threads is 1. --- src/networking.c | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index d61e1f044..916f29ebc 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2525,11 +2525,16 @@ void *IOThreadMain(void *myid) { /* Initialize the data structures needed for threaded I/O. */ void initThreadedIO(void) { - pthread_t tid; - server.io_threads_num = 8; io_threads_active = 0; /* We start with threads not active. */ + + /* Don't spawn any thread if the user selected a single thread: + * we'll handle I/O directly from the main thread. */ + if (server.io_threads_num == 1) return; + + /* Spawn the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { + pthread_t tid; pthread_mutex_init(&io_threads_mutex[i],NULL); io_threads_pending[i] = 0; io_threads_list[i] = listCreate(); @@ -2569,6 +2574,10 @@ void stopThreadedIO(void) { * could be possibly stopped (if already active) as a side effect. */ int stopThreadedIOIfNeeded(void) { int pending = listLength(server.clients_pending_write); + + /* Return ASAP if IO threads are disabled (single threaded mode). */ + if (server.io_threads_num == 1) return 0; + if (pending < (server.io_threads_num*2)) { if (io_threads_active) stopThreadedIO(); return 1; From dc4d13e7511bfd82f1bc872e55e4fb1d11cc45a0 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 27 Mar 2019 18:58:45 +0100 Subject: [PATCH 30/50] Threaded IO: make num of I/O threads configurable. --- src/config.c | 7 +++++++ src/networking.c | 3 +-- src/server.c | 1 + src/server.h | 1 + 4 files changed, 10 insertions(+), 2 deletions(-) diff --git a/src/config.c b/src/config.c index 1e0525594..c4a18f3bb 100644 --- a/src/config.c +++ b/src/config.c @@ -313,6 +313,11 @@ void loadServerConfigFromString(char *config) { if (server.dbnum < 1) { err = "Invalid number of databases"; goto loaderr; } + } else if (!strcasecmp(argv[0],"io-threads") && argc == 2) { + server.io_threads_num = atoi(argv[1]); + if (server.io_threads_num < 1 || server.io_threads_num > 512) { + err = "Invalid number of I/O threads"; goto loaderr; + } } else if (!strcasecmp(argv[0],"include") && argc == 2) { loadServerConfig(argv[1],NULL); } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) { @@ -1426,6 +1431,7 @@ void configGetCommand(client *c) { config_get_numerical_field("cluster-announce-bus-port",server.cluster_announce_bus_port); config_get_numerical_field("tcp-backlog",server.tcp_backlog); config_get_numerical_field("databases",server.dbnum); + config_get_numerical_field("io-threads",server.io_threads_num); config_get_numerical_field("repl-ping-slave-period",server.repl_ping_slave_period); config_get_numerical_field("repl-ping-replica-period",server.repl_ping_slave_period); config_get_numerical_field("repl-timeout",server.repl_timeout); @@ -2239,6 +2245,7 @@ int rewriteConfig(char *path) { rewriteConfigSaveOption(state); rewriteConfigUserOption(state); rewriteConfigNumericalOption(state,"databases",server.dbnum,CONFIG_DEFAULT_DBNUM); + rewriteConfigNumericalOption(state,"io-threads",server.dbnum,CONFIG_DEFAULT_IO_THREADS_NUM); rewriteConfigYesNoOption(state,"stop-writes-on-bgsave-error",server.stop_writes_on_bgsave_err,CONFIG_DEFAULT_STOP_WRITES_ON_BGSAVE_ERROR); rewriteConfigYesNoOption(state,"rdbcompression",server.rdb_compression,CONFIG_DEFAULT_RDB_COMPRESSION); rewriteConfigYesNoOption(state,"rdbchecksum",server.rdb_checksum,CONFIG_DEFAULT_RDB_CHECKSUM); diff --git a/src/networking.c b/src/networking.c index 916f29ebc..275338a6f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2525,7 +2525,6 @@ void *IOThreadMain(void *myid) { /* Initialize the data structures needed for threaded I/O. */ void initThreadedIO(void) { - server.io_threads_num = 8; io_threads_active = 0; /* We start with threads not active. */ /* Don't spawn any thread if the user selected a single thread: @@ -2576,7 +2575,7 @@ int stopThreadedIOIfNeeded(void) { int pending = listLength(server.clients_pending_write); /* Return ASAP if IO threads are disabled (single threaded mode). */ - if (server.io_threads_num == 1) return 0; + if (server.io_threads_num == 1) return 1; if (pending < (server.io_threads_num*2)) { if (io_threads_active) stopThreadedIO(); diff --git a/src/server.c b/src/server.c index 325c9010c..f6d2b47f3 100644 --- a/src/server.c +++ b/src/server.c @@ -2317,6 +2317,7 @@ void initServerConfig(void) { server.lazyfree_lazy_server_del = CONFIG_DEFAULT_LAZYFREE_LAZY_SERVER_DEL; server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO; server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT; + server.io_threads_num = CONFIG_DEFAULT_IO_THREADS_NUM; unsigned int lruclock = getLRUClock(); atomicSet(server.lruclock,lruclock); diff --git a/src/server.h b/src/server.h index 96ee37887..2e4de2bb1 100644 --- a/src/server.h +++ b/src/server.h @@ -87,6 +87,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_TCP_BACKLOG 511 /* TCP listen backlog. */ #define CONFIG_DEFAULT_CLIENT_TIMEOUT 0 /* Default client timeout: infinite */ #define CONFIG_DEFAULT_DBNUM 16 +#define CONFIG_DEFAULT_IO_THREADS_NUM 1 /* Single threaded by default */ #define CONFIG_MAX_LINE 1024 #define CRON_DBS_PER_CALL 16 #define NET_MAX_WRITES_PER_EVENT (1024*64) From 793134d0e665f7fe04580df0723d5fc4fb49830c Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 27 Mar 2019 18:59:39 +0100 Subject: [PATCH 31/50] Threaded IO: hide more debugging printfs under conditional. --- src/networking.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index 275338a6f..caffd3be7 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2547,7 +2547,7 @@ void initThreadedIO(void) { } void startThreadedIO(void) { - printf("S"); fflush(stdout); + if (tio_debug) printf("S"); fflush(stdout); if (tio_debug) printf("--- STARTING THREADED IO ---\n"); serverAssert(io_threads_active == 0); for (int j = 0; j < server.io_threads_num; j++) @@ -2556,7 +2556,7 @@ void startThreadedIO(void) { } void stopThreadedIO(void) { - printf("E"); fflush(stdout); + if (tio_debug) printf("E"); fflush(stdout); if (tio_debug) printf("--- STOPPING THREADED IO ---\n"); serverAssert(io_threads_active == 1); for (int j = 0; j < server.io_threads_num; j++) From 7bae8afef8f3e6ded0d626f2d565ef4db43a69ae Mon Sep 17 00:00:00 2001 From: antirez Date: Sat, 30 Mar 2019 11:26:58 +0100 Subject: [PATCH 32/50] Threaded IO: read side WIP. --- src/evict.c | 2 +- src/networking.c | 61 ++++++++++++++++++++++++++++++++++++++---------- src/server.c | 30 ++++++++++-------------- src/server.h | 28 +++++++++++----------- 4 files changed, 75 insertions(+), 46 deletions(-) diff --git a/src/evict.c b/src/evict.c index 773916ce8..176f4c362 100644 --- a/src/evict.c +++ b/src/evict.c @@ -78,7 +78,7 @@ unsigned int getLRUClock(void) { unsigned int LRU_CLOCK(void) { unsigned int lruclock; if (1000/server.hz <= LRU_CLOCK_RESOLUTION) { - atomicGet(server.lruclock,lruclock); + lruclock = server.lruclock; } else { lruclock = getLRUClock(); } diff --git a/src/networking.c b/src/networking.c index caffd3be7..fd4e990f4 100644 --- a/src/networking.c +++ b/src/networking.c @@ -35,6 +35,7 @@ #include static void setProtocolError(const char *errstr, client *c); +int postponeClientRead(client *c); /* Return the size consumed from the allocator, for the specified SDS string, * including internal fragmentation. This function is used in order to compute @@ -105,8 +106,7 @@ client *createClient(int fd) { } selectDb(c,0); - uint64_t client_id; - atomicGetIncr(server.next_client_id,client_id,1); + uint64_t client_id = ++server.next_client_id; c->id = client_id; c->resp = 2; c->fd = fd; @@ -950,6 +950,14 @@ void unlinkClient(client *c) { c->flags &= ~CLIENT_PENDING_WRITE; } + /* Remove from the list of pending reads if needed. */ + if (c->flags & CLIENT_PENDING_READ) { + ln = listSearchKey(server.clients_pending_read,c); + serverAssert(ln != NULL); + listDelNode(server.clients_pending_read,ln); + c->flags &= ~CLIENT_PENDING_READ; + } + /* When client was just unblocked because of a blocking operation, * remove it from the list of unblocked clients. */ if (c->flags & CLIENT_UNBLOCKED) { @@ -1642,13 +1650,19 @@ void processInputBuffer(client *c) { } /* This is a wrapper for processInputBuffer that also cares about handling - * the replication forwarding to the sub-slaves, in case the client 'c' + * the replication forwarding to the sub-replicas, in case the client 'c' * is flagged as master. Usually you want to call this instead of the * raw processInputBuffer(). */ void processInputBufferAndReplicate(client *c) { if (!(c->flags & CLIENT_MASTER)) { processInputBuffer(c); } else { + /* If the client is a master we need to compute the difference + * between the applied offset before and after processing the buffer, + * to understand how much of the replication stream was actually + * applied to the master state: this quantity, and its corresponding + * part of the replication stream, will be propagated to the + * sub-replicas and to the replication backlog. */ size_t prev_offset = c->reploff; processInputBuffer(c); size_t applied = c->reploff - prev_offset; @@ -1667,6 +1681,10 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(el); UNUSED(mask); + /* Check if we want to read from the client later when exiting from + * the event loop. This is the case if threaded I/O is enabled. */ + if (postponeClientRead(c)) return; + readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query @@ -1716,20 +1734,21 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); - serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); +// FIXME: This may be called from an I/O thread and it is not safe to +// log from there for now. +// serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClient(c); return; } - /* Time to process the buffer. If the client is a master we need to - * compute the difference between the applied offset before and after - * processing the buffer, to understand how much of the replication stream - * was actually applied to the master state: this quantity, and its - * corresponding part of the replication stream, will be propagated to - * the sub-slaves and to the replication backlog. */ - processInputBufferAndReplicate(c); + /* There is more data in the client input buffer, continue parsing it + * in case to check if there is a full command to execute. + * Don't do it if the client is flagged as CLIENT_PENDING_READ: it means + * we are currently in the context of an I/O thread. */ + if (!(c->flags & CLIENT_PENDING_READ)) + processInputBufferAndReplicate(c); } void getClientsMaxBuffers(unsigned long *longest_output_list, @@ -2566,7 +2585,9 @@ void stopThreadedIO(void) { /* This function checks if there are not enough pending clients to justify * taking the I/O threads active: in that case I/O threads are stopped if - * currently active. + * currently active. We track the pending writes as a measure of clients + * we need to handle in parallel, however the I/O threading is disabled + * globally for reads as well if we have too little pending clients. * * The function returns 0 if the I/O threading should be used becuase there * are enough active threads, otherwise 1 is returned and the I/O threads @@ -2647,3 +2668,19 @@ int handleClientsWithPendingWritesUsingThreads(void) { listEmpty(server.clients_pending_write); return processed; } + +/* Return 1 if we want to handle the client read later using threaded I/O. + * This is called by the readable handler of the event loop. + * As a side effect of calling this function the client is put in the + * pending read clients and flagged as such. */ +int postponeClientRead(client *c) { + if (io_threads_active && + !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) + { + c->flags |= CLIENT_PENDING_READ; + listAddNodeHead(server.clients_pending_read,c); + return 1; + } else { + return 0; + } +} diff --git a/src/server.c b/src/server.c index f6d2b47f3..ef6b85c44 100644 --- a/src/server.c +++ b/src/server.c @@ -1728,16 +1728,17 @@ void databasesCron(void) { * every object access, and accuracy is not needed. To access a global var is * a lot faster than calling time(NULL) */ void updateCachedTime(void) { - time_t unixtime = time(NULL); - atomicSet(server.unixtime,unixtime); + server.unixtime = time(NULL); server.mstime = mstime(); - /* To get information about daylight saving time, we need to call localtime_r - * and cache the result. However calling localtime_r in this context is safe - * since we will never fork() while here, in the main thread. The logging - * function will call a thread safe version of localtime that has no locks. */ + /* To get information about daylight saving time, we need to call + * localtime_r and cache the result. However calling localtime_r in this + * context is safe since we will never fork() while here, in the main + * thread. The logging function will call a thread safe version of + * localtime that has no locks. */ struct tm tm; - localtime_r(&server.unixtime,&tm); + time_t ut = server.unixtime; + localtime_r(&ut,&tm); server.daylight_active = tm.tm_isdst; } @@ -1807,8 +1808,7 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { * * Note that you can change the resolution altering the * LRU_CLOCK_RESOLUTION define. */ - unsigned long lruclock = getLRUClock(); - atomicSet(server.lruclock,lruclock); + server.lruclock = getLRUClock(); /* Record the max memory used since the server was started. */ if (zmalloc_used_memory() > server.stat_peak_memory) @@ -2202,10 +2202,6 @@ void createSharedObjects(void) { void initServerConfig(void) { int j; - pthread_mutex_init(&server.next_client_id_mutex,NULL); - pthread_mutex_init(&server.lruclock_mutex,NULL); - pthread_mutex_init(&server.unixtime_mutex,NULL); - updateCachedTime(); getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE); server.runid[CONFIG_RUN_ID_SIZE] = '\0'; @@ -2319,8 +2315,7 @@ void initServerConfig(void) { server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT; server.io_threads_num = CONFIG_DEFAULT_IO_THREADS_NUM; - unsigned int lruclock = getLRUClock(); - atomicSet(server.lruclock,lruclock); + server.lruclock = getLRUClock(); resetServerSaveParams(); appendServerSaveParams(60*60,1); /* save after 1 hour and 1 change */ @@ -2718,6 +2713,7 @@ void initServer(void) { server.slaves = listCreate(); server.monitors = listCreate(); server.clients_pending_write = listCreate(); + server.clients_pending_read = listCreate(); server.slaveseldb = -1; /* Force to emit the first SELECT command. */ server.unblocked_clients = listCreate(); server.ready_keys = listCreate(); @@ -3821,8 +3817,6 @@ sds genRedisInfoString(char *section) { call_uname = 0; } - unsigned int lruclock; - atomicGet(server.lruclock,lruclock); info = sdscatprintf(info, "# Server\r\n" "redis_version:%s\r\n" @@ -3866,7 +3860,7 @@ sds genRedisInfoString(char *section) { (intmax_t)(uptime/(3600*24)), server.hz, server.config_hz, - (unsigned long) lruclock, + (unsigned long) server.lruclock, server.executable ? server.executable : "", server.configfile ? server.configfile : ""); } diff --git a/src/server.h b/src/server.h index 2e4de2bb1..dcfcb55fb 100644 --- a/src/server.h +++ b/src/server.h @@ -285,6 +285,9 @@ typedef long long mstime_t; /* millisecond time type. */ #define CLIENT_LUA_DEBUG_SYNC (1<<26) /* EVAL debugging without fork() */ #define CLIENT_MODULE (1<<27) /* Non connected client used by some module. */ #define CLIENT_PROTECTED (1<<28) /* Client should not be freed for now. */ +#define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put + in the list of clients we can read + from. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1018,7 +1021,7 @@ struct redisServer { dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ aeEventLoop *el; - unsigned int lruclock; /* Clock for LRU eviction */ + _Atomic unsigned int lruclock; /* Clock for LRU eviction */ int shutdown_asap; /* SHUTDOWN needed ASAP */ int activerehashing; /* Incremental rehash in serverCron() */ int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */ @@ -1052,6 +1055,7 @@ struct redisServer { list *clients; /* List of active clients */ list *clients_to_close; /* Clients to close asynchronously */ list *clients_pending_write; /* There is to write or install handler. */ + list *clients_pending_read; /* Client has pending read socket buffers. */ list *slaves, *monitors; /* List of slaves and MONITORs */ client *current_client; /* Current client, only used on crash report */ rax *clients_index; /* Active clients dictionary by client ID. */ @@ -1059,7 +1063,7 @@ struct redisServer { mstime_t clients_pause_end_time; /* Time when we undo clients_paused */ char neterr[ANET_ERR_LEN]; /* Error buffer for anet.c */ dict *migrate_cached_sockets;/* MIGRATE cached sockets */ - uint64_t next_client_id; /* Next client unique ID. Incremental. */ + _Atomic uint64_t next_client_id; /* Next client unique ID. Incremental. */ int protected_mode; /* Don't accept external connections. */ int gopher_enabled; /* If true the server will reply to gopher queries. Will still serve RESP2 queries. */ @@ -1104,8 +1108,8 @@ struct redisServer { long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */ unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */ struct malloc_stats cron_malloc_stats; /* sampled in serverCron(). */ - long long stat_net_input_bytes; /* Bytes read from network. */ - long long stat_net_output_bytes; /* Bytes written to network. */ + _Atomic long long stat_net_input_bytes; /* Bytes read from network. */ + _Atomic long long stat_net_output_bytes; /* Bytes written to network. */ size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */ size_t stat_aof_cow_bytes; /* Copy on write bytes during AOF rewrite. */ /* The following two are used to track instantaneous metrics, like @@ -1128,7 +1132,7 @@ struct redisServer { int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */ int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */ unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */ - size_t client_max_querybuf_len; /* Limit for client query buffer length */ + _Atomic size_t client_max_querybuf_len; /* Limit for client query buffer length */ int dbnum; /* Total number of configured DBs */ int supervised; /* 1 if supervised, 0 otherwise. */ int supervised_mode; /* See SUPERVISED_* */ @@ -1297,10 +1301,10 @@ struct redisServer { int list_max_ziplist_size; int list_compress_depth; /* time cache */ - time_t unixtime; /* Unix time sampled every cron cycle. */ - time_t timezone; /* Cached timezone. As set by tzset(). */ - int daylight_active; /* Currently in daylight saving time. */ - long long mstime; /* Like 'unixtime' but with milliseconds resolution. */ + _Atomic time_t unixtime; /* Unix time sampled every cron cycle. */ + time_t timezone; /* Cached timezone. As set by tzset(). */ + int daylight_active; /* Currently in daylight saving time. */ + long long mstime; /* 'unixtime' with milliseconds resolution. */ /* Pubsub */ dict *pubsub_channels; /* Map channels to list of subscribed clients */ list *pubsub_patterns; /* A list of pubsub_patterns */ @@ -1360,12 +1364,6 @@ struct redisServer { int watchdog_period; /* Software watchdog period in ms. 0 = off */ /* System hardware info */ size_t system_memory_size; /* Total memory in system as reported by OS */ - - /* Mutexes used to protect atomic variables when atomic builtins are - * not available. */ - pthread_mutex_t lruclock_mutex; - pthread_mutex_t next_client_id_mutex; - pthread_mutex_t unixtime_mutex; }; typedef struct pubsubPattern { From 1c3b585e78cde384959731171c124d682f28c764 Mon Sep 17 00:00:00 2001 From: antirez Date: Sun, 31 Mar 2019 15:58:54 +0200 Subject: [PATCH 33/50] Threaded IO: read side WIP 2. --- src/networking.c | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/src/networking.c b/src/networking.c index fd4e990f4..7d8470489 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2496,13 +2496,16 @@ int processEventsWhileBlocked(void) { int tio_debug = 0; -#define SERVER_MAX_IO_THREADS 32 +#define IO_THREADS_MAX_NUM 128 +#define IO_THREADS_OP_READ 0 +#define IO_THREADS_OP_WRITE 1 -pthread_t io_threads[SERVER_MAX_IO_THREADS]; -pthread_mutex_t io_threads_mutex[SERVER_MAX_IO_THREADS]; -_Atomic unsigned long io_threads_pending[SERVER_MAX_IO_THREADS]; -int io_threads_active; -list *io_threads_list[SERVER_MAX_IO_THREADS]; +pthread_t io_threads[IO_THREADS_MAX_NUM]; +pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; +_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; +int io_threads_active; /* Are the threads currently spinning waiting I/O? */ +int io_threads_op; /* IO_THREADS_OP_WRITE or IO_THREADS_OP_READ. */ +list *io_threads_list[IO_THREADS_MAX_NUM]; void *IOThreadMain(void *myid) { /* The ID is the thread number (from 0 to server.iothreads_num-1), and is @@ -2533,7 +2536,11 @@ void *IOThreadMain(void *myid) { listRewind(io_threads_list[id],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); - writeToClient(c->fd,c,0); + if (io_threads_op == IO_THREADS_OP_WRITE) { + writeToClient(c->fd,c,0); + } else { + readQueryFromClient(NULL,c->fd,c,0); + } } listEmpty(io_threads_list[id]); io_threads_pending[id] = 0; @@ -2550,6 +2557,12 @@ void initThreadedIO(void) { * we'll handle I/O directly from the main thread. */ if (server.io_threads_num == 1) return; + if (server.io_threads_num > IO_THREADS_MAX_NUM) { + serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " + "The maximum number is %d.", IO_THREADS_MAX_NUM); + exit(1); + } + /* Spawn the I/O threads. */ for (int i = 0; i < server.io_threads_num; i++) { pthread_t tid; @@ -2684,3 +2697,6 @@ int postponeClientRead(client *c) { return 0; } } + +int handleClientsWithPendingReadsUsingThreads(void) { +} From b9b29641cfa4af61abe0fc7d6ab6e6b3ce816af2 Mon Sep 17 00:00:00 2001 From: antirez Date: Sun, 31 Mar 2019 21:59:50 +0200 Subject: [PATCH 34/50] Threaded IO: read side WIP 3. --- src/networking.c | 59 +++++++++++++++++++++++++++++++++++++++++++----- src/server.c | 1 + src/server.h | 1 + 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/src/networking.c b/src/networking.c index 7d8470489..3a36badb8 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1711,12 +1711,12 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno)); - freeClient(c); + freeClientAsync(c); return; } } else if (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); - freeClient(c); + freeClientAsync(c); return; } else if (c->flags & CLIENT_MASTER) { /* Append the query buffer to the pending (not applied) buffer @@ -1739,7 +1739,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { // serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); - freeClient(c); + freeClientAsync(c); return; } @@ -2538,8 +2538,10 @@ void *IOThreadMain(void *myid) { client *c = listNodeValue(ln); if (io_threads_op == IO_THREADS_OP_WRITE) { writeToClient(c->fd,c,0); - } else { + } else if (io_threads_op == IO_THREADS_OP_READ) { readQueryFromClient(NULL,c->fd,c,0); + } else { + serverPanic("io_threads_op value is unknown"); } } listEmpty(io_threads_list[id]); @@ -2632,7 +2634,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { /* Start threads if needed. */ if (!io_threads_active) startThreadedIO(); - if (tio_debug) printf("%d TOTAL pending clients\n", processed); + if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed); /* Distribute the clients across N different lists. */ listIter li; @@ -2649,6 +2651,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { /* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ + io_threads_op = IO_THREADS_OP_WRITE; for (int j = 0; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; @@ -2661,7 +2664,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { pending += io_threads_pending[j]; if (pending == 0) break; } - if (tio_debug) printf("All threads finshed\n"); + if (tio_debug) printf("I/O WRITE All threads finshed\n"); /* Run the list of clients again to install the write handler where * needed. */ @@ -2699,4 +2702,48 @@ int postponeClientRead(client *c) { } int handleClientsWithPendingReadsUsingThreads(void) { + if (!io_threads_active) return 0; + int processed = listLength(server.clients_pending_read); + if (processed == 0) return 0; + + if (tio_debug) printf("%d TOTAL READ pending clients\n", processed); + + /* Distribute the clients across N different lists. */ + listIter li; + listNode *ln; + listRewind(server.clients_pending_read,&li); + int item_id = 0; + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + int target_id = item_id % server.io_threads_num; + listAddNodeTail(io_threads_list[target_id],c); + item_id++; + } + + /* Give the start condition to the waiting threads, by setting the + * start condition atomic var. */ + io_threads_op = IO_THREADS_OP_READ; + for (int j = 0; j < server.io_threads_num; j++) { + int count = listLength(io_threads_list[j]); + io_threads_pending[j] = count; + } + + /* Wait for all threads to end their work. */ + while(1) { + unsigned long pending = 0; + for (int j = 0; j < server.io_threads_num; j++) + pending += io_threads_pending[j]; + if (pending == 0) break; + } + if (tio_debug) printf("I/O READ All threads finshed\n"); + + /* Run the list of clients again to process the new buffers. */ + listRewind(server.clients_pending_read,&li); + while((ln = listNext(&li))) { + client *c = listNodeValue(ln); + c->flags &= ~CLIENT_PENDING_READ; + processInputBufferAndReplicate(c); + } + listEmpty(server.clients_pending_read); + return processed; } diff --git a/src/server.c b/src/server.c index ef6b85c44..e0c48b097 100644 --- a/src/server.c +++ b/src/server.c @@ -2092,6 +2092,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); if (moduleCount()) moduleAcquireGIL(); + handleClientsWithPendingReadsUsingThreads(); } /* =========================== Server initialization ======================== */ diff --git a/src/server.h b/src/server.h index dcfcb55fb..0d7882419 100644 --- a/src/server.h +++ b/src/server.h @@ -1578,6 +1578,7 @@ int clientsArePaused(void); int processEventsWhileBlocked(void); int handleClientsWithPendingWrites(void); int handleClientsWithPendingWritesUsingThreads(void); +int handleClientsWithPendingReadsUsingThreads(void); int stopThreadedIOIfNeeded(void); int clientHasPendingReplies(client *c); void unlinkClient(client *c); From 4e10311fe5fefb4e4831459287ebb82851132554 Mon Sep 17 00:00:00 2001 From: antirez Date: Sun, 31 Mar 2019 22:06:00 +0200 Subject: [PATCH 35/50] Threaded IO: process read queue before stopping threads. --- src/networking.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 3a36badb8..29a56e983 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2590,8 +2590,13 @@ void startThreadedIO(void) { } void stopThreadedIO(void) { + /* We may have still clients with pending reads when this function + * is called: handle them before stopping the threads. */ + handleClientsWithPendingReadsUsingThreads(); if (tio_debug) printf("E"); fflush(stdout); - if (tio_debug) printf("--- STOPPING THREADED IO ---\n"); + if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n", + (int) listLength(server.clients_pending_read), + (int) listLength(server.clients_pending_write)); serverAssert(io_threads_active == 1); for (int j = 0; j < server.io_threads_num; j++) pthread_mutex_lock(&io_threads_mutex[j]); From 17b4ac3f8cb12c523676cc1cb6ff7c0ced36b3db Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 8 Apr 2019 13:12:10 +0200 Subject: [PATCH 36/50] Threaded IO: logging should be safe in I/O threads. Potentially it is possible that we get interleaved writes, even if serverLog() makes sure to write into a buffer and then use printf(), so even this should be ok. However in general POSIX guarantees that writing to the same file pointer object from multiple threads is safe. Anyway currently we *reopen* the file at each call, but for the standard output logging. The logging functions actually also access global configuration while performing the log (for instance in order to check the log level, the log filename and so forth), however dunring the I/O threads execution we cannot alter such shared state in any way. --- src/networking.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/networking.c b/src/networking.c index 29a56e983..0e11e1f3f 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1174,14 +1174,13 @@ int writeToClient(int fd, client *c, int handler_installed) { zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_SLAVE)) break; } - /* FIXME: Fixme, use atomic var for this. */ server.stat_net_output_bytes += totwritten; if (nwritten == -1) { if (errno == EAGAIN) { nwritten = 0; } else { - // serverLog(LL_VERBOSE, - // "Error writing to client: %s", strerror(errno)); + serverLog(LL_VERBOSE, + "Error writing to client: %s", strerror(errno)); freeClientAsync(c); return C_ERR; } From 67d79b9b10aa6074524d1f99cc549f18f829536c Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 12 Apr 2019 17:18:10 +0200 Subject: [PATCH 37/50] Threaded IO: parsing WIP 1: set current_client in a better scoped way. --- src/networking.c | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/networking.c b/src/networking.c index 0e11e1f3f..3faaf4a12 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1563,7 +1563,7 @@ int processMultibulkBuffer(client *c) { * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c) { - server.current_client = c; + int deadclient = 0; /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { @@ -1619,6 +1619,7 @@ void processInputBuffer(client *c) { resetClient(c); } else { /* Only reset the client when the command was executed. */ + server.current_client = c; if (processCommand(c) == C_OK) { if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ @@ -1629,23 +1630,26 @@ void processInputBuffer(client *c) { * module blocking command, so that the reply callback will * still be able to access the client argv and argc field. * The client will be reset in unblockClientFromModule(). */ - if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) + if (!(c->flags & CLIENT_BLOCKED) || + c->btype != BLOCKED_MODULE) + { resetClient(c); + } } + if (server.current_client == NULL) deadclient = 1; + server.current_client = NULL; /* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */ - if (server.current_client == NULL) break; + if (deadclient) break; } } /* Trim to pos */ - if (server.current_client != NULL && c->qb_pos) { + if (!deadclient && c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } - - server.current_client = NULL; } /* This is a wrapper for processInputBuffer that also cares about handling @@ -1743,11 +1747,8 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { } /* There is more data in the client input buffer, continue parsing it - * in case to check if there is a full command to execute. - * Don't do it if the client is flagged as CLIENT_PENDING_READ: it means - * we are currently in the context of an I/O thread. */ - if (!(c->flags & CLIENT_PENDING_READ)) - processInputBufferAndReplicate(c); + * in case to check if there is a full command to execute. */ + processInputBufferAndReplicate(c); } void getClientsMaxBuffers(unsigned long *longest_output_list, From 1347c13551f34b05f6fb6e03595047c8d16b8f96 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 26 Apr 2019 19:29:50 +0200 Subject: [PATCH 38/50] Threaded IO: parsing WIP 2: refactoring to parse from thread. --- src/networking.c | 89 ++++++++++++++++++++++++++++++++---------------- src/server.h | 1 + 2 files changed, 61 insertions(+), 29 deletions(-) diff --git a/src/networking.c b/src/networking.c index 3faaf4a12..4361ab1af 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1558,13 +1558,47 @@ int processMultibulkBuffer(client *c) { return C_ERR; } +/* This function calls processCommand(), but also performs a few sub tasks + * that are useful in that context: + * + * 1. It sets the current client to the client 'c'. + * 2. In the case of master clients, the replication offset is updated. + * 3. The client is reset unless there are reasons to avoid doing it. + * + * The function returns C_ERR in case the client was freed as a side effect + * of processing the command, otherwise C_OK is returned. */ +int processCommandAndResetClient(client *c) { + int deadclient = 0; + server.current_client = c; + if (processCommand(c) == C_OK) { + if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { + /* Update the applied replication offset of our master. */ + c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; + } + + /* Don't reset the client structure for clients blocked in a + * module blocking command, so that the reply callback will + * still be able to access the client argv and argc field. + * The client will be reset in unblockClientFromModule(). */ + if (!(c->flags & CLIENT_BLOCKED) || + c->btype != BLOCKED_MODULE) + { + resetClient(c); + } + } + if (server.current_client == NULL) deadclient = 1; + server.current_client = NULL; + /* freeMemoryIfNeeded may flush slave output buffers. This may + * result into a slave, that may be the active client, to be + * freed. */ + return deadclient ? C_ERR : C_OK; +} + /* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ void processInputBuffer(client *c) { - int deadclient = 0; - /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { /* Return if clients are paused. */ @@ -1573,6 +1607,10 @@ void processInputBuffer(client *c) { /* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break; + /* Don't process more buffers from clients that have already pending + * commands to execute in c->argv. */ + if (c->flags & CLIENT_PENDING_COMMAND) break; + /* Don't process input from the master while there is a busy script * condition on the slave. We want just to accumulate the replication * stream (instead of replying -BUSY like we do with other clients) and @@ -1618,35 +1656,26 @@ void processInputBuffer(client *c) { if (c->argc == 0) { resetClient(c); } else { - /* Only reset the client when the command was executed. */ - server.current_client = c; - if (processCommand(c) == C_OK) { - if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { - /* Update the applied replication offset of our master. */ - c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; - } - - /* Don't reset the client structure for clients blocked in a - * module blocking command, so that the reply callback will - * still be able to access the client argv and argc field. - * The client will be reset in unblockClientFromModule(). */ - if (!(c->flags & CLIENT_BLOCKED) || - c->btype != BLOCKED_MODULE) - { - resetClient(c); - } + /* If we are in the context of an I/O thread, we can't really + * execute the command here. All we can do is to flag the client + * as one that needs to process the command. */ + if (c->flags & CLIENT_PENDING_READ) { + c->flags |= CLIENT_PENDING_COMMAND; + break; + } + + /* We are finally ready to execute the command. */ + if (processCommandAndResetClient(c) == C_ERR) { + /* If the client is no longer valid, we avoid exiting this + * loop and trimming the client buffer later. So we return + * ASAP in that case. */ + return; } - if (server.current_client == NULL) deadclient = 1; - server.current_client = NULL; - /* freeMemoryIfNeeded may flush slave output buffers. This may - * result into a slave, that may be the active client, to be - * freed. */ - if (deadclient) break; } } /* Trim to pos */ - if (!deadclient && c->qb_pos) { + if (c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } @@ -1737,9 +1766,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty(); bytes = sdscatrepr(bytes,c->querybuf,64); -// FIXME: This may be called from an I/O thread and it is not safe to -// log from there for now. -// serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); + serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes); sdsfree(ci); sdsfree(bytes); freeClientAsync(c); @@ -2747,6 +2774,10 @@ int handleClientsWithPendingReadsUsingThreads(void) { while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_READ; + if (c->flags & CLIENT_PENDING_COMMAND) { + c->flags &= ~ CLIENT_PENDING_COMMAND; + processCommandAndResetClient(c); + } processInputBufferAndReplicate(c); } listEmpty(server.clients_pending_read); diff --git a/src/server.h b/src/server.h index 0d7882419..c088d356a 100644 --- a/src/server.h +++ b/src/server.h @@ -288,6 +288,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CLIENT_PENDING_READ (1<<29) /* The client has pending reads and was put in the list of clients we can read from. */ +#define CLIENT_PENDING_COMMAND (1<<30) /* */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ From 6e29b364a869eb30ec9644d7d7d99acd3026a1de Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 29 Apr 2019 12:46:23 +0200 Subject: [PATCH 39/50] Threaded IO: put fflush() inside tio_debug conditional. --- src/networking.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/networking.c b/src/networking.c index 4361ab1af..74bd0f13d 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2608,7 +2608,7 @@ void initThreadedIO(void) { } void startThreadedIO(void) { - if (tio_debug) printf("S"); fflush(stdout); + if (tio_debug) { printf("S"); fflush(stdout); } if (tio_debug) printf("--- STARTING THREADED IO ---\n"); serverAssert(io_threads_active == 0); for (int j = 0; j < server.io_threads_num; j++) @@ -2620,7 +2620,7 @@ void stopThreadedIO(void) { /* We may have still clients with pending reads when this function * is called: handle them before stopping the threads. */ handleClientsWithPendingReadsUsingThreads(); - if (tio_debug) printf("E"); fflush(stdout); + if (tio_debug) { printf("E"); fflush(stdout); } if (tio_debug) printf("--- STOPPING THREADED IO [R%d] [W%d] ---\n", (int) listLength(server.clients_pending_read), (int) listLength(server.clients_pending_write)); From 50b09bc408eea72e601f5723a331da20ac705302 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 30 Apr 2019 15:39:27 +0200 Subject: [PATCH 40/50] Threaded IO: ability to disable reads from threaded path. --- src/networking.c | 3 ++- src/server.c | 1 + src/server.h | 2 ++ 3 files changed, 5 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 74bd0f13d..651dbdb8a 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2723,6 +2723,7 @@ int handleClientsWithPendingWritesUsingThreads(void) { * pending read clients and flagged as such. */ int postponeClientRead(client *c) { if (io_threads_active && + server.io_threads_do_reads && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) { c->flags |= CLIENT_PENDING_READ; @@ -2734,7 +2735,7 @@ int postponeClientRead(client *c) { } int handleClientsWithPendingReadsUsingThreads(void) { - if (!io_threads_active) return 0; + if (!io_threads_active || !server.io_threads_do_reads) return 0; int processed = listLength(server.clients_pending_read); if (processed == 0) return 0; diff --git a/src/server.c b/src/server.c index e0c48b097..2643d7266 100644 --- a/src/server.c +++ b/src/server.c @@ -2315,6 +2315,7 @@ void initServerConfig(void) { server.always_show_logo = CONFIG_DEFAULT_ALWAYS_SHOW_LOGO; server.lua_time_limit = LUA_SCRIPT_TIME_LIMIT; server.io_threads_num = CONFIG_DEFAULT_IO_THREADS_NUM; + server.io_threads_do_reads = CONFIG_DEFAULT_IO_THREADS_DO_READS; server.lruclock = getLRUClock(); resetServerSaveParams(); diff --git a/src/server.h b/src/server.h index c088d356a..3987ab5fc 100644 --- a/src/server.h +++ b/src/server.h @@ -88,6 +88,7 @@ typedef long long mstime_t; /* millisecond time type. */ #define CONFIG_DEFAULT_CLIENT_TIMEOUT 0 /* Default client timeout: infinite */ #define CONFIG_DEFAULT_DBNUM 16 #define CONFIG_DEFAULT_IO_THREADS_NUM 1 /* Single threaded by default */ +#define CONFIG_DEFAULT_IO_THREADS_DO_READS 0 /* Read + parse from threads? */ #define CONFIG_MAX_LINE 1024 #define CRON_DBS_PER_CALL 16 #define NET_MAX_WRITES_PER_EVENT (1024*64) @@ -1069,6 +1070,7 @@ struct redisServer { int gopher_enabled; /* If true the server will reply to gopher queries. Will still serve RESP2 queries. */ int io_threads_num; /* Number of IO threads to use. */ + int io_threads_do_reads; /* Read and parse from IO threads? */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ From a83dcf766ccd855c6bfb1f11d19f317a5440c978 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 30 Apr 2019 15:55:02 +0200 Subject: [PATCH 41/50] Threaded IO: configuration directive for turning on/off reads. --- src/config.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/config.c b/src/config.c index c4a18f3bb..1686743a0 100644 --- a/src/config.c +++ b/src/config.c @@ -318,6 +318,10 @@ void loadServerConfigFromString(char *config) { if (server.io_threads_num < 1 || server.io_threads_num > 512) { err = "Invalid number of I/O threads"; goto loaderr; } + } else if (!strcasecmp(argv[0],"io-threads-do-reads") && argc == 2) { + if ((server.io_threads_do_reads = yesnotoi(argv[1])) == -1) { + err = "argument must be 'yes' or 'no'"; goto loaderr; + } } else if (!strcasecmp(argv[0],"include") && argc == 2) { loadServerConfig(argv[1],NULL); } else if (!strcasecmp(argv[0],"maxclients") && argc == 2) { @@ -1485,6 +1489,7 @@ void configGetCommand(client *c) { config_get_bool_field("activedefrag", server.active_defrag_enabled); config_get_bool_field("protected-mode", server.protected_mode); config_get_bool_field("gopher-enabled", server.gopher_enabled); + config_get_bool_field("io-threads-do-reads", server.io_threads_do_reads); config_get_bool_field("repl-disable-tcp-nodelay", server.repl_disable_tcp_nodelay); config_get_bool_field("repl-diskless-sync", @@ -2316,6 +2321,7 @@ int rewriteConfig(char *path) { rewriteConfigYesNoOption(state,"activedefrag",server.active_defrag_enabled,CONFIG_DEFAULT_ACTIVE_DEFRAG); rewriteConfigYesNoOption(state,"protected-mode",server.protected_mode,CONFIG_DEFAULT_PROTECTED_MODE); rewriteConfigYesNoOption(state,"gopher-enabled",server.gopher_enabled,CONFIG_DEFAULT_GOPHER_ENABLED); + rewriteConfigYesNoOption(state,"io-threads-do-reads",server.io_threads_do_reads,CONFIG_DEFAULT_IO_THREADS_DO_READS); rewriteConfigClientoutputbufferlimitOption(state); rewriteConfigNumericalOption(state,"hz",server.config_hz,CONFIG_DEFAULT_HZ); rewriteConfigYesNoOption(state,"aof-rewrite-incremental-fsync",server.aof_rewrite_incremental_fsync,CONFIG_DEFAULT_AOF_REWRITE_INCREMENTAL_FSYNC); From cd4845c478a792bd0b567fbba4b49cd45e1f776c Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 30 Apr 2019 15:59:23 +0200 Subject: [PATCH 42/50] Threaded IO: handleClientsWithPendingReadsUsingThreads top comment. --- src/networking.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/networking.c b/src/networking.c index 651dbdb8a..6fec97605 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2734,6 +2734,12 @@ int postponeClientRead(client *c) { } } +/* When threaded I/O is also enabled for the reading + parsing side, the + * readable handler will just put normal clients into a queue of clients to + * process (instead of serving them synchronously). This function runs + * the queue using the I/O threads, and process them in order to accumulate + * the reads in the buffers, and also parse the first command available + * rendering it in the client structures. */ int handleClientsWithPendingReadsUsingThreads(void) { if (!io_threads_active || !server.io_threads_do_reads) return 0; int processed = listLength(server.clients_pending_read); From d6da0c14153f54f36bc0dae989a5343f4c633b65 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 7 May 2019 13:35:27 +0800 Subject: [PATCH 43/50] Makefile: 1TD -> STD --- src/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index 1c80e547f..f35685eff 100644 --- a/src/Makefile +++ b/src/Makefile @@ -20,7 +20,7 @@ DEPENDENCY_TARGETS=hiredis linenoise lua NODEPS:=clean distclean # Default settings -1TD=-std=c11 -pedantic -DREDIS_STATIC='' +STD=-std=c11 -pedantic -DREDIS_STATIC='' ifneq (,$(findstring clang,$(CC))) ifneq (,$(findstring FreeBSD,$(uname_S))) STD+=-Wno-c11-extensions From 0ba31aface761e8de215d32fa48238b6f870d23a Mon Sep 17 00:00:00 2001 From: stan011 Date: Tue, 7 May 2019 14:22:40 +0800 Subject: [PATCH 44/50] change the comments there may have a mis type --- src/t_list.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/t_list.c b/src/t_list.c index 45d2e3317..54e4959b9 100644 --- a/src/t_list.c +++ b/src/t_list.c @@ -617,7 +617,7 @@ void rpoplpushCommand(client *c) { * the AOF and replication channel. * * The argument 'where' is LIST_TAIL or LIST_HEAD, and indicates if the - * 'value' element was popped fron the head (BLPOP) or tail (BRPOP) so that + * 'value' element was popped from the head (BLPOP) or tail (BRPOP) so that * we can propagate the command properly. * * The function returns C_OK if we are able to serve the client, otherwise From 4c83175fa4dd4bfab8de4341cf32c9b3dcd6a8eb Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Tue, 7 May 2019 15:59:16 +0800 Subject: [PATCH 45/50] fix memory leak when rewrite config file --- src/config.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/config.c b/src/config.c index 1686743a0..7f0e9af89 100644 --- a/src/config.c +++ b/src/config.c @@ -1711,12 +1711,11 @@ void rewriteConfigMarkAsProcessed(struct rewriteConfigState *state, const char * * If the old file does not exist at all, an empty state is returned. */ struct rewriteConfigState *rewriteConfigReadOldFile(char *path) { FILE *fp = fopen(path,"r"); - struct rewriteConfigState *state = zmalloc(sizeof(*state)); - char buf[CONFIG_MAX_LINE+1]; - int linenum = -1; - if (fp == NULL && errno != ENOENT) return NULL; + char buf[CONFIG_MAX_LINE+1]; + int linenum = -1; + struct rewriteConfigState *state = zmalloc(sizeof(*state)); state->option_to_line = dictCreate(&optionToLineDictType,NULL); state->rewritten = dictCreate(&optionSetDictType,NULL); state->numlines = 0; From 871308297337d29f5922fe7baa244385e1b60105 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=94=90=E6=9D=83?= Date: Wed, 8 May 2019 12:53:56 +0800 Subject: [PATCH 46/50] Update ziplist.c Hi, @antirez In the code, to get the size of ziplist, "unsigned int bytes = ZIPLIST_HEADER_SIZE+1;" is correct, but why not make it more readable and easy to understand --- src/ziplist.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ziplist.c b/src/ziplist.c index 1579d1109..ef40d6aa2 100644 --- a/src/ziplist.c +++ b/src/ziplist.c @@ -576,7 +576,7 @@ void zipEntry(unsigned char *p, zlentry *e) { /* Create a new empty ziplist. */ unsigned char *ziplistNew(void) { - unsigned int bytes = ZIPLIST_HEADER_SIZE+1; + unsigned int bytes = ZIPLIST_HEADER_SIZE+ZIPLIST_END_SIZE; unsigned char *zl = zmalloc(bytes); ZIPLIST_BYTES(zl) = intrev32ifbe(bytes); ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE); From 58fb679d82d3014b1cc36ccf1917a8ac10054e96 Mon Sep 17 00:00:00 2001 From: yongman Date: Wed, 8 May 2019 16:13:42 +0800 Subject: [PATCH 47/50] Fix uint64_t hash value in active defrag --- src/defrag.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/defrag.c b/src/defrag.c index d67b6e253..ecf0255dc 100644 --- a/src/defrag.c +++ b/src/defrag.c @@ -47,7 +47,7 @@ int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); -dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged); +dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged); /* Defrag helper for generic allocations. * @@ -355,7 +355,7 @@ long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { sdsele = ln->value; if ((newsds = activeDefragSds(sdsele))) { /* When defragging an sds value, we need to update the dict key */ - unsigned int hash = dictGetHash(d, sdsele); + uint64_t hash = dictGetHash(d, sdsele); replaceSateliteDictKeyPtrAndOrDefragDictEntry(d, sdsele, newsds, hash, &defragged); ln->value = newsds; defragged++; @@ -392,7 +392,7 @@ long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) { * moved. Return value is the the dictEntry if found, or NULL if not found. * NOTE: this is very ugly code, but it let's us avoid the complication of * doing a scan on another dict. */ -dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged) { +dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, uint64_t hash, long *defragged) { dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash); if (deref) { dictEntry *de = *deref; From 285bcac99249eb54e0d71a9780a28c98772f3562 Mon Sep 17 00:00:00 2001 From: Angus Pearson Date: Wed, 8 May 2019 11:36:31 +0100 Subject: [PATCH 48/50] Add include to deps/hiredis/read.c to fix Implicit Declaration of strcasecmp warning --- deps/hiredis/read.c | 1 + 1 file changed, 1 insertion(+) diff --git a/deps/hiredis/read.c b/deps/hiredis/read.c index c75c3435f..cc0f3cc72 100644 --- a/deps/hiredis/read.c +++ b/deps/hiredis/read.c @@ -31,6 +31,7 @@ #include "fmacros.h" #include +#include #include #ifndef _MSC_VER #include From 8752868c1d60f4c2f205e03773990726c60ccfd9 Mon Sep 17 00:00:00 2001 From: Angus Pearson Date: Wed, 8 May 2019 12:13:45 +0100 Subject: [PATCH 49/50] Enlarge error buffer in redis-check-aof.c to remove compiler warning of output truncation through snprintf format string --- src/redis-check-aof.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-check-aof.c b/src/redis-check-aof.c index 54ed85f0d..eedb09db5 100644 --- a/src/redis-check-aof.c +++ b/src/redis-check-aof.c @@ -37,7 +37,7 @@ snprintf(error, sizeof(error), "0x%16llx: %s", (long long)epos, __buf); \ } -static char error[1024]; +static char error[1044]; static off_t epos; int consumeNewline(char *buf) { From 1e45354eb6bfc075a98e894c9d601eea82a9eeb4 Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" Date: Fri, 10 May 2019 16:27:25 +0800 Subject: [PATCH 50/50] test cases: skiptill -> skip-till --- tests/test_helper.tcl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 568eacdee..1442067f5 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -503,7 +503,7 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } elseif {$opt eq {--only}} { lappend ::only_tests $arg incr j - } elseif {$opt eq {--skiptill}} { + } elseif {$opt eq {--skip-till}} { set ::skip_till $arg incr j } elseif {$opt eq {--list-tests}} {