Merge tag '6.0.4' into unstable

Redis 6.0.4.


Former-commit-id: 9c31ac7925edba187e527f506e5e992946bd38a6
This commit is contained in:
John Sully 2020-05-29 00:57:07 -04:00
commit ed2e0e66f6
33 changed files with 1171 additions and 382 deletions

View File

@ -3,6 +3,7 @@ name: CI
on: [push, pull_request]
jobs:
test-ubuntu-latest:
runs-on: ubuntu-latest
steps:
@ -59,3 +60,20 @@ jobs:
- uses: actions/checkout@v1
- name: make
run: make -j2
biuld-32bit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: make
run: |
sudo apt-get update && sudo apt-get install libc6-dev-i386
make 32bit
build-libc-malloc:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: make
run: make MALLOC=libc

View File

@ -5,6 +5,7 @@ on:
- cron: '0 7 * * *'
jobs:
test-jemalloc:
runs-on: ubuntu-latest
timeout-minutes: 1200
@ -37,6 +38,38 @@ jobs:
- name: module api test
run: ./runtest-moduleapi --verbose
test-32bit:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: make
run: |
sudo apt-get update && sudo apt-get install libc6-dev-i386
make 32bit
- name: test
run: |
sudo apt-get install tcl8.5
./runtest --accurate --verbose
- name: module api test
run: |
make -C tests/modules 32bit # the script below doesn't have an argument, we must build manually ahead of time
./runtest-moduleapi --verbose
test-tls:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v1
- name: make
run: |
make BUILD_TLS=yes
- name: test
run: |
sudo apt-get install tcl8.5 tcl-tls
./utils/gen-test-certs.sh
./runtest --accurate --verbose --tls
- name: module api test
run: ./runtest-moduleapi --verbose --tls
test-valgrind:
runs-on: ubuntu-latest
timeout-minutes: 14400

View File

@ -11,6 +11,283 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
SECURITY: There are security fixes in the release.
--------------------------------------------------------------------------------
================================================================================
Redis 6.0.4 Released Thu May 28 11:36:45 CEST 2020
================================================================================
Upgrade urgency CRITICAL: this release fixes a severe replication bug.
Redis 6.0.4 fixes a critical replication bug caused by a new feature introduced
in Redis 6. The feature, called "meaningful offset" and strongly wanted by
myself (antirez) was an improvement that avoided that masters were no longer
able, during a failover where they were demoted to replicas, to partially
synchronize with the new master. In short the feature was able to avoid full
synchronizations with RDB. How did it work? By trimming the replication backlog
of the final "PING" commands the master was sending in the replication channel:
this way the replication offset would no longer go "after" the one of the
promoted replica, allowing the master to just continue in the same replication
history, receiving only a small data difference.
However after the introduction of the feature we (the Redis core team) quickly
understood there was something wrong: the apparently harmless feature had
many bugs, and the last bug we discovered, after a joined effort of multiple
people, we were not even able to fully understand after fixing it. Enough was
enough, we decided that the complexity cost of this feature was too high.
So Redis 6.0.4 removes the feature entirely, and fixes the data corruption that
it was able to cause.
However there are two facts to take in mind.
Fact 1: Setups using chained replication, that means that certain replicas
are replicating from other replicas, up to Redis 6.0.3 can experience data
corruption. For chained replication we mean that:
+--------+ +---------+ +-------------+
| master |--------->| replica |-------->| sub-replica |
+--------+ +---------+ +-------------+
People using chained replication SHOULD UPGRADE ASAP away from Redis 6.0.0,
6.0.1, 6.0.2 or 6.0.3 to Redis 6.0.4.
To be clear, people NOT using this setup, but having just replicas attached
directly to the master, SHOUDL NOT BE in danger of any problem. But we
are no longer confident on 6.0.x replication implementation complexities
so we suggest to upgrade to 6.0.4 to everybody using an older 6.0.3 release.
We just so far didn't find any bug that affects Redis 6.0.3 that does not
involve chained replication.
People starting with Redis 6.0.4 are fine. People with Redis 5 are fine.
People upgrading from Redis 5 to Redis 6.0.4 are fine.
TLDR: The problem is with users of 6.0.0, 6.0.1, 6.0.2, 6.0.3.
Fact 2: Upgrading from Redis 6.0.x to Redis 6.0.4, IF AND ONLY IF you
use chained replication, requires some extra care:
1. Once you attach your new Redis 6.0.4 instance as a replica of the current
Redis 6.0.x master, you should wait for the first full synchronization,
then you should promote it right away, if your setup involves chained
replication. Don't give it the time to do a new partial synchronization
in the case the link between the master and the replica will break in
the mean time.
2. As an additional care, you may want to set the replication ping period
to a very large value (for instance 1000000) using the following command:
CONFIG SET repl-ping-replica-period 1000000
Note that if you do "1" with care, "2" is not needed.
However if you do it, make sure to later restore it to its default:
CONFIG SET repl-ping-replica-period 10
So this is the main change in Redis 6. Later we'll find a different way in
order to achieve what we wanted to achieve with the Meaningful Offset feature,
but without the same complexity.
Other changes in this release:
* PSYNC2 tests improved.
* Fix a rare active defrag edge case bug leading to stagnation
* Fix Redis 6 asserting at startup in 32 bit systems.
* Redis 6 32 bit is now added back to our testing environments.
* Fix server crash for STRALGO command,
* Implement sendfile for RDB transfer.
* TLS fixes.
* Make replication more resistant by disconnecting the master if we
detect a protocol error. Basically we no longer accept inline protocol
from the master.
* Other improvements in the tests.
Regards,
antirez
This is the full list of commits:
antirez in commit 59cd4c9f6:
Test: take PSYNC2 test master timeout high during switch.
1 file changed, 1 deletion(-)
antirez in commit 6c1bb7b19:
Test: add the tracking unit as default.
1 file changed, 1 insertion(+)
Oran Agra in commit 1aee695e5:
tests: find_available_port start search from next port
1 file changed, 12 insertions(+), 7 deletions(-)
Oran Agra in commit a2ae46352:
tests: each test client work on a distinct port range
5 files changed, 39 insertions(+), 27 deletions(-)
Oran Agra in commit 86e562d69:
32bit CI needs to build modules correctly
2 files changed, 7 insertions(+), 2 deletions(-)
Oran Agra in commit ab2984b1e:
adjust revived meaningful offset tests
1 file changed, 39 insertions(+), 20 deletions(-)
Oran Agra in commit 1ff5a222d:
revive meaningful offset tests
2 files changed, 213 insertions(+)
antirez in commit cc549b46a:
Replication: showLatestBacklog() refactored out.
3 files changed, 36 insertions(+), 25 deletions(-)
antirez in commit 377dd0515:
Drop useless line from replicationCacheMaster().
1 file changed, 2 deletions(-)
antirez in commit 3f8d113f1:
Another meaningful offset test removed.
1 file changed, 100 deletions(-)
antirez in commit d4541349d:
Remove the PSYNC2 meaningful offset test.
2 files changed, 113 deletions(-)
antirez in commit 2112a5702:
Remove the meaningful offset feature.
4 files changed, 10 insertions(+), 93 deletions(-)
antirez in commit d2eb6e0b4:
Set a protocol error if master use the inline protocol.
1 file changed, 17 insertions(+), 2 deletions(-)
Oran Agra in commit 9c1df3b76:
daily CI test with tls
1 file changed, 15 insertions(+)
Oran Agra in commit 115ed1911:
avoid using sendfile if tls-replication is enabled
1 file changed, 34 insertions(+), 27 deletions(-)
antirez in commit 11c748aac:
Replication: log backlog creation event.
1 file changed, 3 insertions(+)
antirez in commit 8f1013722:
Test: PSYNC2 test can now show server logs.
1 file changed, 88 insertions(+), 25 deletions(-)
antirez in commit 2e591fc4a:
Clarify what is happening in PR #7320.
1 file changed, 5 insertions(+), 1 deletion(-)
zhaozhao.zz in commit cbb51fb8f:
PSYNC2: second_replid_offset should be real meaningful offset
1 file changed, 3 insertions(+), 3 deletions(-)
Oran Agra in commit e0fc88b4d:
add CI for 32bit build
2 files changed, 34 insertions(+)
antirez in commit e3f864b5f:
Make disconnectSlaves() synchronous in the base case.
3 files changed, 20 insertions(+), 9 deletions(-)
ShooterIT in commit 8af1e513f:
Implements sendfile for redis.
2 files changed, 55 insertions(+), 2 deletions(-)
antirez in commit 3c21418cd:
Fix #7306 less aggressively.
2 files changed, 29 insertions(+), 17 deletions(-)
Madelyn Olson in commit e201f83ce:
EAGAIN for tls during diskless load
1 file changed, 4 insertions(+)
Qu Chen in commit 58fc456cb:
Disconnect chained replicas when the replica performs PSYNC with the master always to avoid replication offset mismatch between master and chained replicas.
2 files changed, 60 insertions(+), 3 deletions(-)
hwware in commit 3febc5c29:
using moreargs variable
1 file changed, 2 insertions(+), 2 deletions(-)
hwware in commit 8d6738559:
fix server crash for STRALGO command
1 file changed, 2 insertions(+), 2 deletions(-)
ShooterIT in commit 7a35eec54:
Replace addDeferredMultiBulkLength with addReplyDeferredLen in comment
1 file changed, 2 insertions(+), 2 deletions(-)
Yossi Gottlieb in commit f93e1417b:
TLS: Improve tls-protocols clarity in redis.conf.
1 file changed, 3 insertions(+), 2 deletions(-)
ShooterIT in commit d0c9e4454:
Fix reply bytes calculation error
1 file changed, 1 insertion(+), 1 deletion(-)
zhaozhao.zz in commit 1cde6a060:
Tracking: flag CLIENT_TRACKING_BROKEN_REDIR when redir broken
1 file changed, 1 insertion(+)
Oran Agra in commit 436be3498:
fix a rare active defrag edge case bug leading to stagnation
4 files changed, 146 insertions(+), 23 deletions(-)
Oran Agra in commit f9d2ffdc5:
improve DEBUG MALLCTL to be able to write to write only fields.
1 file changed, 27 insertions(+), 7 deletions(-)
hujie in commit d7968ee92:
fix clear USER_FLAG_ALLCOMMANDS flag in acl
1 file changed, 5 insertions(+), 4 deletions(-)
ShooterIT in commit a902e6b25:
Redis Benchmark: generate random test data
1 file changed, 12 insertions(+), 1 deletion(-)
hwware in commit 9564ed7c3:
Redis-Benchmark: avoid potentical memmory leaking
1 file changed, 1 insertion(+), 1 deletion(-)
WuYunlong in commit 2e4182743:
Handle keys with hash tag when computing hash slot using tcl cluster client.
1 file changed, 23 insertions(+), 2 deletions(-)
WuYunlong in commit eb2c8b2c6:
Add a test to prove current tcl cluster client can not handle keys with hash tag.
1 file changed, 7 insertions(+), 1 deletion(-)
ShooterIT in commit 928e6976b:
Use dictSize to get the size of dict in dict.c
1 file changed, 2 insertions(+), 2 deletions(-)
Madelyn Olson in commit cdcf5af5a:
Converge hash validation for adding and removing
1 file changed, 21 insertions(+), 14 deletions(-)
Benjamin Sergeant in commit e8b09d220:
do not handle --cluster-yes for cluster fix mode
1 file changed, 16 insertions(+), 7 deletions(-)
Benjamin Sergeant in commit 57b4fb0d8:
fix typo ...
1 file changed, 1 insertion(+), 1 deletion(-)
Benjamin Sergeant in commit 29f25e411:
Redis-cli 6.0.1 `--cluster-yes` doesn't work (fix #7246)
1 file changed, 5 insertions(+), 1 deletion(-)
Oran Agra in commit 00d8b92b8:
fix valgrind test failure in replication test
1 file changed, 1 insertion(+), 1 deletion(-)
Oran Agra in commit 5e17e6276:
add regression test for the race in #7205
1 file changed, 52 insertions(+)
antirez in commit 96e7c011e:
Improve the PSYNC2 test reliability.
1 file changed, 33 insertions(+), 15 deletions(-)
================================================================================
Redis 6.0.3 Released Sat May 16 18:10:21 CEST 2020
================================================================================

View File

@ -216,7 +216,7 @@ ixalloc(tsdn_t *tsdn, void *ptr, size_t oldsize, size_t size, size_t extra,
}
JEMALLOC_ALWAYS_INLINE int
iget_defrag_hint(tsdn_t *tsdn, void* ptr, int *bin_util, int *run_util) {
iget_defrag_hint(tsdn_t *tsdn, void* ptr) {
int defrag = 0;
rtree_ctx_t rtree_ctx_fallback;
rtree_ctx_t *rtree_ctx = tsdn_rtree_ctx(tsdn, &rtree_ctx_fallback);
@ -232,11 +232,22 @@ iget_defrag_hint(tsdn_t *tsdn, void* ptr, int *bin_util, int *run_util) {
malloc_mutex_lock(tsdn, &bin->lock);
/* don't bother moving allocations from the slab currently used for new allocations */
if (slab != bin->slabcur) {
int free_in_slab = extent_nfree_get(slab);
if (free_in_slab) {
const bin_info_t *bin_info = &bin_infos[binind];
size_t availregs = bin_info->nregs * bin->stats.curslabs;
*bin_util = ((long long)bin->stats.curregs<<16) / availregs;
*run_util = ((long long)(bin_info->nregs - extent_nfree_get(slab))<<16) / bin_info->nregs;
defrag = 1;
int curslabs = bin->stats.curslabs;
size_t curregs = bin->stats.curregs;
if (bin->slabcur) {
/* remove slabcur from the overall utilization */
curregs -= bin_info->nregs - extent_nfree_get(bin->slabcur);
curslabs -= 1;
}
/* Compare the utilization ratio of the slab in question to the total average,
* to avoid precision lost and division, we do that by extrapolating the usage
* of the slab as if all slabs have the same usage. If this slab is less used
* than the average, we'll prefer to evict the data to hopefully more used ones */
defrag = (bin_info->nregs - free_in_slab) * curslabs <= curregs;
}
}
malloc_mutex_unlock(tsdn, &bin->lock);
}

View File

@ -3326,12 +3326,10 @@ jemalloc_postfork_child(void) {
/******************************************************************************/
/* Helps the application decide if a pointer is worth re-allocating in order to reduce fragmentation.
* returns 0 if the allocation is in the currently active run,
* or when it is not causing any frag issue (large or huge bin)
* returns the bin utilization and run utilization both in fixed point 16:16.
* returns 1 if the allocation should be moved, and 0 if the allocation be kept.
* If the application decides to re-allocate it should use MALLOCX_TCACHE_NONE when doing so. */
JEMALLOC_EXPORT int JEMALLOC_NOTHROW
get_defrag_hint(void* ptr, int *bin_util, int *run_util) {
get_defrag_hint(void* ptr) {
assert(ptr != NULL);
return iget_defrag_hint(TSDN_NULL, ptr, bin_util, run_util);
return iget_defrag_hint(TSDN_NULL, ptr);
}

View File

@ -176,9 +176,10 @@ tcp-keepalive 300
# tls-cluster yes
# Explicitly specify TLS versions to support. Allowed values are case insensitive
# and include "TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3" (OpenSSL >= 1.1.1)
# and include "TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3" (OpenSSL >= 1.1.1) or
# any combination. To enable only TLSv1.2 and TLSv1.3, use:
#
# tls-protocols TLSv1.2
# tls-protocols "TLSv1.2 TLSv1.3"
# Configure allowed ciphers. See the ciphers(1ssl) manpage for more information
# about the syntax of this string.

View File

@ -169,6 +169,25 @@ sds ACLHashPassword(unsigned char *cleartext, size_t len) {
return sdsnewlen(hex,HASH_PASSWORD_LEN);
}
/* Given a hash and the hash length, returns C_OK if it is a valid password
* hash, or C_ERR otherwise. */
int ACLCheckPasswordHash(unsigned char *hash, int hashlen) {
if (hashlen != HASH_PASSWORD_LEN) {
return C_ERR;
}
/* Password hashes can only be characters that represent
* hexadecimal values, which are numbers and lowercase
* characters 'a' through 'f'. */
for(int i = 0; i < HASH_PASSWORD_LEN; i++) {
char c = hash[i];
if ((c < 'a' || c > 'f') && (c < '0' || c > '9')) {
return C_ERR;
}
}
return C_OK;
}
/* =============================================================================
* Low level ACL API
* ==========================================================================*/
@ -359,12 +378,13 @@ int ACLUserCanExecuteFutureCommands(user *u) {
* to skip the command bit explicit test. */
void ACLSetUserCommandBit(user *u, unsigned long id, int value) {
uint64_t word, bit;
if (value == 0) u->flags &= ~USER_FLAG_ALLCOMMANDS;
if (ACLGetCommandBitCoordinates(id,&word,&bit) == C_ERR) return;
if (value)
if (value) {
u->allowed_commands[word] |= bit;
else
} else {
u->allowed_commands[word] &= ~bit;
u->flags &= ~USER_FLAG_ALLCOMMANDS;
}
}
/* This is like ACLSetUserCommandBit(), but instead of setting the specified
@ -756,22 +776,10 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
if (op[0] == '>') {
newpass = ACLHashPassword((unsigned char*)op+1,oplen-1);
} else {
if (oplen != HASH_PASSWORD_LEN + 1) {
if (ACLCheckPasswordHash((unsigned char*)op+1,oplen-1) == C_ERR) {
errno = EBADMSG;
return C_ERR;
}
/* Password hashes can only be characters that represent
* hexadecimal values, which are numbers and lowercase
* characters 'a' through 'f'.
*/
for(int i = 1; i < HASH_PASSWORD_LEN + 1; i++) {
char c = op[i];
if ((c < 'a' || c > 'f') && (c < '0' || c > '9')) {
errno = EBADMSG;
return C_ERR;
}
}
newpass = sdsnewlen(op+1,oplen-1);
}
@ -787,7 +795,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
if (op[0] == '<') {
delpass = ACLHashPassword((unsigned char*)op+1,oplen-1);
} else {
if (oplen != HASH_PASSWORD_LEN + 1) {
if (ACLCheckPasswordHash((unsigned char*)op+1,oplen-1) == C_ERR) {
errno = EBADMSG;
return C_ERR;
}
@ -841,7 +849,6 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
errno = ENOENT;
return C_ERR;
}
unsigned long id = ACLGetCommandID(copy);
/* The subcommand cannot be empty, so things like DEBUG|
* are syntax errors of course. */
@ -854,6 +861,7 @@ int ACLSetUser(user *u, const char *op, ssize_t oplen) {
/* The command should not be set right now in the command
* bitmap, because adding a subcommand of a fully added
* command is probably an error on the user side. */
unsigned long id = ACLGetCommandID(copy);
if (ACLGetUserCommandBit(u,id) == 1) {
zfree(copy);
errno = EBUSY;

View File

@ -139,6 +139,12 @@ void setproctitle(const char *fmt, ...);
/* Byte ordering detection */
#include <sys/types.h> /* This will likely define BYTE_ORDER */
/* Define redis_sendfile. */
#if defined(__linux__) || (defined(__APPLE__) && defined(MAC_OS_X_VERSION_10_5))
#define HAVE_SENDFILE 1
ssize_t redis_sendfile(int out_fd, int in_fd, off_t offset, size_t count);
#endif
#ifndef BYTE_ORDER
#if (BSD >= 199103)
# include <machine/endian.h>

View File

@ -325,6 +325,13 @@ void mallctl_int(client *c, robj **argv, int argc) {
size_t sz = sizeof(old);
while (sz > 0) {
if ((ret=je_mallctl(szFromObj(argv[0]), &old, &sz, argc > 1? &val: NULL, argc > 1?sz: 0))) {
if (ret == EPERM && argc > 1) {
/* if this option is write only, try just writing to it. */
if (!(ret=je_mallctl(szFromObj(argv[0]), NULL, 0, &val, sz))) {
addReply(c, shared.ok);
return;
}
}
if (ret==EINVAL) {
/* size might be wrong, try a smaller one */
sz /= 2;
@ -347,17 +354,30 @@ void mallctl_int(client *c, robj **argv, int argc) {
}
void mallctl_string(client *c, robj **argv, int argc) {
int ret;
int rret, wret;
char *old;
size_t sz = sizeof(old);
/* for strings, it seems we need to first get the old value, before overriding it. */
if ((ret=je_mallctl(szFromObj(argv[0]), &old, &sz, NULL, 0))) {
addReplyErrorFormat(c,"%s", strerror(ret));
if ((rret=je_mallctl(szFromObj(argv[0]), &old, &sz, NULL, 0))) {
/* return error unless this option is write only. */
if (!(rret == EPERM && argc > 1)) {
addReplyErrorFormat(c,"%s", strerror(rret));
return;
}
}
if(argc > 1) {
char *val = szFromObj(argv[1]);
char **valref = &val;
if ((!strcmp(val,"VOID")))
valref = NULL, sz = 0;
wret = je_mallctl(szFromObj(argv[0]), NULL, 0, valref, sz);
}
if (!rret)
addReplyBulkCString(c, old);
if(argc > 1)
je_mallctl(szFromObj(argv[0]), NULL, 0, &argv[1]->m_ptr, sizeof(char*));
else if (wret)
addReplyErrorFormat(c,"%s", strerror(wret));
else
addReply(c, shared.ok);
}
#endif

View File

@ -43,7 +43,7 @@
/* this method was added to jemalloc in order to help us understand which
* pointers are worthwhile moving and which aren't */
extern "C" int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
extern "C" int je_get_defrag_hint(void* ptr);
/* forward declarations*/
void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
@ -56,18 +56,11 @@ bool replaceSateliteOSetKeyPtr(expireset &set, sds oldkey, sds newkey);
* when it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
void* activeDefragAlloc(void *ptr) {
int bin_util, run_util;
size_t size;
void *newptr;
if(!je_get_defrag_hint(ptr, &bin_util, &run_util)) {
g_pserver->stat_active_defrag_misses++;
return NULL;
}
/* if this run is more utilized than the average utilization in this bin
* (or it is full), skip it. This will eventually move all the allocations
* from relatively empty runs into relatively full runs. */
if (run_util > bin_util || run_util == 1<<16) {
if(!je_get_defrag_hint(ptr)) {
g_pserver->stat_active_defrag_misses++;
size = zmalloc_size(ptr);
return NULL;
}
/* move this allocation to a new allocation.

View File

@ -478,7 +478,7 @@ dictEntry *dictFind(dict *d, const void *key)
dictEntry *he;
uint64_t h, idx, table;
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
if (dictSize(d) == 0) return NULL; /* dict is empty */
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
@ -1044,7 +1044,7 @@ dictEntry **dictFindEntryRefByPtrAndHash(dict *d, const void *oldptr, uint64_t h
dictEntry *he, **heref;
unsigned long idx, table;
if (d->ht[0].used + d->ht[1].used == 0) return NULL; /* dict is empty */
if (dictSize(d) == 0) return NULL; /* dict is empty */
for (table = 0; table <= 1; table++) {
idx = hash & d->ht[table].sizemask;
heref = &d->ht[table].table[idx];

View File

@ -332,7 +332,7 @@ void _addReplyProtoToList(client *c, const char *s, size_t len) {
clientReplyBlock *tail = (clientReplyBlock*) (ln? listNodeValue(ln): NULL);
/* Note that 'tail' may be NULL even if we have a tail node, becuase when
* addDeferredMultiBulkLength() is used, it sets a dummy node to NULL just
* addReplyDeferredLen() is used, it sets a dummy node to NULL just
* fo fill it later, when the size of the bulk length is set. */
/* Append to tail string when possible. */
@ -512,31 +512,7 @@ void addReplyErrorLengthCore(client *c, const char *s, size_t len, bool fAsync)
if (ctype == CLIENT_TYPE_MASTER && g_pserver->repl_backlog &&
g_pserver->repl_backlog_histlen > 0)
{
long long dumplen = 256;
if (g_pserver->repl_backlog_histlen < dumplen)
dumplen = g_pserver->repl_backlog_histlen;
/* Identify the first byte to dump. */
long long idx =
(g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - dumplen)) %
g_pserver->repl_backlog_size;
/* Scan the circular buffer to collect 'dumplen' bytes. */
sds dump = sdsempty();
while(dumplen) {
long long thislen =
((g_pserver->repl_backlog_size - idx) < dumplen) ?
(g_pserver->repl_backlog_size - idx) : dumplen;
dump = sdscatrepr(dump,g_pserver->repl_backlog+idx,thislen);
dumplen -= thislen;
idx = 0;
}
/* Finally log such bytes: this is vital debugging info to
* understand what happened. */
serverLog(LL_WARNING,"Latest backlog is: '%s'", dump);
sdsfree(dump);
showLatestBacklog();
}
g_pserver->stat_unexpected_error_replies++;
}
@ -598,7 +574,7 @@ void trimReplyUnusedTailSpace(client *c) {
clientReplyBlock *tail = ln? (clientReplyBlock*)listNodeValue(ln): NULL;
/* Note that 'tail' may be NULL even if we have a tail node, becuase when
* addDeferredMultiBulkLength() is used */
* addReplyDeferredLen() is used */
if (!tail) return;
/* We only try to trim the space is relatively high (more than a 1/4 of the
@ -2072,6 +2048,19 @@ int processInlineBuffer(client *c) {
if (querylen == 0 && getClientType(c) == CLIENT_TYPE_SLAVE)
c->repl_ack_time = g_pserver->unixtime;
/* Masters should never send us inline protocol to run actual
* commands. If this happens, it is likely due to a bug in Redis where
* we got some desynchronization in the protocol, for example
* beause of a PSYNC gone bad.
*
* However the is an exception: masters may send us just a newline
* to keep the connection active. */
if (querylen != 0 && c->flags & CLIENT_MASTER) {
serverLog(LL_WARNING,"WARNING: Receiving inline protocol from master, master stream corruption? Closing the master connection and discarding the cached master.");
setProtocolError("Master using the inline protocol. Desync?",c);
return C_ERR;
}
/* Move querybuffer position to the next query in the buffer. */
c->qb_pos += querylen+linefeed_chars;
@ -2095,7 +2084,7 @@ int processInlineBuffer(client *c) {
* CLIENT_PROTOCOL_ERROR. */
#define PROTO_DUMP_LEN 128
static void setProtocolError(const char *errstr, client *c) {
if (cserver.verbosity <= LL_VERBOSE) {
if (cserver.verbosity <= LL_VERBOSE || c->flags & CLIENT_MASTER) {
sds client = catClientInfoString(sdsempty(),c);
/* Sample some protocol to given an idea about what was inside. */
@ -2114,7 +2103,9 @@ static void setProtocolError(const char *errstr, client *c) {
}
/* Log all the client and protocol info. */
serverLog(LL_VERBOSE,
int loglevel = (c->flags & CLIENT_MASTER) ? LL_WARNING :
LL_VERBOSE;
serverLog(loglevel,
"Protocol error (%s) from client: %s. %s", errstr, client, buf);
sdsfree(client);
}
@ -2273,7 +2264,6 @@ int processMultibulkBuffer(client *c) {
* 2. In the case of master clients, the replication offset is updated.
* 3. Propagate commands we got from our master to replicas down the line. */
void commandProcessed(client *c) {
int cmd_is_ping = c->cmd && c->cmd->proc == pingCommand;
long long prev_offset = c->reploff;
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
/* Update the applied replication offset of our master. */
@ -2300,7 +2290,6 @@ void commandProcessed(client *c) {
AeLocker ae;
ae.arm(c);
long long applied = c->reploff - prev_offset;
long long prev_master_repl_meaningful_offset = g_pserver->master_repl_meaningful_offset;
if (applied) {
if (!g_pserver->fActiveReplica)
{
@ -2309,10 +2298,6 @@ void commandProcessed(client *c) {
}
sdsrange(c->pending_querybuf,applied,-1);
}
/* The g_pserver->master_repl_meaningful_offset variable represents
* the offset of the replication stream without the pending PINGs. */
if (cmd_is_ping)
g_pserver->master_repl_meaningful_offset = prev_master_repl_meaningful_offset;
}
}

View File

@ -308,7 +308,7 @@ fail:
else fprintf(stderr, "%s\n", hostsocket);
freeReplyObject(reply);
redisFree(c);
zfree(cfg);
freeRedisConfig(cfg);
return NULL;
}
static void freeRedisConfig(redisConfig *cfg) {
@ -1284,6 +1284,17 @@ static void updateClusterSlotsConfiguration() {
pthread_mutex_unlock(&config.is_updating_slots_mutex);
}
/* Generate random data for redis benchmark. See #7196. */
static void genBenchmarkRandomData(char *data, int count) {
static uint32_t state = 1234;
int i = 0;
while (count--) {
state = (state*1103515245+12345);
data[i++] = '0'+((state>>16)&63);
}
}
/* Returns number of consumed options. */
int parseOptions(int argc, const char **argv) {
int i;
@ -1646,7 +1657,7 @@ int main(int argc, const char **argv) {
/* Run default benchmark suite. */
data = (char*)zmalloc(config.datasize+1, MALLOC_LOCAL);
do {
memset(data,'x',config.datasize);
genBenchmarkRandomData(data, config.datasize);
data[config.datasize] = '\0';
if (test_is_selected("ping_inline") || test_is_selected("ping"))

View File

@ -108,10 +108,41 @@ extern "C" void freeClusterManager(void) {
dictRelease(clusterManagerUncoveredSlots);
}
/* This function returns a random master node, return NULL if none */
static clusterManagerNode *clusterManagerNodeMasterRandom() {
int master_count = 0;
int idx;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = (clusterManagerNode*) ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
master_count++;
}
srand(time(NULL));
idx = rand() % master_count;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = (clusterManagerNode*) ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (!idx--) {
return n;
}
}
/* Can not be reached */
return NULL;
}
static int clusterManagerFixSlotsCoverage(char *all_slots) {
dictIterator *iter = nullptr;
int force_fix = config.cluster_manager_command.flags &
CLUSTER_MANAGER_CMD_FLAG_FIX_WITH_UNREACHABLE_MASTERS;
/* we want explicit manual confirmation from users for all the fix cases */
int force = 0;
dictIterator *iter = nullptr;
if (cluster_manager.unreachable_masters > 0 && !force_fix) {
clusterManagerLogWarn("*** Fixing slots coverage with %d unreachable masters is dangerous: redis-cli will assume that slots about masters that are not reachable are not covered, and will try to reassign them to the reachable nodes. This can cause data loss and is rarely what you want to do. If you really want to proceed use the --cluster-fix-with-unreachable-masters option.\n", cluster_manager.unreachable_masters);
@ -181,7 +212,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
printf("The following uncovered slots have no keys "
"across the cluster:\n");
clusterManagerPrintSlotsList(none);
if (confirmWithYes("Fix these slots by covering with a random node?")){
if (confirmWithYes("Fix these slots by covering with a random node?",
force)) {
listIter li;
listNode *ln;
listRewind(none, &li);
@ -207,7 +239,8 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
if (listLength(single) > 0) {
printf("The following uncovered slots have keys in just one node:\n");
clusterManagerPrintSlotsList(single);
if (confirmWithYes("Fix these slots by covering with those nodes?")){
if (confirmWithYes("Fix these slots by covering with those nodes?",
force)) {
listIter li;
listNode *ln;
listRewind(single, &li);
@ -239,7 +272,7 @@ static int clusterManagerFixSlotsCoverage(char *all_slots) {
printf("The following uncovered slots have keys in multiple nodes:\n");
clusterManagerPrintSlotsList(multi);
if (confirmWithYes("Fix these slots by moving keys "
"into a single node?")) {
"into a single node?", force)) {
listIter li;
listNode *ln;
listRewind(multi, &li);
@ -539,35 +572,6 @@ dict *clusterManagerGetLinkStatus(void) {
return status;
}
/* This function returns a random master node, return NULL if none */
static clusterManagerNode *clusterManagerNodeMasterRandom() {
int master_count = 0;
int idx;
listIter li;
listNode *ln;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = (clusterManagerNode*) ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
master_count++;
}
srand(time(NULL));
idx = rand() % master_count;
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *n = (clusterManagerNode*) ln->value;
if (n->flags & CLUSTER_MANAGER_FLAG_SLAVE) continue;
if (!idx--) {
return n;
}
}
/* Can not be reached */
return NULL;
}
extern "C" int clusterManagerCheckCluster(int quiet) {
listNode *ln = listFirst(cluster_manager.nodes);
if (!ln) return 0;

View File

@ -1607,7 +1607,14 @@ static void usage(void) {
exit(1);
}
int confirmWithYes(const char *msg) {
int confirmWithYes(const char *msg, int force) {
/* if force is true and --cluster-yes option is on,
* do not prompt for an answer */
if (force &&
(config.cluster_manager_command.flags & CLUSTER_MANAGER_CMD_FLAG_YES)) {
return 1;
}
printf("%s (type 'yes' to accept): ", msg);
fflush(stdout);
char buf[4];
@ -4586,7 +4593,8 @@ assign_replicas:
}
clusterManagerOptimizeAntiAffinity(ip_nodes, ip_count);
clusterManagerShowNodes();
if (confirmWithYes("Can I set the above configuration?")) {
int force = 1;
if (confirmWithYes("Can I set the above configuration?", force)) {
listRewind(cluster_manager.nodes, &li);
while ((ln = listNext(&li)) != NULL) {
clusterManagerNode *node = ln->value;

View File

@ -273,7 +273,7 @@ int parseClusterNodeAddress(char *addr, char **ip_ptr, int *port_ptr,
int *bus_port_ptr);
int clusterManagerCheckRedisReply(clusterManagerNode *n,
redisReply *r, char **err);
int confirmWithYes(const char *msg);
int confirmWithYes(const char *msg, int force);
int clusterManagerSetSlotOwner(clusterManagerNode *owner,
int slot,
int do_clear);

View File

@ -47,7 +47,6 @@
#include <unordered_map>
#include <string>
long long adjustMeaningfulReplOffset();
void replicationDiscardCachedMaster(redisMaster *mi);
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
void replicationSendAck(redisMaster *mi);
@ -249,7 +248,6 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
const unsigned char *p = (const unsigned char*)ptr;
g_pserver->master_repl_offset += len;
g_pserver->master_repl_meaningful_offset = g_pserver->master_repl_offset;
/* This is a circular buffer, so write as much data we can at every
* iteration and rewind the "idx" index if we reach the limit. */
@ -543,6 +541,40 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
listEmpty(fake->reply);
}
/* This is a debugging function that gets called when we detect something
* wrong with the replication protocol: the goal is to peek into the
* replication backlog and show a few final bytes to make simpler to
* guess what kind of bug it could be. */
void showLatestBacklog(void) {
if (g_pserver->repl_backlog == NULL) return;
long long dumplen = 256;
if (g_pserver->repl_backlog_histlen < dumplen)
dumplen = g_pserver->repl_backlog_histlen;
/* Identify the first byte to dump. */
long long idx =
(g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - dumplen)) %
g_pserver->repl_backlog_size;
/* Scan the circular buffer to collect 'dumplen' bytes. */
sds dump = sdsempty();
while(dumplen) {
long long thislen =
((g_pserver->repl_backlog_size - idx) < dumplen) ?
(g_pserver->repl_backlog_size - idx) : dumplen;
dump = sdscatrepr(dump,g_pserver->repl_backlog+idx,thislen);
dumplen -= thislen;
idx = 0;
}
/* Finally log such bytes: this is vital debugging info to
* understand what happened. */
serverLog(LL_WARNING,"Latest backlog is: '%s'", dump);
sdsfree(dump);
}
/* This function is used in order to proxy what we receive from our master
* to our sub-slaves. */
#include <ctype.h>
@ -1006,6 +1038,9 @@ void syncCommand(client *c) {
changeReplicationId();
clearReplicationId2();
createReplicationBacklog();
serverLog(LL_NOTICE,"Replication backlog created, my new "
"replication IDs are '%s' and '%s'",
g_pserver->replid, g_pserver->replid2);
}
/* CASE 1: BGSAVE is in progress, with disk target. */
@ -1280,11 +1315,44 @@ void removeRDBUsedToSyncReplicas(void) {
}
}
#if HAVE_SENDFILE
/* Implements redis_sendfile to transfer data between file descriptors and
* avoid transferring data to and from user space.
*
* The function prototype is just like sendfile(2) on Linux. in_fd is a file
* descriptor opened for reading and out_fd is a descriptor opened for writing.
* offset specifies where to start reading data from in_fd. count is the number
* of bytes to copy between the file descriptors.
*
* The return value is the number of bytes written to out_fd, if the transfer
* was successful. On error, -1 is returned, and errno is set appropriately. */
#if defined(__linux__)
#include <sys/sendfile.h>
#endif
ssize_t redis_sendfile(int out_fd, int in_fd, off_t offset, size_t count) {
#if defined(__linux__)
return sendfile(out_fd, in_fd, &offset, count);
#elif defined(__APPLE__)
off_t len = count;
/* Notice that it may return -1 and errno is set to EAGAIN even if some
* bytes have been sent successfully and the len argument is set correctly
* when using a socket marked for non-blocking I/O. */
if (sendfile(in_fd, out_fd, offset, &len, NULL, 0) == -1 &&
errno != EAGAIN) return -1;
else
return (ssize_t)len;
#endif
errno = ENOSYS;
return -1;
}
#endif
void sendBulkToSlave(connection *conn) {
client *replica = (client*)connGetPrivateData(conn);
serverAssert(FCorrectThread(replica));
char buf[PROTO_IOBUF_LEN];
ssize_t nwritten, buflen;
ssize_t nwritten;
AeLocker aeLock;
std::unique_lock<fastlock> ul(replica->lock);
@ -1313,14 +1381,33 @@ void sendBulkToSlave(connection *conn) {
}
}
/* If the preamble was already transferred, send the RDB bulk data. */
/* If the preamble was already transferred, send the RDB bulk data.
* try to use sendfile system call if supported, unless tls is enabled.
* fallback to normal read+write otherwise. */
nwritten = 0;
#if HAVE_SENDFILE
if (!g_pserver->tls_replication) {
if ((nwritten = redis_sendfile(conn->fd,replica->repldbfd,
replica->repldboff,PROTO_IOBUF_LEN)) == -1)
{
if (errno != EAGAIN) {
serverLog(LL_WARNING,"Sendfile error sending DB to replica: %s",
strerror(errno));
freeClient(replica);
}
return;
}
}
#endif
if (!nwritten) {
ssize_t buflen;
char buf[PROTO_IOBUF_LEN];
lseek(replica->repldbfd,replica->repldboff,SEEK_SET);
buflen = read(replica->repldbfd,buf,PROTO_IOBUF_LEN);
if (buflen <= 0) {
serverLog(LL_WARNING,"Read error sending DB to replica: %s",
(buflen == 0) ? "premature EOF" : strerror(errno));
ul.unlock();
aeLock.arm(nullptr);
freeClient(replica);
return;
}
@ -1328,12 +1415,12 @@ void sendBulkToSlave(connection *conn) {
if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,"Write error sending DB to replica: %s",
connGetLastError(conn));
ul.unlock();
aeLock.arm(nullptr);
freeClient(replica);
}
return;
}
}
replica->repldboff += nwritten;
g_pserver->stat_net_output_bytes += nwritten;
if (replica->repldboff == replica->repldbsize) {
@ -1919,6 +2006,10 @@ void readSyncBulkPayload(connection *conn) {
nread = connRead(conn,buf,readlen);
if (nread <= 0) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
/* equivalent to EAGAIN */
return;
}
serverLog(LL_WARNING,"I/O error trying to sync with MASTER: %s",
(nread == -1) ? strerror(errno) : "connection lost");
cancelReplicationHandshake(mi);
@ -2189,7 +2280,6 @@ void readSyncBulkPayload(connection *conn) {
* we are starting a new history. */
memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid));
g_pserver->master_repl_offset = mi->master->reploff;
g_pserver->master_repl_meaningful_offset = mi->master->reploff;
}
clearReplicationId2();
@ -3007,11 +3097,6 @@ void replicationUnsetMaster(redisMaster *mi) {
sdsfree(mi->masterhost);
mi->masterhost = NULL;
/* When a slave is turned into a master, the current replication ID
* (that was inherited from the master at synchronization time) is
* used as secondary ID up to the current offset, and a new replication
* ID is created to continue with a new replication history. */
shiftReplicationId();
if (mi->master) {
if (FCorrectThread(mi->master))
freeClient(mi->master);
@ -3020,6 +3105,15 @@ void replicationUnsetMaster(redisMaster *mi) {
}
replicationDiscardCachedMaster(mi);
cancelReplicationHandshake(mi);
/* When a slave is turned into a master, the current replication ID
* (that was inherited from the master at synchronization time) is
* used as secondary ID up to the current offset, and a new replication
* ID is created to continue with a new replication history.
*
* NOTE: this function MUST be called after we call
* freeClient(server.master), since there we adjust the replication
* offset trimming the final PINGs. See Github issue #7320. */
shiftReplicationId();
/* Disconnecting all the slaves is required: we need to inform slaves
* of the replication ID change (see shiftReplicationId() call). However
* the slaves will be able to partially resync with us, so it will be
@ -3255,10 +3349,6 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
* pending outputs to the master. */
sdsclear(mi->master->querybuf);
sdsclear(mi->master->pending_querybuf);
/* Adjust reploff and read_reploff to the last meaningful offset we executed.
* this is the offset the replica will use for future PSYNC. */
mi->master->reploff = adjustMeaningfulReplOffset();
mi->master->read_reploff = mi->master->reploff;
if (c->flags & CLIENT_MULTI) discardTransaction(c);
listEmpty(c->reply);
@ -3284,38 +3374,6 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
replicationHandleMasterDisconnection(mi);
}
/* If the "meaningful" offset, that is the offset without the final PINGs
* in the stream, is different than the last offset, use it instead:
* often when the master is no longer reachable, replicas will never
* receive the PINGs, however the master will end with an incremented
* offset because of the PINGs and will not be able to incrementally
* PSYNC with the new master.
* This function trims the replication backlog when needed, and returns
* the offset to be used for future partial sync. */
long long adjustMeaningfulReplOffset() {
if (g_pserver->master_repl_offset > g_pserver->master_repl_meaningful_offset) {
long long delta = g_pserver->master_repl_offset -
g_pserver->master_repl_meaningful_offset;
serverLog(LL_NOTICE,
"Using the meaningful offset %lld instead of %lld to exclude "
"the final PINGs (%lld bytes difference)",
g_pserver->master_repl_meaningful_offset,
g_pserver->master_repl_offset,
delta);
g_pserver->master_repl_offset = g_pserver->master_repl_meaningful_offset;
if (g_pserver->repl_backlog_histlen <= delta) {
g_pserver->repl_backlog_histlen = 0;
g_pserver->repl_backlog_idx = 0;
} else {
g_pserver->repl_backlog_histlen -= delta;
g_pserver->repl_backlog_idx =
(g_pserver->repl_backlog_idx + (g_pserver->repl_backlog_size - delta)) %
g_pserver->repl_backlog_size;
}
}
return g_pserver->master_repl_offset;
}
/* This function is called when a master is turend into a slave, in order to
* create from scratch a cached master for the new client, that will allow
* to PSYNC with the slave that was promoted as the new master after a
@ -3341,7 +3399,7 @@ void replicationCacheMasterUsingMyself(redisMaster *mi) {
* by replicationCreateMasterClient(). We'll later set the created
* master as server.cached_master, so the replica will use such
* offset for PSYNC. */
mi->master_initial_offset = adjustMeaningfulReplOffset();
mi->master_initial_offset = g_pserver->master_repl_offset;
/* The master client we create can be set to any DBID, because
* the new master will start its replication stream with SELECT. */
@ -3759,18 +3817,10 @@ void replicationCron(void) {
clientsArePaused();
if (!manual_failover_in_progress) {
long long before_ping = g_pserver->master_repl_meaningful_offset;
ping_argv[0] = createStringObject("PING",4);
replicationFeedSlaves(g_pserver->slaves, g_pserver->replicaseldb,
ping_argv, 1);
decrRefCount(ping_argv[0]);
/* The server.master_repl_meaningful_offset variable represents
* the offset of the replication stream without the pending PINGs.
* This is useful to set the right replication offset for PSYNC
* when the master is turned into a replica. Otherwise pending
* PINGs may not allow it to perform an incremental sync with the
* new master. */
g_pserver->master_repl_meaningful_offset = before_ping;
}
}

View File

@ -2525,7 +2525,6 @@ void initServerConfig(void) {
g_pserver->enable_multimaster = CONFIG_DEFAULT_ENABLE_MULTIMASTER;
g_pserver->repl_syncio_timeout = CONFIG_REPL_SYNCIO_TIMEOUT;
g_pserver->master_repl_offset = 0;
g_pserver->master_repl_meaningful_offset = 0;
/* Replication partial resync backlog */
g_pserver->repl_backlog = NULL;
@ -4747,7 +4746,6 @@ sds genRedisInfoString(const char *section) {
"master_replid:%s\r\n"
"master_replid2:%s\r\n"
"master_repl_offset:%lld\r\n"
"master_repl_meaningful_offset:%lld\r\n"
"second_repl_offset:%lld\r\n"
"repl_backlog_active:%d\r\n"
"repl_backlog_size:%lld\r\n"
@ -4756,7 +4754,6 @@ sds genRedisInfoString(const char *section) {
g_pserver->replid,
g_pserver->replid2,
g_pserver->master_repl_offset,
g_pserver->master_repl_meaningful_offset,
g_pserver->second_replid_offset,
g_pserver->repl_backlog != NULL,
g_pserver->repl_backlog_size,
@ -5150,7 +5147,6 @@ void loadDataFromDisk(void) {
{
memcpy(g_pserver->replid,rsi.repl_id,sizeof(g_pserver->replid));
g_pserver->master_repl_offset = rsi.repl_offset;
g_pserver->master_repl_meaningful_offset = rsi.repl_offset;
listIter li;
listNode *ln;

View File

@ -1830,7 +1830,6 @@ struct redisServer {
char replid[CONFIG_RUN_ID_SIZE+1]; /* My current replication ID. */
char replid2[CONFIG_RUN_ID_SIZE+1]; /* replid inherited from master*/
long long master_repl_offset; /* My current replication offset */
long long master_repl_meaningful_offset; /* Offset minus latest PINGs. */
long long second_replid_offset; /* Accept offsets up to this for replid2. */
int replicaseldb; /* Last SELECTed DB in replication output */
int repl_ping_slave_period; /* Master pings the replica every N seconds */
@ -2401,6 +2400,7 @@ void chopReplicationBacklog(void);
void replicationCacheMasterUsingMyself(struct redisMaster *mi);
void feedReplicationBacklog(const void *ptr, size_t len);
void updateMasterAuth();
void showLatestBacklog();
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);

View File

@ -521,7 +521,7 @@ void stralgoLCS(client *c) {
!= C_OK) return;
if (minmatchlen < 0) minmatchlen = 0;
j++;
} else if (!strcasecmp(opt,"STRINGS")) {
} else if (!strcasecmp(opt,"STRINGS") && moreargs > 1) {
if (a != NULL) {
addReplyError(c,"Either use STRINGS or KEYS");
return;
@ -529,7 +529,7 @@ void stralgoLCS(client *c) {
a = szFromObj(c->argv[j+1]);
b = szFromObj(c->argv[j+2]);
j += 2;
} else if (!strcasecmp(opt,"KEYS")) {
} else if (!strcasecmp(opt,"KEYS") && moreargs > 1) {
if (a != NULL) {
addReplyError(c,"Either use STRINGS or KEYS");
return;

View File

@ -209,6 +209,7 @@ void sendTrackingMessage(client *c, const char *keyname, size_t keylen, int prot
if (c->client_tracking_redirection) {
client *redir = lookupClientByID(c->client_tracking_redirection);
if (!redir) {
c->flags |= CLIENT_TRACKING_BROKEN_REDIR;
/* We need to signal to the original connection that we
* are unable to send invalidation messages to the redirected
* connection, because the client no longer exist. */

View File

@ -42,3 +42,9 @@ test "client do not break when cluster slot" {
fail "output overflow when cluster slots"
}
}
test "client can handle keys with hash tag" {
set cluster [redis_cluster 127.0.0.1:[get_instance_attrib redis 0 port]]
$cluster set foo{tag} bar
$cluster close
}

View File

@ -25,6 +25,7 @@ set ::sentinel_instances {}
set ::redis_instances {}
set ::sentinel_base_port 20000
set ::redis_base_port 30000
set ::redis_port_count 1024
set ::pids {} ; # We kill everything at exit
set ::dirs {} ; # We remove all the temp dirs at exit
set ::run_matching {} ; # If non empty, only tests matching pattern are run.
@ -60,7 +61,7 @@ proc exec_instance {type cfgfile dir} {
# Spawn a redis or sentinel instance, depending on 'type'.
proc spawn_instance {type base_port count {conf {}}} {
for {set j 0} {$j < $count} {incr j} {
set port [find_available_port $base_port]
set port [find_available_port $base_port $::redis_port_count]
incr base_port
puts "Starting $type #$j at port $port"

View File

@ -1,6 +1,9 @@
# Test the meaningful offset implementation to make sure masters
# are able to PSYNC with replicas even if the replication stream
# has pending PINGs at the end.
# These tests were added together with the meaningful offset implementation
# in redis 6.0.0, which was later abandoned in 6.0.4, they used to test that
# servers are able to PSYNC with replicas even if the replication stream has
# PINGs at the end which present in one sever and missing on another.
# We keep these tests just because they reproduce edge cases in the replication
# logic in hope they'll be able to spot some problem in the future.
start_server {tags {"psync2"}} {
start_server {} {
@ -16,7 +19,7 @@ start_server {} {
}
# Setup replication
test "PSYNC2 meaningful offset: setup" {
test "PSYNC2 pingoff: setup" {
$R(1) replicaof $R_host(0) $R_port(0)
$R(0) set foo bar
wait_for_condition 50 1000 {
@ -27,7 +30,7 @@ start_server {} {
}
}
test "PSYNC2 meaningful offset: write and wait replication" {
test "PSYNC2 pingoff: write and wait replication" {
$R(0) INCR counter
$R(0) INCR counter
$R(0) INCR counter
@ -41,7 +44,7 @@ start_server {} {
# In this test we'll make sure the replica will get stuck, but with
# an active connection: this way the master will continue to send PINGs
# every second (we modified the PING period earlier)
test "PSYNC2 meaningful offset: pause replica and promote it" {
test "PSYNC2 pingoff: pause replica and promote it" {
$R(1) MULTI
$R(1) DEBUG SLEEP 5
$R(1) SLAVEOF NO ONE
@ -50,13 +53,179 @@ start_server {} {
}
test "Make the old master a replica of the new one and check conditions" {
set sync_partial [status $R(1) sync_partial_ok]
assert {$sync_partial == 0}
assert_equal [status $R(1) sync_full] 0
$R(0) REPLICAOF $R_host(1) $R_port(1)
wait_for_condition 50 1000 {
[status $R(1) sync_partial_ok] == 1
[status $R(1) sync_full] == 1
} else {
fail "The new master was not able to partial sync"
fail "The new master was not able to sync"
}
# make sure replication is still alive and kicking
$R(1) incr x
wait_for_condition 50 1000 {
[$R(0) get x] == 1
} else {
fail "replica didn't get incr"
}
assert_equal [status $R(0) master_repl_offset] [status $R(1) master_repl_offset]
}
}}
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
start_server {} {
start_server {} {
test {test various edge cases of repl topology changes with missing pings at the end} {
set master [srv -4 client]
set master_host [srv -4 host]
set master_port [srv -4 port]
set replica1 [srv -3 client]
set replica2 [srv -2 client]
set replica3 [srv -1 client]
set replica4 [srv -0 client]
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
$replica3 replicaof $master_host $master_port
$replica4 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[status $master connected_slaves] == 4
} else {
fail "replicas didn't connect"
}
$master incr x
wait_for_condition 50 1000 {
[$replica1 get x] == 1 && [$replica2 get x] == 1 &&
[$replica3 get x] == 1 && [$replica4 get x] == 1
} else {
fail "replicas didn't get incr"
}
# disconnect replica1 and replica2
# and wait for the master to send a ping to replica3 and replica4
$replica1 replicaof no one
$replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id
$master config set repl-ping-replica-period 1
after 1500
# make everyone sync from the replica1 that didn't get the last ping from the old master
# replica4 will keep syncing from the old master which now syncs from replica1
# and replica2 will re-connect to the old master (which went back in time)
set new_master_host [srv -3 host]
set new_master_port [srv -3 port]
$replica3 replicaof $new_master_host $new_master_port
$master replicaof $new_master_host $new_master_port
$replica2 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[status $replica2 master_link_status] == "up" &&
[status $replica3 master_link_status] == "up" &&
[status $replica4 master_link_status] == "up" &&
[status $master master_link_status] == "up"
} else {
fail "replicas didn't connect"
}
# make sure replication is still alive and kicking
$replica1 incr x
wait_for_condition 50 1000 {
[$replica2 get x] == 2 &&
[$replica3 get x] == 2 &&
[$replica4 get x] == 2 &&
[$master get x] == 2
} else {
fail "replicas didn't get incr"
}
# make sure we have the right amount of full syncs
assert_equal [status $master sync_full] 6
assert_equal [status $replica1 sync_full] 2
assert_equal [status $replica2 sync_full] 0
assert_equal [status $replica3 sync_full] 0
assert_equal [status $replica4 sync_full] 0
# force psync
$master client kill type master
$replica2 client kill type master
$replica3 client kill type master
$replica4 client kill type master
# make sure replication is still alive and kicking
$replica1 incr x
wait_for_condition 50 1000 {
[$replica2 get x] == 3 &&
[$replica3 get x] == 3 &&
[$replica4 get x] == 3 &&
[$master get x] == 3
} else {
fail "replicas didn't get incr"
}
# make sure we have the right amount of full syncs
assert_equal [status $master sync_full] 6
assert_equal [status $replica1 sync_full] 2
assert_equal [status $replica2 sync_full] 0
assert_equal [status $replica3 sync_full] 0
assert_equal [status $replica4 sync_full] 0
}
}}}}}
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
for {set j 0} {$j < 3} {incr j} {
set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port]
$R($j) CONFIG SET repl-ping-replica-period 1
}
test "Chained replicas disconnect when replica re-connect with the same master" {
# Add a second replica as a chained replica of the current replica
$R(1) replicaof $R_host(0) $R_port(0)
$R(2) replicaof $R_host(1) $R_port(1)
wait_for_condition 50 1000 {
[status $R(2) master_link_status] == "up"
} else {
fail "Chained replica not replicating from its master"
}
# Do a write on the master, and wait for 3 seconds for the master to
# send some PINGs to its replica
$R(0) INCR counter2
after 2000
set sync_partial_master [status $R(0) sync_partial_ok]
set sync_partial_replica [status $R(1) sync_partial_ok]
$R(0) CONFIG SET repl-ping-replica-period 100
# Disconnect the master's direct replica
$R(0) client kill type replica
wait_for_condition 50 1000 {
[status $R(1) master_link_status] == "up" &&
[status $R(2) master_link_status] == "up" &&
[status $R(0) sync_partial_ok] == $sync_partial_master + 1 &&
[status $R(1) sync_partial_ok] == $sync_partial_replica
} else {
fail "Disconnected replica failed to PSYNC with master"
}
# Verify that the replica and its replica's meaningful and real
# offsets match with the master
assert_equal [status $R(0) master_repl_offset] [status $R(1) master_repl_offset]
assert_equal [status $R(0) master_repl_offset] [status $R(2) master_repl_offset]
# make sure replication is still alive and kicking
$R(0) incr counter2
wait_for_condition 50 1000 {
[$R(1) get counter2] == 2 && [$R(2) get counter2] == 2
} else {
fail "replicas didn't get incr"
}
assert_equal [status $R(0) master_repl_offset] [status $R(1) master_repl_offset]
assert_equal [status $R(0) master_repl_offset] [status $R(2) master_repl_offset]
}
}}}

View File

@ -1,3 +1,75 @@
proc show_cluster_status {} {
uplevel 1 {
# The following is the regexp we use to match the log line
# time info. Logs are in the following form:
#
# 11296:M 25 May 2020 17:37:14.652 # Server initialized
set log_regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*}
set repl_regexp {(master|repl|sync|backlog|meaningful|offset)}
puts "Master ID is $master_id"
for {set j 0} {$j < 5} {incr j} {
puts "$j: sync_full: [status $R($j) sync_full]"
puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
puts "$j: x var is : [$R($j) GET x]"
puts "---"
}
# Show the replication logs of every instance, interleaving
# them by the log date.
#
# First: load the lines as lists for each instance.
array set log {}
for {set j 0} {$j < 5} {incr j} {
set fd [open $R_log($j)]
while {[gets $fd l] >= 0} {
if {[regexp $log_regexp $l] &&
[regexp -nocase $repl_regexp $l]} {
lappend log($j) $l
}
}
close $fd
}
# To interleave the lines, at every step consume the element of
# the list with the lowest time and remove it. Do it until
# all the lists are empty.
#
# regexp {^[0-9]+:[A-Z] [0-9]+ [A-z]+ [0-9]+ ([0-9:.]+) .*} $l - logdate
while 1 {
# Find the log with smallest time.
set empty 0
set best 0
set bestdate {}
for {set j 0} {$j < 5} {incr j} {
if {[llength $log($j)] == 0} {
incr empty
continue
}
regexp $log_regexp [lindex $log($j) 0] - date
if {$bestdate eq {}} {
set best $j
set bestdate $date
} else {
if {[string compare $bestdate $date] > 0} {
set best $j
set bestdate $date
}
}
}
if {$empty == 5} break ; # Our exit condition: no more logs
# Emit the one with the smallest time (that is the first
# event in the time line).
puts "\[$best port $R_port($best)\] [lindex $log($best) 0]"
set log($best) [lrange $log($best) 1 end]
}
}
}
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
@ -12,7 +84,7 @@ start_server {} {
set no_exit 0 ; # Do not exit at end of the test
set duration 20 ; # Total test seconds
set duration 40 ; # Total test seconds
set genload 1 ; # Load master with writes at every cycle
@ -28,6 +100,7 @@ start_server {} {
set R($j) [srv [expr 0-$j] client]
set R_host($j) [srv [expr 0-$j] host]
set R_port($j) [srv [expr 0-$j] port]
set R_log($j) [srv [expr 0-$j] stdout]
if {$debug_msg} {puts "Log file: [srv [expr 0-$j] stdout]"}
}
@ -74,6 +147,7 @@ start_server {} {
[status $R([expr {($master_id+3)%5}]) master_link_status] == "up" &&
[status $R([expr {($master_id+4)%5}]) master_link_status] == "up"
} else {
show_cluster_status
fail "Replica not reconnecting"
}
@ -85,6 +159,7 @@ start_server {} {
wait_for_condition 50 2000 {
[$R($j) get x] == $counter_value
} else {
show_cluster_status
fail "Instance #$j x variable is inconsistent"
}
}
@ -120,44 +195,27 @@ start_server {} {
wait_for_condition 50 2000 {
[$R($j) get x] == $counter_value
} else {
show_cluster_status
fail "Instance #$j x variable is inconsistent"
}
}
}
# wait for all the slaves to be in sync with the master, due to pings, we have to re-sample the master constantly too
# wait for all the slaves to be in sync.
set masteroff [status $R($master_id) master_repl_offset]
wait_for_condition 500 100 {
[status $R($master_id) master_repl_offset] == [status $R(0) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(1) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(2) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(3) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(4) master_repl_offset]
[status $R(0) master_repl_offset] >= $masteroff &&
[status $R(1) master_repl_offset] >= $masteroff &&
[status $R(2) master_repl_offset] >= $masteroff &&
[status $R(3) master_repl_offset] >= $masteroff &&
[status $R(4) master_repl_offset] >= $masteroff
} else {
for {set j 0} {$j < 5} {incr j} {
puts "$j: sync_full: [status $R($j) sync_full]"
puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
puts "---"
show_cluster_status
fail "Replicas offsets didn't catch up with the master after too long time."
}
fail "Slaves are not in sync with the master after too long time."
}
# Put down the old master so that it cannot generate more
# replication stream, this way in the next master switch, the time at
# which we move slaves away is not important, each will have full
# history (otherwise PINGs will make certain slaves have more history),
# and sometimes a full resync will be needed.
$R($master_id) slaveof 127.0.0.1 0 ;# We use port zero to make it fail.
if {$debug_msg} {
for {set j 0} {$j < 5} {incr j} {
puts "$j: sync_full: [status $R($j) sync_full]"
puts "$j: id1 : [status $R($j) master_replid]:[status $R($j) master_repl_offset]"
puts "$j: id2 : [status $R($j) master_replid2]:[status $R($j) second_repl_offset]"
puts "$j: backlog : firstbyte=[status $R($j) repl_backlog_first_byte_offset] len=[status $R($j) repl_backlog_histlen]"
puts "---"
}
show_cluster_status
}
test "PSYNC2: total sum of full synchronizations is exactly 4" {
@ -165,8 +223,25 @@ start_server {} {
for {set j 0} {$j < 5} {incr j} {
incr sum [status $R($j) sync_full]
}
if {$sum != 4} {
show_cluster_status
assert {$sum == 4}
}
}
# In absence of pings, are the instances really able to have
# the exact same offset?
$R($master_id) config set repl-ping-replica-period 3600
wait_for_condition 500 100 {
[status $R($master_id) master_repl_offset] == [status $R(0) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(1) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(2) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(3) master_repl_offset] &&
[status $R($master_id) master_repl_offset] == [status $R(4) master_repl_offset]
} else {
show_cluster_status
fail "Replicas and master offsets were unable to match *exactly*."
}
# Limit anyway the maximum number of cycles. This is useful when the
# test is skipped via --only option of the test suite. In that case
@ -192,6 +267,7 @@ start_server {} {
[status $R([expr {($master_id+3)%5}]) master_link_status] == "up" &&
[status $R([expr {($master_id+4)%5}]) master_link_status] == "up"
} else {
show_cluster_status
fail "Replica not reconnecting"
}
}
@ -215,6 +291,7 @@ start_server {} {
puts "prev sync_partial_ok: $sync_partial"
puts "prev sync_partial_err: $sync_partial_err"
puts [$R($master_id) info stats]
show_cluster_status
fail "Replica didn't partial sync"
}
set new_sync_count [status $R($master_id) sync_full]
@ -236,6 +313,7 @@ start_server {} {
wait_for_condition 50 2000 {
[$R($master_id) debug digest] == [$R($slave_id) debug digest]
} else {
show_cluster_status
fail "Replica not reconnecting"
}
@ -270,6 +348,7 @@ start_server {} {
wait_for_condition 50 2000 {
[status $R($master_id) connected_slaves] == 4
} else {
show_cluster_status
fail "Replica not reconnecting"
}
set new_sync_count [status $R($master_id) sync_full]
@ -280,6 +359,7 @@ start_server {} {
wait_for_condition 50 2000 {
[$R($master_id) debug digest] == [$R($slave_id) debug digest]
} else {
show_cluster_status
fail "Debug digest mismatch between master and replica in post-restart handshake"
}
}
@ -289,103 +369,3 @@ start_server {} {
}
}}}}}
start_server {tags {"psync2"}} {
start_server {} {
start_server {} {
start_server {} {
start_server {} {
test {pings at the end of replication stream are ignored for psync} {
set master [srv -4 client]
set master_host [srv -4 host]
set master_port [srv -4 port]
set replica1 [srv -3 client]
set replica2 [srv -2 client]
set replica3 [srv -1 client]
set replica4 [srv -0 client]
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
$replica3 replicaof $master_host $master_port
$replica4 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[status $master connected_slaves] == 4
} else {
fail "replicas didn't connect"
}
$master incr x
wait_for_condition 50 1000 {
[$replica1 get x] == 1 && [$replica2 get x] == 1 &&
[$replica3 get x] == 1 && [$replica4 get x] == 1
} else {
fail "replicas didn't get incr"
}
# disconnect replica1 and replica2
# and wait for the master to send a ping to replica3 and replica4
$replica1 replicaof no one
$replica2 replicaof 127.0.0.1 1 ;# we can't promote it to master since that will cycle the replication id
$master config set repl-ping-replica-period 1
after 1500
# make everyone sync from the replica1 that didn't get the last ping from the old master
# replica4 will keep syncing from the old master which now syncs from replica1
# and replica2 will re-connect to the old master (which went back in time)
set new_master_host [srv -3 host]
set new_master_port [srv -3 port]
$replica3 replicaof $new_master_host $new_master_port
$master replicaof $new_master_host $new_master_port
$replica2 replicaof $master_host $master_port
wait_for_condition 50 1000 {
[status $replica2 master_link_status] == "up" &&
[status $replica3 master_link_status] == "up" &&
[status $replica4 master_link_status] == "up" &&
[status $master master_link_status] == "up"
} else {
fail "replicas didn't connect"
}
# make sure replication is still alive and kicking
$replica1 incr x
wait_for_condition 50 1000 {
[$replica2 get x] == 2 &&
[$replica3 get x] == 2 &&
[$replica4 get x] == 2 &&
[$master get x] == 2
} else {
fail "replicas didn't get incr"
}
# make sure there are full syncs other than the initial ones
assert_equal [status $master sync_full] 4
assert_equal [status $replica1 sync_full] 0
assert_equal [status $replica2 sync_full] 0
assert_equal [status $replica3 sync_full] 0
assert_equal [status $replica4 sync_full] 0
# force psync
$master client kill type master
$replica2 client kill type master
$replica3 client kill type master
$replica4 client kill type master
# make sure replication is still alive and kicking
$replica1 incr x
wait_for_condition 50 1000 {
[$replica2 get x] == 3 &&
[$replica3 get x] == 3 &&
[$replica4 get x] == 3 &&
[$master get x] == 3
} else {
fail "replicas didn't get incr"
}
# make sure there are full syncs other than the initial ones
assert_equal [status $master sync_full] 4
assert_equal [status $replica1 sync_full] 0
assert_equal [status $replica2 sync_full] 0
assert_equal [status $replica3 sync_full] 0
assert_equal [status $replica4 sync_full] 0
}
}}}}}

View File

@ -612,7 +612,7 @@ start_server {tags {"repl"}} {
# Wait that replicas acknowledge they are online so
# we are sure that DBSIZE and DEBUG DIGEST will not
# fail because of timing issues.
wait_for_condition 50 100 {
wait_for_condition 150 100 {
[lindex [$replica role] 3] eq {connected}
} else {
fail "replicas still not connected after some time"
@ -637,3 +637,55 @@ start_server {tags {"repl"}} {
}
}
}
test {replicaof right after disconnection} {
# this is a rare race condition that was reproduced sporadically by the psync2 unit.
# see details in #7205
start_server {tags {"repl"}} {
set replica1 [srv 0 client]
set replica1_host [srv 0 host]
set replica1_port [srv 0 port]
set replica1_log [srv 0 stdout]
start_server {} {
set replica2 [srv 0 client]
set replica2_host [srv 0 host]
set replica2_port [srv 0 port]
set replica2_log [srv 0 stdout]
start_server {} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
$replica1 replicaof $master_host $master_port
$replica2 replicaof $master_host $master_port
wait_for_condition 50 100 {
[string match {*master_link_status:up*} [$replica1 info replication]] &&
[string match {*master_link_status:up*} [$replica2 info replication]]
} else {
fail "Can't turn the instance into a replica"
}
set rd [redis_deferring_client -1]
$rd debug sleep 1
after 100
# when replica2 will wake up from the sleep it will find both disconnection
# from it's master and also a replicaof command at the same event loop
$master client kill type replica
$replica2 replicaof $replica1_host $replica1_port
$rd read
wait_for_condition 50 100 {
[string match {*master_link_status:up*} [$replica2 info replication]]
} else {
fail "role change failed."
}
# make sure psync succeeded, and there were no unexpected full syncs.
assert_equal [status $master sync_full] 2
assert_equal [status $replica1 sync_full] 0
assert_equal [status $replica2 sync_full] 0
}
}
}
}

View File

@ -28,11 +28,14 @@ TEST_MODULES = \
all: $(TEST_MODULES)
32bit:
$(MAKE) CFLAGS="-m32" LDFLAGS="-melf_i386"
%.xo: %.c ../../src/redismodule.h
$(CC) -I../../src $(CFLAGS) $(SHOBJ_CFLAGS) -fPIC -c $< -o $@
%.so: %.xo
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LIBS) -lc
$(LD) -o $@ $< $(SHOBJ_LDFLAGS) $(LDFLAGS) $(LIBS) -lc
.PHONY: clean

View File

@ -286,8 +286,29 @@ proc ::redis_cluster::crc16 {s} {
# Hash a single key returning the slot it belongs to, Implemented hash
# tags as described in the Redis Cluster specification.
proc ::redis_cluster::hash {key} {
# TODO: Handle hash slots.
expr {[::redis_cluster::crc16 $key] & 16383}
set keylen [string length $key]
set s {}
set e {}
for {set s 0} {$s < $keylen} {incr s} {
if {[string index $key $s] eq "\{"} break
}
if {[expr {$s == $keylen}]} {
set res [expr {[crc16 $key] & 16383}]
return $res
}
for {set e [expr {$s+1}]} {$e < $keylen} {incr e} {
if {[string index $key $e] == "\}"} break
}
if {$e == $keylen || $e == [expr {$s+1}]} {
set res [expr {[crc16 $key] & 16383}]
return $res
}
set key_sub [string range $key [expr {$s+1}] [expr {$e-1}]]
return [expr {[crc16 $key_sub] & 16383}]
}
# Return the slot the specified keys hash to.

View File

@ -214,14 +214,14 @@ proc start_server {options {code undefined}} {
dict set config dir [tmpdir server]
# start every server on a different port
set ::port [find_available_port [expr {$::port+1}]]
set port [find_available_port $::baseport $::portcount]
if {$::tls} {
dict set config "port" 0
dict set config "tls-port" $::port
dict set config "tls-port" $port
dict set config "tls-cluster" "yes"
dict set config "tls-replication" "yes"
} else {
dict set config port $::port
dict set config port $port
}
set unixsocket [file normalize [format "%s/%s" [dict get $config "dir"] "socket"]]
@ -246,10 +246,10 @@ proc start_server {options {code undefined}} {
set server_started 0
while {$server_started == 0} {
if {$::verbose} {
puts -nonewline "=== ($tags) Starting server ${::host}:${::port} "
puts -nonewline "=== ($tags) Starting server ${::host}:${port} "
}
send_data_packet $::test_server_fd "server-spawning" "port $::port"
send_data_packet $::test_server_fd "server-spawning" "port $port"
if {$::valgrind} {
set pid [exec valgrind --track-origins=yes --suppressions=src/valgrind.sup --show-reachable=no --show-possibly-lost=no --leak-check=full src/keydb-server $config_file > $stdout 2> $stderr &]
@ -294,19 +294,19 @@ proc start_server {options {code undefined}} {
# for availability. Other test clients may grab the port before we
# are able to do it for example.
if {$port_busy} {
puts "Port $::port was already busy, trying another port..."
set ::port [find_available_port [expr {$::port+1}]]
puts "Port $port was already busy, trying another port..."
set port [find_available_port $::baseport $::portcount]
if {$::tls} {
dict set config "tls-port" $::port
dict set config "tls-port" $port
} else {
dict set config port $::port
dict set config port $port
}
create_server_config_file $config_file $config
continue; # Try again
}
if {$code ne "undefined"} {
set serverisup [server_is_up $::host $::port $retrynum]
set serverisup [server_is_up $::host $port $retrynum]
} else {
set serverisup 1
}
@ -327,7 +327,6 @@ proc start_server {options {code undefined}} {
# setup properties to be able to initialize a client object
set port_param [expr $::tls ? {"tls-port"} : {"port"}]
set host $::host
set port $::port
if {[dict exists $config bind]} { set host [dict get $config bind] }
if {[dict exists $config $port_param]} { set port [dict get $config $port_param] }

View File

@ -344,21 +344,26 @@ proc roundFloat f {
format "%.10g" $f
}
proc find_available_port start {
for {set j $start} {$j < $start+1024} {incr j} {
if {[catch {set fd1 [socket 127.0.0.1 $j]}] &&
[catch {set fd2 [socket 127.0.0.1 [expr $j+10000]]}]} {
return $j
set ::last_port_attempted 0
proc find_available_port {start count} {
set port [expr $::last_port_attempted + 1]
for {set attempts 0} {$attempts < $count} {incr attempts} {
if {$port < $start || $port >= $start+$count} {
set port $start
}
if {[catch {set fd1 [socket 127.0.0.1 $port]}] &&
[catch {set fd2 [socket 127.0.0.1 [expr $port+10000]]}]} {
set ::last_port_attempted $port
return $port
} else {
catch {
close $fd1
close $fd2
}
}
incr port
}
if {$j == $start+1024} {
error "Can't find a non busy port in the $start-[expr {$start+1023}] range."
}
error "Can't find a non busy port in the $start-[expr {$start+$count-1}] range."
}
# Test if TERM looks like to support colors

View File

@ -76,7 +76,9 @@ set ::all_tests {
set ::next_test 0
set ::host 127.0.0.1
set ::port 21111
set ::port 6379; # port for external server
set ::baseport 21111; # initial port for spawned redis servers
set ::portcount 8000; # we don't wanna use more than 10000 to avoid collision with cluster bus ports
set ::traceleaks 0
set ::valgrind 0
set ::tls 0
@ -234,26 +236,26 @@ proc test_server_main {} {
set tclsh [info nameofexecutable]
# Open a listening socket, trying different ports in order to find a
# non busy one.
set port [find_available_port 11111]
set clientport [find_available_port 11111 32]
if {!$::quiet} {
puts "Starting test server at port $port"
puts "Starting test server at port $clientport"
}
socket -server accept_test_clients -myaddr 127.0.0.1 $port
socket -server accept_test_clients -myaddr 127.0.0.1 $clientport
# Start the client instances
set ::clients_pids {}
if {$::external} {
set p [exec $tclsh [info script] {*}$::argv \
--client $port --port $::port &]
--client $clientport &]
lappend ::clients_pids $p
} else {
set start_port [expr {$::port+100}]
set start_port $::baseport
set port_count [expr {$::portcount / $::numclients}]
for {set j 0} {$j < $::numclients} {incr j} {
set start_port [find_available_port $start_port]
set p [exec $tclsh [info script] {*}$::argv \
--client $port --port $start_port &]
--client $clientport --baseport $start_port --portcount $port_count &]
lappend ::clients_pids $p
incr start_port 10
incr start_port $port_count
}
}
@ -516,6 +518,10 @@ proc print_help_screen {} {
"--loop Execute the specified set of tests forever."
"--wait-server Wait after server is started (so that you can attach a debugger)."
"--tls Run tests in TLS mode."
"--host <addr> Run tests against an external host."
"--port <port> TCP port to use against external host."
"--baseport <port> Initial port number for spawned redis servers."
"--portcount <num> Port range for spawned redis servers."
"--help Print this help screen."
} "\n"]
}
@ -566,6 +572,12 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
} elseif {$opt eq {--port}} {
set ::port $arg
incr j
} elseif {$opt eq {--baseport}} {
set ::baseport $arg
incr j
} elseif {$opt eq {--portcount}} {
set ::portcount $arg
incr j
} elseif {$opt eq {--accurate}} {
set ::accurate 1
} elseif {$opt eq {--force-failure}} {

View File

@ -95,6 +95,10 @@ start_server {tags {"defrag"}} {
}
if {$::verbose} {
puts "frag $frag"
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]
puts "hits: $hits"
puts "misses: $misses"
puts "max latency $max_latency"
puts [r latency latest]
puts [r latency history active-defrag-cycle]
@ -221,6 +225,10 @@ start_server {tags {"defrag"}} {
}
if {$::verbose} {
puts "frag $frag"
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]
puts "hits: $hits"
puts "misses: $misses"
puts "max latency $max_latency"
puts [r latency latest]
puts [r latency history active-defrag-cycle]
@ -256,11 +264,12 @@ start_server {tags {"defrag"}} {
set expected_frag 1.7
# add a mass of list nodes to two lists (allocations are interlaced)
set val [string repeat A 100] ;# 5 items of 100 bytes puts us in the 640 bytes bin, which has 32 regs, so high potential for fragmentation
for {set j 0} {$j < 500000} {incr j} {
set elements 500000
for {set j 0} {$j < $elements} {incr j} {
$rd lpush biglist1 $val
$rd lpush biglist2 $val
}
for {set j 0} {$j < 500000} {incr j} {
for {set j 0} {$j < $elements} {incr j} {
$rd read ; # Discard replies
$rd read ; # Discard replies
}
@ -302,6 +311,8 @@ start_server {tags {"defrag"}} {
# test the the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]
set frag [s allocator_frag_ratio]
set max_latency 0
foreach event [r latency latest] {
@ -312,6 +323,8 @@ start_server {tags {"defrag"}} {
}
if {$::verbose} {
puts "frag $frag"
puts "misses: $misses"
puts "hits: $hits"
puts "max latency $max_latency"
puts [r latency latest]
puts [r latency history active-defrag-cycle]
@ -320,6 +333,10 @@ start_server {tags {"defrag"}} {
# due to high fragmentation, 100hz, and active-defrag-cycle-max set to 75,
# we expect max latency to be not much higher than 7.5ms but due to rare slowness threshold is set higher
assert {$max_latency <= 30}
# in extreme cases of stagnation, we see over 20m misses before the tests aborts with "defrag didn't stop",
# in normal cases we only see 100k misses out of 500k elements
assert {$misses < $elements}
}
# verify the data isn't corrupted or changed
set newdigest [r debug digest]
@ -327,6 +344,110 @@ start_server {tags {"defrag"}} {
r save ;# saving an rdb iterates over all the data / pointers
r del biglist1 ;# coverage for quicklistBookmarksClear
} {1}
test "Active defrag edge case" {
# there was an edge case in defrag where all the slabs of a certain bin are exact the same
# % utilization, with the exception of the current slab from which new allocations are made
# if the current slab is lower in utilization the defragger would have ended up in stagnation,
# keept running and not move any allocation.
# this test is more consistent on a fresh server with no history
start_server {tags {"defrag"}} {
r flushdb
r config resetstat
r config set save "" ;# prevent bgsave from interfereing with save below
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
r config set active-defrag-cycle-max 75
r config set active-defrag-ignore-bytes 1mb
r config set maxmemory 0
set expected_frag 1.3
r debug mallctl-str thread.tcache.flush VOID
# fill the first slab containin 32 regs of 640 bytes.
for {set j 0} {$j < 32} {incr j} {
r setrange "_$j" 600 x
r debug mallctl-str thread.tcache.flush VOID
}
# add a mass of keys with 600 bytes values, fill the bin of 640 bytes which has 32 regs per slab.
set rd [redis_deferring_client]
set keys 640000
for {set j 0} {$j < $keys} {incr j} {
$rd setrange $j 600 x
}
for {set j 0} {$j < $keys} {incr j} {
$rd read ; # Discard replies
}
# create some fragmentation of 50%
set sent 0
for {set j 0} {$j < $keys} {incr j 1} {
$rd del $j
incr sent
incr j 1
}
for {set j 0} {$j < $sent} {incr j} {
$rd read ; # Discard replies
}
# create higher fragmentation in the first slab
for {set j 10} {$j < 32} {incr j} {
r del "_$j"
}
# start defrag
after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
if {$::verbose} {
puts "frag $frag"
}
assert {$frag >= $expected_frag}
set digest [r debug digest]
catch {r config set activedefrag yes} e
if {![string match {DISABLED*} $e]} {
# wait for the active defrag to start working (decision once a second)
wait_for_condition 50 100 {
[s active_defrag_running] ne 0
} else {
fail "defrag not started."
}
# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
} else {
after 120 ;# serverCron only updates the info once in 100ms
puts [r info memory]
puts [r info stats]
puts [r memory malloc-stats]
fail "defrag didn't stop."
}
# test the the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]
set frag [s allocator_frag_ratio]
if {$::verbose} {
puts "frag $frag"
puts "hits: $hits"
puts "misses: $misses"
}
assert {$frag < 1.1}
assert {$misses < 10000000} ;# when defrag doesn't stop, we have some 30m misses, when it does, we have 2m misses
}
# verify the data isn't corrupted or changed
set newdigest [r debug digest]
assert {$digest eq $newdigest}
r save ;# saving an rdb iterates over all the data / pointers
}
}
}
}
} ;# run_solo

View File

@ -174,9 +174,9 @@ start_server {tags {"other"}} {
tags {protocol} {
test {PIPELINING stresser (also a regression for the old epoll bug)} {
if {$::tls} {
set fd2 [::tls::socket $::host $::port]
set fd2 [::tls::socket [srv host] [srv port]]
} else {
set fd2 [socket $::host $::port]
set fd2 [socket [srv host] [srv port]]
}
fconfigure $fd2 -encoding binary -translation binary
puts -nonewline $fd2 "SELECT 9\r\n"