Merge branch 'unstable' into keydbpro

Former-commit-id: 08a36155e3db9918048e87c3d691b7317787c9ab
This commit is contained in:
John Sully 2020-06-01 17:41:37 -04:00
commit df3f1e8d8e
51 changed files with 1366 additions and 446 deletions

View File

@ -3,6 +3,7 @@ name: CI
on: [push, pull_request]
jobs:
test-ubuntu-latest:
runs-on: self-hosted
steps:
@ -14,19 +15,19 @@ jobs:
sudo apt-get -y remove libzstd || true
sudo apt-get -y install uuid-dev libcurl4-openssl-dev libbz2-dev zlib1g-dev libsnappy-dev liblz4-dev libzstd-dev libgflags-dev
sudo apt-get -y install uuid-dev libcurl4-openssl-dev
make -j$(nproc)
- name: test
make BUILD_TLS=yes -j2
- name: gen-cert
run: ./utils/gen-test-certs.sh
- name: test-tls
run: |
sudo apt-get -y install tcl8.5
./runtest --clients 4 --verbose
sudo apt-get -y install tcl8.5 tcl-tls
./runtest --clients 2 --verbose --tls
- name: cluster-test
run: |
./runtest-cluster
./runtest-cluster --tls
- name: sentinel test
run: |
./runtest-sentinel
- name: module tests
run: |
./runtest-moduleapi

View File

@ -5,6 +5,7 @@ on:
- cron: '0 7 * * *'
jobs:
test-jemalloc:
runs-on: ubuntu-latest
timeout-minutes: 1200
@ -37,6 +38,40 @@ jobs:
- name: module api test
run: ./runtest-moduleapi --verbose
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: make
run: |
sudo apt-get -y install uuid-dev libcurl4-openssl-dev
make BUILD_TLS=yes -j2
- name: "test (tls)"
run: |
sudo apt-get install tcl8.5 tcl-tls
./utils/gen-test-certs.sh
./runtest --accurate --verbose --tls
- name: test
run: ./runtest --accurate --verbose
- name: module api test (tls)
run: ./runtest-moduleapi --verbose --tls
test-ubuntu-arm:
runs-on: [self-hosted, linux, arm]
steps:
- uses: actions/checkout@v1
- name: make
run: |
sudo apt-get -y install uuid-dev libcurl4-openssl-dev
make -j4
- name: test
run: |
sudo apt-get -y install tcl8.5
./runtest --clients 2 --verbose
- name: module tests
run: |
./runtest-moduleapi
test-valgrind:
runs-on: ubuntu-latest
timeout-minutes: 14400

22
.github/workflows/endurance.yml vendored Normal file
View File

@ -0,0 +1,22 @@
name: Endurance
on:
schedule:
- cron: '30 * * * *'
jobs:
test-endurance:
runs-on: octocore
timeout-minutes: 60
steps:
- uses: actions/checkout@v1
- name: make
run: |
sudo apt-get -y install uuid-dev libcurl4-openssl-dev
make -j8
- name: test-multithread (5X)
run: |
sudo apt-get install -y tcl8.5
./runtest --loopn 5 --config server-threads 3 --clients 5

View File

@ -11,6 +11,283 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
SECURITY: There are security fixes in the release.
--------------------------------------------------------------------------------
================================================================================
Redis 6.0.4 Released Thu May 28 11:36:45 CEST 2020
================================================================================
Upgrade urgency CRITICAL: this release fixes a severe replication bug.
Redis 6.0.4 fixes a critical replication bug caused by a new feature introduced
in Redis 6. The feature, called "meaningful offset" and strongly wanted by
myself (antirez) was an improvement that avoided that masters were no longer
able, during a failover where they were demoted to replicas, to partially
synchronize with the new master. In short the feature was able to avoid full
synchronizations with RDB. How did it work? By trimming the replication backlog
of the final "PING" commands the master was sending in the replication channel:
this way the replication offset would no longer go "after" the one of the
promoted replica, allowing the master to just continue in the same replication
history, receiving only a small data difference.
However after the introduction of the feature we (the Redis core team) quickly
understood there was something wrong: the apparently harmless feature had
many bugs, and the last bug we discovered, after a joined effort of multiple
people, we were not even able to fully understand after fixing it. Enough was
enough, we decided that the complexity cost of this feature was too high.
So Redis 6.0.4 removes the feature entirely, and fixes the data corruption that
it was able to cause.
However there are two facts to take in mind.
Fact 1: Setups using chained replication, that means that certain replicas
are replicating from other replicas, up to Redis 6.0.3 can experience data
corruption. For chained replication we mean that:
+--------+ +---------+ +-------------+
| master |--------->| replica |-------->| sub-replica |
+--------+ +---------+ +-------------+
People using chained replication SHOULD UPGRADE ASAP away from Redis 6.0.0,
6.0.1, 6.0.2 or 6.0.3 to Redis 6.0.4.
To be clear, people NOT using this setup, but having just replicas attached
directly to the master, SHOUDL NOT BE in danger of any problem. But we
are no longer confident on 6.0.x replication implementation complexities
so we suggest to upgrade to 6.0.4 to everybody using an older 6.0.3 release.
We just so far didn't find any bug that affects Redis 6.0.3 that does not
involve chained replication.
People starting with Redis 6.0.4 are fine. People with Redis 5 are fine.
People upgrading from Redis 5 to Redis 6.0.4 are fine.
TLDR: The problem is with users of 6.0.0, 6.0.1, 6.0.2, 6.0.3.
Fact 2: Upgrading from Redis 6.0.x to Redis 6.0.4, IF AND ONLY IF you
use chained replication, requires some extra care:
1. Once you attach your new Redis 6.0.4 instance as a replica of the current
Redis 6.0.x master, you should wait for the first full synchronization,
then you should promote it right away, if your setup involves chained
replication. Don't give it the time to do a new partial synchronization
in the case the link between the master and the replica will break in
the mean time.
2. As an additional care, you may want to set the replication ping period
to a very large value (for instance 1000000) using the following command:
CONFIG SET repl-ping-replica-period 1000000
Note that if you do "1" with care, "2" is not needed.
However if you do it, make sure to later restore it to its default:
CONFIG SET repl-ping-replica-period 10
So this is the main change in Redis 6. Later we'll find a different way in
order to achieve what we wanted to achieve with the Meaningful Offset feature,
but without the same complexity.
Other changes in this release:
* PSYNC2 tests improved.
* Fix a rare active defrag edge case bug leading to stagnation
* Fix Redis 6 asserting at startup in 32 bit systems.
* Redis 6 32 bit is now added back to our testing environments.
* Fix server crash for STRALGO command,
* Implement sendfile for RDB transfer.
* TLS fixes.
* Make replication more resistant by disconnecting the master if we
detect a protocol error. Basically we no longer accept inline protocol
from the master.
* Other improvements in the tests.
Regards,
antirez
This is the full list of commits:
antirez in commit 59cd4c9f6:
Test: take PSYNC2 test master timeout high during switch.
1 file changed, 1 deletion(-)
antirez in commit 6c1bb7b19:
Test: add the tracking unit as default.
1 file changed, 1 insertion(+)
Oran Agra in commit 1aee695e5:
tests: find_available_port start search from next port
1 file changed, 12 insertions(+), 7 deletions(-)
Oran Agra in commit a2ae46352:
tests: each test client work on a distinct port range
5 files changed, 39 insertions(+), 27 deletions(-)
Oran Agra in commit 86e562d69:
32bit CI needs to build modules correctly
2 files changed, 7 insertions(+), 2 deletions(-)
Oran Agra in commit ab2984b1e:
adjust revived meaningful offset tests
1 file changed, 39 insertions(+), 20 deletions(-)
Oran Agra in commit 1ff5a222d:
revive meaningful offset tests
2 files changed, 213 insertions(+)
antirez in commit cc549b46a:
Replication: showLatestBacklog() refactored out.
3 files changed, 36 insertions(+), 25 deletions(-)
antirez in commit 377dd0515:
Drop useless line from replicationCacheMaster().
1 file changed, 2 deletions(-)
antirez in commit 3f8d113f1:
Another meaningful offset test removed.
1 file changed, 100 deletions(-)
antirez in commit d4541349d:
Remove the PSYNC2 meaningful offset test.
2 files changed, 113 deletions(-)
antirez in commit 2112a5702:
Remove the meaningful offset feature.
4 files changed, 10 insertions(+), 93 deletions(-)
antirez in commit d2eb6e0b4:
Set a protocol error if master use the inline protocol.
1 file changed, 17 insertions(+), 2 deletions(-)
Oran Agra in commit 9c1df3b76:
daily CI test with tls
1 file changed, 15 insertions(+)
Oran Agra in commit 115ed1911:
avoid using sendfile if tls-replication is enabled
1 file changed, 34 insertions(+), 27 deletions(-)
antirez in commit 11c748aac:
Replication: log backlog creation event.
1 file changed, 3 insertions(+)
antirez in commit 8f1013722:
Test: PSYNC2 test can now show server logs.
1 file changed, 88 insertions(+), 25 deletions(-)
antirez in commit 2e591fc4a:
Clarify what is happening in PR #7320.
1 file changed, 5 insertions(+), 1 deletion(-)
zhaozhao.zz in commit cbb51fb8f:
PSYNC2: second_replid_offset should be real meaningful offset
1 file changed, 3 insertions(+), 3 deletions(-)
Oran Agra in commit e0fc88b4d:
add CI for 32bit build
2 files changed, 34 insertions(+)
antirez in commit e3f864b5f:
Make disconnectSlaves() synchronous in the base case.
3 files changed, 20 insertions(+), 9 deletions(-)
ShooterIT in commit 8af1e513f:
Implements sendfile for redis.
2 files changed, 55 insertions(+), 2 deletions(-)
antirez in commit 3c21418cd:
Fix #7306 less aggressively.
2 files changed, 29 insertions(+), 17 deletions(-)
Madelyn Olson in commit e201f83ce:
EAGAIN for tls during diskless load
1 file changed, 4 insertions(+)
Qu Chen in commit 58fc456cb:
Disconnect chained replicas when the replica performs PSYNC with the master always to avoid replication offset mismatch between master and chained replicas.
2 files changed, 60 insertions(+), 3 deletions(-)
hwware in commit 3febc5c29:
using moreargs variable
1 file changed, 2 insertions(+), 2 deletions(-)
hwware in commit 8d6738559:
fix server crash for STRALGO command
1 file changed, 2 insertions(+), 2 deletions(-)
ShooterIT in commit 7a35eec54:
Replace addDeferredMultiBulkLength with addReplyDeferredLen in comment
1 file changed, 2 insertions(+), 2 deletions(-)
Yossi Gottlieb in commit f93e1417b:
TLS: Improve tls-protocols clarity in redis.conf.
1 file changed, 3 insertions(+), 2 deletions(-)
ShooterIT in commit d0c9e4454:
Fix reply bytes calculation error
1 file changed, 1 insertion(+), 1 deletion(-)
zhaozhao.zz in commit 1cde6a060:
Tracking: flag CLIENT_TRACKING_BROKEN_REDIR when redir broken
1 file changed, 1 insertion(+)
Oran Agra in commit 436be3498:
fix a rare active defrag edge case bug leading to stagnation
4 files changed, 146 insertions(+), 23 deletions(-)
Oran Agra in commit f9d2ffdc5:
improve DEBUG MALLCTL to be able to write to write only fields.
1 file changed, 27 insertions(+), 7 deletions(-)
hujie in commit d7968ee92:
fix clear USER_FLAG_ALLCOMMANDS flag in acl
1 file changed, 5 insertions(+), 4 deletions(-)
ShooterIT in commit a902e6b25:
Redis Benchmark: generate random test data
1 file changed, 12 insertions(+), 1 deletion(-)
hwware in commit 9564ed7c3:
Redis-Benchmark: avoid potentical memmory leaking
1 file changed, 1 insertion(+), 1 deletion(-)
WuYunlong in commit 2e4182743:
Handle keys with hash tag when computing hash slot using tcl cluster client.
1 file changed, 23 insertions(+), 2 deletions(-)
WuYunlong in commit eb2c8b2c6:
Add a test to prove current tcl cluster client can not handle keys with hash tag.
1 file changed, 7 insertions(+), 1 deletion(-)
ShooterIT in commit 928e6976b:
Use dictSize to get the size of dict in dict.c
1 file changed, 2 insertions(+), 2 deletions(-)
Madelyn Olson in commit cdcf5af5a:
Converge hash validation for adding and removing
1 file changed, 21 insertions(+), 14 deletions(-)
Benjamin Sergeant in commit e8b09d220:
do not handle --cluster-yes for cluster fix mode
1 file changed, 16 insertions(+), 7 deletions(-)
Benjamin Sergeant in commit 57b4fb0d8:
fix typo ...
1 file changed, 1 insertion(+), 1 deletion(-)
Benjamin Sergeant in commit 29f25e411:
Redis-cli 6.0.1 `--cluster-yes` doesn't work (fix #7246)
1 file changed, 5 insertions(+), 1 deletion(-)
Oran Agra in commit 00d8b92b8:
fix valgrind test failure in replication test
1 file changed, 1 insertion(+), 1 deletion(-)
Oran Agra in commit 5e17e6276:
add regression test for the race in #7205
1 file changed, 52 insertions(+)
antirez in commit 96e7c011e:
Improve the PSYNC2 test reliability.
1 file changed, 33 insertions(+), 15 deletions(-)
================================================================================
Redis 6.0.3 Released Sat May 16 18:10:21 CEST 2020
================================================================================

1
deps/depot_tools vendored

@ -1 +0,0 @@
Subproject commit aaf566999558aa8ead38811228cd539a6e6e2fda

View File

@ -216,7 +216,7 @@ ixalloc(tsdn_t *tsdn, void *ptr, size_t oldsize, size_t size, size_t extra,
}
JEMALLOC_ALWAYS_INLINE int
iget_defrag_hint(tsdn_t *tsdn, void* ptr, int *bin_util, int *run_util) {
iget_defrag_hint(tsdn_t *tsdn, void* ptr) {
int defrag = 0;
rtree_ctx_t rtree_ctx_fallback;
rtree_ctx_t *rtree_ctx = tsdn_rtree_ctx(tsdn, &rtree_ctx_fallback);
@ -232,11 +232,22 @@ iget_defrag_hint(tsdn_t *tsdn, void* ptr, int *bin_util, int *run_util) {
malloc_mutex_lock(tsdn, &bin->lock);
/* don't bother moving allocations from the slab currently used for new allocations */
if (slab != bin->slabcur) {
int free_in_slab = extent_nfree_get(slab);
if (free_in_slab) {
const bin_info_t *bin_info = &bin_infos[binind];
size_t availregs = bin_info->nregs * bin->stats.curslabs;
*bin_util = ((long long)bin->stats.curregs<<16) / availregs;
*run_util = ((long long)(bin_info->nregs - extent_nfree_get(slab))<<16) / bin_info->nregs;
defrag = 1;
int curslabs = bin->stats.curslabs;
size_t curregs = bin->stats.curregs;
if (bin->slabcur) {
/* remove slabcur from the overall utilization */
curregs -= bin_info->nregs - extent_nfree_get(bin->slabcur);
curslabs -= 1;
}
/* Compare the utilization ratio of the slab in question to the total average,
* to avoid precision lost and division, we do that by extrapolating the usage
* of the slab as if all slabs have the same usage. If this slab is less used
* than the average, we'll prefer to evict the data to hopefully more used ones */
defrag = (bin_info->nregs - free_in_slab) * curslabs <= curregs;
}
}
malloc_mutex_unlock(tsdn, &bin->lock);
}

View File

@ -3326,12 +3326,10 @@ jemalloc_postfork_child(void) {
/******************************************************************************/
/* Helps the application decide if a pointer is worth re-allocating in order to reduce fragmentation.
* returns 0 if the allocation is in the currently active run,
* or when it is not causing any frag issue (large or huge bin)
* returns the bin utilization and run utilization both in fixed point 16:16.
* returns 1 if the allocation should be moved, and 0 if the allocation be kept.
* If the application decides to re-allocate it should use MALLOCX_TCACHE_NONE when doing so. */
JEMALLOC_EXPORT int JEMALLOC_NOTHROW
get_defrag_hint(void* ptr, int *bin_util, int *run_util) {
get_defrag_hint(void* ptr) {
assert(ptr != NULL);
return iget_defrag_hint(TSDN_NULL, ptr, bin_util, run_util);
return iget_defrag_hint(TSDN_NULL, ptr);
}

View File

@ -176,9 +176,10 @@ tcp-keepalive 300
# tls-cluster yes
# Explicitly specify TLS versions to support. Allowed values are case insensitive
# and include "TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3" (OpenSSL >= 1.1.1)
# and include "TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3" (OpenSSL >= 1.1.1) or
# any combination. To enable only TLSv1.2 and TLSv1.3, use:
#
# tls-protocols TLSv1.2
# tls-protocols "TLSv1.2 TLSv1.3"
# Configure allowed ciphers. See the ciphers(1ssl) manpage for more information
# about the syntax of this string.

View File

@ -169,6 +169,25 @@ sds ACLHashPassword(unsigned char *cleartext, size_t len) {
return sdsnewlen(hex,HASH_PASSWORD_LEN);
}
/* Given a hash and the hash length, returns C_OK if it is a valid password
* hash, or C_ERR otherwise. */
int ACLCheckPasswordHash(unsigned char *hash, int hashlen) {
if (hashlen != HASH_PASSWORD_LEN) {
return C_ERR;
}
/* Password hashes can only be characters that represent
* hexadecimal values, which are numbers and lowercase
* characters 'a' through 'f'. */
for(int i = 0; i < HASH_PASSWORD_LEN; i++) {
char c = hash[i];
if ((c < 'a' || c > 'f') && (c < '0' || c > '9')) {
return C_ERR;
}
}
return C_OK;
}
/* =============================================================================
* Low level ACL API
* ==========================================================================*/
@ -359,12 +378,13 @@ int ACLUserCanExecuteFutureCommands(user *u) {
* to skip the command bit explicit test. */
void ACLSetUserCommandBit(user *u, unsigned long id, int value) {
uint64_t word, bit;
if (value == 0) u->flags &= ~USER_FLAG_ALLCOMMANDS;
if (ACLGetCommandBitCoordinates(id,&word,&bit) == C_ERR) return;
if (value)
if (value) {
u->allowed_commands[word] |= bit;
else
} else {
u->allowed_commands[word] &= ~bit;
u->flags &= ~USER_FLAG_ALLCOMMANDS;
}
}
/* This is like ACLSetUserCommandBit(), but instead of setting the specified
@ -756,22 +776,10 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
if (op[0] == '>') {
newpass = ACLHashPassword((unsigned char*)op+1,oplen-1);
} else {
if (oplen != HASH_PASSWORD_LEN + 1) {
if (ACLCheckPasswordHash((unsigned char*)op+1,oplen-1) == C_ERR) {
errno = EBADMSG;
return C_ERR;
}
/* Password hashes can only be characters that represent
* hexadecimal values, which are numbers and lowercase
* characters 'a' through 'f'.
*/
for(int i = 1; i < HASH_PASSWORD_LEN + 1; i++) {
char c = op[i];
if ((c < 'a' || c > 'f') && (c < '0' || c > '9')) {
errno = EBADMSG;
return C_ERR;
}
}
newpass = sdsnewlen(op+1,oplen-1);
}
@ -787,7 +795,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
if (op[0] == '<') {
delpass = ACLHashPassword((unsigned char*)op+1,oplen-1);
} else {
if (oplen != HASH_PASSWORD_LEN + 1) {
if (ACLCheckPasswordHash((unsigned char*)op+1,oplen-1) == C_ERR) {
errno = EBADMSG;
return C_ERR;
}
@ -841,7 +849,6 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
errno = ENOENT;
return C_ERR;
}
unsigned long id = ACLGetCommandID(copy);
/* The subcommand cannot be empty, so things like DEBUG|
* are syntax errors of course. */
@ -854,6 +861,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
/* The command should not be set right now in the command
* bitmap, because adding a subcommand of a fully added
* command is probably an error on the user side. */
unsigned long id = ACLGetCommandID(copy);
if (ACLGetUserCommandBit(u,id) == 1) {
zfree(copy);
errno = EBUSY;

View File

@ -122,6 +122,7 @@ struct aeCommand
AE_ASYNC_OP op;
int fd;
int mask;
bool fLock = true;
union {
aePostFunctionProc *proc;
aeFileProc *fproc;
@ -169,7 +170,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
case AE_ASYNC_OP::PostFunction:
{
std::unique_lock<decltype(g_lock)> ulock(g_lock);
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (cmd.fLock)
ulock.lock();
((aePostFunctionProc*)cmd.proc)(cmd.clientData);
break;
}
@ -179,7 +182,9 @@ void aeProcessCmd(aeEventLoop *eventLoop, int fd, void *, int )
if (cmd.pctl != nullptr)
cmd.pctl->mutexcv.lock();
std::unique_lock<decltype(g_lock)> ulock(g_lock);
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (cmd.fLock)
ulock.lock();
(*cmd.pfn)();
if (cmd.pctl != nullptr)
@ -239,6 +244,7 @@ int aeCreateRemoteFileEvent(aeEventLoop *eventLoop, int fd, int mask,
cmd.fproc = proc;
cmd.clientData = clientData;
cmd.pctl = nullptr;
cmd.fLock = true;
if (fSynchronous)
{
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl();
@ -275,13 +281,14 @@ int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg)
cmd.op = AE_ASYNC_OP::PostFunction;
cmd.proc = proc;
cmd.clientData = arg;
cmd.fLock = true;
auto size = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
if (size != sizeof(cmd))
return AE_ERR;
return AE_OK;
}
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous)
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous, bool fLock)
{
if (eventLoop == g_eventLoopThisThread)
{
@ -293,6 +300,7 @@ int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynch
cmd.op = AE_ASYNC_OP::PostCppFunction;
cmd.pfn = new (MALLOC_LOCAL) std::function<void()>(fn);
cmd.pctl = nullptr;
cmd.fLock = fLock;
if (fSynchronous)
{
cmd.pctl = new (MALLOC_LOCAL) aeCommandControl();
@ -453,6 +461,7 @@ void aeDeleteFileEventAsync(aeEventLoop *eventLoop, int fd, int mask)
cmd.op = AE_ASYNC_OP::DeleteFileEvent;
cmd.fd = fd;
cmd.mask = mask;
cmd.fLock = true;
auto cb = write(eventLoop->fdCmdWrite, &cmd, sizeof(cmd));
AE_ASSERT(cb == sizeof(cmd));
}

View File

@ -135,7 +135,7 @@ aeEventLoop *aeCreateEventLoop(int setsize);
int aePostFunction(aeEventLoop *eventLoop, aePostFunctionProc *proc, void *arg);
#ifdef __cplusplus
} // EXTERN C
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false);
int aePostFunction(aeEventLoop *eventLoop, std::function<void()> fn, bool fSynchronous = false, bool fLock = true);
extern "C" {
#endif
void aeDeleteEventLoop(aeEventLoop *eventLoop);

View File

@ -751,6 +751,7 @@ struct client *createAOFClient(void) {
c->bufpos = 0;
c->flags = 0;
c->fPendingAsyncWrite = FALSE;
c->fPendingAsyncWriteHandler = FALSE;
c->btype = BLOCKED_NONE;
/* We set the fake client as a replica waiting for the synchronization
* so that Redis will not try to send replies to this client. */

View File

@ -2235,6 +2235,8 @@ void clusterWriteHandler(connection *conn) {
void clusterLinkConnectHandler(connection *conn) {
clusterLink *link = (clusterLink*)connGetPrivateData(conn);
clusterNode *node = link->node;
if (node == nullptr)
return; // we're about to be freed
/* Check if connection succeeded */
if (connGetState(conn) != CONN_STATE_CONNECTED) {
@ -5289,6 +5291,7 @@ try_again:
zfree(ov); zfree(kv);
return; /* error sent to the client by migrateGetSocket() */
}
connMarshalThread(cs->conn);
rioInitWithBuffer(&cmd,sdsempty());

View File

@ -319,6 +319,13 @@ void mallctl_int(client *c, robj **argv, int argc) {
size_t sz = sizeof(old);
while (sz > 0) {
if ((ret=je_mallctl(szFromObj(argv[0]), &old, &sz, argc > 1? &val: NULL, argc > 1?sz: 0))) {
if (ret == EPERM && argc > 1) {
/* if this option is write only, try just writing to it. */
if (!(ret=je_mallctl(szFromObj(argv[0]), NULL, 0, &val, sz))) {
addReply(c, shared.ok);
return;
}
}
if (ret==EINVAL) {
/* size might be wrong, try a smaller one */
sz /= 2;
@ -341,17 +348,30 @@ void mallctl_int(client *c, robj **argv, int argc) {
}
void mallctl_string(client *c, robj **argv, int argc) {
int ret;
int rret, wret;
char *old;
size_t sz = sizeof(old);
/* for strings, it seems we need to first get the old value, before overriding it. */
if ((ret=je_mallctl(szFromObj(argv[0]), &old, &sz, NULL, 0))) {
addReplyErrorFormat(c,"%s", strerror(ret));
if ((rret=je_mallctl(szFromObj(argv[0]), &old, &sz, NULL, 0))) {
/* return error unless this option is write only. */
if (!(rret == EPERM && argc > 1)) {
addReplyErrorFormat(c,"%s", strerror(rret));
return;
}
}
if(argc > 1) {
char *val = szFromObj(argv[1]);
char **valref = &val;
if ((!strcmp(val,"VOID")))
valref = NULL, sz = 0;
wret = je_mallctl(szFromObj(argv[0]), NULL, 0, valref, sz);
}
if (!rret)
addReplyBulkCString(c, old);
if(argc > 1)
je_mallctl(szFromObj(argv[0]), NULL, 0, &argv[1]->m_ptr, sizeof(char*));
else if (wret)
addReplyErrorFormat(c,"%s", strerror(wret));
else
addReply(c, shared.ok);
}
#endif

View File

@ -43,7 +43,7 @@
/* this method was added to jemalloc in order to help us understand which
* pointers are worthwhile moving and which aren't */
extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
extern "C" int je_get_defrag_hint(void* ptr);
/* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
@ -56,18 +56,11 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
* when it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
void* activeDefragAlloc(void *ptr) {
int bin_util, run_util;
size_t size;
void *newptr;
if(!je_get_defrag_hint(ptr, &bin_util, &run_util)) {
g_pserver->stat_active_defrag_misses++;
return NULL;
}
/* if this run is more utilized than the average utilization in this bin
* (or it is full), skip it. This will eventually move all the allocations
* from relatively empty runs into relatively full runs. */
if (run_util > bin_util || run_util == 1<<16) {
if(!je_get_defrag_hint(ptr)) {
g_pserver->stat_active_defrag_misses++;
size = zmalloc_size(ptr);
return NULL;
}
/* move this allocation to a new allocation.

View File

@ -536,7 +536,7 @@ dictEntry *dictFind(dict *d, const void *key)
dictEntry *he;
uint64_t h, idx, table;
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
if (dictSize(d) == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
@ -1102,7 +1102,7 @@ dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t h
dictEntry *he, **heref;
unsigned long idx, table;
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
if (dictSize(d) == 0) return NULL; /* dict is empty */
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
heref = &d->ht[table].table[idx];

View File

@ -40,6 +40,7 @@
#include <map>
#ifdef __linux__
#include <linux/futex.h>
#include <sys/sysinfo.h>
#endif
#include <string.h>
#include <stdarg.h>
@ -69,6 +70,8 @@ __attribute__((weak)) void logStackTrace(ucontext_t *) {}
#endif
extern int g_fInCrash;
extern int g_fTestMode;
int g_fHighCpuPressure = false;
/****************************************************
*
@ -290,12 +293,21 @@ uint64_t fastlock_getlongwaitcount()
return rval;
}
extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned mask)
extern "C" void fastlock_sleep(fastlock *lock, pid_t pid, unsigned wake, unsigned myticket)
{
#ifdef __linux__
g_dlock.registerwait(lock, pid);
unsigned mask = (1U << (myticket % 32));
__atomic_fetch_or(&lock->futex, mask, __ATOMIC_ACQUIRE);
// double check the lock wasn't release between the last check and us setting the futex mask
uint32_t u;
__atomic_load(&lock->m_ticket.u, &u, __ATOMIC_ACQUIRE);
if ((u & 0xffff) != myticket)
{
futex(&lock->m_ticket.u, FUTEX_WAIT_BITSET_PRIVATE, wake, nullptr, mask);
}
__atomic_fetch_and(&lock->futex, ~mask, __ATOMIC_RELEASE);
g_dlock.clearwait(lock, pid);
#endif
@ -329,9 +341,9 @@ extern "C" void fastlock_lock(struct fastlock *lock)
int tid = gettid();
unsigned myticket = __atomic_fetch_add(&lock->m_ticket.m_avail, 1, __ATOMIC_RELEASE);
unsigned mask = (1U << (myticket % 32));
unsigned cloops = 0;
ticket ticketT;
unsigned loopLimit = g_fHighCpuPressure ? 0x10000 : 0x100000;
for (;;)
{
@ -344,9 +356,9 @@ extern "C" void fastlock_lock(struct fastlock *lock)
#elif defined(__aarch64__)
__asm__ __volatile__ ("yield");
#endif
if ((++cloops % 0x100000) == 0)
if ((++cloops % loopLimit) == 0)
{
fastlock_sleep(lock, tid, ticketT.u, mask);
fastlock_sleep(lock, tid, ticketT.u, myticket);
}
}
@ -461,3 +473,24 @@ void fastlock_lock_recursive(struct fastlock *lock, int nesting)
fastlock_lock(lock);
lock->m_depth = nesting;
}
void fastlock_auto_adjust_waits()
{
#ifdef __linux__
struct sysinfo sysinf;
auto fHighPressurePrev = g_fHighCpuPressure;
memset(&sysinf, 0, sizeof sysinf);
if (!sysinfo(&sysinf)) {
auto avgCoreLoad = sysinf.loads[0] / get_nprocs();
g_fHighCpuPressure = (avgCoreLoad > ((1 << SI_LOAD_SHIFT) * 0.9));
if (g_fHighCpuPressure)
serverLog(!fHighPressurePrev ? 3 /*LL_WARNING*/ : 1 /* LL_VERBOSE */, "NOTICE: Detuning locks due to high load per core: %.2f%%", avgCoreLoad / (double)(1 << SI_LOAD_SHIFT)*100.0);
}
if (!g_fHighCpuPressure && fHighPressurePrev) {
serverLog(3 /*LL_WARNING*/, "NOTICE: CPU pressure reduced");
}
#else
g_fHighCpuPressure = g_fTestMode;
#endif
}

View File

@ -15,6 +15,7 @@ void fastlock_unlock(struct fastlock *lock);
void fastlock_free(struct fastlock *lock);
int fastlock_unlock_recursive(struct fastlock *lock);
void fastlock_lock_recursive(struct fastlock *lock, int nesting);
void fastlock_auto_adjust_waits();
uint64_t fastlock_getlongwaitcount(); // this is a global value

View File

@ -3,6 +3,7 @@
.extern gettid
.extern fastlock_sleep
.extern g_fHighCpuPressure
# This is the first use of assembly in this codebase, a valid question is WHY?
# The spinlock we implement here is performance critical, and simply put GCC
@ -33,6 +34,13 @@ fastlock_lock:
cmp [rdi], esi # Is the TID we got back the owner of the lock?
je .LLocked # Don't spin in that case
mov r9d, 0x1000 # 1000h is set so we overflow on the 1024*1024'th iteration (like the C code)
mov eax, [rip+g_fHighCpuPressure]
test eax, eax
jz .LNoTestMode
mov r9d, 0x10000
.LNoTestMode:
xor eax, eax # eliminate partial register dependency
inc eax # we want to add one
lock xadd [rdi+66], ax # do the xadd, ax contains the value before the addition
@ -45,8 +53,7 @@ fastlock_lock:
cmp dx, ax # is our ticket up?
je .LLocked # leave the loop
pause
add ecx, 0x1000 # Have we been waiting a long time? (oflow if we have)
# 1000h is set so we overflow on the 1024*1024'th iteration (like the C code)
add ecx, r9d # Have we been waiting a long time? (oflow if we have)
jnc .LLoop # If so, give up our timeslice to someone who's doing real work
# Like the compiler, you're probably thinking: "Hey! I should take these pushs out of the loop"
# But the compiler doesn't know that we rarely hit this, and when we do we know the lock is
@ -62,7 +69,7 @@ fastlock_lock:
# rdi ARG1 futex (already in rdi)
# rsi ARG2 tid (already in esi)
# rdx ARG3 ticketT.u (already in edx)
bts ecx, eax # rcx ARG4 mask
mov ecx, eax # rcx ARG4 myticket
call fastlock_sleep
# cleanup and continue
pop rax

View File

@ -5439,7 +5439,9 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
if (memcmp(ri.key,&key,sizeof(key)) == 0) {
/* This is the first key, we need to re-install the timer according
* to the just added event. */
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
aeDeleteTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,aeTimer);
}, true /* synchronous */, false /* fLock */);
aeTimer = -1;
}
raxStop(&ri);
@ -5447,8 +5449,11 @@ RedisModuleTimerID RM_CreateTimer(RedisModuleCtx *ctx, mstime_t period, RedisMod
/* If we have no main timer (the old one was invalidated, or this is the
* first module timer we have), install one. */
if (aeTimer == -1)
if (aeTimer == -1) {
aePostFunction(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el, [&]{
aeTimer = aeCreateTimeEvent(g_pserver->rgthreadvar[IDX_EVENT_LOOP_MAIN].el,period,moduleTimerHandler,NULL,NULL);
}, true /* synchronous */, false /* fLock */);
}
return key;
}

View File

@ -135,6 +135,7 @@ client *createClient(connection *conn, int iel) {
c->sentlenAsync = 0;
c->flags = 0;
c->fPendingAsyncWrite = FALSE;
c->fPendingAsyncWriteHandler = FALSE;
c->ctime = c->lastinteraction = g_pserver->unixtime;
/* If the default user does not require authentication, the user is
* directly authenticated. */
@ -333,7 +334,7 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) {
clientReplyBlock *tail = (clientReplyBlock*) (ln? listNodeValue(ln): NULL);
/* Note that 'tail' may be NULL even if we have a tail node, becuase when
* addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just
* addReplyDeferredLen() is used, it sets a dummy node to NULL just
* fo fill it later, when the size of the bulk length is set. */
/* Append to tail string when possible. */
@ -513,31 +514,7 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync)
if (ctype == CLIENT_TYPE_MASTER && g_pserver->repl_backlog &&
g_pserver->repl_backlog_histlen > 0)
{
long long dumplen = 256;
if (g_pserver->repl_backlog_histlen < dumplen)
dumplen = g_pserver->repl_backlog_histlen;
/* Identify the first byte to dump. */
long long idx =
(g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - dumplen)) %
g_pserver->repl_backlog_size;
/* Scan the circular buffer to collect 'dumplen' bytes. */
sds dump = sdsempty();
while(dumplen) {
long long thislen =
((g_pserver->repl_backlog_size - idx) < dumplen) ?
(g_pserver->repl_backlog_size - idx) : dumplen;
dump = sdscatrepr(dump,g_pserver->repl_backlog+idx,thislen);
dumplen -= thislen;
idx = 0;
}
/* Finally log such bytes: this is vital debugging info to
* understand what happened. */
serverLog(LL_WARNING,"Latest backlog is: '%s'", dump);
sdsfree(dump);
showLatestBacklog();
}
g_pserver->stat_unexpected_error_replies++;
}
@ -599,7 +576,7 @@ void trimReplyUnusedTailSpace(client *c) {
clientReplyBlock *tail = ln? (clientReplyBlock*)listNodeValue(ln): NULL;
/* Note that 'tail' may be NULL even if we have a tail node, becuase when
* addDeferredMultiBulkLength() is used */
* addReplyDeferredLen() is used */
if (!tail) return;
/* We only try to trim the space is relatively high (more than a 1/4 of the
@ -1881,29 +1858,21 @@ void ProcessPendingAsyncWrites()
std::atomic_thread_fence(std::memory_order_seq_cst);
if (c->casyncOpsPending == 0 || c->btype == BLOCKED_ASYNC) // It's ok to send data if we're in a bgthread op
{
if (FCorrectThread(c))
{
prepareClientToWrite(c, false); // queue an event
}
else
{
// We need to start the write on the client's thread
if (aePostFunction(g_pserver->rgthreadvar[c->iel].el, [c]{
// Install a write handler. Don't do the actual write here since we don't want
// to duplicate the throttling and safety mechanisms of the normal write code
std::lock_guard<decltype(c->lock)> lock(c->lock);
serverAssert(c->casyncOpsPending > 0);
c->casyncOpsPending--;
if (!c->fPendingAsyncWriteHandler) {
c->fPendingAsyncWriteHandler = true;
bool fResult = c->postFunction([](client *c) {
c->fPendingAsyncWriteHandler = false;
connSetWriteHandler(c->conn, sendReplyToClient, true);
}, false) == AE_ERR
)
{
// Posting the function failed
continue; // We can retry later in the cron
}
++c->casyncOpsPending; // race is handled by the client lock in the lambda
});
if (!fResult)
c->fPendingAsyncWriteHandler = false; // if we failed to set the handler then prevent this from never being reset
}
}
}
@ -2079,6 +2048,19 @@ int processInlineBuffer(client *c) {
if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
c->repl_ack_time = g_pserver->unixtime;
/* Masters should never send us inline protocol to run actual
* commands. If this happens, it is likely due to a bug in Redis where
* we got some desynchronization in the protocol, for example
* beause of a PSYNC gone bad.
*
* However the is an exception: masters may send us just a newline
* to keep the connection active. */
if (querylen != 0 && c->flags & CLIENT_MASTER) {
serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master.");
setProtocolError("Master using the inline protocol. Desync?",c);
return C_ERR;
}
/* Move querybuffer position to the next query in the buffer. */
c->qb_pos += querylen+linefeed_chars;
@ -2102,7 +2084,7 @@ int processInlineBuffer(client *c) {
* CLIENT_PROTOCOL_ERROR. */
#define PROTO_DUMP_LEN 128
static void setProtocolError(const char *errstr, client *c) {
if (cserver.verbosity <= LL_VERBOSE) {
if (cserver.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) {
sds client = catClientInfoString(sdsempty(),c);
/* Sample some protocol to given an idea about what was inside. */
@ -2121,7 +2103,9 @@ static void setProtocolError(const char *errstr, client *c) {
}
/* Log all the client and protocol info. */
serverLog(LL_VERBOSE,
int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING :
LL_VERBOSE;
serverLog(loglevel,
"Protocol error (%s) from client: %s. %s", errstr, client, buf);
sdsfree(client);
}
@ -2280,7 +2264,6 @@ int processMultibulkBuffer(client *c) {
* 2. In the case of master clients, the replication offset is updated.
* 3. Propagate commands we got from our master to replicas down the line. */
void commandProcessed(client *c) {
int cmd_is_ping = c->cmd && c->cmd->proc == pingCommand;
long long prev_offset = c->reploff;
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
@ -2307,7 +2290,6 @@ void commandProcessed(client *c) {
AeLocker ae;
ae.arm(c);
long long applied = c->reploff - prev_offset;
long long prev_master_repl_meaningful_offset = g_pserver->master_repl_meaningful_offset;
if (applied) {
if (!g_pserver->fActiveReplica)
{
@ -2316,10 +2298,6 @@ void commandProcessed(client *c) {
}
sdsrange(c->pending_querybuf,applied,-1);
}
/* The g_pserver->master_repl_meaningful_offset variable represents
* the offset of the replication stream without the pending PINGs. */
if (cmd_is_ping)
g_pserver->master_repl_meaningful_offset = prev_master_repl_meaningful_offset;
}
}
@ -2852,7 +2830,7 @@ NULL
if (target && target->flags & CLIENT_BLOCKED) {
std::unique_lock<fastlock> ul(target->lock);
if (unblock_error)
addReplyError(target,
addReplyErrorAsync(target,
"-UNBLOCKED client unblocked via CLIENT UNBLOCK");
else
replyToBlockedClientTimedOut(target);

View File

@ -2243,6 +2243,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
size_t ckeysLoaded = 0;
robj *subexpireKey = nullptr;
sds key = nullptr;
bool fLastKeyExpired = false;
for (int idb = 0; idb < cserver.dbnum; ++idb)
{
@ -2380,12 +2381,23 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
static_assert(sizeof(unsigned long long) == sizeof(uint64_t), "Ensure long long is 64-bits");
mvcc_tstamp = strtoull(szFromObj(auxval), nullptr, 10);
} else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-key")) {
if (subexpireKey != nullptr) {
serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? key : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)");
decrRefCount(subexpireKey);
subexpireKey = nullptr;
}
subexpireKey = auxval;
incrRefCount(subexpireKey);
} else if (!strcasecmp(szFromObj(auxkey), "keydb-subexpire-when")) {
if (key == nullptr || subexpireKey == nullptr) {
if (!fLastKeyExpired) { // This is not an error if we just expired the key associated with this subexpire
serverLog(LL_WARNING, "Corrupt subexpire entry in RDB skipping. key: %s subkey: %s", key != nullptr ? key : "(null)", subexpireKey != nullptr ? szFromObj(subexpireKey) : "(null)");
}
if (subexpireKey) {
decrRefCount(subexpireKey);
subexpireKey = nullptr;
}
}
else {
redisObject keyobj;
initStaticStringObject(keyobj,key);
@ -2489,6 +2501,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
rsi->mi->staleKeyMap->operator[](dbid).push_back(objKeyDup);
decrRefCount(objKeyDup);
}
fLastKeyExpired = true;
sdsfree(key);
key = nullptr;
decrRefCount(val);
@ -2512,6 +2525,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
/* Add the new object in the hash table */
int fInserted = dbMerge(db, &keyobj, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
fLastKeyExpired = false;
if (fInserted)
{

View File

@ -66,6 +66,8 @@ struct benchmarkThread;
struct clusterNode;
struct redisConfig;
int g_fTestMode = false;
static struct config {
aeEventLoop *el;
const char *hostip;
@ -308,7 +310,7 @@ fail:
else fprintf(stderr, "%s\n", hostsocket);
freeReplyObject(reply);
redisFree(c);
zfree(cfg);
freeRedisConfig(cfg);
return NULL;
}
static void freeRedisConfig(redisConfig *cfg) {
@ -1284,6 +1286,17 @@ static void updateClusterSlotsConfiguration() {
pthread_mutex_unlock(&config.is_updating_slots_mutex);
}
/* Generate random data for redis benchmark. See #7196. */
static void genBenchmarkRandomData(char *data, int count) {
static uint32_t state = 1234;
int i = 0;
while (count--) {
state = (state*1103515245+12345);
data[i++] = '0'+((state>>16)&63);
}
}
/* Returns number of consumed options. */
int parseOptions(int argc, const char **argv) {
int i;
@ -1646,7 +1659,7 @@ int main(int argc, const char **argv) {
/* Run default benchmark suite. */
data = (char*)zmalloc(config.datasize+1, MALLOC_LOCAL);
do {
memset(data,'x',config.datasize);
genBenchmarkRandomData(data, config.datasize);
data[config.datasize] = '\0';
if (test_is_selected("ping_inline") || test_is_selected("ping"))

View File

@ -108,10 +108,41 @@ extern "C" void freeClusterManager(void) {
dictRelease(clusterManagerUncoveredSlots);
}
/* This function returns a random master node, return NULL if none */
static clusterManagerNode *clusterManagerNodeMasterRandom() {
int master_count = 0;
int idx;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = (clusterManagerNode*) ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
master_count++;
}
srand(time(NULL));
idx = rand() % master_count;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = (clusterManagerNode*) ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (!idx--) {
return n;
}
}
/* Can not be reached */
return NULL;
}
static int clusterManagerFixSlotsCoverage(char *all_slots) {
dictIterator *iter = nullptr;
int force_fix = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_FIX_WITH_UNREACHABLE_MASTERS;
/* we want explicit manual confirmation from users for all the fix cases */
int force = 0;
dictIterator *iter = nullptr;
if (cluster_manager.unreachable_masters > 0 && !force_fix) {
clusterManagerLogWarn("*** Fixing slots coverage with %d unreachable masters is dangerous: redis-cli will assume that slots about masters that are not reachable are not covered, and will try to reassign them to the reachable nodes. This can cause data loss and is rarely what you want to do. If you really want to proceed use the --cluster-fix-with-unreachable-masters option.\n", cluster_manager.unreachable_masters);
@ -181,7 +212,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
printf("The following uncovered slots have no keys "
"across the cluster:\n");
clusterManagerPrintSlotsList(none);
if (confirmWithYes("Fix these slots by covering with a random node?")){
if (confirmWithYes("Fix these slots by covering with a random node?",
force)) {
listIter li;
listNode *ln;
listRewind(none, &li);
@ -207,7 +239,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
if (listLength(single) > 0) {
printf("The following uncovered slots have keys in just one node:\n");
clusterManagerPrintSlotsList(single);
if (confirmWithYes("Fix these slots by covering with those nodes?")){
if (confirmWithYes("Fix these slots by covering with those nodes?",
force)) {
listIter li;
listNode *ln;
listRewind(single, &li);
@ -239,7 +272,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
printf("The following uncovered slots have keys in multiple nodes:\n");
clusterManagerPrintSlotsList(multi);
if (confirmWithYes("Fix these slots by moving keys "
"into a single node?")) {
"into a single node?", force)) {
listIter li;
listNode *ln;
listRewind(multi, &li);
@ -539,35 +572,6 @@ dict *clusterManagerGetLinkStatus(void) {
return status;
}
/* This function returns a random master node, return NULL if none */
static clusterManagerNode *clusterManagerNodeMasterRandom() {
int master_count = 0;
int idx;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = (clusterManagerNode*) ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
master_count++;
}
srand(time(NULL));
idx = rand() % master_count;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = (clusterManagerNode*) ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (!idx--) {
return n;
}
}
/* Can not be reached */
return NULL;
}
extern "C" int clusterManagerCheckCluster(int quiet) {
listNode *ln = listFirst(cluster_manager.nodes);
if (!ln) return 0;

View File

@ -69,6 +69,8 @@
redisContext *context;
struct config config;
int g_fTestMode = 0;
/* User preferences. */
static struct pref {
int hints;
@ -1607,7 +1609,14 @@ static void usage(void) {
exit(1);
}
int confirmWithYes(const char *msg) {
int confirmWithYes(const char *msg, int force) {
/* if force is true and --cluster-yes option is on,
* do not prompt for an answer */
if (force &&
(config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_YES)) {
return 1;
}
printf("%s (type 'yes' to accept): ", msg);
fflush(stdout);
char buf[4];
@ -4586,7 +4595,8 @@ assign_replicas:
}
clusterManagerOptimizeAntiAffinity(ip_nodes, ip_count);
clusterManagerShowNodes();
if (confirmWithYes("Can I set the above configuration?")) {
int force = 1;
if (confirmWithYes("Can I set the above configuration?", force)) {
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *node = ln->value;

View File

@ -273,7 +273,7 @@ int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr,
int *bus_port_ptr);
int clusterManagerCheckRedisReply(clusterManagerNode *n,
redisReply *r, char **err);
int confirmWithYes(const char *msg);
int confirmWithYes(const char *msg, int force);
int clusterManagerSetSlotOwner(clusterManagerNode *owner,
int slot,
int do_clear);

View File

@ -47,7 +47,6 @@
#include <unordered_map>
#include <string>
long long adjustMeaningfulReplOffset();
void replicationDiscardCachedMaster(redisMaster *mi);
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
void replicationSendAck(redisMaster *mi);
@ -249,7 +248,6 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
const unsigned char *p = (const unsigned char*)ptr;
g_pserver->master_repl_offset += len;
g_pserver->master_repl_meaningful_offset = g_pserver->master_repl_offset;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */
@ -543,6 +541,40 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listEmpty(fake->reply);
}
/* This is a debugging function that gets called when we detect something
* wrong with the replication protocol: the goal is to peek into the
* replication backlog and show a few final bytes to make simpler to
* guess what kind of bug it could be. */
void showLatestBacklog(void) {
if (g_pserver->repl_backlog == NULL) return;
long long dumplen = 256;
if (g_pserver->repl_backlog_histlen < dumplen)
dumplen = g_pserver->repl_backlog_histlen;
/* Identify the first byte to dump. */
long long idx =
(g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - dumplen)) %
g_pserver->repl_backlog_size;
/* Scan the circular buffer to collect 'dumplen' bytes. */
sds dump = sdsempty();
while(dumplen) {
long long thislen =
((g_pserver->repl_backlog_size - idx) < dumplen) ?
(g_pserver->repl_backlog_size - idx) : dumplen;
dump = sdscatrepr(dump,g_pserver->repl_backlog+idx,thislen);
dumplen -= thislen;
idx = 0;
}
/* Finally log such bytes: this is vital debugging info to
* understand what happened. */
serverLog(LL_WARNING,"Latest backlog is: '%s'", dump);
sdsfree(dump);
}
/* This function is used in order to proxy what we receive from our master
* to our sub-slaves. */
#include <ctype.h>
@ -1006,6 +1038,9 @@ void syncCommand(client *c) {
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
serverLog(LL_NOTICE,"Replication backlog created, my new "
"replication IDs are '%s' and '%s'",
g_pserver->replid, g_pserver->replid2);
}
/* CASE 1: BGSAVE is in progress, with disk target. */
@ -1086,6 +1121,25 @@ void processReplconfUuid(client *c, robj *arg)
if (uuid_parse(remoteUUID, c->uuid) != 0)
goto LError;
listIter liMi;
listNode *lnMi;
listRewind(g_pserver->masters, &liMi);
// Enforce a fair ordering for connection, if they attempt to connect before us close them out
// This must be consistent so that both make the same decision of who should proceed first
while ((lnMi = listNext(&liMi))) {
redisMaster *mi = (redisMaster*)listNodeValue(lnMi);
if (mi->repl_state == REPL_STATE_CONNECTED)
continue;
if (FSameUuidNoNil(mi->master_uuid, c->uuid)) {
// Decide based on UUID so both clients make the same decision of which host loses
// otherwise we may entere a loop where neither client can proceed
if (memcmp(mi->master_uuid, c->uuid, UUID_BINARY_LEN) < 0) {
freeClientAsync(c);
}
}
}
char szServerUUID[36 + 2]; // 1 for the '+', another for '\0'
szServerUUID[0] = '+';
uuid_unparse(cserver.uuid, szServerUUID+1);
@ -1301,8 +1355,7 @@ void sendBulkToSlave(connection *conn) {
client *replica = (client*)connGetPrivateData(conn);
serverAssert(FCorrectThread(replica));
char buf[PROTO_IOBUF_LEN];
ssize_t nwritten, buflen;
ssize_t nwritten;
AeLocker aeLock;
std::unique_lock<fastlock> ul(replica->lock);
@ -1331,7 +1384,14 @@ void sendBulkToSlave(connection *conn) {
}
}
/* If the preamble was already transferred, send the RDB bulk data. */
/* If the preamble was already transferred, send the RDB bulk data.
* try to use sendfile system call if supported, unless tls is enabled.
* fallback to normal read+write otherwise. */
nwritten = 0;
if (!nwritten) {
ssize_t buflen;
char buf[PROTO_IOBUF_LEN];
lseek(replica->repldbfd,replica->repldboff,SEEK_SET);
buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN);
if (buflen <= 0) {
@ -1352,6 +1412,8 @@ void sendBulkToSlave(connection *conn) {
}
return;
}
}
replica->repldboff += nwritten;
g_pserver->stat_net_output_bytes += nwritten;
if (replica->repldboff == replica->repldbsize) {
@ -1936,6 +1998,10 @@ void readSyncBulkPayload(connection *conn) {
nread = connRead(conn,buf,readlen);
if (nread <= 0) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
/* equivalent to EAGAIN */
return;
}
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake(mi);
@ -2206,7 +2272,6 @@ void readSyncBulkPayload(connection *conn) {
* we are starting a new history. */
memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid));
g_pserver->master_repl_offset = mi->master->reploff;
g_pserver->master_repl_meaningful_offset = mi->master->reploff;
}
clearReplicationId2();
@ -3054,11 +3119,6 @@ void replicationUnsetMaster(redisMaster *mi) {
sdsfree(mi->masterhost);
mi->masterhost = NULL;
/* When a slave is turned into a master, the current replication ID
* (that was inherited from the master at synchronization time) is
* used as secondary ID up to the current offset, and a new replication
* ID is created to continue with a new replication history. */
shiftReplicationId();
if (mi->master) {
if (FCorrectThread(mi->master))
freeClient(mi->master);
@ -3067,6 +3127,15 @@ void replicationUnsetMaster(redisMaster *mi) {
}
replicationDiscardCachedMaster(mi);
cancelReplicationHandshake(mi);
/* When a slave is turned into a master, the current replication ID
* (that was inherited from the master at synchronization time) is
* used as secondary ID up to the current offset, and a new replication
* ID is created to continue with a new replication history.
*
* NOTE: this function MUST be called after we call
* freeClient(server.master), since there we adjust the replication
* offset trimming the final PINGs. See Github issue #7320. */
shiftReplicationId();
/* Disconnecting all the slaves is required: we need to inform slaves
* of the replication ID change (see shiftReplicationId() call). However
* the slaves will be able to partially resync with us, so it will be
@ -3302,10 +3371,6 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
* pending outputs to the master. */
sdsclear(mi->master->querybuf);
sdsclear(mi->master->pending_querybuf);
/* Adjust reploff and read_reploff to the last meaningful offset we executed.
* this is the offset the replica will use for future PSYNC. */
mi->master->reploff = adjustMeaningfulReplOffset();
mi->master->read_reploff = mi->master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
@ -3331,38 +3396,6 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
replicationHandleMasterDisconnection(mi);
}
/* If the "meaningful" offset, that is the offset without the final PINGs
* in the stream, is different than the last offset, use it instead:
* often when the master is no longer reachable, replicas will never
* receive the PINGs, however the master will end with an incremented
* offset because of the PINGs and will not be able to incrementally
* PSYNC with the new master.
* This function trims the replication backlog when needed, and returns
* the offset to be used for future partial sync. */
long long adjustMeaningfulReplOffset() {
if (g_pserver->master_repl_offset > g_pserver->master_repl_meaningful_offset) {
long long delta = g_pserver->master_repl_offset -
g_pserver->master_repl_meaningful_offset;
serverLog(LL_NOTICE,
"Using the meaningful offset %lld instead of %lld to exclude "
"the final PINGs (%lld bytes difference)",
g_pserver->master_repl_meaningful_offset,
g_pserver->master_repl_offset,
delta);
g_pserver->master_repl_offset = g_pserver->master_repl_meaningful_offset;
if (g_pserver->repl_backlog_histlen <= delta) {
g_pserver->repl_backlog_histlen = 0;
g_pserver->repl_backlog_idx = 0;
} else {
g_pserver->repl_backlog_histlen -= delta;
g_pserver->repl_backlog_idx =
(g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - delta)) %
g_pserver->repl_backlog_size;
}
}
return g_pserver->master_repl_offset;
}
/* This function is called when a master is turend into a slave, in order to
* create from scratch a cached master for the new client, that will allow
* to PSYNC with the slave that was promoted as the new master after a
@ -3388,7 +3421,7 @@ void replicationCacheMasterUsingMyself(redisMaster *mi) {
* by replicationCreateMasterClient(). We'll later set the created
* master as server.cached_master, so the replica will use such
* offset for PSYNC. */
mi->master_initial_offset = adjustMeaningfulReplOffset();
mi->master_initial_offset = g_pserver->master_repl_offset;
/* The master client we create can be set to any DBID, because
* the new master will start its replication stream with SELECT. */
@ -3730,6 +3763,18 @@ void replicationCron(void) {
listIter liMaster;
listNode *lnMaster;
listRewind(g_pserver->masters, &liMaster);
bool fInMasterConnection = false;
while ((lnMaster = listNext(&liMaster)) && !fInMasterConnection)
{
redisMaster *mi = (redisMaster*)listNodeValue(lnMaster);
if (mi->repl_state != REPL_STATE_NONE && mi->repl_state != REPL_STATE_CONNECTED && mi->repl_state != REPL_STATE_CONNECT) {
fInMasterConnection = true;
}
}
bool fConnectionStarted = false;
listRewind(g_pserver->masters, &liMaster);
while ((lnMaster = listNext(&liMaster)))
{
redisMaster *mi = (redisMaster*)listNodeValue(lnMaster);
@ -3768,12 +3813,14 @@ void replicationCron(void) {
}
/* Check if we should connect to a MASTER */
if (mi->repl_state == REPL_STATE_CONNECT) {
if (mi->repl_state == REPL_STATE_CONNECT && !fInMasterConnection) {
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
mi->masterhost, mi->masterport);
if (connectWithMaster(mi) == C_OK) {
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
fInMasterConnection = true;
fConnectionStarted = true;
}
/* Send ACK to master from time to time.
@ -3784,6 +3831,11 @@ void replicationCron(void) {
replicationSendAck(mi);
}
if (fConnectionStarted) {
// If we cancel this handshake we want the next attempt to be a different master
listRotateHeadToTail(g_pserver->masters);
}
/* If we have attached slaves, PING them from time to time.
* So slaves can implement an explicit timeout to masters, and will
* be able to detect a link disconnection even if the TCP connection
@ -3806,18 +3858,10 @@ void replicationCron(void) {
clientsArePaused();
if (!manual_failover_in_progress) {
long long before_ping = g_pserver->master_repl_meaningful_offset;
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb,
ping_argv, 1);
decrRefCount(ping_argv[0]);
/* The server.master_repl_meaningful_offset variable represents
* the offset of the replication stream without the pending PINGs.
* This is useful to set the right replication offset for PSYNC
* when the master is turned into a replica. Otherwise pending
* PINGs may not allow it to perform an incremental sync with the
* new master. */
g_pserver->master_repl_meaningful_offset = before_ping;
}
}

View File

@ -2030,8 +2030,6 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
processUnblockedClients(IDX_EVENT_LOOP_MAIN);
}
ProcessPendingAsyncWrites(); // This is really a bug, but for now catch any laggards that didn't clean up
/* Software watchdog: deliver the SIGALRM that will reach the signal
* handler if we don't return here fast enough. */
if (g_pserver->watchdog_period) watchdogScheduleSignal(g_pserver->watchdog_period);
@ -2236,6 +2234,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
run_with_period(30000) {
checkTrialTimeout();
/* Tune the fastlock to CPU load */
fastlock_auto_adjust_waits();
}
/* Resize tracking keys table if needed. This is also done at every
@ -2273,6 +2274,9 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
&ei);
/* CRON functions may trigger async writes, so do this last */
ProcessPendingAsyncWrites();
g_pserver->cronloops++;
return 1000/g_pserver->hz;
}
@ -2656,7 +2660,6 @@ void initServerConfig(void) {
g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER;
g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
g_pserver->master_repl_offset = 0;
g_pserver->master_repl_meaningful_offset = 0;
/* Replication partial resync backlog */
g_pserver->repl_backlog = NULL;
@ -4848,8 +4851,17 @@ sds genRedisInfoString(const char *section) {
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
bool fAllUp = true;
while ((ln = listNext(&li))) {
redisMaster *mi = (redisMaster*)listNodeValue(ln);
fAllUp = fAllUp && mi->repl_state == REPL_STATE_CONNECTED;
}
info = sdscatprintf(info, "master_global_link_status:%s\r\n",
fAllUp ? "up" : "down");
int cmasters = 0;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
long long slave_repl_offset = 1;
@ -4961,7 +4973,6 @@ sds genRedisInfoString(const char *section) {
"master_replid:%s\r\n"
"master_replid2:%s\r\n"
"master_repl_offset:%lld\r\n"
"master_repl_meaningful_offset:%lld\r\n"
"second_repl_offset:%lld\r\n"
"repl_backlog_active:%d\r\n"
"repl_backlog_size:%lld\r\n"
@ -4970,7 +4981,6 @@ sds genRedisInfoString(const char *section) {
g_pserver->replid,
g_pserver->replid2,
g_pserver->master_repl_offset,
g_pserver->master_repl_meaningful_offset,
g_pserver->second_replid_offset,
g_pserver->repl_backlog != NULL,
g_pserver->repl_backlog_size,
@ -5390,7 +5400,6 @@ void loadDataFromDisk(void) {
{
memcpy(g_pserver->replid,rsi.repl_id,sizeof(g_pserver->replid));
g_pserver->master_repl_offset = rsi.repl_offset;
g_pserver->master_repl_meaningful_offset = rsi.repl_offset;
listIter li;
listNode *ln;

View File

@ -1645,6 +1645,7 @@ typedef struct client {
std::atomic<uint64_t> flags; /* Client flags: CLIENT_* macros. */
int casyncOpsPending;
int fPendingAsyncWrite; /* NOTE: Not a flag because it is written to outside of the client lock (locked by the global lock instead) */
int fPendingAsyncWriteHandler;
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a replica. */
int repl_put_online_on_ack; /* Install replica write handler on ACK. */
@ -2228,7 +2229,6 @@ struct redisServer {
char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
long long master_repl_offset; /* My current replication offset */
long long master_repl_meaningful_offset; /* Offset minus latest PINGs. */
long long second_replid_offset; /* Accept offsets up to this for replid2. */
int replicaseldb; /* Last SELECTed DB in replication output */
int repl_ping_slave_period; /* Master pings the replica every N seconds */
@ -2825,6 +2825,7 @@ void chopReplicationBacklog(void);
void replicationCacheMasterUsingMyself(struct redisMaster *mi);
void feedReplicationBacklog(const void *ptr, size_t len);
void updateMasterAuth();
void showLatestBacklog();
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);

View File

@ -521,7 +521,7 @@ void stralgoLCS(client *c) {
!= C_OK) return;
if (minmatchlen < 0) minmatchlen = 0;
j++;
} else if (!strcasecmp(opt,"STRINGS")) {
} else if (!strcasecmp(opt,"STRINGS") && moreargs > 1) {
if (a != NULL) {
addReplyError(c,"Either use STRINGS or KEYS");
return;
@ -529,7 +529,7 @@ void stralgoLCS(client *c) {
a = szFromObj(c->argv[j+1]);
b = szFromObj(c->argv[j+2]);
j += 2;
} else if (!strcasecmp(opt,"KEYS")) {
} else if (!strcasecmp(opt,"KEYS") && moreargs > 1) {
if (a != NULL) {
addReplyError(c,"Either use STRINGS or KEYS");
return;

View File

@ -129,7 +129,7 @@ static void initCryptoLocks(void) {
return;
}
nlocks = CRYPTO_num_locks();
openssl_locks = zmalloc(sizeof(*openssl_locks) * nlocks);
openssl_locks = (pthread_mutex_t*)zmalloc(sizeof(*openssl_locks) * nlocks);
for (i = 0; i < nlocks; i++) {
pthread_mutex_init(openssl_locks + i, NULL);
}
@ -349,6 +349,7 @@ connection *connCreateTLS(void) {
void connTLSMarshalThread(connection *c) {
tls_connection *conn = (tls_connection*)c;
serverAssert(conn->pending_list_node == nullptr);
conn->el = serverTL->el;
}
@ -458,7 +459,6 @@ void updateSSLEvent(tls_connection *conn) {
void tlsHandleEvent(tls_connection *conn, int mask) {
int ret;
serverAssert(!GlobalLocksAcquired());
serverAssert(conn->el == serverTL->el);
TLSCONN_DEBUG("tlsEventHandler(): fd=%d, state=%d, mask=%d, r=%d, w=%d, flags=%d",
@ -910,6 +910,7 @@ int tlsHasPendingData() {
int tlsProcessPendingData() {
listIter li;
listNode *ln;
serverAssert(!GlobalLocksAcquired());
int processed = listLength(pending_list);
listRewind(pending_list,&li);

View File

@ -209,6 +209,7 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
if (!redir) {
c->flags |= CLIENT_TRACKING_BROKEN_REDIR;
/* We need to signal to the original connection that we
* are unable to send invalidation messages to the redirected
* connection, because the client no longer exist. */

View File

@ -42,3 +42,9 @@ test "client do not break when cluster slot" {
fail "output overflow when cluster slots"
}
}
test "client can handle keys with hash tag" {
set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
$cluster set foo{tag} bar
$cluster close
}

View File

@ -25,6 +25,7 @@ set ::sentinel_instances {}
set ::redis_instances {}
set ::sentinel_base_port 20000
set ::redis_base_port 30000
set ::redis_port_count 1024
set ::pids {} ; # We kill everything at exit
set ::dirs {} ; # We remove all the temp dirs at exit
set ::run_matching {} ; # If non empty, only tests matching pattern are run.
@ -60,7 +61,7 @@ proc exec_instance {type cfgfile dir} {
# Spawn a redis or sentinel instance, depending on 'type'.
proc spawn_instance {type base_port count {conf {}}} {
for {set j 0} {$j < $count} {incr j} {
set port [find_available_port $base_port]
set port [find_available_port $base_port $::redis_port_count]
incr base_port
puts "Starting $type #$j at port $port"

View File

@ -315,7 +315,7 @@ tags {"aof"} {
}
test "AOF+PEXPIREMEMBERAT: set should have 3 values" {
set client [redis [dict get $srv host] [dict get $srv port]]
set client [redis [dict get $srv host] [dict get $srv port] 0 $::tls]
wait_for_condition 50 100 {
[catch {$client ping} e] == 0
} else {

View File

@ -1,6 +1,9 @@
# Test the meaningful offset implementation to make sure masters
# are able to PSYNC with replicas even if the replication stream
# has pending PINGs at the end.
# These tests were added together with the meaningful offset implementation
# in redis 6.0.0, which was later abandoned in 6.0.4, they used to test that
# servers are able to PSYNC with replicas even if the replication stream has
# PINGs at the end which present in one sever and missing on another.
# We keep these tests just because they reproduce edge cases in the replication
# logic in hope they'll be able to spot some problem in the future.
start_server {tags {"psync2"}} {
start_server {} {
@ -16,7 +19,7 @@ start_server {} {
}
# Setup replication
test "PSYNC2 meaningful offset: setup" {
test "PSYNC2 pingoff: setup" {
$R(1) replicaof $R_host(0) $R_port(0)
$R(0) set foo bar
wait_for_condition 50 1000 {
@ -27,7 +30,7 @@ start_server {} {
}
}
test "PSYNC2 meaningful offset: write and wait replication" {
test "PSYNC2 pingoff: write and wait replication" {
$R(0) INCR counter
$R(0) INCR counter
$R(0) INCR counter
@ -41,7 +44,7 @@ start_server {} {
# In this test we'll make sure the replica will get stuck, but with
# an active connection: this way the master will continue to send PINGs
# every second (we modified the PING period earlier)
test "PSYNC2 meaningful offset: pause replica and promote it" {
test "PSYNC2 pingoff: pause replica and promote it" {
$R(1) MULTI
$R(1) DEBUG SLEEP 5
$R(1) SLAVEOF NO ONE
@ -50,13 +53,179 @@ start_server {} {
}
test "Make the old master a replica of the new one and check conditions" {
set sync_partial [status $R(1) sync_partial_ok]
assert {$sync_partial == 0}
assert_equal [status $R(1) sync_full] 0
$R(0) REPLICAOF $R_host(1) $R_port(1)
wait_for_condition 50 1000 {
[status $R(1) sync_partial_ok] == 1
[status $R(1) sync_full] == 1
} else {
fail "The new master was not able to partial sync"
fail "The new master was not able to sync"
}
# make sure replication is still alive and kicking
$R(1) incr x
wait_for_condition 50 1000 {
[$R(0) get x] == 1
} else {
fail "replica didn't get incr"
}
assert_equal [status $R(0) master_repl_offset] [status $R(1) master_repl_offset]
}
}}
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
start_server {} {
start_server {} {
test {test various edge cases of repl topology changes with missing pings at the end} {
set master [srv -4 client]
set master_host [srv -4 host]
set master_port [srv -4 port]
set replica1 [srv -3 client]
set replica2 [srv -2 client]
set replica3 [srv -1 client]
set replica4 [srv -0 client]
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
$replica3 replicaof $master_host $master_port
$replica4 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[status $master connected_slaves] == 4
} else {
fail "replicas didn't connect"
}
$master incr x
wait_for_condition 50 1000 {
[$replica1 get x] == 1 && [$replica2 get x] == 1 &&
[$replica3 get x] == 1 && [$replica4 get x] == 1
} else {
fail "replicas didn't get incr"
}
# disconnect replica1 and replica2
# and wait for the master to send a ping to replica3 and replica4
$replica1 replicaof no one
$replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id
$master config set repl-ping-replica-period 1
after 1500
# make everyone sync from the replica1 that didn't get the last ping from the old master
# replica4 will keep syncing from the old master which now syncs from replica1
# and replica2 will re-connect to the old master (which went back in time)
set new_master_host [srv -3 host]
set new_master_port [srv -3 port]
$replica3 replicaof $new_master_host $new_master_port
$master replicaof $new_master_host $new_master_port
$replica2 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[status $replica2 master_link_status] == "up" &&
[status $replica3 master_link_status] == "up" &&
[status $replica4 master_link_status] == "up" &&
[status $master master_link_status] == "up"
} else {
fail "replicas didn't connect"
}
# make sure replication is still alive and kicking
$replica1 incr x
wait_for_condition 50 1000 {
[$replica2 get x] == 2 &&
[$replica3 get x] == 2 &&
[$replica4 get x] == 2 &&
[$master get x] == 2
} else {
fail "replicas didn't get incr"
}
# make sure we have the right amount of full syncs
assert_equal [status $master sync_full] 6
assert_equal [status $replica1 sync_full] 2
assert_equal [status $replica2 sync_full] 0
assert_equal [status $replica3 sync_full] 0
assert_equal [status $replica4 sync_full] 0
# force psync
$master client kill type master
$replica2 client kill type master
$replica3 client kill type master
$replica4 client kill type master
# make sure replication is still alive and kicking
$replica1 incr x
wait_for_condition 50 1000 {
[$replica2 get x] == 3 &&
[$replica3 get x] == 3 &&
[$replica4 get x] == 3 &&
[$master get x] == 3
} else {
fail "replicas didn't get incr"
}
# make sure we have the right amount of full syncs
assert_equal [status $master sync_full] 6
assert_equal [status $replica1 sync_full] 2
assert_equal [status $replica2 sync_full] 0
assert_equal [status $replica3 sync_full] 0
assert_equal [status $replica4 sync_full] 0
}
}}}}}
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
for {set j 0} {$j < 3} {incr j} {
set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port]
$R($j) CONFIG SET repl-ping-replica-period 1
}
test "Chained replicas disconnect when replica re-connect with the same master" {
# Add a second replica as a chained replica of the current replica
$R(1) replicaof $R_host(0) $R_port(0)
$R(2) replicaof $R_host(1) $R_port(1)
wait_for_condition 50 1000 {
[status $R(2) master_link_status] == "up"
} else {
fail "Chained replica not replicating from its master"
}
# Do a write on the master, and wait for 3 seconds for the master to
# send some PINGs to its replica
$R(0) INCR counter2
after 2000
set sync_partial_master [status $R(0) sync_partial_ok]
set sync_partial_replica [status $R(1) sync_partial_ok]
$R(0) CONFIG SET repl-ping-replica-period 100
# Disconnect the master's direct replica
$R(0) client kill type replica
wait_for_condition 50 1000 {
[status $R(1) master_link_status] == "up" &&
[status $R(2) master_link_status] == "up" &&
[status $R(0) sync_partial_ok] == $sync_partial_master + 1 &&
[status $R(1) sync_partial_ok] == $sync_partial_replica
} else {
fail "Disconnected replica failed to PSYNC with master"
}
# Verify that the replica and its replica's meaningful and real
# offsets match with the master
assert_equal [status $R(0) master_repl_offset] [status $R(1) master_repl_offset]
assert_equal [status $R(0) master_repl_offset] [status $R(2) master_repl_offset]
# make sure replication is still alive and kicking
$R(0) incr counter2
wait_for_condition 50 1000 {
[$R(1) get counter2] == 2 && [$R(2) get counter2] == 2
} else {
fail "replicas didn't get incr"
}
assert_equal [status $R(0) master_repl_offset] [status $R(1) master_repl_offset]
assert_equal [status $R(0) master_repl_offset] [status $R(2) master_repl_offset]
}
}}}

View File

@ -1,3 +1,79 @@
proc show_cluster_status {} {
uplevel 1 {
# The following is the regexp we use to match the log line
# time info. Logs are in the following form:
#
# 11296:M 25 May 2020 17:37:14.652 # Server initialized
set log_regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*}
set repl_regexp {(master|repl|sync|backlog|meaningful|offset)}
puts "Master ID is $master_id"
for {set j 0} {$j < 5} {incr j} {
puts "$j: sync_full: [status $R($j) sync_full]"
puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
puts "$j: x var is : [$R($j) GET x]"
puts "---"
}
# Show the replication logs of every instance, interleaving
# them by the log date.
#
# First: load the lines as lists for each instance.
array set log {}
for {set j 0} {$j < 5} {incr j} {
set fd [open $R_log($j)]
set found 0
set tries 0
while {([gets $fd l] >= 0 || !$found) && $tries < 1000} {
if {[regexp $log_regexp $l] &&
[regexp -nocase $repl_regexp $l]} {
lappend log($j) $l
set found 1
}
incr $tries
}
close $fd
}
# To interleave the lines, at every step consume the element of
# the list with the lowest time and remove it. Do it until
# all the lists are empty.
#
# regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*} $l - logdate
while 1 {
# Find the log with smallest time.
set empty 0
set best 0
set bestdate {}
for {set j 0} {$j < 5} {incr j} {
if {[llength $log($j)] == 0} {
incr empty
continue
}
regexp $log_regexp [lindex $log($j) 0] - date
if {$bestdate eq {}} {
set best $j
set bestdate $date
} else {
if {[string compare $bestdate $date] > 0} {
set best $j
set bestdate $date
}
}
}
if {$empty == 5} break ; # Our exit condition: no more logs
# Emit the one with the smallest time (that is the first
# event in the time line).
puts "\[$best port $R_port($best)\] [lindex $log($best) 0]"
set log($best) [lrange $log($best) 1 end]
}
}
}
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
@ -12,7 +88,7 @@ start_server {} {
set no_exit 0 ; # Do not exit at end of the test
set duration 20 ; # Total test seconds
set duration 40 ; # Total test seconds
set genload 1 ; # Load master with writes at every cycle
@ -28,6 +104,7 @@ start_server {} {
set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port]
set R_log($j) [srv [expr 0-$j] stdout]
if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"}
}
@ -74,6 +151,7 @@ start_server {} {
[status $R([expr {($master_id+3)%5}]) master_link_status] == "up" &&
[status $R([expr {($master_id+4)%5}]) master_link_status] == "up"
} else {
show_cluster_status
fail "Replica not reconnecting"
}
@ -85,6 +163,7 @@ start_server {} {
wait_for_condition 50 2000 {
[$R($j) get x] == $counter_value
} else {
show_cluster_status
fail "Instance #$j x variable is inconsistent"
}
}
@ -120,44 +199,27 @@ start_server {} {
wait_for_condition 50 2000 {
[$R($j) get x] == $counter_value
} else {
show_cluster_status
fail "Instance #$j x variable is inconsistent"
}
}
}
# wait for all the slaves to be in sync with the master, due to pings, we have to re-sample the master constantly too
# wait for all the slaves to be in sync.
set masteroff [status $R($master_id) master_repl_offset]
wait_for_condition 500 100 {
[status $R($master_id) master_repl_offset] == [status $R(0) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(1) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(2) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(3) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(4) master_repl_offset]
[status $R(0) master_repl_offset] >= $masteroff &&
[status $R(1) master_repl_offset] >= $masteroff &&
[status $R(2) master_repl_offset] >= $masteroff &&
[status $R(3) master_repl_offset] >= $masteroff &&
[status $R(4) master_repl_offset] >= $masteroff
} else {
for {set j 0} {$j < 5} {incr j} {
puts "$j: sync_full: [status $R($j) sync_full]"
puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
puts "---"
show_cluster_status
fail "Replicas offsets didn't catch up with the master after too long time."
}
fail "Slaves are not in sync with the master after too long time."
}
# Put down the old master so that it cannot generate more
# replication stream, this way in the next master switch, the time at
# which we move slaves away is not important, each will have full
# history (otherwise PINGs will make certain slaves have more history),
# and sometimes a full resync will be needed.
$R($master_id) slaveof 127.0.0.1 0 ;# We use port zero to make it fail.
if {$debug_msg} {
for {set j 0} {$j < 5} {incr j} {
puts "$j: sync_full: [status $R($j) sync_full]"
puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
puts "---"
}
show_cluster_status
}
test "PSYNC2: total sum of full synchronizations is exactly 4" {
@ -165,8 +227,25 @@ start_server {} {
for {set j 0} {$j < 5} {incr j} {
incr sum [status $R($j) sync_full]
}
if {$sum != 4} {
show_cluster_status
assert {$sum == 4}
}
}
# In absence of pings, are the instances really able to have
# the exact same offset?
$R($master_id) config set repl-ping-replica-period 3600
wait_for_condition 500 100 {
[status $R($master_id) master_repl_offset] == [status $R(0) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(1) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(2) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(3) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(4) master_repl_offset]
} else {
show_cluster_status
fail "Replicas and master offsets were unable to match *exactly*."
}
# Limit anyway the maximum number of cycles. This is useful when the
# test is skipped via --only option of the test suite. In that case
@ -192,6 +271,7 @@ start_server {} {
[status $R([expr {($master_id+3)%5}]) master_link_status] == "up" &&
[status $R([expr {($master_id+4)%5}]) master_link_status] == "up"
} else {
show_cluster_status
fail "Replica not reconnecting"
}
}
@ -215,6 +295,7 @@ start_server {} {
puts "prev sync_partial_ok: $sync_partial"
puts "prev sync_partial_err: $sync_partial_err"
puts [$R($master_id) info stats]
show_cluster_status
fail "Replica didn't partial sync"
}
set new_sync_count [status $R($master_id) sync_full]
@ -236,6 +317,7 @@ start_server {} {
wait_for_condition 50 2000 {
[$R($master_id) debug digest] == [$R($slave_id) debug digest]
} else {
show_cluster_status
fail "Replica not reconnecting"
}
@ -270,6 +352,7 @@ start_server {} {
wait_for_condition 50 2000 {
[status $R($master_id) connected_slaves] == 4
} else {
show_cluster_status
fail "Replica not reconnecting"
}
set new_sync_count [status $R($master_id) sync_full]
@ -280,6 +363,7 @@ start_server {} {
wait_for_condition 50 2000 {
[$R($master_id) debug digest] == [$R($slave_id) debug digest]
} else {
show_cluster_status
fail "Debug digest mismatch between master and replica in post-restart handshake"
}
}
@ -289,103 +373,3 @@ start_server {} {
}
}}}}}
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
start_server {} {
start_server {} {
test {pings at the end of replication stream are ignored for psync} {
set master [srv -4 client]
set master_host [srv -4 host]
set master_port [srv -4 port]
set replica1 [srv -3 client]
set replica2 [srv -2 client]
set replica3 [srv -1 client]
set replica4 [srv -0 client]
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
$replica3 replicaof $master_host $master_port
$replica4 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[status $master connected_slaves] == 4
} else {
fail "replicas didn't connect"
}
$master incr x
wait_for_condition 50 1000 {
[$replica1 get x] == 1 && [$replica2 get x] == 1 &&
[$replica3 get x] == 1 && [$replica4 get x] == 1
} else {
fail "replicas didn't get incr"
}
# disconnect replica1 and replica2
# and wait for the master to send a ping to replica3 and replica4
$replica1 replicaof no one
$replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id
$master config set repl-ping-replica-period 1
after 1500
# make everyone sync from the replica1 that didn't get the last ping from the old master
# replica4 will keep syncing from the old master which now syncs from replica1
# and replica2 will re-connect to the old master (which went back in time)
set new_master_host [srv -3 host]
set new_master_port [srv -3 port]
$replica3 replicaof $new_master_host $new_master_port
$master replicaof $new_master_host $new_master_port
$replica2 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[status $replica2 master_link_status] == "up" &&
[status $replica3 master_link_status] == "up" &&
[status $replica4 master_link_status] == "up" &&
[status $master master_link_status] == "up"
} else {
fail "replicas didn't connect"
}
# make sure replication is still alive and kicking
$replica1 incr x
wait_for_condition 50 1000 {
[$replica2 get x] == 2 &&
[$replica3 get x] == 2 &&
[$replica4 get x] == 2 &&
[$master get x] == 2
} else {
fail "replicas didn't get incr"
}
# make sure there are full syncs other than the initial ones
assert_equal [status $master sync_full] 4
assert_equal [status $replica1 sync_full] 0
assert_equal [status $replica2 sync_full] 0
assert_equal [status $replica3 sync_full] 0
assert_equal [status $replica4 sync_full] 0
# force psync
$master client kill type master
$replica2 client kill type master
$replica3 client kill type master
$replica4 client kill type master
# make sure replication is still alive and kicking
$replica1 incr x
wait_for_condition 50 1000 {
[$replica2 get x] == 3 &&
[$replica3 get x] == 3 &&
[$replica4 get x] == 3 &&
[$master get x] == 3
} else {
fail "replicas didn't get incr"
}
# make sure there are full syncs other than the initial ones
assert_equal [status $master sync_full] 4
assert_equal [status $replica1 sync_full] 0
assert_equal [status $replica2 sync_full] 0
assert_equal [status $replica3 sync_full] 0
assert_equal [status $replica4 sync_full] 0
}
}}}}}

View File

@ -36,6 +36,7 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
test {Active replicas propogate} {
$master set testkey foo
after 500
wait_for_condition 50 500 {
[string match *foo* [$slave get testkey]]
} else {

View File

@ -31,13 +31,21 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
$R(3) replicaof $R_host(2) $R_port(2)
}
after 2000
test "$topology all nodes up" {
for {set j 0} {$j < 4} {incr j} {
wait_for_condition 50 100 {
[string match {*master_global_link_status:up*} [$R($j) info replication]]
} else {
fail "Multimaster group didn't connect up in a reasonable period of time"
}
}
}
test "$topology replicates to all nodes" {
$R(0) set testkey foo
after 500
for {set n 0} {$n < 4} {incr n} {
wait_for_condition 50 1000 {
wait_for_condition 50 100 {
[$R($n) get testkey] == "foo"
} else {
fail "Failed to replicate to $n"
@ -48,12 +56,17 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
test "$topology replicates only once" {
$R(0) set testkey 1
after 500
#wait_for_condition 50 100 {
# [$R(1) get testkey] == 1 && [$R(2) get testkey] == 1
#} else {
# fail "Set failed to replicate"
#}
$R(1) incr testkey
after 500
$R(2) incr testkey
after 500
for {set n 0} {$n < 4} {incr n} {
wait_for_condition 50 1000 {
wait_for_condition 100 100 {
[$R($n) get testkey] == 3
} else {
fail "node $n did not replicate"
@ -69,7 +82,7 @@ start_server {overrides {hz 500 active-replica yes multi-master yes}} {
$R(0) incr testkey
$R(0) exec
for {set n 0} {$n < 4} {incr n} {
wait_for_condition 50 1000 {
wait_for_condition 50 100 {
[$R($n) get testkey] == 3
} else {
fail "node $n failed to replicate"

View File

@ -1,10 +1,3 @@
proc log_file_matches {log pattern} {
set fp [open $log r]
set content [read $fp]
close $fp
string match $pattern $content
}
start_server {tags {"repl"} overrides {hz 100}} {
set slave [srv 0 client]
set slave_host [srv 0 host]
@ -612,7 +605,7 @@ start_server {tags {"repl"}} {
# Wait that replicas acknowledge they are online so
# we are sure that DBSIZE and DEBUG DIGEST will not
# fail because of timing issues.
wait_for_condition 50 100 {
wait_for_condition 150 100 {
[lindex [$replica role] 3] eq {connected}
} else {
fail "replicas still not connected after some time"
@ -637,3 +630,55 @@ start_server {tags {"repl"}} {
}
}
}
test {replicaof right after disconnection} {
# this is a rare race condition that was reproduced sporadically by the psync2 unit.
# see details in #7205
start_server {tags {"repl"}} {
set replica1 [srv 0 client]
set replica1_host [srv 0 host]
set replica1_port [srv 0 port]
set replica1_log [srv 0 stdout]
start_server {} {
set replica2 [srv 0 client]
set replica2_host [srv 0 host]
set replica2_port [srv 0 port]
set replica2_log [srv 0 stdout]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
wait_for_condition 50 100 {
[string match {*master_link_status:up*} [$replica1 info replication]] &&
[string match {*master_link_status:up*} [$replica2 info replication]]
} else {
fail "Can't turn the instance into a replica"
}
set rd [redis_deferring_client -1]
$rd debug sleep 1
after 100
# when replica2 will wake up from the sleep it will find both disconnection
# from it's master and also a replicaof command at the same event loop
$master client kill type replica
$replica2 replicaof $replica1_host $replica1_port
$rd read
wait_for_condition 50 100 {
[string match {*master_link_status:up*} [$replica2 info replication]]
} else {
fail "role change failed."
}
# make sure psync succeeded, and there were no unexpected full syncs.
assert_equal [status $master sync_full] 2
assert_equal [status $replica1 sync_full] 0
assert_equal [status $replica2 sync_full] 0
}
}
}
}

View File

@ -28,11 +28,14 @@ TEST_MODULES = \
all: $(TEST_MODULES)
32bit:
$(MAKE) CFLAGS="-m32" LDFLAGS="-melf_i386"
%.xo: %.c ../../src/redismodule.h
$(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
%.so: %.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LDFLAGS) $(LIBS) -lc
.PHONY: clean

View File

@ -286,8 +286,29 @@ proc ::redis_cluster::crc16 {s} {
# Hash a single key returning the slot it belongs to, Implemented hash
# tags as described in the Redis Cluster specification.
proc ::redis_cluster::hash {key} {
# TODO: Handle hash slots.
expr {[::redis_cluster::crc16 $key] & 16383}
set keylen [string length $key]
set s {}
set e {}
for {set s 0} {$s < $keylen} {incr s} {
if {[string index $key $s] eq "\{"} break
}
if {[expr {$s == $keylen}]} {
set res [expr {[crc16 $key] & 16383}]
return $res
}
for {set e [expr {$s+1}]} {$e < $keylen} {incr e} {
if {[string index $key $e] == "\}"} break
}
if {$e == $keylen || $e == [expr {$s+1}]} {
set res [expr {[crc16 $key] & 16383}]
return $res
}
set key_sub [string range $key [expr {$s+1}] [expr {$e-1}]]
return [expr {[crc16 $key_sub] & 16383}]
}
# Return the slot the specified keys hash to.

View File

@ -214,14 +214,14 @@ proc start_server {options {code undefined}} {
dict set config dir [tmpdir server]
# start every server on a different port
set ::port [find_available_port [expr {$::port+1}]]
set port [find_available_port $::baseport $::portcount]
if {$::tls} {
dict set config "port" 0
dict set config "tls-port" $::port
dict set config "tls-port" $port
dict set config "tls-cluster" "yes"
dict set config "tls-replication" "yes"
} else {
dict set config port $::port
dict set config port $port
}
set unixsocket [file normalize [format "%s/%s" [dict get $config "dir"] "socket"]]
@ -246,10 +246,10 @@ proc start_server {options {code undefined}} {
set server_started 0
while {$server_started == 0} {
if {$::verbose} {
puts -nonewline "=== ($tags) Starting server ${::host}:${::port} "
puts -nonewline "=== ($tags) Starting server ${::host}:${port} "
}
send_data_packet $::test_server_fd "server-spawning" "port $::port"
send_data_packet $::test_server_fd "server-spawning" "port $port"
if {$::valgrind} {
set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/keydb-pro-server $config_file > $stdout 2> $stderr &]
@ -294,19 +294,19 @@ proc start_server {options {code undefined}} {
# for availability. Other test clients may grab the port before we
# are able to do it for example.
if {$port_busy} {
puts "Port $::port was already busy, trying another port..."
set ::port [find_available_port [expr {$::port+1}]]
puts "Port $port was already busy, trying another port..."
set port [find_available_port $::baseport $::portcount]
if {$::tls} {
dict set config "tls-port" $::port
dict set config "tls-port" $port
} else {
dict set config port $::port
dict set config port $port
}
create_server_config_file $config_file $config
continue; # Try again
}
if {$code ne "undefined"} {
set serverisup [server_is_up $::host $::port $retrynum]
set serverisup [server_is_up $::host $port $retrynum]
} else {
set serverisup 1
}
@ -327,7 +327,6 @@ proc start_server {options {code undefined}} {
# setup properties to be able to initialize a client object
set port_param [expr $::tls ? {"tls-port"} : {"port"}]
set host $::host
set port $::port
if {[dict exists $config bind]} { set host [dict get $config bind] }
if {[dict exists $config $port_param]} { set port [dict get $config $port_param] }

View File

@ -1,3 +1,10 @@
proc log_file_matches {log pattern} {
set fp [open $log r]
set content [read $fp]
close $fp
string match $pattern $content
}
proc randstring {min max {type binary}} {
set len [expr {$min+int(rand()*($max-$min+1))}]
set output {}
@ -344,21 +351,26 @@ proc roundFloat f {
format "%.10g" $f
}
proc find_available_port start {
for {set j $start} {$j < $start+1024} {incr j} {
if {[catch {set fd1 [socket 127.0.0.1 $j]}] &&
[catch {set fd2 [socket 127.0.0.1 [expr $j+10000]]}]} {
return $j
set ::last_port_attempted 0
proc find_available_port {start count} {
set port [expr $::last_port_attempted + 1]
for {set attempts 0} {$attempts < $count} {incr attempts} {
if {$port < $start || $port >= $start+$count} {
set port $start
}
if {[catch {set fd1 [socket 127.0.0.1 $port]}] &&
[catch {set fd2 [socket 127.0.0.1 [expr $port+10000]]}]} {
set ::last_port_attempted $port
return $port
} else {
catch {
close $fd1
close $fd2
}
}
incr port
}
if {$j == $start+1024} {
error "Can't find a non busy port in the $start-[expr {$start+1023}] range."
}
error "Can't find a non busy port in the $start-[expr {$start+$count-1}] range."
}
# Test if TERM looks like to support colors

View File

@ -77,7 +77,9 @@ set ::all_tests {
set ::next_test 0
set ::host 127.0.0.1
set ::port 21111
set ::port 6379; # port for external server
set ::baseport 21111; # initial port for spawned redis servers
set ::portcount 8000; # we don't wanna use more than 10000 to avoid collision with cluster bus ports
set ::traceleaks 0
set ::valgrind 0
set ::tls 0
@ -235,26 +237,26 @@ proc test_server_main {} {
set tclsh [info nameofexecutable]
# Open a listening socket, trying different ports in order to find a
# non busy one.
set port [find_available_port 11111]
set clientport [find_available_port 11111 32]
if {!$::quiet} {
puts "Starting test server at port $port"
puts "Starting test server at port $clientport"
}
socket -server accept_test_clients -myaddr 127.0.0.1 $port
socket -server accept_test_clients -myaddr 127.0.0.1 $clientport
# Start the client instances
set ::clients_pids {}
if {$::external} {
set p [exec $tclsh [info script] {*}$::argv \
--client $port --port $::port &]
--client $clientport &]
lappend ::clients_pids $p
} else {
set start_port [expr {$::port+100}]
set start_port $::baseport
set port_count [expr {$::portcount / $::numclients}]
for {set j 0} {$j < $::numclients} {incr j} {
set start_port [find_available_port $start_port]
set p [exec $tclsh [info script] {*}$::argv \
--client $port --port $start_port &]
--client $clientport --baseport $start_port --portcount $port_count &]
lappend ::clients_pids $p
incr start_port 10
incr start_port $port_count
}
}
@ -427,6 +429,7 @@ proc signal_idle_client fd {
lappend ::active_clients $fd
incr ::next_test
if {$::loop && $::next_test == [llength $::all_tests]} {
incr ::loop -1
set ::next_test 0
}
} elseif {[llength $::run_solo_tests] != 0 && [llength $::active_clients] == 0} {
@ -517,6 +520,10 @@ proc print_help_screen {} {
"--loop Execute the specified set of tests forever."
"--wait-server Wait after server is started (so that you can attach a debugger)."
"--tls Run tests in TLS mode."
"--host <addr> Run tests against an external host."
"--port <port> TCP port to use against external host."
"--baseport <port> Initial port number for spawned redis servers."
"--portcount <num> Port range for spawned redis servers."
"--help Print this help screen."
} "\n"]
}
@ -567,6 +574,12 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
} elseif {$opt eq {--port}} {
set ::port $arg
incr j
} elseif {$opt eq {--baseport}} {
set ::baseport $arg
incr j
} elseif {$opt eq {--portcount}} {
set ::portcount $arg
incr j
} elseif {$opt eq {--accurate}} {
set ::accurate 1
} elseif {$opt eq {--force-failure}} {
@ -601,7 +614,10 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
} elseif {$opt eq {--stop}} {
set ::stop_on_failure 1
} elseif {$opt eq {--loop}} {
set ::loop 1
set ::loop -1
} elseif {$opt eq {--loopn}} {
set ::loop [expr $arg - 1]
incr j
} elseif {$opt eq {--timeout}} {
set ::timeout $arg
incr j

View File

@ -272,6 +272,23 @@ start_server {tags {"expire"}} {
r expiremember testkey foo 10000
r save
r debug reload
if {[log_file_matches [srv 0 stdout] "*Corrupt subexpire*"]} {
fail "Server reported corrupt subexpire"
}
assert [expr [r ttl testkey foo] > 0]
}
test {Load subkey for an expired key works} {
# Note test inherits keys from previous tests, we want more traffic in the RDB
r multi
r sadd testset val1
r expiremember testset val1 300
r pexpire testset 1
r debug reload
r exec
set logerr [log_file_matches [srv 0 stdout] "*Corrupt subexpire*"]
if {$logerr} {
fail "Server reported corrupt subexpire"
}
}
}

View File

@ -1,3 +1,4 @@
run_solo {maxmemory} {
start_server {tags {"maxmemory"}} {
test "Without maxmemory small integers are shared" {
r config set maxmemory 0
@ -144,7 +145,11 @@ start_server {tags {"maxmemory"}} {
}
proc test_slave_buffers {test_name cmd_count payload_len limit_memory pipeline} {
start_server {tags {"maxmemory"}} {
# This is a single thread only test because there is a race in getMaxMemoryState
# between zmalloc_used_memory and getting the client outbut buffer memory usage.
# The cure would be worse than the disease, we do not want lock every replica
# simultaneously just to get more accurate memory usage.
start_server {tags {"maxmemory"} overrides { server-threads 1 }} {
start_server {} {
set slave_pid [s process_id]
test "$test_name" {
@ -240,3 +245,4 @@ test_slave_buffers {slave buffer are counted correctly} 1000000 10 0 1
# test that slave buffer don't induce eviction
# test again with fewer (and bigger) commands without pipeline, but with eviction
test_slave_buffers "replica buffer don't induce eviction" 100000 100 1 0
} ;# run_solo

View File

@ -37,7 +37,7 @@ start_server {tags {"memefficiency"}} {
}
run_solo {defrag} {
start_server {tags {"defrag"}} {
start_server {tags {"defrag"} overrides {server-threads 1} } {
if {[string match {*jemalloc*} [s mem_allocator]]} {
test "Active defrag" {
r config set save "" ;# prevent bgsave from interfereing with save below
@ -95,6 +95,10 @@ start_server {tags {"defrag"}} {
}
if {$::verbose} {
puts "frag $frag"
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]
puts "hits: $hits"
puts "misses: $misses"
puts "max latency $max_latency"
puts [r latency latest]
puts [r latency history active-defrag-cycle]
@ -221,6 +225,10 @@ start_server {tags {"defrag"}} {
}
if {$::verbose} {
puts "frag $frag"
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]
puts "hits: $hits"
puts "misses: $misses"
puts "max latency $max_latency"
puts [r latency latest]
puts [r latency history active-defrag-cycle]
@ -256,11 +264,12 @@ start_server {tags {"defrag"}} {
set expected_frag 1.7
# add a mass of list nodes to two lists (allocations are interlaced)
set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation
for {set j 0} {$j < 500000} {incr j} {
set elements 500000
for {set j 0} {$j < $elements} {incr j} {
$rd lpush biglist1 $val
$rd lpush biglist2 $val
}
for {set j 0} {$j < 500000} {incr j} {
for {set j 0} {$j < $elements} {incr j} {
$rd read ; # Discard replies
$rd read ; # Discard replies
}
@ -302,6 +311,8 @@ start_server {tags {"defrag"}} {
# test the the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]
set frag [s allocator_frag_ratio]
set max_latency 0
foreach event [r latency latest] {
@ -312,6 +323,8 @@ start_server {tags {"defrag"}} {
}
if {$::verbose} {
puts "frag $frag"
puts "misses: $misses"
puts "hits: $hits"
puts "max latency $max_latency"
puts [r latency latest]
puts [r latency history active-defrag-cycle]
@ -320,6 +333,10 @@ start_server {tags {"defrag"}} {
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
# we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
assert {$max_latency <= 30}
# in extreme cases of stagnation, we see over 20m misses before the tests aborts with "defrag didn't stop",
# in normal cases we only see 100k misses out of 500k elements
assert {$misses < $elements}
}
# verify the data isn't corrupted or changed
set newdigest [r debug digest]
@ -327,6 +344,110 @@ start_server {tags {"defrag"}} {
r save ;# saving an rdb iterates over all the data / pointers
r del biglist1 ;# coverage for quicklistBookmarksClear
} {1}
test "Active defrag edge case" {
# there was an edge case in defrag where all the slabs of a certain bin are exact the same
# % utilization, with the exception of the current slab from which new allocations are made
# if the current slab is lower in utilization the defragger would have ended up in stagnation,
# keept running and not move any allocation.
# this test is more consistent on a fresh server with no history
start_server {tags {"defrag"} overrides {server-threads 1}} {
r flushdb
r config resetstat
r config set save "" ;# prevent bgsave from interfereing with save below
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
r config set active-defrag-cycle-max 75
r config set active-defrag-ignore-bytes 1mb
r config set maxmemory 0
set expected_frag 1.3
r debug mallctl-str thread.tcache.flush VOID
# fill the first slab containin 32 regs of 640 bytes.
for {set j 0} {$j < 32} {incr j} {
r setrange "_$j" 600 x
r debug mallctl-str thread.tcache.flush VOID
}
# add a mass of keys with 600 bytes values, fill the bin of 640 bytes which has 32 regs per slab.
set rd [redis_deferring_client]
set keys 640000
for {set j 0} {$j < $keys} {incr j} {
$rd setrange $j 600 x
}
for {set j 0} {$j < $keys} {incr j} {
$rd read ; # Discard replies
}
# create some fragmentation of 50%
set sent 0
for {set j 0} {$j < $keys} {incr j 1} {
$rd del $j
incr sent
incr j 1
}
for {set j 0} {$j < $sent} {incr j} {
$rd read ; # Discard replies
}
# create higher fragmentation in the first slab
for {set j 10} {$j < 32} {incr j} {
r del "_$j"
}
# start defrag
after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
if {$::verbose} {
puts "frag $frag"
}
assert {$frag >= $expected_frag}
set digest [r debug digest]
catch {r config set activedefrag yes} e
if {![string match {DISABLED*} $e]} {
# wait for the active defrag to start working (decision once a second)
wait_for_condition 50 100 {
[s active_defrag_running] ne 0
} else {
fail "defrag not started."
}
# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
} else {
after 120 ;# serverCron only updates the info once in 100ms
puts [r info memory]
puts [r info stats]
puts [r memory malloc-stats]
fail "defrag didn't stop."
}
# test the the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]
set frag [s allocator_frag_ratio]
if {$::verbose} {
puts "frag $frag"
puts "hits: $hits"
puts "misses: $misses"
}
assert {$frag < 1.1}
assert {$misses < 10000000} ;# when defrag doesn't stop, we have some 30m misses, when it does, we have 2m misses
}
# verify the data isn't corrupted or changed
set newdigest [r debug digest]
assert {$digest eq $newdigest}
r save ;# saving an rdb iterates over all the data / pointers
}
}
}
}
} ;# run_solo

View File

@ -174,9 +174,9 @@ start_server {tags {"other"}} {
tags {protocol} {
test {PIPELINING stresser (also a regression for the old epoll bug)} {
if {$::tls} {
set fd2 [::tls::socket $::host $::port]
set fd2 [::tls::socket [srv host] [srv port]]
} else {
set fd2 [socket $::host $::port]
set fd2 [socket [srv host] [srv port]]
}
fconfigure $fd2 -encoding binary -translation binary
puts -nonewline $fd2 "SELECT 9\r\n"

View File

@ -107,6 +107,8 @@ start_server {tags {"pubsub"}} {
set rd1 [redis_deferring_client]
assert_equal {1 2 3} [subscribe $rd1 {chan1 chan2 chan3}]
unsubscribe $rd1
# Wait for a response to the unsub
__consume_subscribe_messages $rd1 unsubscribe {chan1 chan2 chan3}
assert_equal 0 [r publish chan1 hello]
assert_equal 0 [r publish chan2 hello]
assert_equal 0 [r publish chan3 hello]
@ -180,6 +182,8 @@ start_server {tags {"pubsub"}} {
set rd1 [redis_deferring_client]
assert_equal {1 2 3} [psubscribe $rd1 {chan1.* chan2.* chan3.*}]
punsubscribe $rd1
# Wait for a response to the unsub
__consume_subscribe_messages $rd1 punsubscribe {chan1.* chan2.* chan3.*}
assert_equal 0 [r publish chan1.hi hello]
assert_equal 0 [r publish chan2.hi hello]
assert_equal 0 [r publish chan3.hi hello]

View File

@ -1,7 +1,10 @@
# Setting server-threads to 2 is really single threaded because test mode is enabled (no client allocated to thread 1)
# We do this because of the large numbers of nonblocking clients in this tests and the client races that causes
start_server {
tags {"list"}
overrides {
"list-max-ziplist-size" 5
"server-threads 2"
}
} {
source "tests/unit/type/list-common.tcl"