From 6726b3c2cb41e700c8cfd2f821df6c8c847a5ddc Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Fri, 24 Apr 2020 17:20:28 +0300 Subject: [PATCH 1/5] optimize memory usage of deferred replies - fixed When deffered reply is added the previous reply node cannot be used so all the extra space we allocated in it is wasted. in case someone uses deffered replies in a loop, each time adding a small reply, each of these reply nodes (the small string reply) would have consumed a 16k block. now when we add anther diferred reply node, we trim the unused portion of the previous reply block. see #7123 cherry picked from commit fb732f7a944a4d4c90bb7375cb6030e88211f5aa with fix to handle a crash with LIBC allocator, which apparently can return the same pointer despite changing it's size. i.e. shrinking an allocation of 16k into 56 bytes without changing the pointer. --- src/networking.c | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/networking.c b/src/networking.c index c4a277e0a..d62533e3e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -436,6 +436,34 @@ void addReplyStatusFormat(client *c, const char *fmt, ...) { sdsfree(s); } +/* Sometimes we are forced to create a new reply node, and we can't append to + * the previous one, when that happens, we wanna try to trim the unused space + * at the end of the last reply node which we won't use anymore. */ +void trimReplyUnusedTailSpace(client *c) { + listNode *ln = listLast(c->reply); + clientReplyBlock *tail = ln? listNodeValue(ln): NULL; + + /* Note that 'tail' may be NULL even if we have a tail node, becuase when + * addDeferredMultiBulkLength() is used */ + if (!tail) return; + + /* We only try to trim the space is relatively high (more than a 1/4 of the + * allocation), otherwise there's a high chance realloc will NOP. + * Also, to avoid large memmove which happens as part of realloc, we only do + * that if the used part is small. */ + if (tail->size - tail->used > tail->size / 4 && + tail->used < PROTO_REPLY_CHUNK_BYTES) + { + size_t old_size = tail->size; + tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock)); + /* take over the allocation's internal fragmentation (at least for + * memory usage tracking) */ + tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); + c->reply_bytes += tail->size - old_size; + listNodeValue(ln) = tail; + } +} + /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ void *addReplyDeferredLen(client *c) { @@ -443,6 +471,7 @@ void *addReplyDeferredLen(client *c) { * ready to be sent, since we are sure that before returning to the * event loop setDeferredAggregateLen() will be called. */ if (prepareClientToWrite(c) != C_OK) return NULL; + trimReplyUnusedTailSpace(c); listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ return listLast(c->reply); } From 1a0deab2a548fa306171f03439e858c00836fe69 Mon Sep 17 00:00:00 2001 From: zhenwei pi Date: Sat, 2 May 2020 20:05:39 +0800 Subject: [PATCH 2/5] Support setcpuaffinity on linux/bsd Currently, there are several types of threads/child processes of a redis server. Sometimes we need deeply optimise the performance of redis, so we would like to isolate threads/processes. There were some discussion about cpu affinity cases in the issue: https://github.com/antirez/redis/issues/2863 So implement cpu affinity setting by redis.conf in this patch, then we can config server_cpulist/bio_cpulist/aof_rewrite_cpulist/ bgsave_cpulist by cpu list. Examples of cpulist in redis.conf: server_cpulist 0-7:2 means cpu affinity 0,2,4,6 bio_cpulist 1,3 means cpu affinity 1,3 aof_rewrite_cpulist 8-11 means cpu affinity 8,9,10,11 bgsave_cpulist 1,10-11 means cpu affinity 1,10,11 Test on linux/freebsd, both work fine. Signed-off-by: zhenwei pi --- redis.conf | 15 ++++ src/Makefile | 2 +- src/aof.c | 1 + src/bio.c | 2 + src/config.c | 4 ++ src/config.h | 6 ++ src/networking.c | 1 + src/rdb.c | 2 + src/server.c | 9 +++ src/server.h | 6 ++ src/setcpuaffinity.c | 129 +++++++++++++++++++++++++++++++++++ tests/unit/introspection.tcl | 4 ++ 12 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 src/setcpuaffinity.c diff --git a/redis.conf b/redis.conf index d96d26e1c..756073a27 100644 --- a/redis.conf +++ b/redis.conf @@ -1780,3 +1780,18 @@ rdb-save-incremental-fsync yes # Maximum number of set/hash/zset/list fields that will be processed from # the main dictionary scan # active-defrag-max-scan-fields 1000 + +# Redis server/IO threads, bio threads, aof rewrite child process, and bgsave +# child process cpu affinity list config. syntax of cpu list looks like taskset +# command. serveral examples: +# set redis server/io threads to cpu affinity 0,2,4,6 +# server_cpulist 0-7:2 +# +# set bio threads to cpu affinity 1,3 +# bio_cpulist 1,3 +# +# set aof rewrite child process to cpu affinity 8,9,10,11 +# aof_rewrite_cpulist 8-11 +# +# set bgsave child process to cpu affinity 1,10,11 +# bgsave_cpulist 1,10-11 diff --git a/src/Makefile b/src/Makefile index f9922afce..55f862cfc 100644 --- a/src/Makefile +++ b/src/Makefile @@ -206,7 +206,7 @@ endif REDIS_SERVER_NAME=redis-server REDIS_SENTINEL_NAME=redis-sentinel -REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o +REDIS_SERVER_OBJ=adlist.o quicklist.o ae.o anet.o dict.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o crc16.o endianconv.o slowlog.o scripting.o bio.o rio.o rand.o memtest.o crcspeed.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o redis-check-rdb.o redis-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o gopher.o tracking.o connection.o tls.o sha256.o timeout.o setcpuaffinity.o REDIS_CLI_NAME=redis-cli REDIS_CLI_OBJ=anet.o adlist.o dict.o redis-cli.o zmalloc.o release.o ae.o crcspeed.o crc64.o siphash.o crc16.o REDIS_BENCHMARK_NAME=redis-benchmark diff --git a/src/aof.c b/src/aof.c index 301a40848..02409abe6 100644 --- a/src/aof.c +++ b/src/aof.c @@ -1596,6 +1596,7 @@ int rewriteAppendOnlyFileBackground(void) { /* Child */ redisSetProcTitle("redis-aof-rewrite"); + redisSetCpuAffinity(server.aof_rewrite_cpulist); snprintf(tmpfile,256,"temp-rewriteaof-bg-%d.aof", (int) getpid()); if (rewriteAppendOnlyFile(tmpfile) == C_OK) { sendChildCOWInfo(CHILD_INFO_TYPE_AOF, "AOF rewrite"); diff --git a/src/bio.c b/src/bio.c index 85f681185..69c62fc6f 100644 --- a/src/bio.c +++ b/src/bio.c @@ -166,6 +166,8 @@ void *bioProcessBackgroundJobs(void *arg) { break; } + redisSetCpuAffinity(server.bio_cpulist); + /* Make the thread killable at any time, so that bioKillThreads() * can work reliably. */ pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); diff --git a/src/config.c b/src/config.c index e0cbcc281..64854592c 100644 --- a/src/config.c +++ b/src/config.c @@ -2133,6 +2133,10 @@ standardConfig configs[] = { createStringConfig("syslog-ident", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.syslog_ident, "redis", NULL, NULL), createStringConfig("dbfilename", NULL, MODIFIABLE_CONFIG, ALLOW_EMPTY_STRING, server.rdb_filename, "dump.rdb", isValidDBfilename, NULL), createStringConfig("appendfilename", NULL, IMMUTABLE_CONFIG, ALLOW_EMPTY_STRING, server.aof_filename, "appendonly.aof", isValidAOFfilename, NULL), + createStringConfig("server_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.server_cpulist, NULL, NULL, NULL), + createStringConfig("bio_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bio_cpulist, NULL, NULL, NULL), + createStringConfig("aof_rewrite_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.aof_rewrite_cpulist, NULL, NULL, NULL), + createStringConfig("bgsave_cpulist", NULL, IMMUTABLE_CONFIG, EMPTY_STRING_IS_NULL, server.bgsave_cpulist, NULL, NULL, NULL), /* Enum Configs */ createEnumConfig("supervised", NULL, IMMUTABLE_CONFIG, supervised_mode_enum, server.supervised_mode, SUPERVISED_NONE, NULL, NULL), diff --git a/src/config.h b/src/config.h index 40dc683ce..a322ce354 100644 --- a/src/config.h +++ b/src/config.h @@ -244,4 +244,10 @@ int pthread_setname_np(const char *name); #endif #endif +/* Check if we can use setcpuaffinity(). */ +#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__) +#define USE_SETCPUAFFINITY +void setcpuaffinity(const char *cpulist); +#endif + #endif diff --git a/src/networking.c b/src/networking.c index c4a277e0a..9d10c4bb4 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2843,6 +2843,7 @@ void *IOThreadMain(void *myid) { snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); redis_set_thread_title(thdname); + redisSetCpuAffinity(server.server_cpulist); while(1) { /* Wait for start */ diff --git a/src/rdb.c b/src/rdb.c index 9f6bf13f1..e2a2fb39f 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1351,6 +1351,7 @@ int rdbSaveBackground(char *filename, rdbSaveInfo *rsi) { /* Child */ redisSetProcTitle("redis-rdb-bgsave"); + redisSetCpuAffinity(server.bgsave_cpulist); retval = rdbSave(filename,rsi); if (retval == C_OK) { sendChildCOWInfo(CHILD_INFO_TYPE_RDB, "RDB"); @@ -2458,6 +2459,7 @@ int rdbSaveToSlavesSockets(rdbSaveInfo *rsi) { rioInitWithFd(&rdb,server.rdb_pipe_write); redisSetProcTitle("redis-rdb-to-slaves"); + redisSetCpuAffinity(server.bgsave_cpulist); retval = rdbSaveRioWithEOFMark(&rdb,NULL,rsi); if (retval == C_OK && rioFlush(&rdb) == 0) diff --git a/src/server.c b/src/server.c index 659604ef3..1baa044be 100644 --- a/src/server.c +++ b/src/server.c @@ -4850,6 +4850,14 @@ void redisSetProcTitle(char *title) { #endif } +void redisSetCpuAffinity(const char *cpulist) { +#ifdef USE_SETCPUAFFINITY + setcpuaffinity(cpulist); +#else + UNUSED(cpulist); +#endif +} + /* * Check whether systemd or upstart have been used to start redis. */ @@ -5118,6 +5126,7 @@ int main(int argc, char **argv) { serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); } + redisSetCpuAffinity(server.server_cpulist); aeSetBeforeSleepProc(server.el,beforeSleep); aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); diff --git a/src/server.h b/src/server.h index 41d767e13..af435b148 100644 --- a/src/server.h +++ b/src/server.h @@ -1433,6 +1433,11 @@ struct redisServer { int tls_replication; int tls_auth_clients; redisTLSContextConfig tls_ctx_config; + /* cpu affinity */ + char *server_cpulist; /* cpu affinity list of redis server main/io thread. */ + char *bio_cpulist; /* cpu affinity list of bio thread. */ + char *aof_rewrite_cpulist; /* cpu affinity list of aof rewrite process. */ + char *bgsave_cpulist; /* cpu affinity list of bgsave process. */ }; typedef struct pubsubPattern { @@ -1585,6 +1590,7 @@ void exitFromChild(int retcode); size_t redisPopcount(void *s, long count); void redisSetProcTitle(char *title); int redisCommunicateSystemd(const char *sd_notify_msg); +void redisSetCpuAffinity(const char *cpulist); /* networking.c -- Networking and Client related operations */ client *createClient(connection *conn); diff --git a/src/setcpuaffinity.c b/src/setcpuaffinity.c new file mode 100644 index 000000000..1a12795cd --- /dev/null +++ b/src/setcpuaffinity.c @@ -0,0 +1,129 @@ +/* ========================================================================== + * setproctitle.c - Linux/BSD setcpuaffinity. + * -------------------------------------------------------------------------- + * Copyright (C) 2020 zhenwei pi + * + * Permission is hereby granted, free of charge, to any person obtaining a + * copy of this software and associated documentation files (the + * "Software"), to deal in the Software without restriction, including + * without limitation the rights to use, copy, modify, merge, publish, + * distribute, sublicense, and/or sell copies of the Software, and to permit + * persons to whom the Software is furnished to do so, subject to the + * following conditions: + * + * The above copyright notice and this permission notice shall be included + * in all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS + * OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN + * NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, + * DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR + * OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE + * USE OR OTHER DEALINGS IN THE SOFTWARE. + * ========================================================================== + */ +#ifndef _GNU_SOURCE +#define _GNU_SOURCE +#endif +#include +#include +#include +#ifdef __linux__ +#include +#endif +#ifdef __FreeBSD__ +#include +#include +#endif +#include "config.h" + +#ifdef USE_SETCPUAFFINITY +static const char *next_token(const char *q, int sep) { + if (q) + q = strchr(q, sep); + if (q) + q++; + + return q; +} + +static int next_num(const char *str, char **end, int *result) { + if (!str || *str == '\0' || !isdigit(*str)) + return -1; + + *result = strtoul(str, end, 10); + if (str == *end) + return -1; + + return 0; +} + +/* set current thread cpu affinity to cpu list, this function works like + * taskset command (actually cpulist parsing logic reference to util-linux). + * example of this function: "0,2,3", "0,2-3", "0-20:2". */ +void setcpuaffinity(const char *cpulist) { + const char *p, *q; + char *end = NULL; +#ifdef __linux__ + cpu_set_t cpuset; +#endif +#ifdef __FreeBSD__ + cpuset_t cpuset; +#endif + + if (!cpulist) + return; + + CPU_ZERO(&cpuset); + + q = cpulist; + while (p = q, q = next_token(q, ','), p) { + int a, b, s; + const char *c1, *c2; + + if (next_num(p, &end, &a) != 0) + return; + + b = a; + s = 1; + p = end; + + c1 = next_token(p, '-'); + c2 = next_token(p, ','); + + if (c1 != NULL && (c2 == NULL || c1 < c2)) { + if (next_num(c1, &end, &b) != 0) + return; + + c1 = end && *end ? next_token(end, ':') : NULL; + if (c1 != NULL && (c2 == NULL || c1 < c2)) { + if (next_num(c1, &end, &s) != 0) + return; + + if (s == 0) + return; + } + } + + if ((a > b)) + return; + + while (a <= b) { + CPU_SET(a, &cpuset); + a += s; + } + } + + if (end && *end) + return; + +#ifdef __linux__ + sched_setaffinity(0, sizeof(cpuset), &cpuset); +#endif +#ifdef __FreeBSD__ + cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1, sizeof(cpuset), &cpuset); +#endif +} + +#endif /* USE_SETCPUAFFINITY */ diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index cd905084a..b60ca0d48 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -94,6 +94,10 @@ start_server {tags {"introspection"}} { slaveof bind requirepass + server_cpulist + bio_cpulist + aof_rewrite_cpulist + bgsave_cpulist } set configs {} From e598d9b59ed5592eb7fda764733f9d88c1ade0ee Mon Sep 17 00:00:00 2001 From: Deliang Yang Date: Mon, 4 May 2020 01:57:38 +0800 Subject: [PATCH 3/5] reformat code --- src/ziplist.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ziplist.c b/src/ziplist.c index ef40d6aa2..ddae0d96f 100644 --- a/src/ziplist.c +++ b/src/ziplist.c @@ -440,7 +440,7 @@ unsigned int zipStorePrevEntryLength(unsigned char *p, unsigned int len) { if ((prevlensize) == 1) { \ (prevlen) = (ptr)[0]; \ } else if ((prevlensize) == 5) { \ - assert(sizeof((prevlen)) == 4); \ + assert(sizeof((prevlen)) == 4); \ memcpy(&(prevlen), ((char*)(ptr)) + 1, 4); \ memrev32ifbe(&prevlen); \ } \ From deee2c1ef2249120ae7db0e7523bfad4041b21a6 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 3 May 2020 09:31:50 +0300 Subject: [PATCH 4/5] add daily github actions with libc malloc and valgrind * fix memlry leaks with diskless replica short read. * fix a few timing issues with valgrind runs * fix issue with valgrind and watchdog schedule signal about the valgrind WD issue: the stack trace test in logging.tcl, has issues with valgrind: ==28808== Can't extend stack to 0x1ffeffdb38 during signal delivery for thread 1: ==28808== too small or bad protection modes it seems to be some valgrind bug with SA_ONSTACK. SA_ONSTACK seems unneeded since WD is not recursive (SA_NODEFER was removed), also, not sure if it's even valid without a call to sigaltstack() --- .github/workflows/daily.yml | 48 +++++++++++++++++++++++ src/debug.c | 2 +- src/rdb.c | 60 +++++++++++++++++++++-------- tests/integration/aof.tcl | 12 ++++++ tests/integration/replication-3.tcl | 2 +- 5 files changed, 106 insertions(+), 18 deletions(-) create mode 100644 .github/workflows/daily.yml diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml new file mode 100644 index 000000000..b6a9abb68 --- /dev/null +++ b/.github/workflows/daily.yml @@ -0,0 +1,48 @@ +name: Daily + +on: + schedule: + - cron: '0 7 * * *' + +jobs: + test-jemalloc: + runs-on: ubuntu-latest + timeout-minutes: 1200 + steps: + - uses: actions/checkout@v1 + - name: make + run: make + - name: test + run: | + sudo apt-get install tcl8.5 + ./runtest --accurate --verbose + - name: module api test + run: ./runtest-moduleapi --verbose + + test-libc-malloc: + runs-on: ubuntu-latest + timeout-minutes: 1200 + steps: + - uses: actions/checkout@v1 + - name: make + run: make MALLOC=libc + - name: test + run: | + sudo apt-get install tcl8.5 + ./runtest --accurate --verbose + - name: module api test + run: ./runtest-moduleapi --verbose + + test-valgrind: + runs-on: ubuntu-latest + timeout-minutes: 14400 + steps: + - uses: actions/checkout@v1 + - name: make + run: make valgrind + - name: test + run: | + sudo apt-get install tcl8.5 valgrind -y + ./runtest --valgrind --verbose --clients 1 + - name: module api test + run: ./runtest-moduleapi --valgrind --verbose --clients 1 diff --git a/src/debug.c b/src/debug.c index cbb56cb71..587ff7c5d 100644 --- a/src/debug.c +++ b/src/debug.c @@ -1636,7 +1636,7 @@ void enableWatchdog(int period) { /* Watchdog was actually disabled, so we have to setup the signal * handler. */ sigemptyset(&act.sa_mask); - act.sa_flags = SA_ONSTACK | SA_SIGINFO; + act.sa_flags = SA_SIGINFO; act.sa_sigaction = watchdogSignalHandler; sigaction(SIGALRM, &act, NULL); } diff --git a/src/rdb.c b/src/rdb.c index 9f6bf13f1..14c5579e0 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1441,7 +1441,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { /* Load every single element of the list */ while(len--) { - if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) return NULL; + if ((ele = rdbLoadEncodedStringObject(rdb)) == NULL) { + decrRefCount(o); + return NULL; + } dec = getDecodedObject(ele); size_t len = sdslen(dec->ptr); quicklistPushTail(o->ptr, dec->ptr, len); @@ -1468,8 +1471,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { long long llval; sds sdsele; - if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } if (o->encoding == OBJ_ENCODING_INTSET) { /* Fetch integer value from element. */ @@ -1508,13 +1513,23 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { double score; zskiplistNode *znode; - if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((sdsele = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } if (rdbtype == RDB_TYPE_ZSET_2) { - if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadBinaryDoubleValue(rdb,&score) == -1) { + decrRefCount(o); + sdsfree(sdsele); + return NULL; + } } else { - if (rdbLoadDoubleValue(rdb,&score) == -1) return NULL; + if (rdbLoadDoubleValue(rdb,&score) == -1) { + decrRefCount(o); + sdsfree(sdsele); + return NULL; + } } /* Don't care about integer-encoded strings. */ @@ -1546,10 +1561,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { while (o->encoding == OBJ_ENCODING_ZIPLIST && len > 0) { len--; /* Load raw strings */ - if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } + if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + sdsfree(field); + decrRefCount(o); + return NULL; + } /* Add pair to ziplist */ o->ptr = ziplistPush(o->ptr, (unsigned char*)field, @@ -1577,10 +1597,15 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { while (o->encoding == OBJ_ENCODING_HT && len > 0) { len--; /* Load encoded strings */ - if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; - if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) - == NULL) return NULL; + if ((field = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + decrRefCount(o); + return NULL; + } + if ((value = rdbGenericLoadStringObject(rdb,RDB_LOAD_SDS,NULL)) == NULL) { + sdsfree(field); + decrRefCount(o); + return NULL; + } /* Add pair to hash table */ ret = dictAdd((dict*)o->ptr, field, value); @@ -1600,7 +1625,10 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { while (len--) { unsigned char *zl = rdbGenericLoadStringObject(rdb,RDB_LOAD_PLAIN,NULL); - if (zl == NULL) return NULL; + if (zl == NULL) { + decrRefCount(o); + return NULL; + } quicklistAppendZiplist(o->ptr, zl); } } else if (rdbtype == RDB_TYPE_HASH_ZIPMAP || diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index 2734de7f1..b82c87d71 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -54,6 +54,12 @@ tags {"aof"} { set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + wait_for_condition 50 100 { + [catch {$client ping} e] == 0 + } else { + fail "Loading DB is taking too much time." + } + test "Truncated AOF loaded: we expect foo to be equal to 5" { assert {[$client get foo] eq "5"} } @@ -71,6 +77,12 @@ tags {"aof"} { set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls] + wait_for_condition 50 100 { + [catch {$client ping} e] == 0 + } else { + fail "Loading DB is taking too much time." + } + test "Truncated AOF loaded: we expect foo to be equal to 6 now" { assert {[$client get foo] eq "6"} } diff --git a/tests/integration/replication-3.tcl b/tests/integration/replication-3.tcl index 198e698f2..43eb53538 100644 --- a/tests/integration/replication-3.tcl +++ b/tests/integration/replication-3.tcl @@ -118,7 +118,7 @@ start_server {tags {"repl"}} { # correctly the RDB file: such file will contain "lua" AUX # sections with scripts already in the memory of the master. - wait_for_condition 50 100 { + wait_for_condition 500 100 { [s -1 master_link_status] eq {up} } else { fail "Replication not started." From bce3d08c66a1bf22ea852295a57b92a8024d4a1b Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Sun, 3 May 2020 16:49:45 +0300 Subject: [PATCH 5/5] XPENDING should not update consumer's seen-time Same goes for XGROUP DELCONSUMER (But in this case, it doesn't have any visible effect) --- src/blocked.c | 7 ++++--- src/rdb.c | 4 ++-- src/stream.h | 7 ++++++- src/t_stream.c | 33 ++++++++++++++++++++------------- 4 files changed, 32 insertions(+), 19 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 045369e93..92f1cee65 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -371,9 +371,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { int noack = 0; if (group) { - consumer = streamLookupConsumer(group, - receiver->bpop.xread_consumer->ptr, - 1); + consumer = + streamLookupConsumer(group, + receiver->bpop.xread_consumer->ptr, + SLC_NONE); noack = receiver->bpop.xread_group_noack; } diff --git a/src/rdb.c b/src/rdb.c index 9f6bf13f1..def8585ac 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1823,8 +1823,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { decrRefCount(o); return NULL; } - streamConsumer *consumer = streamLookupConsumer(cgroup,cname, - 1); + streamConsumer *consumer = + streamLookupConsumer(cgroup,cname,SLC_NONE); sdsfree(cname); consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); if (rioGetReadError(rdb)) { diff --git a/src/stream.h b/src/stream.h index b69073994..0d3bf63fc 100644 --- a/src/stream.h +++ b/src/stream.h @@ -96,6 +96,11 @@ typedef struct streamPropInfo { /* Prototypes of exported APIs. */ struct client; +/* Flags for streamLookupConsumer */ +#define SLC_NONE 0 +#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */ +#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */ + stream *streamNew(void); void freeStream(stream *s); unsigned long streamLength(const robj *subject); @@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); diff --git a/src/t_stream.c b/src/t_stream.c index 5c1b9a523..676ddd9bb 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1570,7 +1570,8 @@ void xreadCommand(client *c) { addReplyBulk(c,c->argv[streams_arg+i]); streamConsumer *consumer = NULL; if (groups) consumer = streamLookupConsumer(groups[i], - consumername->ptr,1); + consumername->ptr, + SLC_NONE); streamPropInfo spi = {c->argv[i+streams_arg],groupname}; int flags = 0; if (noack) flags |= STREAM_RWR_NOACK; @@ -1706,7 +1707,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) { * consumer does not exist it is automatically created as a side effect * of calling this function, otherwise its last seen time is updated and * the existing consumer reference returned. */ -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { + int create = !(flags & SLC_NOCREAT); + int refresh = !(flags & SLC_NOREFRESH); streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); if (consumer == raxNotFound) { @@ -1717,7 +1720,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), consumer,NULL); } - consumer->seen_time = mstime(); + if (refresh) consumer->seen_time = mstime(); return consumer; } @@ -1725,7 +1728,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { * may have pending messages: they are removed from the PEL, and the number * of pending messages "lost" is returned. */ uint64_t streamDelConsumer(streamCG *cg, sds name) { - streamConsumer *consumer = streamLookupConsumer(cg,name,0); + streamConsumer *consumer = + streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH); if (consumer == NULL) return 0; uint64_t retval = raxSize(consumer->pel); @@ -2068,15 +2072,18 @@ void xpendingCommand(client *c) { } /* XPENDING [] variant. */ else { - streamConsumer *consumer = consumername ? - streamLookupConsumer(group,consumername->ptr,0): - NULL; + streamConsumer *consumer = NULL; + if (consumername) { + consumer = streamLookupConsumer(group, + consumername->ptr, + SLC_NOCREAT|SLC_NOREFRESH); - /* If a consumer name was mentioned but it does not exist, we can - * just return an empty array. */ - if (consumername && consumer == NULL) { - addReplyArrayLen(c,0); - return; + /* If a consumer name was mentioned but it does not exist, we can + * just return an empty array. */ + if (consumer == NULL) { + addReplyArrayLen(c,0); + return; + } } rax *pel = consumer ? consumer->pel : group->pel; @@ -2338,7 +2345,7 @@ void xclaimCommand(client *c) { raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); /* Update the consumer and idle time. */ if (consumer == NULL) - consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); + consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE); nack->consumer = consumer; nack->delivery_time = deliverytime; /* Set the delivery attempts counter if given, otherwise