diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b2c723289..d4cfc42a6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -3,6 +3,7 @@ name: CI on: [push, pull_request] jobs: + test-ubuntu-latest: runs-on: self-hosted steps: @@ -14,19 +15,19 @@ jobs: sudo apt-get -y remove libzstd || true sudo apt-get -y install uuid-dev libcurl4-openssl-dev libbz2-dev zlib1g-dev libsnappy-dev liblz4-dev libzstd-dev libgflags-dev sudo apt-get -y install uuid-dev libcurl4-openssl-dev - make -j$(nproc) - - name: test + make BUILD_TLS=yes -j2 + - name: gen-cert + run: ./utils/gen-test-certs.sh + - name: test-tls run: | - sudo apt-get -y install tcl8.5 - ./runtest --clients 4 --verbose + sudo apt-get -y install tcl8.5 tcl-tls + ./runtest --clients 2 --verbose --tls - name: cluster-test run: | - ./runtest-cluster + ./runtest-cluster --tls - name: sentinel test run: | ./runtest-sentinel - name: module tests run: | ./runtest-moduleapi - - diff --git a/.github/workflows/daily.yml b/.github/workflows/daily.yml index e03715d5c..3c10236b4 100644 --- a/.github/workflows/daily.yml +++ b/.github/workflows/daily.yml @@ -5,6 +5,7 @@ on: - cron: '0 7 * * *' jobs: + test-jemalloc: runs-on: ubuntu-latest timeout-minutes: 1200 @@ -37,6 +38,40 @@ jobs: - name: module api test run: ./runtest-moduleapi --verbose + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v1 + - name: make + run: | + sudo apt-get -y install uuid-dev libcurl4-openssl-dev + make BUILD_TLS=yes -j2 + - name: "test (tls)" + run: | + sudo apt-get install tcl8.5 tcl-tls + ./utils/gen-test-certs.sh + ./runtest --accurate --verbose --tls + - name: test + run: ./runtest --accurate --verbose + - name: module api test (tls) + run: ./runtest-moduleapi --verbose --tls + + test-ubuntu-arm: + runs-on: [self-hosted, linux, arm] + steps: + - uses: actions/checkout@v1 + - name: make + run: | + sudo apt-get -y install uuid-dev libcurl4-openssl-dev + make -j4 + - name: test + run: | + sudo apt-get -y install tcl8.5 + ./runtest --clients 2 --verbose + - name: module tests + run: | + ./runtest-moduleapi + test-valgrind: runs-on: ubuntu-latest timeout-minutes: 14400 diff --git a/.github/workflows/endurance.yml b/.github/workflows/endurance.yml new file mode 100644 index 000000000..24c5f540d --- /dev/null +++ b/.github/workflows/endurance.yml @@ -0,0 +1,22 @@ +name: Endurance + +on: + schedule: + - cron: '30 * * * *' + +jobs: + + test-endurance: + runs-on: octocore + timeout-minutes: 60 + steps: + - uses: actions/checkout@v1 + - name: make + run: | + sudo apt-get -y install uuid-dev libcurl4-openssl-dev + make -j8 + - name: test-multithread (5X) + run: | + sudo apt-get install -y tcl8.5 + ./runtest --loopn 5 --config server-threads 3 --clients 5 + diff --git a/00-RELEASENOTES b/00-RELEASENOTES index 7c5a75412..7ce88c556 100644 --- a/00-RELEASENOTES +++ b/00-RELEASENOTES @@ -11,6 +11,283 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP. SECURITY: There are security fixes in the release. -------------------------------------------------------------------------------- +================================================================================ +Redis 6.0.4 Released Thu May 28 11:36:45 CEST 2020 +================================================================================ + +Upgrade urgency CRITICAL: this release fixes a severe replication bug. + +Redis 6.0.4 fixes a critical replication bug caused by a new feature introduced +in Redis 6. The feature, called "meaningful offset" and strongly wanted by +myself (antirez) was an improvement that avoided that masters were no longer +able, during a failover where they were demoted to replicas, to partially +synchronize with the new master. In short the feature was able to avoid full +synchronizations with RDB. How did it work? By trimming the replication backlog +of the final "PING" commands the master was sending in the replication channel: +this way the replication offset would no longer go "after" the one of the +promoted replica, allowing the master to just continue in the same replication +history, receiving only a small data difference. + +However after the introduction of the feature we (the Redis core team) quickly +understood there was something wrong: the apparently harmless feature had +many bugs, and the last bug we discovered, after a joined effort of multiple +people, we were not even able to fully understand after fixing it. Enough was +enough, we decided that the complexity cost of this feature was too high. +So Redis 6.0.4 removes the feature entirely, and fixes the data corruption that +it was able to cause. + +However there are two facts to take in mind. + +Fact 1: Setups using chained replication, that means that certain replicas +are replicating from other replicas, up to Redis 6.0.3 can experience data +corruption. For chained replication we mean that: + + +--------+ +---------+ +-------------+ + | master |--------->| replica |-------->| sub-replica | + +--------+ +---------+ +-------------+ + + +People using chained replication SHOULD UPGRADE ASAP away from Redis 6.0.0, +6.0.1, 6.0.2 or 6.0.3 to Redis 6.0.4. + +To be clear, people NOT using this setup, but having just replicas attached +directly to the master, SHOUDL NOT BE in danger of any problem. But we +are no longer confident on 6.0.x replication implementation complexities +so we suggest to upgrade to 6.0.4 to everybody using an older 6.0.3 release. +We just so far didn't find any bug that affects Redis 6.0.3 that does not +involve chained replication. + +People starting with Redis 6.0.4 are fine. People with Redis 5 are fine. +People upgrading from Redis 5 to Redis 6.0.4 are fine. +TLDR: The problem is with users of 6.0.0, 6.0.1, 6.0.2, 6.0.3. + +Fact 2: Upgrading from Redis 6.0.x to Redis 6.0.4, IF AND ONLY IF you +use chained replication, requires some extra care: + +1. Once you attach your new Redis 6.0.4 instance as a replica of the current + Redis 6.0.x master, you should wait for the first full synchronization, + then you should promote it right away, if your setup involves chained + replication. Don't give it the time to do a new partial synchronization + in the case the link between the master and the replica will break in + the mean time. + +2. As an additional care, you may want to set the replication ping period + to a very large value (for instance 1000000) using the following command: + + CONFIG SET repl-ping-replica-period 1000000 + + Note that if you do "1" with care, "2" is not needed. + However if you do it, make sure to later restore it to its default: + + CONFIG SET repl-ping-replica-period 10 + +So this is the main change in Redis 6. Later we'll find a different way in +order to achieve what we wanted to achieve with the Meaningful Offset feature, +but without the same complexity. + +Other changes in this release: + +* PSYNC2 tests improved. +* Fix a rare active defrag edge case bug leading to stagnation +* Fix Redis 6 asserting at startup in 32 bit systems. +* Redis 6 32 bit is now added back to our testing environments. +* Fix server crash for STRALGO command, +* Implement sendfile for RDB transfer. +* TLS fixes. +* Make replication more resistant by disconnecting the master if we + detect a protocol error. Basically we no longer accept inline protocol + from the master. +* Other improvements in the tests. + +Regards, +antirez + +This is the full list of commits: + +antirez in commit 59cd4c9f6: + Test: take PSYNC2 test master timeout high during switch. + 1 file changed, 1 deletion(-) + +antirez in commit 6c1bb7b19: + Test: add the tracking unit as default. + 1 file changed, 1 insertion(+) + +Oran Agra in commit 1aee695e5: + tests: find_available_port start search from next port + 1 file changed, 12 insertions(+), 7 deletions(-) + +Oran Agra in commit a2ae46352: + tests: each test client work on a distinct port range + 5 files changed, 39 insertions(+), 27 deletions(-) + +Oran Agra in commit 86e562d69: + 32bit CI needs to build modules correctly + 2 files changed, 7 insertions(+), 2 deletions(-) + +Oran Agra in commit ab2984b1e: + adjust revived meaningful offset tests + 1 file changed, 39 insertions(+), 20 deletions(-) + +Oran Agra in commit 1ff5a222d: + revive meaningful offset tests + 2 files changed, 213 insertions(+) + +antirez in commit cc549b46a: + Replication: showLatestBacklog() refactored out. + 3 files changed, 36 insertions(+), 25 deletions(-) + +antirez in commit 377dd0515: + Drop useless line from replicationCacheMaster(). + 1 file changed, 2 deletions(-) + +antirez in commit 3f8d113f1: + Another meaningful offset test removed. + 1 file changed, 100 deletions(-) + +antirez in commit d4541349d: + Remove the PSYNC2 meaningful offset test. + 2 files changed, 113 deletions(-) + +antirez in commit 2112a5702: + Remove the meaningful offset feature. + 4 files changed, 10 insertions(+), 93 deletions(-) + +antirez in commit d2eb6e0b4: + Set a protocol error if master use the inline protocol. + 1 file changed, 17 insertions(+), 2 deletions(-) + +Oran Agra in commit 9c1df3b76: + daily CI test with tls + 1 file changed, 15 insertions(+) + +Oran Agra in commit 115ed1911: + avoid using sendfile if tls-replication is enabled + 1 file changed, 34 insertions(+), 27 deletions(-) + +antirez in commit 11c748aac: + Replication: log backlog creation event. + 1 file changed, 3 insertions(+) + +antirez in commit 8f1013722: + Test: PSYNC2 test can now show server logs. + 1 file changed, 88 insertions(+), 25 deletions(-) + +antirez in commit 2e591fc4a: + Clarify what is happening in PR #7320. + 1 file changed, 5 insertions(+), 1 deletion(-) + +zhaozhao.zz in commit cbb51fb8f: + PSYNC2: second_replid_offset should be real meaningful offset + 1 file changed, 3 insertions(+), 3 deletions(-) + +Oran Agra in commit e0fc88b4d: + add CI for 32bit build + 2 files changed, 34 insertions(+) + +antirez in commit e3f864b5f: + Make disconnectSlaves() synchronous in the base case. + 3 files changed, 20 insertions(+), 9 deletions(-) + +ShooterIT in commit 8af1e513f: + Implements sendfile for redis. + 2 files changed, 55 insertions(+), 2 deletions(-) + +antirez in commit 3c21418cd: + Fix #7306 less aggressively. + 2 files changed, 29 insertions(+), 17 deletions(-) + +Madelyn Olson in commit e201f83ce: + EAGAIN for tls during diskless load + 1 file changed, 4 insertions(+) + +Qu Chen in commit 58fc456cb: + Disconnect chained replicas when the replica performs PSYNC with the master always to avoid replication offset mismatch between master and chained replicas. + 2 files changed, 60 insertions(+), 3 deletions(-) + +hwware in commit 3febc5c29: + using moreargs variable + 1 file changed, 2 insertions(+), 2 deletions(-) + +hwware in commit 8d6738559: + fix server crash for STRALGO command + 1 file changed, 2 insertions(+), 2 deletions(-) + +ShooterIT in commit 7a35eec54: + Replace addDeferredMultiBulkLength with addReplyDeferredLen in comment + 1 file changed, 2 insertions(+), 2 deletions(-) + +Yossi Gottlieb in commit f93e1417b: + TLS: Improve tls-protocols clarity in redis.conf. + 1 file changed, 3 insertions(+), 2 deletions(-) + +ShooterIT in commit d0c9e4454: + Fix reply bytes calculation error + 1 file changed, 1 insertion(+), 1 deletion(-) + +zhaozhao.zz in commit 1cde6a060: + Tracking: flag CLIENT_TRACKING_BROKEN_REDIR when redir broken + 1 file changed, 1 insertion(+) + +Oran Agra in commit 436be3498: + fix a rare active defrag edge case bug leading to stagnation + 4 files changed, 146 insertions(+), 23 deletions(-) + +Oran Agra in commit f9d2ffdc5: + improve DEBUG MALLCTL to be able to write to write only fields. + 1 file changed, 27 insertions(+), 7 deletions(-) + +hujie in commit d7968ee92: + fix clear USER_FLAG_ALLCOMMANDS flag in acl + 1 file changed, 5 insertions(+), 4 deletions(-) + +ShooterIT in commit a902e6b25: + Redis Benchmark: generate random test data + 1 file changed, 12 insertions(+), 1 deletion(-) + +hwware in commit 9564ed7c3: + Redis-Benchmark: avoid potentical memmory leaking + 1 file changed, 1 insertion(+), 1 deletion(-) + +WuYunlong in commit 2e4182743: + Handle keys with hash tag when computing hash slot using tcl cluster client. + 1 file changed, 23 insertions(+), 2 deletions(-) + +WuYunlong in commit eb2c8b2c6: + Add a test to prove current tcl cluster client can not handle keys with hash tag. + 1 file changed, 7 insertions(+), 1 deletion(-) + +ShooterIT in commit 928e6976b: + Use dictSize to get the size of dict in dict.c + 1 file changed, 2 insertions(+), 2 deletions(-) + +Madelyn Olson in commit cdcf5af5a: + Converge hash validation for adding and removing + 1 file changed, 21 insertions(+), 14 deletions(-) + +Benjamin Sergeant in commit e8b09d220: + do not handle --cluster-yes for cluster fix mode + 1 file changed, 16 insertions(+), 7 deletions(-) + +Benjamin Sergeant in commit 57b4fb0d8: + fix typo ... + 1 file changed, 1 insertion(+), 1 deletion(-) + +Benjamin Sergeant in commit 29f25e411: + Redis-cli 6.0.1 `--cluster-yes` doesn't work (fix #7246) + 1 file changed, 5 insertions(+), 1 deletion(-) + +Oran Agra in commit 00d8b92b8: + fix valgrind test failure in replication test + 1 file changed, 1 insertion(+), 1 deletion(-) + +Oran Agra in commit 5e17e6276: + add regression test for the race in #7205 + 1 file changed, 52 insertions(+) + +antirez in commit 96e7c011e: + Improve the PSYNC2 test reliability. + 1 file changed, 33 insertions(+), 15 deletions(-) + ================================================================================ Redis 6.0.3 Released Sat May 16 18:10:21 CEST 2020 ================================================================================ diff --git a/deps/depot_tools b/deps/depot_tools deleted file mode 160000 index aaf566999..000000000 --- a/deps/depot_tools +++ /dev/null @@ -1 +0,0 @@ -Subproject commit aaf566999558aa8ead38811228cd539a6e6e2fda diff --git a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h index 290e5cf99..2685802b8 100644 --- a/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h +++ b/deps/jemalloc/include/jemalloc/internal/jemalloc_internal_inlines_c.h @@ -216,7 +216,7 @@ ixalloc(tsdn_t *tsdn, void *ptr, size_t oldsize, size_t size, size_t extra, } JEMALLOC_ALWAYS_INLINE int -iget_defrag_hint(tsdn_t *tsdn, void* ptr, int *bin_util, int *run_util) { +iget_defrag_hint(tsdn_t *tsdn, void* ptr) { int defrag = 0; rtree_ctx_t rtree_ctx_fallback; rtree_ctx_t *rtree_ctx = tsdn_rtree_ctx(tsdn, &rtree_ctx_fallback); @@ -232,11 +232,22 @@ iget_defrag_hint(tsdn_t *tsdn, void* ptr, int *bin_util, int *run_util) { malloc_mutex_lock(tsdn, &bin->lock); /* don't bother moving allocations from the slab currently used for new allocations */ if (slab != bin->slabcur) { - const bin_info_t *bin_info = &bin_infos[binind]; - size_t availregs = bin_info->nregs * bin->stats.curslabs; - *bin_util = ((long long)bin->stats.curregs<<16) / availregs; - *run_util = ((long long)(bin_info->nregs - extent_nfree_get(slab))<<16) / bin_info->nregs; - defrag = 1; + int free_in_slab = extent_nfree_get(slab); + if (free_in_slab) { + const bin_info_t *bin_info = &bin_infos[binind]; + int curslabs = bin->stats.curslabs; + size_t curregs = bin->stats.curregs; + if (bin->slabcur) { + /* remove slabcur from the overall utilization */ + curregs -= bin_info->nregs - extent_nfree_get(bin->slabcur); + curslabs -= 1; + } + /* Compare the utilization ratio of the slab in question to the total average, + * to avoid precision lost and division, we do that by extrapolating the usage + * of the slab as if all slabs have the same usage. If this slab is less used + * than the average, we'll prefer to evict the data to hopefully more used ones */ + defrag = (bin_info->nregs - free_in_slab) * curslabs <= curregs; + } } malloc_mutex_unlock(tsdn, &bin->lock); } diff --git a/deps/jemalloc/src/jemalloc.c b/deps/jemalloc/src/jemalloc.c index 5b936cb48..585645a28 100644 --- a/deps/jemalloc/src/jemalloc.c +++ b/deps/jemalloc/src/jemalloc.c @@ -3326,12 +3326,10 @@ jemalloc_postfork_child(void) { /******************************************************************************/ /* Helps the application decide if a pointer is worth re-allocating in order to reduce fragmentation. - * returns 0 if the allocation is in the currently active run, - * or when it is not causing any frag issue (large or huge bin) - * returns the bin utilization and run utilization both in fixed point 16:16. + * returns 1 if the allocation should be moved, and 0 if the allocation be kept. * If the application decides to re-allocate it should use MALLOCX_TCACHE_NONE when doing so. */ JEMALLOC_EXPORT int JEMALLOC_NOTHROW -get_defrag_hint(void* ptr, int *bin_util, int *run_util) { +get_defrag_hint(void* ptr) { assert(ptr != NULL); - return iget_defrag_hint(TSDN_NULL, ptr, bin_util, run_util); + return iget_defrag_hint(TSDN_NULL, ptr); } diff --git a/keydb.conf b/keydb.conf index 39bde2789..7ac2099da 100644 --- a/keydb.conf +++ b/keydb.conf @@ -176,9 +176,10 @@ tcp-keepalive 300 # tls-cluster yes # Explicitly specify TLS versions to support. Allowed values are case insensitive -# and include "TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3" (OpenSSL >= 1.1.1) +# and include "TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3" (OpenSSL >= 1.1.1) or +# any combination. To enable only TLSv1.2 and TLSv1.3, use: # -# tls-protocols TLSv1.2 +# tls-protocols "TLSv1.2 TLSv1.3" # Configure allowed ciphers. See the ciphers(1ssl) manpage for more information # about the syntax of this string. diff --git a/src/acl.cpp b/src/acl.cpp index b3a4ea522..b51dcc295 100644 --- a/src/acl.cpp +++ b/src/acl.cpp @@ -169,6 +169,25 @@ sds ACLHashPassword(unsigned char *cleartext, size_t len) { return sdsnewlen(hex,HASH_PASSWORD_LEN); } +/* Given a hash and the hash length, returns C_OK if it is a valid password + * hash, or C_ERR otherwise. */ +int ACLCheckPasswordHash(unsigned char *hash, int hashlen) { + if (hashlen != HASH_PASSWORD_LEN) { + return C_ERR; + } + + /* Password hashes can only be characters that represent + * hexadecimal values, which are numbers and lowercase + * characters 'a' through 'f'. */ + for(int i = 0; i < HASH_PASSWORD_LEN; i++) { + char c = hash[i]; + if ((c < 'a' || c > 'f') && (c < '0' || c > '9')) { + return C_ERR; + } + } + return C_OK; +} + /* ============================================================================= * Low level ACL API * ==========================================================================*/ @@ -359,12 +378,13 @@ int ACLUserCanExecuteFutureCommands(user *u) { * to skip the command bit explicit test. */ void ACLSetUserCommandBit(user *u, unsigned long id, int value) { uint64_t word, bit; - if (value == 0) u->flags &= ~USER_FLAG_ALLCOMMANDS; if (ACLGetCommandBitCoordinates(id,&word,&bit) == C_ERR) return; - if (value) + if (value) { u->allowed_commands[word] |= bit; - else + } else { u->allowed_commands[word] &= ~bit; + u->flags &= ~USER_FLAG_ALLCOMMANDS; + } } /* This is like ACLSetUserCommandBit(), but instead of setting the specified @@ -756,22 +776,10 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { if (op[0] == '>') { newpass = ACLHashPassword((unsigned char*)op+1,oplen-1); } else { - if (oplen != HASH_PASSWORD_LEN + 1) { + if (ACLCheckPasswordHash((unsigned char*)op+1,oplen-1) == C_ERR) { errno = EBADMSG; return C_ERR; } - - /* Password hashes can only be characters that represent - * hexadecimal values, which are numbers and lowercase - * characters 'a' through 'f'. - */ - for(int i = 1; i < HASH_PASSWORD_LEN + 1; i++) { - char c = op[i]; - if ((c < 'a' || c > 'f') && (c < '0' || c > '9')) { - errno = EBADMSG; - return C_ERR; - } - } newpass = sdsnewlen(op+1,oplen-1); } @@ -787,7 +795,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { if (op[0] == '<') { delpass = ACLHashPassword((unsigned char*)op+1,oplen-1); } else { - if (oplen != HASH_PASSWORD_LEN + 1) { + if (ACLCheckPasswordHash((unsigned char*)op+1,oplen-1) == C_ERR) { errno = EBADMSG; return C_ERR; } @@ -841,7 +849,6 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { errno = ENOENT; return C_ERR; } - unsigned long id = ACLGetCommandID(copy); /* The subcommand cannot be empty, so things like DEBUG| * are syntax errors of course. */ @@ -854,6 +861,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) { /* The command should not be set right now in the command * bitmap, because adding a subcommand of a fully added * command is probably an error on the user side. */ + unsigned long id = ACLGetCommandID(copy); if (ACLGetUserCommandBit(u,id) == 1) { zfree(copy); errno = EBUSY; diff --git a/src/ae.cpp b/src/ae.cpp index 33b7b184e..58e23e3f0 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -122,6 +122,7 @@ struct aeCommand AE_ASYNC_OP op; int fd; int mask; + bool fLock = true; union { aePostFunctionProc *proc; aeFileProc *fproc; @@ -169,7 +170,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) case AE_ASYNC_OP::PostFunction: { - std::unique_lock ulock(g_lock); + std::unique_lock ulock(g_lock, std::defer_lock); + if (cmd.fLock) + ulock.lock(); ((aePostFunctionProc*)cmd.proc)(cmd.clientData); break; } @@ -179,7 +182,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int ) if (cmd.pctl != nullptr) cmd.pctl->mutexcv.lock(); - std::unique_lock ulock(g_lock); + std::unique_lock ulock(g_lock, std::defer_lock); + if (cmd.fLock) + ulock.lock(); (*cmd.pfn)(); if (cmd.pctl != nullptr) @@ -239,6 +244,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask, cmd.fproc = proc; cmd.clientData = clientData; cmd.pctl = nullptr; + cmd.fLock = true; if (fSynchronous) { cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); @@ -275,13 +281,14 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg) cmd.op = AE_ASYNC_OP::PostFunction; cmd.proc = proc; cmd.clientData = arg; + cmd.fLock = true; auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); if (size != sizeof(cmd)) return AE_ERR; return AE_OK; } -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous) +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous, bool fLock) { if (eventLoop == g_eventLoopThisThread) { @@ -293,6 +300,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynch cmd.op = AE_ASYNC_OP::PostCppFunction; cmd.pfn = new (MALLOC_LOCAL) std::function(fn); cmd.pctl = nullptr; + cmd.fLock = fLock; if (fSynchronous) { cmd.pctl = new (MALLOC_LOCAL) aeCommandControl(); @@ -453,6 +461,7 @@ void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask) cmd.op = AE_ASYNC_OP::DeleteFileEvent; cmd.fd = fd; cmd.mask = mask; + cmd.fLock = true; auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd)); AE_ASSERT(cb == sizeof(cmd)); } diff --git a/src/ae.h b/src/ae.h index 156c219ef..3f1ddbf06 100644 --- a/src/ae.h +++ b/src/ae.h @@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize); int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg); #ifdef __cplusplus } // EXTERN C -int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false); +int aePostFunction(aeEventLoop *eventLoop, std::function fn, bool fSynchronous = false, bool fLock = true); extern "C" { #endif void aeDeleteEventLoop(aeEventLoop *eventLoop); diff --git a/src/aof.cpp b/src/aof.cpp index c678dcdde..0a15204d0 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -751,6 +751,7 @@ struct client *createAOFClient(void) { c->bufpos = 0; c->flags = 0; c->fPendingAsyncWrite = FALSE; + c->fPendingAsyncWriteHandler = FALSE; c->btype = BLOCKED_NONE; /* We set the fake client as a replica waiting for the synchronization * so that Redis will not try to send replies to this client. */ diff --git a/src/cluster.cpp b/src/cluster.cpp index 72f12a375..389c6dd98 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -2235,6 +2235,8 @@ void clusterWriteHandler(connection *conn) { void clusterLinkConnectHandler(connection *conn) { clusterLink *link = (clusterLink*)connGetPrivateData(conn); clusterNode *node = link->node; + if (node == nullptr) + return; // we're about to be freed /* Check if connection succeeded */ if (connGetState(conn) != CONN_STATE_CONNECTED) { @@ -5289,6 +5291,7 @@ try_again: zfree(ov); zfree(kv); return; /* error sent to the client by migrateGetSocket() */ } + connMarshalThread(cs->conn); rioInitWithBuffer(&cmd,sdsempty()); diff --git a/src/debug.cpp b/src/debug.cpp index 871d6c5a6..9aebe8885 100644 --- a/src/debug.cpp +++ b/src/debug.cpp @@ -319,6 +319,13 @@ void mallctl_int(client *c, robj **argv, int argc) { size_t sz = sizeof(old); while (sz > 0) { if ((ret=je_mallctl(szFromObj(argv[0]), &old, &sz, argc > 1? &val: NULL, argc > 1?sz: 0))) { + if (ret == EPERM && argc > 1) { + /* if this option is write only, try just writing to it. */ + if (!(ret=je_mallctl(szFromObj(argv[0]), NULL, 0, &val, sz))) { + addReply(c, shared.ok); + return; + } + } if (ret==EINVAL) { /* size might be wrong, try a smaller one */ sz /= 2; @@ -341,17 +348,30 @@ void mallctl_int(client *c, robj **argv, int argc) { } void mallctl_string(client *c, robj **argv, int argc) { - int ret; + int rret, wret; char *old; size_t sz = sizeof(old); /* for strings, it seems we need to first get the old value, before overriding it. */ - if ((ret=je_mallctl(szFromObj(argv[0]), &old, &sz, NULL, 0))) { - addReplyErrorFormat(c,"%s", strerror(ret)); - return; + if ((rret=je_mallctl(szFromObj(argv[0]), &old, &sz, NULL, 0))) { + /* return error unless this option is write only. */ + if (!(rret == EPERM && argc > 1)) { + addReplyErrorFormat(c,"%s", strerror(rret)); + return; + } } - addReplyBulkCString(c, old); - if(argc > 1) - je_mallctl(szFromObj(argv[0]), NULL, 0, &argv[1]->m_ptr, sizeof(char*)); + if(argc > 1) { + char *val = szFromObj(argv[1]); + char **valref = &val; + if ((!strcmp(val,"VOID"))) + valref = NULL, sz = 0; + wret = je_mallctl(szFromObj(argv[0]), NULL, 0, valref, sz); + } + if (!rret) + addReplyBulkCString(c, old); + else if (wret) + addReplyErrorFormat(c,"%s", strerror(wret)); + else + addReply(c, shared.ok); } #endif diff --git a/src/defrag.cpp b/src/defrag.cpp index 2536a1020..e60705a3e 100644 --- a/src/defrag.cpp +++ b/src/defrag.cpp @@ -43,7 +43,7 @@ /* this method was added to jemalloc in order to help us understand which * pointers are worthwhile moving and which aren't */ -extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util); +extern "C" int je_get_defrag_hint(void* ptr); /* forward declarations*/ void defragDictBucketCallback(void *privdata, dictEntry **bucketref); @@ -56,18 +56,11 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey); * when it returns a non-null value, the old pointer was already released * and should NOT be accessed. */ void* activeDefragAlloc(void *ptr) { - int bin_util, run_util; size_t size; void *newptr; - if(!je_get_defrag_hint(ptr, &bin_util, &run_util)) { - g_pserver->stat_active_defrag_misses++; - return NULL; - } - /* if this run is more utilized than the average utilization in this bin - * (or it is full), skip it. This will eventually move all the allocations - * from relatively empty runs into relatively full runs. */ - if (run_util > bin_util || run_util == 1<<16) { + if(!je_get_defrag_hint(ptr)) { g_pserver->stat_active_defrag_misses++; + size = zmalloc_size(ptr); return NULL; } /* move this allocation to a new allocation. diff --git a/src/dict.cpp b/src/dict.cpp index 4f28c0a5c..7434804af 100644 --- a/src/dict.cpp +++ b/src/dict.cpp @@ -536,7 +536,7 @@ dictEntry *dictFind(dict *d, const void *key) dictEntry *he; uint64_t h, idx, table; - if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */ + if (dictSize(d) == 0) return NULL; /* dict is empty */ if (dictIsRehashing(d)) _dictRehashStep(d); h = dictHashKey(d, key); for (table = 0; table <= 1; table++) { @@ -1102,7 +1102,7 @@ dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t h dictEntry *he, **heref; unsigned long idx, table; - if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */ + if (dictSize(d) == 0) return NULL; /* dict is empty */ for (table = 0; table <= 1; table++) { idx = hash & d->ht[table].sizemask; heref = &d->ht[table].table[idx]; diff --git a/src/fastlock.cpp b/src/fastlock.cpp index 28092828f..60eb653bf 100644 --- a/src/fastlock.cpp +++ b/src/fastlock.cpp @@ -40,6 +40,7 @@ #include #ifdef __linux__ #include +#include #endif #include #include @@ -69,6 +70,8 @@ __attribute__((weak)) void logStackTrace(ucontext_t *) {} #endif extern int g_fInCrash; +extern int g_fTestMode; +int g_fHighCpuPressure = false; /**************************************************** * @@ -290,12 +293,21 @@ uint64_t fastlock_getlongwaitcount() return rval; } -extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned mask) +extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned myticket) { #ifdef __linux__ g_dlock.registerwait(lock, pid); + unsigned mask = (1U << (myticket % 32)); __atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE); - futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, wake, nullptr, mask); + + // double check the lock wasn't release between the last check and us setting the futex mask + uint32_t u; + __atomic_load(&lock->m_ticket.u, &u, __ATOMIC_ACQUIRE); + if ((u & 0xffff) != myticket) + { + futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, wake, nullptr, mask); + } + __atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE); g_dlock.clearwait(lock, pid); #endif @@ -329,9 +341,9 @@ extern "C" void fastlock_lock(struct fastlock *lock) int tid = gettid(); unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE); - unsigned mask = (1U << (myticket % 32)); unsigned cloops = 0; ticket ticketT; + unsigned loopLimit = g_fHighCpuPressure ? 0x10000 : 0x100000; for (;;) { @@ -344,9 +356,9 @@ extern "C" void fastlock_lock(struct fastlock *lock) #elif defined(__aarch64__) __asm__ __volatile__ ("yield"); #endif - if ((++cloops % 0x100000) == 0) + if ((++cloops % loopLimit) == 0) { - fastlock_sleep(lock, tid, ticketT.u, mask); + fastlock_sleep(lock, tid, ticketT.u, myticket); } } @@ -461,3 +473,24 @@ void fastlock_lock_recursive(struct fastlock *lock, int nesting) fastlock_lock(lock); lock->m_depth = nesting; } + +void fastlock_auto_adjust_waits() +{ +#ifdef __linux__ + struct sysinfo sysinf; + auto fHighPressurePrev = g_fHighCpuPressure; + memset(&sysinf, 0, sizeof sysinf); + if (!sysinfo(&sysinf)) { + auto avgCoreLoad = sysinf.loads[0] / get_nprocs(); + g_fHighCpuPressure = (avgCoreLoad > ((1 << SI_LOAD_SHIFT) * 0.9)); + if (g_fHighCpuPressure) + serverLog(!fHighPressurePrev ? 3 /*LL_WARNING*/ : 1 /* LL_VERBOSE */, "NOTICE: Detuning locks due to high load per core: %.2f%%", avgCoreLoad / (double)(1 << SI_LOAD_SHIFT)*100.0); + } + + if (!g_fHighCpuPressure && fHighPressurePrev) { + serverLog(3 /*LL_WARNING*/, "NOTICE: CPU pressure reduced"); + } +#else + g_fHighCpuPressure = g_fTestMode; +#endif +} \ No newline at end of file diff --git a/src/fastlock.h b/src/fastlock.h index cdf8fe454..e4ab1874f 100644 --- a/src/fastlock.h +++ b/src/fastlock.h @@ -15,6 +15,7 @@ void fastlock_unlock(struct fastlock *lock); void fastlock_free(struct fastlock *lock); int fastlock_unlock_recursive(struct fastlock *lock); void fastlock_lock_recursive(struct fastlock *lock, int nesting); +void fastlock_auto_adjust_waits(); uint64_t fastlock_getlongwaitcount(); // this is a global value diff --git a/src/fastlock_x64.asm b/src/fastlock_x64.asm index bcea0e095..26c302434 100644 --- a/src/fastlock_x64.asm +++ b/src/fastlock_x64.asm @@ -3,6 +3,7 @@ .extern gettid .extern fastlock_sleep +.extern g_fHighCpuPressure # This is the first use of assembly in this codebase, a valid question is WHY? # The spinlock we implement here is performance critical, and simply put GCC @@ -33,6 +34,13 @@ fastlock_lock: cmp [rdi], esi # Is the TID we got back the owner of the lock? je .LLocked # Don't spin in that case + mov r9d, 0x1000 # 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) + mov eax, [rip+g_fHighCpuPressure] + test eax, eax + jz .LNoTestMode + mov r9d, 0x10000 +.LNoTestMode: + xor eax, eax # eliminate partial register dependency inc eax # we want to add one lock xadd [rdi+66], ax # do the xadd, ax contains the value before the addition @@ -45,8 +53,7 @@ fastlock_lock: cmp dx, ax # is our ticket up? je .LLocked # leave the loop pause - add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have) - # 1000h is set so we overflow on the 1024*1024'th iteration (like the C code) + add ecx, r9d # Have we been waiting a long time? (oflow if we have) jnc .LLoop # If so, give up our timeslice to someone who's doing real work # Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop" # But the compiler doesn't know that we rarely hit this, and when we do we know the lock is @@ -62,7 +69,7 @@ fastlock_lock: # rdi ARG1 futex (already in rdi) # rsi ARG2 tid (already in esi) # rdx ARG3 ticketT.u (already in edx) - bts ecx, eax # rcx ARG4 mask + mov ecx, eax # rcx ARG4 myticket call fastlock_sleep # cleanup and continue pop rax diff --git a/src/module.cpp b/src/module.cpp index 799bbcb01..4f79b9f95 100644 --- a/src/module.cpp +++ b/src/module.cpp @@ -5439,7 +5439,9 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod if (memcmp(ri.key,&key,sizeof(key)) == 0) { /* This is the first key, we need to re-install the timer according * to the just added event. */ - aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ + aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer); + }, true /* synchronous */, false /* fLock */); aeTimer = -1; } raxStop(&ri); @@ -5447,8 +5449,11 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod /* If we have no main timer (the old one was invalidated, or this is the * first module timer we have), install one. */ - if (aeTimer == -1) - aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); + if (aeTimer == -1) { + aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{ + aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL); + }, true /* synchronous */, false /* fLock */); + } return key; } diff --git a/src/networking.cpp b/src/networking.cpp index 286b0e94e..89efceb8d 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -135,6 +135,7 @@ client *createClient(connection *conn, int iel) { c->sentlenAsync = 0; c->flags = 0; c->fPendingAsyncWrite = FALSE; + c->fPendingAsyncWriteHandler = FALSE; c->ctime = c->lastinteraction = g_pserver->unixtime; /* If the default user does not require authentication, the user is * directly authenticated. */ @@ -333,7 +334,7 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) { clientReplyBlock *tail = (clientReplyBlock*) (ln? listNodeValue(ln): NULL); /* Note that 'tail' may be NULL even if we have a tail node, becuase when - * addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just + * addReplyDeferredLen() is used, it sets a dummy node to NULL just * fo fill it later, when the size of the bulk length is set. */ /* Append to tail string when possible. */ @@ -513,31 +514,7 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync) if (ctype == CLIENT_TYPE_MASTER && g_pserver->repl_backlog && g_pserver->repl_backlog_histlen > 0) { - long long dumplen = 256; - if (g_pserver->repl_backlog_histlen < dumplen) - dumplen = g_pserver->repl_backlog_histlen; - - /* Identify the first byte to dump. */ - long long idx = - (g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - dumplen)) % - g_pserver->repl_backlog_size; - - /* Scan the circular buffer to collect 'dumplen' bytes. */ - sds dump = sdsempty(); - while(dumplen) { - long long thislen = - ((g_pserver->repl_backlog_size - idx) < dumplen) ? - (g_pserver->repl_backlog_size - idx) : dumplen; - - dump = sdscatrepr(dump,g_pserver->repl_backlog+idx,thislen); - dumplen -= thislen; - idx = 0; - } - - /* Finally log such bytes: this is vital debugging info to - * understand what happened. */ - serverLog(LL_WARNING,"Latest backlog is: '%s'", dump); - sdsfree(dump); + showLatestBacklog(); } g_pserver->stat_unexpected_error_replies++; } @@ -599,7 +576,7 @@ void trimReplyUnusedTailSpace(client *c) { clientReplyBlock *tail = ln? (clientReplyBlock*)listNodeValue(ln): NULL; /* Note that 'tail' may be NULL even if we have a tail node, becuase when - * addDeferredMultiBulkLength() is used */ + * addReplyDeferredLen() is used */ if (!tail) return; /* We only try to trim the space is relatively high (more than a 1/4 of the @@ -1881,29 +1858,21 @@ void ProcessPendingAsyncWrites() std::atomic_thread_fence(std::memory_order_seq_cst); - if (c->casyncOpsPending == 0 || c->btype == BLOCKED_ASYNC) // It's ok to send data if we're in a bgthread op + if (FCorrectThread(c)) { - if (FCorrectThread(c)) - { - prepareClientToWrite(c, false); // queue an event - } - else - { - // We need to start the write on the client's thread - if (aePostFunction(g_pserver->rgthreadvar[c->iel].el, [c]{ - // Install a write handler. Don't do the actual write here since we don't want - // to duplicate the throttling and safety mechanisms of the normal write code - std::lock_guardlock)> lock(c->lock); - serverAssert(c->casyncOpsPending > 0); - c->casyncOpsPending--; - connSetWriteHandler(c->conn, sendReplyToClient, true); - }, false) == AE_ERR - ) - { - // Posting the function failed - continue; // We can retry later in the cron - } - ++c->casyncOpsPending; // race is handled by the client lock in the lambda + prepareClientToWrite(c, false); // queue an event + } + else + { + if (!c->fPendingAsyncWriteHandler) { + c->fPendingAsyncWriteHandler = true; + bool fResult = c->postFunction([](client *c) { + c->fPendingAsyncWriteHandler = false; + connSetWriteHandler(c->conn, sendReplyToClient, true); + }); + + if (!fResult) + c->fPendingAsyncWriteHandler = false; // if we failed to set the handler then prevent this from never being reset } } } @@ -2079,6 +2048,19 @@ int processInlineBuffer(client *c) { if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE) c->repl_ack_time = g_pserver->unixtime; + /* Masters should never send us inline protocol to run actual + * commands. If this happens, it is likely due to a bug in Redis where + * we got some desynchronization in the protocol, for example + * beause of a PSYNC gone bad. + * + * However the is an exception: masters may send us just a newline + * to keep the connection active. */ + if (querylen != 0 && c->flags & CLIENT_MASTER) { + serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master."); + setProtocolError("Master using the inline protocol. Desync?",c); + return C_ERR; + } + /* Move querybuffer position to the next query in the buffer. */ c->qb_pos += querylen+linefeed_chars; @@ -2102,7 +2084,7 @@ int processInlineBuffer(client *c) { * CLIENT_PROTOCOL_ERROR. */ #define PROTO_DUMP_LEN 128 static void setProtocolError(const char *errstr, client *c) { - if (cserver.verbosity <= LL_VERBOSE) { + if (cserver.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) { sds client = catClientInfoString(sdsempty(),c); /* Sample some protocol to given an idea about what was inside. */ @@ -2121,7 +2103,9 @@ static void setProtocolError(const char *errstr, client *c) { } /* Log all the client and protocol info. */ - serverLog(LL_VERBOSE, + int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING : + LL_VERBOSE; + serverLog(loglevel, "Protocol error (%s) from client: %s. %s", errstr, client, buf); sdsfree(client); } @@ -2280,7 +2264,6 @@ int processMultibulkBuffer(client *c) { * 2. In the case of master clients, the replication offset is updated. * 3. Propagate commands we got from our master to replicas down the line. */ void commandProcessed(client *c) { - int cmd_is_ping = c->cmd && c->cmd->proc == pingCommand; long long prev_offset = c->reploff; if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { /* Update the applied replication offset of our master. */ @@ -2307,7 +2290,6 @@ void commandProcessed(client *c) { AeLocker ae; ae.arm(c); long long applied = c->reploff - prev_offset; - long long prev_master_repl_meaningful_offset = g_pserver->master_repl_meaningful_offset; if (applied) { if (!g_pserver->fActiveReplica) { @@ -2316,10 +2298,6 @@ void commandProcessed(client *c) { } sdsrange(c->pending_querybuf,applied,-1); } - /* The g_pserver->master_repl_meaningful_offset variable represents - * the offset of the replication stream without the pending PINGs. */ - if (cmd_is_ping) - g_pserver->master_repl_meaningful_offset = prev_master_repl_meaningful_offset; } } @@ -2852,7 +2830,7 @@ NULL if (target && target->flags & CLIENT_BLOCKED) { std::unique_lock ul(target->lock); if (unblock_error) - addReplyError(target, + addReplyErrorAsync(target, "-UNBLOCKED client unblocked via CLIENT UNBLOCK"); else replyToBlockedClientTimedOut(target); diff --git a/src/rdb.cpp b/src/rdb.cpp index 3a4d4be74..775799b3d 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2243,6 +2243,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { size_t ckeysLoaded = 0; robj *subexpireKey = nullptr; sds key = nullptr; + bool fLastKeyExpired = false; for (int idb = 0; idb < cserver.dbnum; ++idb) { @@ -2380,11 +2381,22 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits"); mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10); } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-key")) { + if (subexpireKey != nullptr) { + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? key : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)"); + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } subexpireKey = auxval; incrRefCount(subexpireKey); } else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) { if (key == nullptr || subexpireKey == nullptr) { - serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? key : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)"); + if (!fLastKeyExpired) { // This is not an error if we just expired the key associated with this subexpire + serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? key : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)"); + } + if (subexpireKey) { + decrRefCount(subexpireKey); + subexpireKey = nullptr; + } } else { redisObject keyobj; @@ -2489,6 +2501,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { rsi->mi->staleKeyMap->operator[](dbid).push_back(objKeyDup); decrRefCount(objKeyDup); } + fLastKeyExpired = true; sdsfree(key); key = nullptr; decrRefCount(val); @@ -2512,6 +2525,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { /* Add the new object in the hash table */ int fInserted = dbMerge(db, &keyobj, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef + fLastKeyExpired = false; if (fInserted) { diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 37f6dca3c..e3935cae6 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -66,6 +66,8 @@ struct benchmarkThread; struct clusterNode; struct redisConfig; +int g_fTestMode = false; + static struct config { aeEventLoop *el; const char *hostip; @@ -308,7 +310,7 @@ fail: else fprintf(stderr, "%s\n", hostsocket); freeReplyObject(reply); redisFree(c); - zfree(cfg); + freeRedisConfig(cfg); return NULL; } static void freeRedisConfig(redisConfig *cfg) { @@ -1284,6 +1286,17 @@ static void updateClusterSlotsConfiguration() { pthread_mutex_unlock(&config.is_updating_slots_mutex); } +/* Generate random data for redis benchmark. See #7196. */ +static void genBenchmarkRandomData(char *data, int count) { + static uint32_t state = 1234; + int i = 0; + + while (count--) { + state = (state*1103515245+12345); + data[i++] = '0'+((state>>16)&63); + } +} + /* Returns number of consumed options. */ int parseOptions(int argc, const char **argv) { int i; @@ -1646,7 +1659,7 @@ int main(int argc, const char **argv) { /* Run default benchmark suite. */ data = (char*)zmalloc(config.datasize+1, MALLOC_LOCAL); do { - memset(data,'x',config.datasize); + genBenchmarkRandomData(data, config.datasize); data[config.datasize] = '\0'; if (test_is_selected("ping_inline") || test_is_selected("ping")) diff --git a/src/redis-cli-cpphelper.cpp b/src/redis-cli-cpphelper.cpp index 585b0decf..527eac698 100644 --- a/src/redis-cli-cpphelper.cpp +++ b/src/redis-cli-cpphelper.cpp @@ -108,10 +108,41 @@ extern "C" void freeClusterManager(void) { dictRelease(clusterManagerUncoveredSlots); } +/* This function returns a random master node, return NULL if none */ + +static clusterManagerNode *clusterManagerNodeMasterRandom() { + int master_count = 0; + int idx; + listIter li; + listNode *ln; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = (clusterManagerNode*) ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + master_count++; + } + + srand(time(NULL)); + idx = rand() % master_count; + listRewind(cluster_manager.nodes, &li); + while ((ln = listNext(&li)) != NULL) { + clusterManagerNode *n = (clusterManagerNode*) ln->value; + if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; + if (!idx--) { + return n; + } + } + /* Can not be reached */ + return NULL; +} + static int clusterManagerFixSlotsCoverage(char *all_slots) { - dictIterator *iter = nullptr; int force_fix = config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_FIX_WITH_UNREACHABLE_MASTERS; + /* we want explicit manual confirmation from users for all the fix cases */ + int force = 0; + + dictIterator *iter = nullptr; if (cluster_manager.unreachable_masters > 0 && !force_fix) { clusterManagerLogWarn("*** Fixing slots coverage with %d unreachable masters is dangerous: redis-cli will assume that slots about masters that are not reachable are not covered, and will try to reassign them to the reachable nodes. This can cause data loss and is rarely what you want to do. If you really want to proceed use the --cluster-fix-with-unreachable-masters option.\n", cluster_manager.unreachable_masters); @@ -181,7 +212,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { printf("The following uncovered slots have no keys " "across the cluster:\n"); clusterManagerPrintSlotsList(none); - if (confirmWithYes("Fix these slots by covering with a random node?")){ + if (confirmWithYes("Fix these slots by covering with a random node?", + force)) { listIter li; listNode *ln; listRewind(none, &li); @@ -207,7 +239,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { if (listLength(single) > 0) { printf("The following uncovered slots have keys in just one node:\n"); clusterManagerPrintSlotsList(single); - if (confirmWithYes("Fix these slots by covering with those nodes?")){ + if (confirmWithYes("Fix these slots by covering with those nodes?", + force)) { listIter li; listNode *ln; listRewind(single, &li); @@ -239,7 +272,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) { printf("The following uncovered slots have keys in multiple nodes:\n"); clusterManagerPrintSlotsList(multi); if (confirmWithYes("Fix these slots by moving keys " - "into a single node?")) { + "into a single node?", force)) { listIter li; listNode *ln; listRewind(multi, &li); @@ -539,35 +572,6 @@ dict *clusterManagerGetLinkStatus(void) { return status; } - -/* This function returns a random master node, return NULL if none */ - -static clusterManagerNode *clusterManagerNodeMasterRandom() { - int master_count = 0; - int idx; - listIter li; - listNode *ln; - listRewind(cluster_manager.nodes, &li); - while ((ln = listNext(&li)) != NULL) { - clusterManagerNode *n = (clusterManagerNode*) ln->value; - if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - master_count++; - } - - srand(time(NULL)); - idx = rand() % master_count; - listRewind(cluster_manager.nodes, &li); - while ((ln = listNext(&li)) != NULL) { - clusterManagerNode *n = (clusterManagerNode*) ln->value; - if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue; - if (!idx--) { - return n; - } - } - /* Can not be reached */ - return NULL; -} - extern "C" int clusterManagerCheckCluster(int quiet) { listNode *ln = listFirst(cluster_manager.nodes); if (!ln) return 0; diff --git a/src/redis-cli.c b/src/redis-cli.c index fe9b3ce4a..d818fa4e9 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -69,6 +69,8 @@ redisContext *context; struct config config; +int g_fTestMode = 0; + /* User preferences. */ static struct pref { int hints; @@ -1607,7 +1609,14 @@ static void usage(void) { exit(1); } -int confirmWithYes(const char *msg) { +int confirmWithYes(const char *msg, int force) { + /* if force is true and --cluster-yes option is on, + * do not prompt for an answer */ + if (force && + (config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_YES)) { + return 1; + } + printf("%s (type 'yes' to accept): ", msg); fflush(stdout); char buf[4]; @@ -4586,7 +4595,8 @@ assign_replicas: } clusterManagerOptimizeAntiAffinity(ip_nodes, ip_count); clusterManagerShowNodes(); - if (confirmWithYes("Can I set the above configuration?")) { + int force = 1; + if (confirmWithYes("Can I set the above configuration?", force)) { listRewind(cluster_manager.nodes, &li); while ((ln = listNext(&li)) != NULL) { clusterManagerNode *node = ln->value; diff --git a/src/redis-cli.h b/src/redis-cli.h index f4693dff7..dac839410 100644 --- a/src/redis-cli.h +++ b/src/redis-cli.h @@ -273,7 +273,7 @@ int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr, int *bus_port_ptr); int clusterManagerCheckRedisReply(clusterManagerNode *n, redisReply *r, char **err); -int confirmWithYes(const char *msg); +int confirmWithYes(const char *msg, int force); int clusterManagerSetSlotOwner(clusterManagerNode *owner, int slot, int do_clear); diff --git a/src/replication.cpp b/src/replication.cpp index bd33cb810..7273ad0fc 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -47,7 +47,6 @@ #include #include -long long adjustMeaningfulReplOffset(); void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, connection *conn); void replicationSendAck(redisMaster *mi); @@ -249,7 +248,6 @@ void feedReplicationBacklog(const void *ptr, size_t len) { const unsigned char *p = (const unsigned char*)ptr; g_pserver->master_repl_offset += len; - g_pserver->master_repl_meaningful_offset = g_pserver->master_repl_offset; /* This is a circular buffer, so write as much data we can at every * iteration and rewind the "idx" index if we reach the limit. */ @@ -543,6 +541,40 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { listEmpty(fake->reply); } +/* This is a debugging function that gets called when we detect something + * wrong with the replication protocol: the goal is to peek into the + * replication backlog and show a few final bytes to make simpler to + * guess what kind of bug it could be. */ +void showLatestBacklog(void) { + if (g_pserver->repl_backlog == NULL) return; + + long long dumplen = 256; + if (g_pserver->repl_backlog_histlen < dumplen) + dumplen = g_pserver->repl_backlog_histlen; + + /* Identify the first byte to dump. */ + long long idx = + (g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - dumplen)) % + g_pserver->repl_backlog_size; + + /* Scan the circular buffer to collect 'dumplen' bytes. */ + sds dump = sdsempty(); + while(dumplen) { + long long thislen = + ((g_pserver->repl_backlog_size - idx) < dumplen) ? + (g_pserver->repl_backlog_size - idx) : dumplen; + + dump = sdscatrepr(dump,g_pserver->repl_backlog+idx,thislen); + dumplen -= thislen; + idx = 0; + } + + /* Finally log such bytes: this is vital debugging info to + * understand what happened. */ + serverLog(LL_WARNING,"Latest backlog is: '%s'", dump); + sdsfree(dump); +} + /* This function is used in order to proxy what we receive from our master * to our sub-slaves. */ #include @@ -1006,6 +1038,9 @@ void syncCommand(client *c) { changeReplicationId(); clearReplicationId2(); createReplicationBacklog(); + serverLog(LL_NOTICE,"Replication backlog created, my new " + "replication IDs are '%s' and '%s'", + g_pserver->replid, g_pserver->replid2); } /* CASE 1: BGSAVE is in progress, with disk target. */ @@ -1086,6 +1121,25 @@ void processReplconfUuid(client *c, robj *arg) if (uuid_parse(remoteUUID, c->uuid) != 0) goto LError; + listIter liMi; + listNode *lnMi; + listRewind(g_pserver->masters, &liMi); + + // Enforce a fair ordering for connection, if they attempt to connect before us close them out + // This must be consistent so that both make the same decision of who should proceed first + while ((lnMi = listNext(&liMi))) { + redisMaster *mi = (redisMaster*)listNodeValue(lnMi); + if (mi->repl_state == REPL_STATE_CONNECTED) + continue; + if (FSameUuidNoNil(mi->master_uuid, c->uuid)) { + // Decide based on UUID so both clients make the same decision of which host loses + // otherwise we may entere a loop where neither client can proceed + if (memcmp(mi->master_uuid, c->uuid, UUID_BINARY_LEN) < 0) { + freeClientAsync(c); + } + } + } + char szServerUUID[36 + 2]; // 1 for the '+', another for '\0' szServerUUID[0] = '+'; uuid_unparse(cserver.uuid, szServerUUID+1); @@ -1301,8 +1355,7 @@ void sendBulkToSlave(connection *conn) { client *replica = (client*)connGetPrivateData(conn); serverAssert(FCorrectThread(replica)); - char buf[PROTO_IOBUF_LEN]; - ssize_t nwritten, buflen; + ssize_t nwritten; AeLocker aeLock; std::unique_lock ul(replica->lock); @@ -1331,27 +1384,36 @@ void sendBulkToSlave(connection *conn) { } } - /* If the preamble was already transferred, send the RDB bulk data. */ - lseek(replica->repldbfd,replica->repldboff,SEEK_SET); - buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN); - if (buflen <= 0) { - serverLog(LL_WARNING,"Read error sending DB to replica: %s", - (buflen == 0) ? "premature EOF" : strerror(errno)); - ul.unlock(); - aeLock.arm(nullptr); - freeClient(replica); - return; - } - if ((nwritten = connWrite(conn,buf,buflen)) == -1) { - if (connGetState(conn) != CONN_STATE_CONNECTED) { - serverLog(LL_WARNING,"Write error sending DB to replica: %s", - connGetLastError(conn)); + /* If the preamble was already transferred, send the RDB bulk data. + * try to use sendfile system call if supported, unless tls is enabled. + * fallback to normal read+write otherwise. */ + nwritten = 0; + if (!nwritten) { + ssize_t buflen; + char buf[PROTO_IOBUF_LEN]; + + lseek(replica->repldbfd,replica->repldboff,SEEK_SET); + buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN); + if (buflen <= 0) { + serverLog(LL_WARNING,"Read error sending DB to replica: %s", + (buflen == 0) ? "premature EOF" : strerror(errno)); ul.unlock(); aeLock.arm(nullptr); freeClient(replica); + return; + } + if ((nwritten = connWrite(conn,buf,buflen)) == -1) { + if (connGetState(conn) != CONN_STATE_CONNECTED) { + serverLog(LL_WARNING,"Write error sending DB to replica: %s", + connGetLastError(conn)); + ul.unlock(); + aeLock.arm(nullptr); + freeClient(replica); + } + return; } - return; } + replica->repldboff += nwritten; g_pserver->stat_net_output_bytes += nwritten; if (replica->repldboff == replica->repldbsize) { @@ -1936,6 +1998,10 @@ void readSyncBulkPayload(connection *conn) { nread = connRead(conn,buf,readlen); if (nread <= 0) { + if (connGetState(conn) == CONN_STATE_CONNECTED) { + /* equivalent to EAGAIN */ + return; + } serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s", (nread == -1) ? strerror(errno) : "connection lost"); cancelReplicationHandshake(mi); @@ -2206,7 +2272,6 @@ void readSyncBulkPayload(connection *conn) { * we are starting a new history. */ memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid)); g_pserver->master_repl_offset = mi->master->reploff; - g_pserver->master_repl_meaningful_offset = mi->master->reploff; } clearReplicationId2(); @@ -3054,11 +3119,6 @@ void replicationUnsetMaster(redisMaster *mi) { sdsfree(mi->masterhost); mi->masterhost = NULL; - /* When a slave is turned into a master, the current replication ID - * (that was inherited from the master at synchronization time) is - * used as secondary ID up to the current offset, and a new replication - * ID is created to continue with a new replication history. */ - shiftReplicationId(); if (mi->master) { if (FCorrectThread(mi->master)) freeClient(mi->master); @@ -3067,6 +3127,15 @@ void replicationUnsetMaster(redisMaster *mi) { } replicationDiscardCachedMaster(mi); cancelReplicationHandshake(mi); + /* When a slave is turned into a master, the current replication ID + * (that was inherited from the master at synchronization time) is + * used as secondary ID up to the current offset, and a new replication + * ID is created to continue with a new replication history. + * + * NOTE: this function MUST be called after we call + * freeClient(server.master), since there we adjust the replication + * offset trimming the final PINGs. See Github issue #7320. */ + shiftReplicationId(); /* Disconnecting all the slaves is required: we need to inform slaves * of the replication ID change (see shiftReplicationId() call). However * the slaves will be able to partially resync with us, so it will be @@ -3302,10 +3371,6 @@ void replicationCacheMaster(redisMaster *mi, client *c) { * pending outputs to the master. */ sdsclear(mi->master->querybuf); sdsclear(mi->master->pending_querybuf); - - /* Adjust reploff and read_reploff to the last meaningful offset we executed. - * this is the offset the replica will use for future PSYNC. */ - mi->master->reploff = adjustMeaningfulReplOffset(); mi->master->read_reploff = mi->master->reploff; if (c->flags & CLIENT_MULTI) discardTransaction(c); listEmpty(c->reply); @@ -3331,38 +3396,6 @@ void replicationCacheMaster(redisMaster *mi, client *c) { replicationHandleMasterDisconnection(mi); } -/* If the "meaningful" offset, that is the offset without the final PINGs - * in the stream, is different than the last offset, use it instead: - * often when the master is no longer reachable, replicas will never - * receive the PINGs, however the master will end with an incremented - * offset because of the PINGs and will not be able to incrementally - * PSYNC with the new master. - * This function trims the replication backlog when needed, and returns - * the offset to be used for future partial sync. */ -long long adjustMeaningfulReplOffset() { - if (g_pserver->master_repl_offset > g_pserver->master_repl_meaningful_offset) { - long long delta = g_pserver->master_repl_offset - - g_pserver->master_repl_meaningful_offset; - serverLog(LL_NOTICE, - "Using the meaningful offset %lld instead of %lld to exclude " - "the final PINGs (%lld bytes difference)", - g_pserver->master_repl_meaningful_offset, - g_pserver->master_repl_offset, - delta); - g_pserver->master_repl_offset = g_pserver->master_repl_meaningful_offset; - if (g_pserver->repl_backlog_histlen <= delta) { - g_pserver->repl_backlog_histlen = 0; - g_pserver->repl_backlog_idx = 0; - } else { - g_pserver->repl_backlog_histlen -= delta; - g_pserver->repl_backlog_idx = - (g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - delta)) % - g_pserver->repl_backlog_size; - } - } - return g_pserver->master_repl_offset; -} - /* This function is called when a master is turend into a slave, in order to * create from scratch a cached master for the new client, that will allow * to PSYNC with the slave that was promoted as the new master after a @@ -3388,7 +3421,7 @@ void replicationCacheMasterUsingMyself(redisMaster *mi) { * by replicationCreateMasterClient(). We'll later set the created * master as server.cached_master, so the replica will use such * offset for PSYNC. */ - mi->master_initial_offset = adjustMeaningfulReplOffset(); + mi->master_initial_offset = g_pserver->master_repl_offset; /* The master client we create can be set to any DBID, because * the new master will start its replication stream with SELECT. */ @@ -3730,6 +3763,18 @@ void replicationCron(void) { listIter liMaster; listNode *lnMaster; listRewind(g_pserver->masters, &liMaster); + + bool fInMasterConnection = false; + while ((lnMaster = listNext(&liMaster)) && !fInMasterConnection) + { + redisMaster *mi = (redisMaster*)listNodeValue(lnMaster); + if (mi->repl_state != REPL_STATE_NONE && mi->repl_state != REPL_STATE_CONNECTED && mi->repl_state != REPL_STATE_CONNECT) { + fInMasterConnection = true; + } + } + + bool fConnectionStarted = false; + listRewind(g_pserver->masters, &liMaster); while ((lnMaster = listNext(&liMaster))) { redisMaster *mi = (redisMaster*)listNodeValue(lnMaster); @@ -3768,12 +3813,14 @@ void replicationCron(void) { } /* Check if we should connect to a MASTER */ - if (mi->repl_state == REPL_STATE_CONNECT) { + if (mi->repl_state == REPL_STATE_CONNECT && !fInMasterConnection) { serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", mi->masterhost, mi->masterport); if (connectWithMaster(mi) == C_OK) { serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started"); } + fInMasterConnection = true; + fConnectionStarted = true; } /* Send ACK to master from time to time. @@ -3784,6 +3831,11 @@ void replicationCron(void) { replicationSendAck(mi); } + if (fConnectionStarted) { + // If we cancel this handshake we want the next attempt to be a different master + listRotateHeadToTail(g_pserver->masters); + } + /* If we have attached slaves, PING them from time to time. * So slaves can implement an explicit timeout to masters, and will * be able to detect a link disconnection even if the TCP connection @@ -3806,18 +3858,10 @@ void replicationCron(void) { clientsArePaused(); if (!manual_failover_in_progress) { - long long before_ping = g_pserver->master_repl_meaningful_offset; ping_argv[0] = createStringObject("PING",4); replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb, ping_argv, 1); decrRefCount(ping_argv[0]); - /* The server.master_repl_meaningful_offset variable represents - * the offset of the replication stream without the pending PINGs. - * This is useful to set the right replication offset for PSYNC - * when the master is turned into a replica. Otherwise pending - * PINGs may not allow it to perform an incremental sync with the - * new master. */ - g_pserver->master_repl_meaningful_offset = before_ping; } } diff --git a/src/server.cpp b/src/server.cpp index 157655805..63e7979be 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2029,8 +2029,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { { processUnblockedClients(IDX_EVENT_LOOP_MAIN); } - - ProcessPendingAsyncWrites(); // This is really a bug, but for now catch any laggards that didn't clean up /* Software watchdog: deliver the SIGALRM that will reach the signal * handler if we don't return here fast enough. */ @@ -2236,6 +2234,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { run_with_period(30000) { checkTrialTimeout(); + + /* Tune the fastlock to CPU load */ + fastlock_auto_adjust_waits(); } /* Resize tracking keys table if needed. This is also done at every @@ -2273,6 +2274,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { &ei); + /* CRON functions may trigger async writes, so do this last */ + ProcessPendingAsyncWrites(); + g_pserver->cronloops++; return 1000/g_pserver->hz; } @@ -2656,7 +2660,6 @@ void initServerConfig(void) { g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER; g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT; g_pserver->master_repl_offset = 0; - g_pserver->master_repl_meaningful_offset = 0; /* Replication partial resync backlog */ g_pserver->repl_backlog = NULL; @@ -4848,8 +4851,17 @@ sds genRedisInfoString(const char *section) { listIter li; listNode *ln; listRewind(g_pserver->masters, &li); + bool fAllUp = true; + while ((ln = listNext(&li))) { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + fAllUp = fAllUp && mi->repl_state == REPL_STATE_CONNECTED; + } + + info = sdscatprintf(info, "master_global_link_status:%s\r\n", + fAllUp ? "up" : "down"); int cmasters = 0; + listRewind(g_pserver->masters, &li); while ((ln = listNext(&li))) { long long slave_repl_offset = 1; @@ -4961,7 +4973,6 @@ sds genRedisInfoString(const char *section) { "master_replid:%s\r\n" "master_replid2:%s\r\n" "master_repl_offset:%lld\r\n" - "master_repl_meaningful_offset:%lld\r\n" "second_repl_offset:%lld\r\n" "repl_backlog_active:%d\r\n" "repl_backlog_size:%lld\r\n" @@ -4970,7 +4981,6 @@ sds genRedisInfoString(const char *section) { g_pserver->replid, g_pserver->replid2, g_pserver->master_repl_offset, - g_pserver->master_repl_meaningful_offset, g_pserver->second_replid_offset, g_pserver->repl_backlog != NULL, g_pserver->repl_backlog_size, @@ -5390,7 +5400,6 @@ void loadDataFromDisk(void) { { memcpy(g_pserver->replid,rsi.repl_id,sizeof(g_pserver->replid)); g_pserver->master_repl_offset = rsi.repl_offset; - g_pserver->master_repl_meaningful_offset = rsi.repl_offset; listIter li; listNode *ln; diff --git a/src/server.h b/src/server.h index a04e3301c..bff867758 100644 --- a/src/server.h +++ b/src/server.h @@ -1645,6 +1645,7 @@ typedef struct client { std::atomic flags; /* Client flags: CLIENT_* macros. */ int casyncOpsPending; int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */ + int fPendingAsyncWriteHandler; int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a replica. */ int repl_put_online_on_ack; /* Install replica write handler on ACK. */ @@ -2228,7 +2229,6 @@ struct redisServer { char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */ char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/ long long master_repl_offset; /* My current replication offset */ - long long master_repl_meaningful_offset; /* Offset minus latest PINGs. */ long long second_replid_offset; /* Accept offsets up to this for replid2. */ int replicaseldb; /* Last SELECTed DB in replication output */ int repl_ping_slave_period; /* Master pings the replica every N seconds */ @@ -2825,6 +2825,7 @@ void chopReplicationBacklog(void); void replicationCacheMasterUsingMyself(struct redisMaster *mi); void feedReplicationBacklog(const void *ptr, size_t len); void updateMasterAuth(); +void showLatestBacklog(); void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); void rdbPipeWriteHandlerConnRemoved(struct connection *conn); diff --git a/src/t_string.cpp b/src/t_string.cpp index a07f0d797..8b8d62cac 100644 --- a/src/t_string.cpp +++ b/src/t_string.cpp @@ -521,7 +521,7 @@ void stralgoLCS(client *c) { != C_OK) return; if (minmatchlen < 0) minmatchlen = 0; j++; - } else if (!strcasecmp(opt,"STRINGS")) { + } else if (!strcasecmp(opt,"STRINGS") && moreargs > 1) { if (a != NULL) { addReplyError(c,"Either use STRINGS or KEYS"); return; @@ -529,7 +529,7 @@ void stralgoLCS(client *c) { a = szFromObj(c->argv[j+1]); b = szFromObj(c->argv[j+2]); j += 2; - } else if (!strcasecmp(opt,"KEYS")) { + } else if (!strcasecmp(opt,"KEYS") && moreargs > 1) { if (a != NULL) { addReplyError(c,"Either use STRINGS or KEYS"); return; diff --git a/src/tls.cpp b/src/tls.cpp index d297695cc..4284a27bf 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -129,7 +129,7 @@ static void initCryptoLocks(void) { return; } nlocks = CRYPTO_num_locks(); - openssl_locks = zmalloc(sizeof(*openssl_locks) * nlocks); + openssl_locks = (pthread_mutex_t*)zmalloc(sizeof(*openssl_locks) * nlocks); for (i = 0; i < nlocks; i++) { pthread_mutex_init(openssl_locks + i, NULL); } @@ -349,6 +349,7 @@ connection *connCreateTLS(void) { void connTLSMarshalThread(connection *c) { tls_connection *conn = (tls_connection*)c; + serverAssert(conn->pending_list_node == nullptr); conn->el = serverTL->el; } @@ -458,7 +459,6 @@ void updateSSLEvent(tls_connection *conn) { void tlsHandleEvent(tls_connection *conn, int mask) { int ret; - serverAssert(!GlobalLocksAcquired()); serverAssert(conn->el == serverTL->el); TLSCONN_DEBUG("tlsEventHandler(): fd=%d, state=%d, mask=%d, r=%d, w=%d, flags=%d", @@ -910,6 +910,7 @@ int tlsHasPendingData() { int tlsProcessPendingData() { listIter li; listNode *ln; + serverAssert(!GlobalLocksAcquired()); int processed = listLength(pending_list); listRewind(pending_list,&li); diff --git a/src/tracking.cpp b/src/tracking.cpp index 2c0f4812b..2bb53e615 100644 --- a/src/tracking.cpp +++ b/src/tracking.cpp @@ -209,6 +209,7 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot if (c->client_tracking_redirection) { client *redir = lookupClientByID(c->client_tracking_redirection); if (!redir) { + c->flags |= CLIENT_TRACKING_BROKEN_REDIR; /* We need to signal to the original connection that we * are unable to send invalidation messages to the redirected * connection, because the client no longer exist. */ diff --git a/tests/cluster/tests/15-cluster-slots.tcl b/tests/cluster/tests/15-cluster-slots.tcl index dc9938ef6..1b33c57bd 100644 --- a/tests/cluster/tests/15-cluster-slots.tcl +++ b/tests/cluster/tests/15-cluster-slots.tcl @@ -41,4 +41,10 @@ test "client do not break when cluster slot" { if { [catch {R 0 cluster slots}] } { fail "output overflow when cluster slots" } -} \ No newline at end of file +} + +test "client can handle keys with hash tag" { + set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]] + $cluster set foo{tag} bar + $cluster close +} diff --git a/tests/instances.tcl b/tests/instances.tcl index d243e9896..5e546f211 100644 --- a/tests/instances.tcl +++ b/tests/instances.tcl @@ -25,6 +25,7 @@ set ::sentinel_instances {} set ::redis_instances {} set ::sentinel_base_port 20000 set ::redis_base_port 30000 +set ::redis_port_count 1024 set ::pids {} ; # We kill everything at exit set ::dirs {} ; # We remove all the temp dirs at exit set ::run_matching {} ; # If non empty, only tests matching pattern are run. @@ -60,7 +61,7 @@ proc exec_instance {type cfgfile dir} { # Spawn a redis or sentinel instance, depending on 'type'. proc spawn_instance {type base_port count {conf {}}} { for {set j 0} {$j < $count} {incr j} { - set port [find_available_port $base_port] + set port [find_available_port $base_port $::redis_port_count] incr base_port puts "Starting $type #$j at port $port" diff --git a/tests/integration/aof.tcl b/tests/integration/aof.tcl index 192c7a84b..d1222b0e4 100644 --- a/tests/integration/aof.tcl +++ b/tests/integration/aof.tcl @@ -315,7 +315,7 @@ tags {"aof"} { } test "AOF+PEXPIREMEMBERAT: set should have 3 values" { - set client [redis [dict get $srv host] [dict get $srv port]] + set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls] wait_for_condition 50 100 { [catch {$client ping} e] == 0 } else { diff --git a/tests/integration/psync2-pingoff.tcl b/tests/integration/psync2-pingoff.tcl index 420747d21..5a9a46d16 100644 --- a/tests/integration/psync2-pingoff.tcl +++ b/tests/integration/psync2-pingoff.tcl @@ -1,6 +1,9 @@ -# Test the meaningful offset implementation to make sure masters -# are able to PSYNC with replicas even if the replication stream -# has pending PINGs at the end. +# These tests were added together with the meaningful offset implementation +# in redis 6.0.0, which was later abandoned in 6.0.4, they used to test that +# servers are able to PSYNC with replicas even if the replication stream has +# PINGs at the end which present in one sever and missing on another. +# We keep these tests just because they reproduce edge cases in the replication +# logic in hope they'll be able to spot some problem in the future. start_server {tags {"psync2"}} { start_server {} { @@ -16,7 +19,7 @@ start_server {} { } # Setup replication - test "PSYNC2 meaningful offset: setup" { + test "PSYNC2 pingoff: setup" { $R(1) replicaof $R_host(0) $R_port(0) $R(0) set foo bar wait_for_condition 50 1000 { @@ -27,7 +30,7 @@ start_server {} { } } - test "PSYNC2 meaningful offset: write and wait replication" { + test "PSYNC2 pingoff: write and wait replication" { $R(0) INCR counter $R(0) INCR counter $R(0) INCR counter @@ -41,7 +44,7 @@ start_server {} { # In this test we'll make sure the replica will get stuck, but with # an active connection: this way the master will continue to send PINGs # every second (we modified the PING period earlier) - test "PSYNC2 meaningful offset: pause replica and promote it" { + test "PSYNC2 pingoff: pause replica and promote it" { $R(1) MULTI $R(1) DEBUG SLEEP 5 $R(1) SLAVEOF NO ONE @@ -50,13 +53,179 @@ start_server {} { } test "Make the old master a replica of the new one and check conditions" { - set sync_partial [status $R(1) sync_partial_ok] - assert {$sync_partial == 0} + assert_equal [status $R(1) sync_full] 0 $R(0) REPLICAOF $R_host(1) $R_port(1) wait_for_condition 50 1000 { - [status $R(1) sync_partial_ok] == 1 + [status $R(1) sync_full] == 1 } else { - fail "The new master was not able to partial sync" + fail "The new master was not able to sync" } + + # make sure replication is still alive and kicking + $R(1) incr x + wait_for_condition 50 1000 { + [$R(0) get x] == 1 + } else { + fail "replica didn't get incr" + } + assert_equal [status $R(0) master_repl_offset] [status $R(1) master_repl_offset] } }} + + +start_server {tags {"psync2"}} { +start_server {} { +start_server {} { +start_server {} { +start_server {} { + test {test various edge cases of repl topology changes with missing pings at the end} { + set master [srv -4 client] + set master_host [srv -4 host] + set master_port [srv -4 port] + set replica1 [srv -3 client] + set replica2 [srv -2 client] + set replica3 [srv -1 client] + set replica4 [srv -0 client] + + $replica1 replicaof $master_host $master_port + $replica2 replicaof $master_host $master_port + $replica3 replicaof $master_host $master_port + $replica4 replicaof $master_host $master_port + wait_for_condition 50 1000 { + [status $master connected_slaves] == 4 + } else { + fail "replicas didn't connect" + } + + $master incr x + wait_for_condition 50 1000 { + [$replica1 get x] == 1 && [$replica2 get x] == 1 && + [$replica3 get x] == 1 && [$replica4 get x] == 1 + } else { + fail "replicas didn't get incr" + } + + # disconnect replica1 and replica2 + # and wait for the master to send a ping to replica3 and replica4 + $replica1 replicaof no one + $replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id + $master config set repl-ping-replica-period 1 + after 1500 + + # make everyone sync from the replica1 that didn't get the last ping from the old master + # replica4 will keep syncing from the old master which now syncs from replica1 + # and replica2 will re-connect to the old master (which went back in time) + set new_master_host [srv -3 host] + set new_master_port [srv -3 port] + $replica3 replicaof $new_master_host $new_master_port + $master replicaof $new_master_host $new_master_port + $replica2 replicaof $master_host $master_port + wait_for_condition 50 1000 { + [status $replica2 master_link_status] == "up" && + [status $replica3 master_link_status] == "up" && + [status $replica4 master_link_status] == "up" && + [status $master master_link_status] == "up" + } else { + fail "replicas didn't connect" + } + + # make sure replication is still alive and kicking + $replica1 incr x + wait_for_condition 50 1000 { + [$replica2 get x] == 2 && + [$replica3 get x] == 2 && + [$replica4 get x] == 2 && + [$master get x] == 2 + } else { + fail "replicas didn't get incr" + } + + # make sure we have the right amount of full syncs + assert_equal [status $master sync_full] 6 + assert_equal [status $replica1 sync_full] 2 + assert_equal [status $replica2 sync_full] 0 + assert_equal [status $replica3 sync_full] 0 + assert_equal [status $replica4 sync_full] 0 + + # force psync + $master client kill type master + $replica2 client kill type master + $replica3 client kill type master + $replica4 client kill type master + + # make sure replication is still alive and kicking + $replica1 incr x + wait_for_condition 50 1000 { + [$replica2 get x] == 3 && + [$replica3 get x] == 3 && + [$replica4 get x] == 3 && + [$master get x] == 3 + } else { + fail "replicas didn't get incr" + } + + # make sure we have the right amount of full syncs + assert_equal [status $master sync_full] 6 + assert_equal [status $replica1 sync_full] 2 + assert_equal [status $replica2 sync_full] 0 + assert_equal [status $replica3 sync_full] 0 + assert_equal [status $replica4 sync_full] 0 +} +}}}}} + +start_server {tags {"psync2"}} { +start_server {} { +start_server {} { + + for {set j 0} {$j < 3} {incr j} { + set R($j) [srv [expr 0-$j] client] + set R_host($j) [srv [expr 0-$j] host] + set R_port($j) [srv [expr 0-$j] port] + $R($j) CONFIG SET repl-ping-replica-period 1 + } + + test "Chained replicas disconnect when replica re-connect with the same master" { + # Add a second replica as a chained replica of the current replica + $R(1) replicaof $R_host(0) $R_port(0) + $R(2) replicaof $R_host(1) $R_port(1) + wait_for_condition 50 1000 { + [status $R(2) master_link_status] == "up" + } else { + fail "Chained replica not replicating from its master" + } + + # Do a write on the master, and wait for 3 seconds for the master to + # send some PINGs to its replica + $R(0) INCR counter2 + after 2000 + set sync_partial_master [status $R(0) sync_partial_ok] + set sync_partial_replica [status $R(1) sync_partial_ok] + $R(0) CONFIG SET repl-ping-replica-period 100 + + # Disconnect the master's direct replica + $R(0) client kill type replica + wait_for_condition 50 1000 { + [status $R(1) master_link_status] == "up" && + [status $R(2) master_link_status] == "up" && + [status $R(0) sync_partial_ok] == $sync_partial_master + 1 && + [status $R(1) sync_partial_ok] == $sync_partial_replica + } else { + fail "Disconnected replica failed to PSYNC with master" + } + + # Verify that the replica and its replica's meaningful and real + # offsets match with the master + assert_equal [status $R(0) master_repl_offset] [status $R(1) master_repl_offset] + assert_equal [status $R(0) master_repl_offset] [status $R(2) master_repl_offset] + + # make sure replication is still alive and kicking + $R(0) incr counter2 + wait_for_condition 50 1000 { + [$R(1) get counter2] == 2 && [$R(2) get counter2] == 2 + } else { + fail "replicas didn't get incr" + } + assert_equal [status $R(0) master_repl_offset] [status $R(1) master_repl_offset] + assert_equal [status $R(0) master_repl_offset] [status $R(2) master_repl_offset] + } +}}} diff --git a/tests/integration/psync2.tcl b/tests/integration/psync2.tcl index f1a3a6ea2..4dfa7cddc 100644 --- a/tests/integration/psync2.tcl +++ b/tests/integration/psync2.tcl @@ -1,3 +1,79 @@ + +proc show_cluster_status {} { + uplevel 1 { + # The following is the regexp we use to match the log line + # time info. Logs are in the following form: + # + # 11296:M 25 May 2020 17:37:14.652 # Server initialized + set log_regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*} + set repl_regexp {(master|repl|sync|backlog|meaningful|offset)} + + puts "Master ID is $master_id" + for {set j 0} {$j < 5} {incr j} { + puts "$j: sync_full: [status $R($j) sync_full]" + puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]" + puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]" + puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]" + puts "$j: x var is : [$R($j) GET x]" + puts "---" + } + + # Show the replication logs of every instance, interleaving + # them by the log date. + # + # First: load the lines as lists for each instance. + array set log {} + for {set j 0} {$j < 5} {incr j} { + set fd [open $R_log($j)] + set found 0 + set tries 0 + while {([gets $fd l] >= 0 || !$found) && $tries < 1000} { + if {[regexp $log_regexp $l] && + [regexp -nocase $repl_regexp $l]} { + lappend log($j) $l + set found 1 + } + incr $tries + } + close $fd + } + + # To interleave the lines, at every step consume the element of + # the list with the lowest time and remove it. Do it until + # all the lists are empty. + # + # regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*} $l - logdate + while 1 { + # Find the log with smallest time. + set empty 0 + set best 0 + set bestdate {} + for {set j 0} {$j < 5} {incr j} { + if {[llength $log($j)] == 0} { + incr empty + continue + } + regexp $log_regexp [lindex $log($j) 0] - date + if {$bestdate eq {}} { + set best $j + set bestdate $date + } else { + if {[string compare $bestdate $date] > 0} { + set best $j + set bestdate $date + } + } + } + if {$empty == 5} break ; # Our exit condition: no more logs + + # Emit the one with the smallest time (that is the first + # event in the time line). + puts "\[$best port $R_port($best)\] [lindex $log($best) 0]" + set log($best) [lrange $log($best) 1 end] + } + } +} + start_server {tags {"psync2"}} { start_server {} { start_server {} { @@ -12,7 +88,7 @@ start_server {} { set no_exit 0 ; # Do not exit at end of the test - set duration 20 ; # Total test seconds + set duration 40 ; # Total test seconds set genload 1 ; # Load master with writes at every cycle @@ -28,6 +104,7 @@ start_server {} { set R($j) [srv [expr 0-$j] client] set R_host($j) [srv [expr 0-$j] host] set R_port($j) [srv [expr 0-$j] port] + set R_log($j) [srv [expr 0-$j] stdout] if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"} } @@ -74,6 +151,7 @@ start_server {} { [status $R([expr {($master_id+3)%5}]) master_link_status] == "up" && [status $R([expr {($master_id+4)%5}]) master_link_status] == "up" } else { + show_cluster_status fail "Replica not reconnecting" } @@ -85,6 +163,7 @@ start_server {} { wait_for_condition 50 2000 { [$R($j) get x] == $counter_value } else { + show_cluster_status fail "Instance #$j x variable is inconsistent" } } @@ -120,44 +199,27 @@ start_server {} { wait_for_condition 50 2000 { [$R($j) get x] == $counter_value } else { + show_cluster_status fail "Instance #$j x variable is inconsistent" } } } - # wait for all the slaves to be in sync with the master, due to pings, we have to re-sample the master constantly too + # wait for all the slaves to be in sync. + set masteroff [status $R($master_id) master_repl_offset] wait_for_condition 500 100 { - [status $R($master_id) master_repl_offset] == [status $R(0) master_repl_offset] && - [status $R($master_id) master_repl_offset] == [status $R(1) master_repl_offset] && - [status $R($master_id) master_repl_offset] == [status $R(2) master_repl_offset] && - [status $R($master_id) master_repl_offset] == [status $R(3) master_repl_offset] && - [status $R($master_id) master_repl_offset] == [status $R(4) master_repl_offset] + [status $R(0) master_repl_offset] >= $masteroff && + [status $R(1) master_repl_offset] >= $masteroff && + [status $R(2) master_repl_offset] >= $masteroff && + [status $R(3) master_repl_offset] >= $masteroff && + [status $R(4) master_repl_offset] >= $masteroff } else { - for {set j 0} {$j < 5} {incr j} { - puts "$j: sync_full: [status $R($j) sync_full]" - puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]" - puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]" - puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]" - puts "---" - } - fail "Slaves are not in sync with the master after too long time." + show_cluster_status + fail "Replicas offsets didn't catch up with the master after too long time." } - # Put down the old master so that it cannot generate more - # replication stream, this way in the next master switch, the time at - # which we move slaves away is not important, each will have full - # history (otherwise PINGs will make certain slaves have more history), - # and sometimes a full resync will be needed. - $R($master_id) slaveof 127.0.0.1 0 ;# We use port zero to make it fail. - if {$debug_msg} { - for {set j 0} {$j < 5} {incr j} { - puts "$j: sync_full: [status $R($j) sync_full]" - puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]" - puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]" - puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]" - puts "---" - } + show_cluster_status } test "PSYNC2: total sum of full synchronizations is exactly 4" { @@ -165,7 +227,24 @@ start_server {} { for {set j 0} {$j < 5} {incr j} { incr sum [status $R($j) sync_full] } - assert {$sum == 4} + if {$sum != 4} { + show_cluster_status + assert {$sum == 4} + } + } + + # In absence of pings, are the instances really able to have + # the exact same offset? + $R($master_id) config set repl-ping-replica-period 3600 + wait_for_condition 500 100 { + [status $R($master_id) master_repl_offset] == [status $R(0) master_repl_offset] && + [status $R($master_id) master_repl_offset] == [status $R(1) master_repl_offset] && + [status $R($master_id) master_repl_offset] == [status $R(2) master_repl_offset] && + [status $R($master_id) master_repl_offset] == [status $R(3) master_repl_offset] && + [status $R($master_id) master_repl_offset] == [status $R(4) master_repl_offset] + } else { + show_cluster_status + fail "Replicas and master offsets were unable to match *exactly*." } # Limit anyway the maximum number of cycles. This is useful when the @@ -192,6 +271,7 @@ start_server {} { [status $R([expr {($master_id+3)%5}]) master_link_status] == "up" && [status $R([expr {($master_id+4)%5}]) master_link_status] == "up" } else { + show_cluster_status fail "Replica not reconnecting" } } @@ -215,6 +295,7 @@ start_server {} { puts "prev sync_partial_ok: $sync_partial" puts "prev sync_partial_err: $sync_partial_err" puts [$R($master_id) info stats] + show_cluster_status fail "Replica didn't partial sync" } set new_sync_count [status $R($master_id) sync_full] @@ -236,6 +317,7 @@ start_server {} { wait_for_condition 50 2000 { [$R($master_id) debug digest] == [$R($slave_id) debug digest] } else { + show_cluster_status fail "Replica not reconnecting" } @@ -270,6 +352,7 @@ start_server {} { wait_for_condition 50 2000 { [status $R($master_id) connected_slaves] == 4 } else { + show_cluster_status fail "Replica not reconnecting" } set new_sync_count [status $R($master_id) sync_full] @@ -280,6 +363,7 @@ start_server {} { wait_for_condition 50 2000 { [$R($master_id) debug digest] == [$R($slave_id) debug digest] } else { + show_cluster_status fail "Debug digest mismatch between master and replica in post-restart handshake" } } @@ -289,103 +373,3 @@ start_server {} { } }}}}} - -start_server {tags {"psync2"}} { -start_server {} { -start_server {} { -start_server {} { -start_server {} { - test {pings at the end of replication stream are ignored for psync} { - set master [srv -4 client] - set master_host [srv -4 host] - set master_port [srv -4 port] - set replica1 [srv -3 client] - set replica2 [srv -2 client] - set replica3 [srv -1 client] - set replica4 [srv -0 client] - - $replica1 replicaof $master_host $master_port - $replica2 replicaof $master_host $master_port - $replica3 replicaof $master_host $master_port - $replica4 replicaof $master_host $master_port - wait_for_condition 50 1000 { - [status $master connected_slaves] == 4 - } else { - fail "replicas didn't connect" - } - - $master incr x - wait_for_condition 50 1000 { - [$replica1 get x] == 1 && [$replica2 get x] == 1 && - [$replica3 get x] == 1 && [$replica4 get x] == 1 - } else { - fail "replicas didn't get incr" - } - - # disconnect replica1 and replica2 - # and wait for the master to send a ping to replica3 and replica4 - $replica1 replicaof no one - $replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id - $master config set repl-ping-replica-period 1 - after 1500 - - # make everyone sync from the replica1 that didn't get the last ping from the old master - # replica4 will keep syncing from the old master which now syncs from replica1 - # and replica2 will re-connect to the old master (which went back in time) - set new_master_host [srv -3 host] - set new_master_port [srv -3 port] - $replica3 replicaof $new_master_host $new_master_port - $master replicaof $new_master_host $new_master_port - $replica2 replicaof $master_host $master_port - wait_for_condition 50 1000 { - [status $replica2 master_link_status] == "up" && - [status $replica3 master_link_status] == "up" && - [status $replica4 master_link_status] == "up" && - [status $master master_link_status] == "up" - } else { - fail "replicas didn't connect" - } - - # make sure replication is still alive and kicking - $replica1 incr x - wait_for_condition 50 1000 { - [$replica2 get x] == 2 && - [$replica3 get x] == 2 && - [$replica4 get x] == 2 && - [$master get x] == 2 - } else { - fail "replicas didn't get incr" - } - - # make sure there are full syncs other than the initial ones - assert_equal [status $master sync_full] 4 - assert_equal [status $replica1 sync_full] 0 - assert_equal [status $replica2 sync_full] 0 - assert_equal [status $replica3 sync_full] 0 - assert_equal [status $replica4 sync_full] 0 - - # force psync - $master client kill type master - $replica2 client kill type master - $replica3 client kill type master - $replica4 client kill type master - - # make sure replication is still alive and kicking - $replica1 incr x - wait_for_condition 50 1000 { - [$replica2 get x] == 3 && - [$replica3 get x] == 3 && - [$replica4 get x] == 3 && - [$master get x] == 3 - } else { - fail "replicas didn't get incr" - } - - # make sure there are full syncs other than the initial ones - assert_equal [status $master sync_full] 4 - assert_equal [status $replica1 sync_full] 0 - assert_equal [status $replica2 sync_full] 0 - assert_equal [status $replica3 sync_full] 0 - assert_equal [status $replica4 sync_full] 0 -} -}}}}} diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 92692e44d..fb49e5dc4 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -36,6 +36,7 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { test {Active replicas propogate} { $master set testkey foo + after 500 wait_for_condition 50 500 { [string match *foo* [$slave get testkey]] } else { diff --git a/tests/integration/replication-multimaster.tcl b/tests/integration/replication-multimaster.tcl index ff7cebac5..858cff26a 100644 --- a/tests/integration/replication-multimaster.tcl +++ b/tests/integration/replication-multimaster.tcl @@ -31,13 +31,21 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { $R(3) replicaof $R_host(2) $R_port(2) } - after 2000 + test "$topology all nodes up" { + for {set j 0} {$j < 4} {incr j} { + wait_for_condition 50 100 { + [string match {*master_global_link_status:up*} [$R($j) info replication]] + } else { + fail "Multimaster group didn't connect up in a reasonable period of time" + } + } + } test "$topology replicates to all nodes" { $R(0) set testkey foo after 500 for {set n 0} {$n < 4} {incr n} { - wait_for_condition 50 1000 { + wait_for_condition 50 100 { [$R($n) get testkey] == "foo" } else { fail "Failed to replicate to $n" @@ -48,12 +56,17 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { test "$topology replicates only once" { $R(0) set testkey 1 after 500 + #wait_for_condition 50 100 { + # [$R(1) get testkey] == 1 && [$R(2) get testkey] == 1 + #} else { + # fail "Set failed to replicate" + #} $R(1) incr testkey after 500 $R(2) incr testkey after 500 for {set n 0} {$n < 4} {incr n} { - wait_for_condition 50 1000 { + wait_for_condition 100 100 { [$R($n) get testkey] == 3 } else { fail "node $n did not replicate" @@ -69,7 +82,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} { $R(0) incr testkey $R(0) exec for {set n 0} {$n < 4} {incr n} { - wait_for_condition 50 1000 { + wait_for_condition 50 100 { [$R($n) get testkey] == 3 } else { fail "node $n failed to replicate" diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index f856a6307..b67f5428c 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -1,10 +1,3 @@ -proc log_file_matches {log pattern} { - set fp [open $log r] - set content [read $fp] - close $fp - string match $pattern $content -} - start_server {tags {"repl"} overrides {hz 100}} { set slave [srv 0 client] set slave_host [srv 0 host] @@ -612,7 +605,7 @@ start_server {tags {"repl"}} { # Wait that replicas acknowledge they are online so # we are sure that DBSIZE and DEBUG DIGEST will not # fail because of timing issues. - wait_for_condition 50 100 { + wait_for_condition 150 100 { [lindex [$replica role] 3] eq {connected} } else { fail "replicas still not connected after some time" @@ -637,3 +630,55 @@ start_server {tags {"repl"}} { } } } + +test {replicaof right after disconnection} { + # this is a rare race condition that was reproduced sporadically by the psync2 unit. + # see details in #7205 + start_server {tags {"repl"}} { + set replica1 [srv 0 client] + set replica1_host [srv 0 host] + set replica1_port [srv 0 port] + set replica1_log [srv 0 stdout] + start_server {} { + set replica2 [srv 0 client] + set replica2_host [srv 0 host] + set replica2_port [srv 0 port] + set replica2_log [srv 0 stdout] + start_server {} { + set master [srv 0 client] + set master_host [srv 0 host] + set master_port [srv 0 port] + $replica1 replicaof $master_host $master_port + $replica2 replicaof $master_host $master_port + + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [$replica1 info replication]] && + [string match {*master_link_status:up*} [$replica2 info replication]] + } else { + fail "Can't turn the instance into a replica" + } + + set rd [redis_deferring_client -1] + $rd debug sleep 1 + after 100 + + # when replica2 will wake up from the sleep it will find both disconnection + # from it's master and also a replicaof command at the same event loop + $master client kill type replica + $replica2 replicaof $replica1_host $replica1_port + $rd read + + wait_for_condition 50 100 { + [string match {*master_link_status:up*} [$replica2 info replication]] + } else { + fail "role change failed." + } + + # make sure psync succeeded, and there were no unexpected full syncs. + assert_equal [status $master sync_full] 2 + assert_equal [status $replica1 sync_full] 0 + assert_equal [status $replica2 sync_full] 0 + } + } + } +} diff --git a/tests/modules/Makefile b/tests/modules/Makefile index 363231a87..39b8e6efa 100644 --- a/tests/modules/Makefile +++ b/tests/modules/Makefile @@ -28,11 +28,14 @@ TEST_MODULES = \ all: $(TEST_MODULES) +32bit: + $(MAKE) CFLAGS="-m32" LDFLAGS="-melf_i386" + %.xo: %.c ../../src/redismodule.h $(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@ %.so: %.xo - $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc + $(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LDFLAGS) $(LIBS) -lc .PHONY: clean diff --git a/tests/support/cluster.tcl b/tests/support/cluster.tcl index 74587e1f7..64b079ff8 100644 --- a/tests/support/cluster.tcl +++ b/tests/support/cluster.tcl @@ -286,8 +286,29 @@ proc ::redis_cluster::crc16 {s} { # Hash a single key returning the slot it belongs to, Implemented hash # tags as described in the Redis Cluster specification. proc ::redis_cluster::hash {key} { - # TODO: Handle hash slots. - expr {[::redis_cluster::crc16 $key] & 16383} + set keylen [string length $key] + set s {} + set e {} + for {set s 0} {$s < $keylen} {incr s} { + if {[string index $key $s] eq "\{"} break + } + + if {[expr {$s == $keylen}]} { + set res [expr {[crc16 $key] & 16383}] + return $res + } + + for {set e [expr {$s+1}]} {$e < $keylen} {incr e} { + if {[string index $key $e] == "\}"} break + } + + if {$e == $keylen || $e == [expr {$s+1}]} { + set res [expr {[crc16 $key] & 16383}] + return $res + } + + set key_sub [string range $key [expr {$s+1}] [expr {$e-1}]] + return [expr {[crc16 $key_sub] & 16383}] } # Return the slot the specified keys hash to. diff --git a/tests/support/server.tcl b/tests/support/server.tcl index 9f31b5421..abb59dccb 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -214,14 +214,14 @@ proc start_server {options {code undefined}} { dict set config dir [tmpdir server] # start every server on a different port - set ::port [find_available_port [expr {$::port+1}]] + set port [find_available_port $::baseport $::portcount] if {$::tls} { dict set config "port" 0 - dict set config "tls-port" $::port + dict set config "tls-port" $port dict set config "tls-cluster" "yes" dict set config "tls-replication" "yes" } else { - dict set config port $::port + dict set config port $port } set unixsocket [file normalize [format "%s/%s" [dict get $config "dir"] "socket"]] @@ -246,10 +246,10 @@ proc start_server {options {code undefined}} { set server_started 0 while {$server_started == 0} { if {$::verbose} { - puts -nonewline "=== ($tags) Starting server ${::host}:${::port} " + puts -nonewline "=== ($tags) Starting server ${::host}:${port} " } - send_data_packet $::test_server_fd "server-spawning" "port $::port" + send_data_packet $::test_server_fd "server-spawning" "port $port" if {$::valgrind} { set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/keydb-pro-server $config_file > $stdout 2> $stderr &] @@ -294,19 +294,19 @@ proc start_server {options {code undefined}} { # for availability. Other test clients may grab the port before we # are able to do it for example. if {$port_busy} { - puts "Port $::port was already busy, trying another port..." - set ::port [find_available_port [expr {$::port+1}]] + puts "Port $port was already busy, trying another port..." + set port [find_available_port $::baseport $::portcount] if {$::tls} { - dict set config "tls-port" $::port + dict set config "tls-port" $port } else { - dict set config port $::port + dict set config port $port } create_server_config_file $config_file $config continue; # Try again } if {$code ne "undefined"} { - set serverisup [server_is_up $::host $::port $retrynum] + set serverisup [server_is_up $::host $port $retrynum] } else { set serverisup 1 } @@ -327,7 +327,6 @@ proc start_server {options {code undefined}} { # setup properties to be able to initialize a client object set port_param [expr $::tls ? {"tls-port"} : {"port"}] set host $::host - set port $::port if {[dict exists $config bind]} { set host [dict get $config bind] } if {[dict exists $config $port_param]} { set port [dict get $config $port_param] } diff --git a/tests/support/util.tcl b/tests/support/util.tcl index 436631b42..1f7a9fb8d 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -1,3 +1,10 @@ +proc log_file_matches {log pattern} { + set fp [open $log r] + set content [read $fp] + close $fp + string match $pattern $content +} + proc randstring {min max {type binary}} { set len [expr {$min+int(rand()*($max-$min+1))}] set output {} @@ -344,21 +351,26 @@ proc roundFloat f { format "%.10g" $f } -proc find_available_port start { - for {set j $start} {$j < $start+1024} {incr j} { - if {[catch {set fd1 [socket 127.0.0.1 $j]}] && - [catch {set fd2 [socket 127.0.0.1 [expr $j+10000]]}]} { - return $j +set ::last_port_attempted 0 +proc find_available_port {start count} { + set port [expr $::last_port_attempted + 1] + for {set attempts 0} {$attempts < $count} {incr attempts} { + if {$port < $start || $port >= $start+$count} { + set port $start + } + if {[catch {set fd1 [socket 127.0.0.1 $port]}] && + [catch {set fd2 [socket 127.0.0.1 [expr $port+10000]]}]} { + set ::last_port_attempted $port + return $port } else { catch { close $fd1 close $fd2 } } + incr port } - if {$j == $start+1024} { - error "Can't find a non busy port in the $start-[expr {$start+1023}] range." - } + error "Can't find a non busy port in the $start-[expr {$start+$count-1}] range." } # Test if TERM looks like to support colors diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 7ed06b464..eed91c484 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -77,7 +77,9 @@ set ::all_tests { set ::next_test 0 set ::host 127.0.0.1 -set ::port 21111 +set ::port 6379; # port for external server +set ::baseport 21111; # initial port for spawned redis servers +set ::portcount 8000; # we don't wanna use more than 10000 to avoid collision with cluster bus ports set ::traceleaks 0 set ::valgrind 0 set ::tls 0 @@ -235,26 +237,26 @@ proc test_server_main {} { set tclsh [info nameofexecutable] # Open a listening socket, trying different ports in order to find a # non busy one. - set port [find_available_port 11111] + set clientport [find_available_port 11111 32] if {!$::quiet} { - puts "Starting test server at port $port" + puts "Starting test server at port $clientport" } - socket -server accept_test_clients -myaddr 127.0.0.1 $port + socket -server accept_test_clients -myaddr 127.0.0.1 $clientport # Start the client instances set ::clients_pids {} if {$::external} { set p [exec $tclsh [info script] {*}$::argv \ - --client $port --port $::port &] + --client $clientport &] lappend ::clients_pids $p } else { - set start_port [expr {$::port+100}] + set start_port $::baseport + set port_count [expr {$::portcount / $::numclients}] for {set j 0} {$j < $::numclients} {incr j} { - set start_port [find_available_port $start_port] set p [exec $tclsh [info script] {*}$::argv \ - --client $port --port $start_port &] + --client $clientport --baseport $start_port --portcount $port_count &] lappend ::clients_pids $p - incr start_port 10 + incr start_port $port_count } } @@ -427,6 +429,7 @@ proc signal_idle_client fd { lappend ::active_clients $fd incr ::next_test if {$::loop && $::next_test == [llength $::all_tests]} { + incr ::loop -1 set ::next_test 0 } } elseif {[llength $::run_solo_tests] != 0 && [llength $::active_clients] == 0} { @@ -517,6 +520,10 @@ proc print_help_screen {} { "--loop Execute the specified set of tests forever." "--wait-server Wait after server is started (so that you can attach a debugger)." "--tls Run tests in TLS mode." + "--host Run tests against an external host." + "--port TCP port to use against external host." + "--baseport Initial port number for spawned redis servers." + "--portcount Port range for spawned redis servers." "--help Print this help screen." } "\n"] } @@ -567,6 +574,12 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } elseif {$opt eq {--port}} { set ::port $arg incr j + } elseif {$opt eq {--baseport}} { + set ::baseport $arg + incr j + } elseif {$opt eq {--portcount}} { + set ::portcount $arg + incr j } elseif {$opt eq {--accurate}} { set ::accurate 1 } elseif {$opt eq {--force-failure}} { @@ -601,7 +614,10 @@ for {set j 0} {$j < [llength $argv]} {incr j} { } elseif {$opt eq {--stop}} { set ::stop_on_failure 1 } elseif {$opt eq {--loop}} { - set ::loop 1 + set ::loop -1 + } elseif {$opt eq {--loopn}} { + set ::loop [expr $arg - 1] + incr j } elseif {$opt eq {--timeout}} { set ::timeout $arg incr j diff --git a/tests/unit/expire.tcl b/tests/unit/expire.tcl index ae4dad08b..67b2db4c2 100644 --- a/tests/unit/expire.tcl +++ b/tests/unit/expire.tcl @@ -272,6 +272,23 @@ start_server {tags {"expire"}} { r expiremember testkey foo 10000 r save r debug reload + if {[log_file_matches [srv 0 stdout] "*Corrupt subexpire*"]} { + fail "Server reported corrupt subexpire" + } assert [expr [r ttl testkey foo] > 0] } + + test {Load subkey for an expired key works} { + # Note test inherits keys from previous tests, we want more traffic in the RDB + r multi + r sadd testset val1 + r expiremember testset val1 300 + r pexpire testset 1 + r debug reload + r exec + set logerr [log_file_matches [srv 0 stdout] "*Corrupt subexpire*"] + if {$logerr} { + fail "Server reported corrupt subexpire" + } + } } diff --git a/tests/unit/maxmemory.tcl b/tests/unit/maxmemory.tcl index 5a82abb97..e12fedc91 100644 --- a/tests/unit/maxmemory.tcl +++ b/tests/unit/maxmemory.tcl @@ -1,3 +1,4 @@ +run_solo {maxmemory} { start_server {tags {"maxmemory"}} { test "Without maxmemory small integers are shared" { r config set maxmemory 0 @@ -144,7 +145,11 @@ start_server {tags {"maxmemory"}} { } proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} { - start_server {tags {"maxmemory"}} { + # This is a single thread only test because there is a race in getMaxMemoryState + # between zmalloc_used_memory and getting the client outbut buffer memory usage. + # The cure would be worse than the disease, we do not want lock every replica + # simultaneously just to get more accurate memory usage. + start_server {tags {"maxmemory"} overrides { server-threads 1 }} { start_server {} { set slave_pid [s process_id] test "$test_name" { @@ -240,3 +245,4 @@ test_slave_buffers {slave buffer are counted correctly} 1000000 10 0 1 # test that slave buffer don't induce eviction # test again with fewer (and bigger) commands without pipeline, but with eviction test_slave_buffers "replica buffer don't induce eviction" 100000 100 1 0 +} ;# run_solo diff --git a/tests/unit/memefficiency.tcl b/tests/unit/memefficiency.tcl index 777693fdf..0654898ee 100644 --- a/tests/unit/memefficiency.tcl +++ b/tests/unit/memefficiency.tcl @@ -37,7 +37,7 @@ start_server {tags {"memefficiency"}} { } run_solo {defrag} { -start_server {tags {"defrag"}} { +start_server {tags {"defrag"} overrides {server-threads 1} } { if {[string match {*jemalloc*} [s mem_allocator]]} { test "Active defrag" { r config set save "" ;# prevent bgsave from interfereing with save below @@ -95,6 +95,10 @@ start_server {tags {"defrag"}} { } if {$::verbose} { puts "frag $frag" + set misses [s active_defrag_misses] + set hits [s active_defrag_hits] + puts "hits: $hits" + puts "misses: $misses" puts "max latency $max_latency" puts [r latency latest] puts [r latency history active-defrag-cycle] @@ -221,6 +225,10 @@ start_server {tags {"defrag"}} { } if {$::verbose} { puts "frag $frag" + set misses [s active_defrag_misses] + set hits [s active_defrag_hits] + puts "hits: $hits" + puts "misses: $misses" puts "max latency $max_latency" puts [r latency latest] puts [r latency history active-defrag-cycle] @@ -256,11 +264,12 @@ start_server {tags {"defrag"}} { set expected_frag 1.7 # add a mass of list nodes to two lists (allocations are interlaced) set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation - for {set j 0} {$j < 500000} {incr j} { + set elements 500000 + for {set j 0} {$j < $elements} {incr j} { $rd lpush biglist1 $val $rd lpush biglist2 $val } - for {set j 0} {$j < 500000} {incr j} { + for {set j 0} {$j < $elements} {incr j} { $rd read ; # Discard replies $rd read ; # Discard replies } @@ -302,6 +311,8 @@ start_server {tags {"defrag"}} { # test the the fragmentation is lower after 120 ;# serverCron only updates the info once in 100ms + set misses [s active_defrag_misses] + set hits [s active_defrag_hits] set frag [s allocator_frag_ratio] set max_latency 0 foreach event [r latency latest] { @@ -312,6 +323,8 @@ start_server {tags {"defrag"}} { } if {$::verbose} { puts "frag $frag" + puts "misses: $misses" + puts "hits: $hits" puts "max latency $max_latency" puts [r latency latest] puts [r latency history active-defrag-cycle] @@ -320,6 +333,10 @@ start_server {tags {"defrag"}} { # due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75, # we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher assert {$max_latency <= 30} + + # in extreme cases of stagnation, we see over 20m misses before the tests aborts with "defrag didn't stop", + # in normal cases we only see 100k misses out of 500k elements + assert {$misses < $elements} } # verify the data isn't corrupted or changed set newdigest [r debug digest] @@ -327,6 +344,110 @@ start_server {tags {"defrag"}} { r save ;# saving an rdb iterates over all the data / pointers r del biglist1 ;# coverage for quicklistBookmarksClear } {1} + + test "Active defrag edge case" { + # there was an edge case in defrag where all the slabs of a certain bin are exact the same + # % utilization, with the exception of the current slab from which new allocations are made + # if the current slab is lower in utilization the defragger would have ended up in stagnation, + # keept running and not move any allocation. + # this test is more consistent on a fresh server with no history + start_server {tags {"defrag"} overrides {server-threads 1}} { + r flushdb + r config resetstat + r config set save "" ;# prevent bgsave from interfereing with save below + r config set hz 100 + r config set activedefrag no + r config set active-defrag-max-scan-fields 1000 + r config set active-defrag-threshold-lower 5 + r config set active-defrag-cycle-min 65 + r config set active-defrag-cycle-max 75 + r config set active-defrag-ignore-bytes 1mb + r config set maxmemory 0 + set expected_frag 1.3 + + r debug mallctl-str thread.tcache.flush VOID + # fill the first slab containin 32 regs of 640 bytes. + for {set j 0} {$j < 32} {incr j} { + r setrange "_$j" 600 x + r debug mallctl-str thread.tcache.flush VOID + } + + # add a mass of keys with 600 bytes values, fill the bin of 640 bytes which has 32 regs per slab. + set rd [redis_deferring_client] + set keys 640000 + for {set j 0} {$j < $keys} {incr j} { + $rd setrange $j 600 x + } + for {set j 0} {$j < $keys} {incr j} { + $rd read ; # Discard replies + } + + # create some fragmentation of 50% + set sent 0 + for {set j 0} {$j < $keys} {incr j 1} { + $rd del $j + incr sent + incr j 1 + } + for {set j 0} {$j < $sent} {incr j} { + $rd read ; # Discard replies + } + + # create higher fragmentation in the first slab + for {set j 10} {$j < 32} {incr j} { + r del "_$j" + } + + # start defrag + after 120 ;# serverCron only updates the info once in 100ms + set frag [s allocator_frag_ratio] + if {$::verbose} { + puts "frag $frag" + } + + assert {$frag >= $expected_frag} + + set digest [r debug digest] + catch {r config set activedefrag yes} e + if {![string match {DISABLED*} $e]} { + # wait for the active defrag to start working (decision once a second) + wait_for_condition 50 100 { + [s active_defrag_running] ne 0 + } else { + fail "defrag not started." + } + + # wait for the active defrag to stop working + wait_for_condition 500 100 { + [s active_defrag_running] eq 0 + } else { + after 120 ;# serverCron only updates the info once in 100ms + puts [r info memory] + puts [r info stats] + puts [r memory malloc-stats] + fail "defrag didn't stop." + } + + # test the the fragmentation is lower + after 120 ;# serverCron only updates the info once in 100ms + set misses [s active_defrag_misses] + set hits [s active_defrag_hits] + set frag [s allocator_frag_ratio] + if {$::verbose} { + puts "frag $frag" + puts "hits: $hits" + puts "misses: $misses" + } + assert {$frag < 1.1} + assert {$misses < 10000000} ;# when defrag doesn't stop, we have some 30m misses, when it does, we have 2m misses + } + + # verify the data isn't corrupted or changed + set newdigest [r debug digest] + assert {$digest eq $newdigest} + r save ;# saving an rdb iterates over all the data / pointers + } + } } } } ;# run_solo diff --git a/tests/unit/other.tcl b/tests/unit/other.tcl index 2e90181da..43b1e1870 100644 --- a/tests/unit/other.tcl +++ b/tests/unit/other.tcl @@ -174,9 +174,9 @@ start_server {tags {"other"}} { tags {protocol} { test {PIPELINING stresser (also a regression for the old epoll bug)} { if {$::tls} { - set fd2 [::tls::socket $::host $::port] + set fd2 [::tls::socket [srv host] [srv port]] } else { - set fd2 [socket $::host $::port] + set fd2 [socket [srv host] [srv port]] } fconfigure $fd2 -encoding binary -translation binary puts -nonewline $fd2 "SELECT 9\r\n" diff --git a/tests/unit/pubsub.tcl b/tests/unit/pubsub.tcl index 9c7a43bf0..6c991ac97 100644 --- a/tests/unit/pubsub.tcl +++ b/tests/unit/pubsub.tcl @@ -107,6 +107,8 @@ start_server {tags {"pubsub"}} { set rd1 [redis_deferring_client] assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}] unsubscribe $rd1 + # Wait for a response to the unsub + __consume_subscribe_messages $rd1 unsubscribe {chan1 chan2 chan3} assert_equal 0 [r publish chan1 hello] assert_equal 0 [r publish chan2 hello] assert_equal 0 [r publish chan3 hello] @@ -180,6 +182,8 @@ start_server {tags {"pubsub"}} { set rd1 [redis_deferring_client] assert_equal {1 2 3} [psubscribe $rd1 {chan1.* chan2.* chan3.*}] punsubscribe $rd1 + # Wait for a response to the unsub + __consume_subscribe_messages $rd1 punsubscribe {chan1.* chan2.* chan3.*} assert_equal 0 [r publish chan1.hi hello] assert_equal 0 [r publish chan2.hi hello] assert_equal 0 [r publish chan3.hi hello] diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 676896a75..9349d8bdf 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -1,7 +1,10 @@ +# Setting server-threads to 2 is really single threaded because test mode is enabled (no client allocated to thread 1) +# We do this because of the large numbers of nonblocking clients in this tests and the client races that causes start_server { tags {"list"} overrides { "list-max-ziplist-size" 5 + "server-threads 2" } } { source "tests/unit/type/list-common.tcl"