Merge tag '6.0.2' into unstable

Redis 6.0.2


Former-commit-id: a010e4a4b2cc2bcad1cb14604b7ebc596c35b05e
This commit is contained in:
John Sully 2020-05-22 16:45:18 -04:00
commit 2d783a3cbf
25 changed files with 485 additions and 85 deletions

View File

@ -11,6 +11,185 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP.
SECURITY: There are security fixes in the release.
--------------------------------------------------------------------------------
================================================================================
Redis 6.0.2 Released Fri May 15 22:24:36 CEST 2020
================================================================================
Upgrade urgency MODERATE: many not critical bugfixes in different areas.
Critical fix to client side caching when
keys are evicted from the tracking table but
no notifications are sent.
The following are the most serious fix:
* XPENDING should not update consumer's seen-time
* optimize memory usage of deferred replies - fixed
* Fix CRC64 initialization outside the Redis server itself.
* stringmatchlen() should not expect null terminated strings.
* Cluster nodes availability checks improved when there is
high Pub/Sub load on the cluster bus.
* Redis Benchmark: Fix coredump because of double free
* Tracking: send eviction messages when evicting entries.
* rax.c updated from upstream antirez/rax.
* fix redis 6.0 not freeing closed connections during loading.
New features:
* Support setcpuaffinity on linux/bsd
* Client Side Caching: Add Tracking Prefix Number Stats in Server Info
* Add --user argument to redis-benchmark.c (ACL)
Full list of commits:
Yossi Gottlieb in commit 16ba33c05:
TLS: Fix test failures on recent Debian/Ubuntu.
1 file changed, 20 deletions(-)
Yossi Gottlieb in commit 77ae66930:
TLS: Add crypto locks for older OpenSSL support.
1 file changed, 45 insertions(+)
David Carlier in commit 389697988:
NetBSD build update.
3 files changed, 30 insertions(+), 1 deletion(-)
Madelyn Olson in commit 2435341d7:
Added a refcount on timer events to prevent deletion of recursive timer calls
2 files changed, 12 insertions(+)
antirez in commit 80c906bd3:
Cache master without checking of deferred close flags.
3 files changed, 11 insertions(+), 8 deletions(-)
antirez in commit 74249be4a:
Track events processed while blocked globally.
5 files changed, 32 insertions(+), 17 deletions(-)
antirez in commit 8bf660af9:
Some rework of #7234.
4 files changed, 77 insertions(+), 65 deletions(-)
Oran Agra in commit 9da134cd8:
fix redis 6.0 not freeing closed connections during loading.
3 files changed, 133 insertions(+), 58 deletions(-)
antirez in commit f7f219a13:
Regression test for #7249.
1 file changed, 22 insertions(+)
antirez in commit 693629585:
rax.c updated from upstream antirez/rax.
1 file changed, 4 insertions(+), 2 deletions(-)
antirez in commit e3b5648df:
Tracking: send eviction messages when evicting entries.
2 files changed, 29 insertions(+), 12 deletions(-)
Oran Agra in commit 5c41802d5:
fix unstable replication test
1 file changed, 2 insertions(+), 2 deletions(-)
ShooterIT in commit a23cdbb94:
Redis Benchmark: Fix coredump because of double free
1 file changed, 1 insertion(+), 1 deletion(-)
antirez in commit 1276058ea:
Cluster: clarify we always resolve the sender.
1 file changed, 3 insertions(+), 1 deletion(-)
antirez in commit 002fcde3d:
Cluster: refactor ping/data delay handling.
1 file changed, 13 insertions(+), 11 deletions(-)
antirez in commit 960186a71:
Cluster: introduce data_received field.
2 files changed, 27 insertions(+), 10 deletions(-)
antirez in commit 3672875b4:
stringmatchlen() should not expect null terminated strings.
1 file changed, 2 insertions(+), 2 deletions(-)
Brad Dunbar in commit 24e12641d:
Remove unreachable branch.
1 file changed, 2 deletions(-)
hwware in commit c7edffbd5:
add jemalloc-bg-thread config in redis conf
1 file changed, 3 insertions(+)
hwware in commit 8a9c84f4a:
add include guard for lolwut.h
1 file changed, 6 insertions(+)
antirez in commit cb683a84f:
Don't propagate spurious MULTI on DEBUG LOADAOF.
2 files changed, 6 insertions(+), 3 deletions(-)
antirez in commit 84d9766d6:
Dump recent backlog on master query generating errors.
1 file changed, 29 insertions(+)
Titouan Christophe in commit ec1e106ec:
make struct user anonymous (only typedefed)
1 file changed, 1 insertion(+), 1 deletion(-)
antirez in commit e48c37316:
Test: --dont-clean should do first cleanup.
1 file changed, 2 insertions(+), 5 deletions(-)
Benjamin Sergeant in commit 1e561cfaa:
Add --user argument to redis-benchmark.c (ACL)
1 file changed, 15 insertions(+), 2 deletions(-)
antirez in commit d1af82a88:
Drop not needed part from #7194.
1 file changed, 1 insertion(+), 1 deletion(-)
Muhammad Zahalqa in commit 897a360d0:
Fix compiler warnings on function rev(unsigned long)
1 file changed, 3 insertions(+), 3 deletions(-)
antirez in commit ac316d8cc:
Move CRC64 initialization in main().
2 files changed, 1 insertion(+), 4 deletions(-)
antirez in commit fc7bc3204:
Fix CRC64 initialization outside the Redis server itself.
1 file changed, 3 insertions(+)
hwware in commit a6e55c096:
Client Side Caching: Add Tracking Prefix Number Stats in Server Info
3 files changed, 8 insertions(+)
antirez in commit b062fd523:
Fix NetBSD build by fixing redis_set_thread_title() support.
1 file changed, 4 insertions(+), 1 deletion(-)
antirez in commit 4efb25d9c:
Rework a bit the documentation for CPU pinning.
2 files changed, 18 insertions(+), 8 deletions(-)
zhenwei pi in commit d6436eb7c:
Support setcpuaffinity on linux/bsd
12 files changed, 180 insertions(+), 1 deletion(-)
Guy Benoish in commit 3a441c7d9:
XPENDING should not update consumer's seen-time
4 files changed, 33 insertions(+), 20 deletions(-)
Oran Agra in commit 75addb4fe:
optimize memory usage of deferred replies - fixed
1 file changed, 29 insertions(+)
Deliang Yang in commit c57d9146f:
reformat code
1 file changed, 1 insertion(+), 1 deletion(-)
Oran Agra in commit 3d3861dd8:
add daily github actions with libc malloc and valgrind
5 files changed, 106 insertions(+), 18 deletions(-)
================================================================================
Redis 6.0.1 Released Sat May 02 00:06:07 CEST 2020
================================================================================

View File

@ -1782,6 +1782,9 @@ rdb-save-incremental-fsync yes
# the main dictionary scan
# active-defrag-max-scan-fields 1000
# Jemalloc background thread for purging will be enabled by default
jemalloc-bg-thread yes
# It is possible to pin different threads and processes of Redis to specific
# CPUs in your system, in order to maximize the performances of the server.
# This is useful both in order to pin different Redis threads in different

View File

@ -159,6 +159,14 @@ else
ifeq ($(uname_S),DragonFly)
# FreeBSD
FINAL_LIBS+= -lpthread -lexecinfo
else
ifeq ($(uname_S),OpenBSD)
# OpenBSD
FINAL_LIBS+= -lpthread -lexecinfo
else
ifeq ($(uname_S),NetBSD)
# NetBSD
FINAL_LIBS+= -lpthread -lexecinfo
else
# All the other OSes (notably Linux)
FINAL_LDFLAGS+= -rdynamic
@ -171,6 +179,8 @@ endif
endif
endif
endif
endif
endif
# Include paths to dependencies
FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src
FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src

View File

@ -526,6 +526,7 @@ extern "C" long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long millise
te->clientData = clientData;
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
@ -607,6 +608,13 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
@ -636,7 +644,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) {
int retval;
id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
@ -718,6 +728,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
* if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
*
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
@ -777,6 +788,13 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags)
tvp = &tv;
}
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) {
std::unique_lock<decltype(g_lock)> ulock(g_lock, std::defer_lock);
if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE))
ulock.lock();
eventLoop->beforesleep(eventLoop);
}
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);

View File

@ -58,11 +58,12 @@ extern "C" {
#define AE_WRITE_THREADSAFE 16
#define AE_SLEEP_THREADSAFE 32
#define AE_FILE_EVENTS 1
#define AE_TIME_EVENTS 2
#define AE_FILE_EVENTS (1<<0)
#define AE_TIME_EVENTS (1<<1)
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)
#define AE_DONT_WAIT 4
#define AE_CALL_AFTER_SLEEP 8
#define AE_DONT_WAIT (1<<2)
#define AE_CALL_BEFORE_SLEEP (1<<3)
#define AE_CALL_AFTER_SLEEP (1<<4)
#define AE_NOMORE -1
#define AE_DELETED_EVENT_ID -1
@ -97,6 +98,8 @@ typedef struct aeTimeEvent {
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;
/* A fired event */

View File

@ -787,6 +787,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
node->slaves = NULL;
node->slaveof = NULL;
node->ping_sent = node->pong_received = 0;
node->data_received = 0;
node->fail_time = 0;
node->link = NULL;
memset(node->ip,0,sizeof(node->ip));
@ -1720,6 +1721,7 @@ int clusterProcessPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg*) link->rcvbuf;
uint32_t totlen = ntohl(hdr->totlen);
uint16_t type = ntohs(hdr->type);
mstime_t now = mstime();
if (type < CLUSTERMSG_TYPE_COUNT)
g_pserver->cluster->stats_bus_messages_received[type]++;
@ -1781,8 +1783,17 @@ int clusterProcessPacket(clusterLink *link) {
if (totlen != explen) return 1;
}
/* Check if the sender is a known node. */
/* Check if the sender is a known node. Note that for incoming connections
* we don't store link->node information, but resolve the node by the
* ID in the header each time in the current implementation. */
sender = clusterLookupNode(hdr->sender);
/* Update the last time we saw any data from this node. We
* use this in order to avoid detecting a timeout from a node that
* is just sending a lot of data in the cluster bus, for instance
* because of Pub/Sub. */
if (sender) sender->data_received = now;
if (sender && !nodeInHandshake(sender)) {
/* Update our curretEpoch if we see a newer epoch in the cluster. */
senderCurrentEpoch = ntohu64(hdr->currentEpoch);
@ -1797,7 +1808,7 @@ int clusterProcessPacket(clusterLink *link) {
}
/* Update the replication offset info for this node. */
sender->repl_offset = ntohu64(hdr->offset);
sender->repl_offset_time = mstime();
sender->repl_offset_time = now;
/* If we are a slave performing a manual failover and our master
* sent its offset while already paused, populate the MF state. */
if (g_pserver->cluster->mf_end &&
@ -1911,7 +1922,7 @@ int clusterProcessPacket(clusterLink *link) {
* address. */
serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d",
link->node->name,
(int)(mstime()-(link->node->ctime)),
(int)(now-(link->node->ctime)),
link->node->flags);
link->node->flags |= CLUSTER_NODE_NOADDR;
link->node->ip[0] = '\0';
@ -1946,7 +1957,7 @@ int clusterProcessPacket(clusterLink *link) {
/* Update our info about the node */
if (link->node && type == CLUSTERMSG_TYPE_PONG) {
link->node->pong_received = mstime();
link->node->pong_received = now;
link->node->ping_sent = 0;
/* The PFAIL condition can be reversed without external
@ -2093,7 +2104,7 @@ int clusterProcessPacket(clusterLink *link) {
"FAIL message received from %.40s about %.40s",
hdr->sender, hdr->data.fail.about.nodename);
failing->flags |= CLUSTER_NODE_FAIL;
failing->fail_time = mstime();
failing->fail_time = now;
failing->flags &= ~CLUSTER_NODE_PFAIL;
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|
CLUSTER_TODO_UPDATE_STATE);
@ -2146,9 +2157,9 @@ int clusterProcessPacket(clusterLink *link) {
/* Manual failover requested from slaves. Initialize the state
* accordingly. */
resetManualFailover();
g_pserver->cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT;
g_pserver->cluster->mf_end = now + CLUSTER_MF_TIMEOUT;
g_pserver->cluster->mf_slave = sender;
pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT));
serverLog(LL_WARNING,"Manual failover requested by replica %.40s.",
sender->name);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
@ -3577,7 +3588,6 @@ void clusterCron(void) {
while((de = dictNext(di)) != NULL) {
clusterNode *node = (clusterNode*)dictGetVal(de);
now = mstime(); /* Use an updated time at every iteration. */
mstime_t delay;
if (node->flags &
(CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE))
@ -3601,16 +3611,20 @@ void clusterCron(void) {
this_slaves = okslaves;
}
/* If we are waiting for the PONG more than half the cluster
/* If we are not receiving any data for more than half the cluster
* timeout, reconnect the link: maybe there is a connection
* issue even if the node is alive. */
mstime_t ping_delay = now - node->ping_sent;
mstime_t data_delay = now - node->data_received;
if (node->link && /* is connected */
now - node->link->ctime >
g_pserver->cluster_node_timeout && /* was not already reconnected */
node->ping_sent && /* we already sent a ping */
node->pong_received < node->ping_sent && /* still waiting pong */
/* and we are waiting for the pong more than timeout/2 */
now - node->ping_sent > g_pserver->cluster_node_timeout/2)
ping_delay > g_pserver->cluster_node_timeout/2 &&
/* and in such interval we are not seeing any traffic at all. */
data_delay > g_pserver->cluster_node_timeout/2)
{
/* Disconnect the link, it will be reconnected automatically. */
freeClusterLink(node->link);
@ -3642,12 +3656,18 @@ void clusterCron(void) {
/* Check only if we have an active ping for this instance. */
if (node->ping_sent == 0) continue;
/* Compute the delay of the PONG. Note that if we already received
* the PONG, then node->ping_sent is zero, so can't reach this
* code at all. */
delay = now - node->ping_sent;
/* Check if this node looks unreachable.
* Note that if we already received the PONG, then node->ping_sent
* is zero, so can't reach this code at all, so we don't risk of
* checking for a PONG delay if we didn't sent the PING.
*
* We also consider every incoming data as proof of liveness, since
* our cluster bus link is also used for data: under heavy data
* load pong delays are possible. */
mstime_t node_delay = (ping_delay < data_delay) ? ping_delay :
data_delay;
if (delay > g_pserver->cluster_node_timeout) {
if (node_delay > g_pserver->cluster_node_timeout) {
/* Timeout reached. Set the node as possibly failing if it is
* not already in this state. */
if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) {

View File

@ -128,6 +128,7 @@ typedef struct clusterNode {
tables. */
mstime_t ping_sent; /* Unix time we sent latest ping */
mstime_t pong_received; /* Unix time we received the pong */
mstime_t data_received; /* Unix time we received any data */
mstime_t fail_time; /* Unix time when FAIL flag was set */
mstime_t voted_time; /* Last time we voted for a slave of this master */
mstime_t repl_offset_time; /* Unix time we received offset for this node */

View File

@ -257,7 +257,7 @@ int pthread_setname_np(const char *name);
#endif
/* Check if we can use setcpuaffinity(). */
#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__)
#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__)
#define USE_SETCPUAFFINITY
#ifdef __cplusplus
extern "C"

View File

@ -225,6 +225,6 @@ const char *connGetInfo(connection *conn, char *buf, size_t buf_len);
/* Helpers for tls special considerations */
int tlsHasPendingData();
void tlsProcessPendingData();
int tlsProcessPendingData();
#endif /* __REDIS_CONNECTION_H */

View File

@ -34,6 +34,10 @@
/* This represents a very simple generic canvas in order to draw stuff.
* It's up to each LOLWUT versions to translate what they draw to the
* screen, depending on the result to accomplish. */
#ifndef __LOLWUT_H
#define __LOLWUT_H
typedef struct lwCanvas {
int width;
int height;
@ -47,3 +51,5 @@ void lwDrawPixel(lwCanvas *canvas, int x, int y, int color);
int lwGetPixel(lwCanvas *canvas, int x, int y);
void lwDrawLine(lwCanvas *canvas, int x1, int y1, int x2, int y2, int color);
void lwDrawSquare(lwCanvas *canvas, int x, int y, float size, float angle, int color);
#endif

View File

@ -1552,8 +1552,7 @@ bool freeClient(client *c) {
* some unexpected state, by checking its flags. */
if (FActiveMaster(c)) {
serverLog(LL_WARNING,"Connection with master lost.");
if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
CLIENT_CLOSE_ASAP|
if (!(c->flags & (CLIENT_PROTOCOL_ERROR|
CLIENT_BLOCKED)))
{
replicationCacheMaster(MasterInfoFromClient(c), c);
@ -1671,7 +1670,7 @@ void freeClientAsync(client *c) {
listAddNodeTail(g_pserver->clients_to_close,c);
}
void freeClientsInAsyncFreeQueue(int iel) {
int freeClientsInAsyncFreeQueue(int iel) {
serverAssert(GlobalLocksAcquired());
listIter li;
listNode *ln;
@ -1696,6 +1695,7 @@ void freeClientsInAsyncFreeQueue(int iel) {
c->flags &= ~CLIENT_CLOSE_ASAP;
freeClient(c);
}
return (int)vecclientsFree.size();
}
/* Return a client by ID, or NULL if the client ID is not in the set
@ -2108,7 +2108,8 @@ int processInlineBuffer(client *c) {
}
/* Helper function. Record protocol erro details in server log,
* and set the client as CLIENT_CLOSE_AFTER_REPLY. */
* and set the client as CLIENT_CLOSE_AFTER_REPLY and
* CLIENT_PROTOCOL_ERROR. */
#define PROTO_DUMP_LEN 128
static void setProtocolError(const char *errstr, client *c) {
if (cserver.verbosity <= LL_VERBOSE) {
@ -2134,7 +2135,7 @@ static void setProtocolError(const char *errstr, client *c) {
"Protocol error (%s) from client: %s. %s", errstr, client, buf);
sdsfree(client);
}
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR);
}
/* Process the query buffer for client 'c', setting up the client argument
@ -3436,10 +3437,9 @@ void unpauseClientsIfNecessary()
* write, close sequence needed to serve a client.
*
* The function returns the total number of events processed. */
int processEventsWhileBlocked(int iel) {
void processEventsWhileBlocked(int iel) {
serverAssert(GlobalLocksAcquired());
int iterations = 4; /* See the function top-comment. */
int count = 0;
std::vector<client*> vecclients;
listIter li;
@ -3465,11 +3465,15 @@ int processEventsWhileBlocked(int iel) {
try
{
while (iterations--) {
int events = 0;
events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT);
events += handleClientsWithPendingWrites(iel, aof_state);
long long startval = g_pserver->events_processed_while_blocked;
long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el,
AE_FILE_EVENTS|AE_DONT_WAIT|
AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP);
/* Note that server.events_processed_while_blocked will also get
* incremeted by callbacks called by the event loop handlers. */
g_pserver->events_processed_while_blocked += ae_events;
long long events = g_pserver->events_processed_while_blocked - startval;
if (!events) break;
count += events;
}
}
catch (...)
@ -3488,6 +3492,5 @@ int processEventsWhileBlocked(int iel) {
locker.release();
for (client *c : vecclients)
c->lock.lock();
return count;
}

View File

@ -1,6 +1,8 @@
/* Rax -- A radix tree implementation.
*
* Copyright (c) 2017-2018, Salvatore Sanfilippo <antirez at gmail dot com>
* Version 1.2 -- 7 February 2019
*
* Copyright (c) 2017-2019, Salvatore Sanfilippo <antirez at gmail dot com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
@ -1737,7 +1739,7 @@ int raxRandomWalk(raxIterator *it, size_t steps) {
}
if (steps == 0) {
size_t fle = floor(log(it->rt->numele));
size_t fle = 1+floor(log(it->rt->numele));
fle *= 2;
steps = 1 + rand() % fle;
}

View File

@ -2409,8 +2409,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
decrRefCount(val);
val = nullptr;
} else {
/* Add the new object in the hash table */
int fInserted = dbMerge(db, &keyobj, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef
@ -2431,6 +2429,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
val = nullptr;
}
}
if (g_pserver->key_load_delay)
usleep(g_pserver->key_load_delay);

View File

@ -284,7 +284,7 @@ static redisConfig *getRedisConfig(const char *ip, int port,
for (; i < 2; i++) {
int res = redisGetReply(c, &r);
if (reply) freeReplyObject(reply);
reply = ((redisReply *) r);
reply = res == REDIS_OK ? ((redisReply *) r) : NULL;
if (res != REDIS_OK || !r) goto fail;
if (reply->type == REDIS_REPLY_ERROR) {
fprintf(stderr, "ERROR: %s\n", reply->str);

View File

@ -3244,12 +3244,16 @@ void replicationCacheMaster(redisMaster *mi, client *c) {
/* Unlink the client from the server structures. */
unlinkClient(c);
/* Clear flags that can create issues once we reconnect the client. */
c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY);
/* Reset the master client so that's ready to accept new commands:
* we want to discard te non processed query buffers and non processed
* offsets, including pending transactions, already populated arguments,
* pending outputs to the master. */
sdsclear(mi->master->querybuf);
sdsclear(mi->master->pending_querybuf);
/* Adjust reploff and read_reploff to the last meaningful offset we executed.
* this is the offset the replica will use for future PSYNC. */
mi->master->reploff = adjustMeaningfulReplOffset();

View File

@ -2112,6 +2112,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
migrateCloseTimedoutSockets();
}
/* Resize tracking keys table if needed. This is also done at every
* command execution, but we want to be sure that if the last command
* executed changes the value via CONFIG SET, the server will perform
* the operation even if completely idle. */
if (g_pserver->tracking_clients) trackingLimitUsedSlots();
/* Start a scheduled BGSAVE if the corresponding flag is set. This is
* useful when we are forced to postpone a BGSAVE because an AOF
* rewrite is in progress.
@ -2168,9 +2174,22 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData
return 1000/g_pserver->hz;
}
extern int ProcessingEventsWhileBlocked;
/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
* for ready file descriptors. */
* for ready file descriptors.
*
* Note: This function is (currently) called from two functions:
* 1. aeMain - The main server loop
* 2. processEventsWhileBlocked - Process clients during RDB/AOF load
*
* If it was called from processEventsWhileBlocked we don't want
* to perform all actions (For example, we don't want to expire
* keys), but we do need to perform some actions.
*
* The most important is freeClientsInAsyncFreeQueue but we also
* call some other low-risk functions. */
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
@ -2448,6 +2467,7 @@ void initServerConfig(void) {
g_pserver->slaves = listCreate();
g_pserver->monitors = listCreate();
g_pserver->clients_timeout_table = raxNew();
g_pserver->events_processed_while_blocked = 0;
g_pserver->timezone = getTimeZone(); /* Initialized by tzset(). */
cserver.configfile = NULL;
cserver.executable = NULL;
@ -2937,6 +2957,8 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain)
pvar->tlsfd_count = 0;
pvar->cclients = 0;
pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR);
aeSetBeforeSleepProc(pvar->el, fMain ? beforeSleep : beforeSleepLite, fMain ? 0 : AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE);
pvar->current_client = nullptr;
pvar->clients_paused = 0;
pvar->fRetrySetAofEvent = false;
@ -5277,10 +5299,7 @@ void *workerThreadMain(void *parg)
serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global
moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run
int isMainThread = (iel == IDX_EVENT_LOOP_MAIN);
aeEventLoop *el = g_pserver->rgthreadvar[iel].el;
aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE);
aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE);
try
{
aeMain(el);

View File

@ -430,7 +430,8 @@ public:
#define CLIENT_TRACKING_NOLOOP (1ULL<<37) /* Don't send invalidation messages
about writes performed by myself.*/
#define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */
#define CLIENT_FORCE_REPLY (1ULL<<39) /* Should addReply be forced to write the text? */
#define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */
#define CLIENT_FORCE_REPLY (1ULL<<40) /* Should addReply be forced to write the text? */
/* Client block type (btype field in client structure)
* if CLIENT_BLOCKED flag is set. */
@ -1680,6 +1681,8 @@ struct redisServer {
dict *migrate_cached_sockets;/* MIGRATE cached sockets */
std::atomic<uint64_t> next_client_id; /* Next client unique ID. Incremental. */
int protected_mode; /* Don't accept external connections. */
long long events_processed_while_blocked; /* processEventsWhileBlocked() */
/* RDB / AOF loading information */
std::atomic<int> loading; /* We are loading data from disk if true */
off_t loading_total_bytes;
@ -2213,7 +2216,7 @@ void rewriteClientCommandVector(client *c, int argc, ...);
void rewriteClientCommandArgument(client *c, int i, robj *newval);
void replaceClientCommandVector(client *c, int argc, robj **argv);
unsigned long getClientOutputBufferMemoryUsage(client *c);
void freeClientsInAsyncFreeQueue(int iel);
int freeClientsInAsyncFreeQueue(int iel);
void asyncCloseClientOnOutputBufferLimitReached(client *c);
int getClientType(client *c);
int getClientTypeByName(const char *name);
@ -2225,7 +2228,7 @@ int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListe
void pauseClients(mstime_t duration);
int clientsArePaused(void);
void unpauseClientsIfNecessary();
int processEventsWhileBlocked(int iel);
void processEventsWhileBlocked(int iel);
int handleClientsWithPendingWrites(int iel, int aof_state);
int clientHasPendingReplies(client *c);
void unlinkClient(client *c);

View File

@ -36,6 +36,10 @@
#include <sys/param.h>
#include <sys/cpuset.h>
#endif
#ifdef __NetBSD__
#include <pthread.h>
#include <sched.h>
#endif
#include "config.h"
#ifdef USE_SETCPUAFFINITY
@ -71,11 +75,18 @@ void setcpuaffinity(const char *cpulist) {
#ifdef __FreeBSD__
cpuset_t cpuset;
#endif
#ifdef __NetBSD__
cpuset_t *cpuset;
#endif
if (!cpulist)
return;
#ifndef __NetBSD__
CPU_ZERO(&cpuset);
#else
cpuset = cpuset_create();
#endif
q = cpulist;
while (p = q, q = next_token(q, ','), p) {
@ -110,7 +121,11 @@ void setcpuaffinity(const char *cpulist) {
return;
while (a <= b) {
#ifndef __NetBSD__
CPU_SET(a, &cpuset);
#else
cpuset_set(a, cpuset);
#endif
a += s;
}
}
@ -124,6 +139,10 @@ void setcpuaffinity(const char *cpulist) {
#ifdef __FreeBSD__
cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1, sizeof(cpuset), &cpuset);
#endif
#ifdef __NetBSD__
pthread_setaffinity_np(pthread_self(), cpuset_size(cpuset), cpuset);
cpuset_destroy(cpuset);
#endif
}
#endif /* USE_SETCPUAFFINITY */

View File

@ -93,11 +93,56 @@ static int parseProtocolsConfig(const char *str) {
* served to the reader yet. */
static list *pending_list = NULL;
/**
* OpenSSL global initialization and locking handling callbacks.
* Note that this is only required for OpenSSL < 1.1.0.
*/
#if OPENSSL_VERSION_NUMBER < 0x10100000L
#define USE_CRYPTO_LOCKS
#endif
#ifdef USE_CRYPTO_LOCKS
static pthread_mutex_t *openssl_locks;
static void sslLockingCallback(int mode, int lock_id, const char *f, int line) {
pthread_mutex_t *mt = openssl_locks + lock_id;
if (mode & CRYPTO_LOCK) {
pthread_mutex_lock(mt);
} else {
pthread_mutex_unlock(mt);
}
(void)f;
(void)line;
}
static void initCryptoLocks(void) {
unsigned i, nlocks;
if (CRYPTO_get_locking_callback() != NULL) {
/* Someone already set the callback before us. Don't destroy it! */
return;
}
nlocks = CRYPTO_num_locks();
openssl_locks = zmalloc(sizeof(*openssl_locks) * nlocks);
for (i = 0; i < nlocks; i++) {
pthread_mutex_init(openssl_locks + i, NULL);
}
CRYPTO_set_locking_callback(sslLockingCallback);
}
#endif /* USE_CRYPTO_LOCKS */
void tlsInit(void) {
ERR_load_crypto_strings();
SSL_load_error_strings();
SSL_library_init();
#ifdef USE_CRYPTO_LOCKS
initCryptoLocks();
#endif
if (!RAND_poll()) {
serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator.");
}
@ -768,15 +813,17 @@ int tlsHasPendingData() {
return listLength(pending_list) > 0;
}
void tlsProcessPendingData() {
int tlsProcessPendingData() {
listIter li;
listNode *ln;
int processed = listLength(pending_list);
listRewind(pending_list,&li);
while((ln = listNext(&li))) {
tls_connection *conn = listNodeValue(ln);
tlsHandleEvent(conn, AE_READABLE);
}
return processed;
}
#else /* USE_OPENSSL */
@ -804,7 +851,8 @@ int tlsHasPendingData() {
return 0;
}
void tlsProcessPendingData() {
int tlsProcessPendingData() {
return 0;
}
#endif

View File

@ -279,15 +279,22 @@ void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) {
*
* Note that 'c' may be NULL in case the operation was performed outside the
* context of a client modifying the database (for instance when we delete a
* key because of expire). */
void trackingInvalidateKey(client *c, robj *keyobj) {
* key because of expire).
*
* The last argument 'bcast' tells the function if it should also schedule
* the key for broadcasting to clients in BCAST mode. This is the case when
* the function is called from the Redis core once a key is modified, however
* we also call the function in order to evict keys in the key table in case
* of memory pressure: in that case the key didn't really change, so we want
* just to notify the clients that are in the table for this key, that would
* otherwise miss the fact we are no longer tracking the key for them. */
void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) {
if (TrackingTable == NULL) return;
sds sdskey = szFromObj(keyobj);
if (raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(c,sdskey,sdslen(sdskey));
if (bcast && raxSize(PrefixTable) > 0)
trackingRememberKeyToBroadcast(c,key,keylen);
rax *ids = (rax*)raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey));
rax *ids = (rax*)raxFind(TrackingTable,(unsigned char*)key,keylen);
if (ids == raxNotFound) return;
raxIterator ri;
@ -317,7 +324,7 @@ void trackingInvalidateKey(client *c, robj *keyobj) {
continue;
}
sendTrackingMessage(target,sdskey,sdslen(sdskey),0);
sendTrackingMessage(target,key,keylen,0);
}
raxStop(&ri);
@ -325,7 +332,13 @@ void trackingInvalidateKey(client *c, robj *keyobj) {
* again if more keys will be modified in this caching slot. */
TrackingTableTotalItems -= raxSize(ids);
raxFree(ids);
raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL);
raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL);
}
/* Wrapper (the one actually called across the core) to pass the key
* as object. */
void trackingInvalidateKey(client *c, robj *keyobj) {
trackingInvalidateKeyRaw(c,szFromObj(keyobj),sdslen(szFromObj(keyobj)),1);
}
/* This function is called when one or all the Redis databases are flushed
@ -392,10 +405,8 @@ void trackingLimitUsedSlots(void) {
effort--;
raxSeek(&ri,"^",NULL,0);
raxRandomWalk(&ri,0);
rax *ids = (rax*)ri.data;
TrackingTableTotalItems -= raxSize(ids);
raxFree(ids);
raxRemove(TrackingTable,ri.key,ri.key_len,NULL);
if (raxEOF(&ri)) break;
trackingInvalidateKeyRaw(NULL,(char*)ri.key,ri.key_len,0);
if (raxSize(TrackingTable) <= max_keys) {
timeout_counter = 0;
raxStop(&ri);

View File

@ -51,7 +51,7 @@ int stringmatchlen(const char *pattern, int patternLen,
while(patternLen && stringLen) {
switch(pattern[0]) {
case '*':
while (pattern[1] == '*') {
while (patternLen && pattern[1] == '*') {
pattern++;
patternLen--;
}
@ -67,8 +67,6 @@ int stringmatchlen(const char *pattern, int patternLen,
return 0; /* no match */
break;
case '?':
if (stringLen == 0)
return 0; /* no match */
string++;
stringLen--;
break;
@ -96,7 +94,7 @@ int stringmatchlen(const char *pattern, int patternLen,
pattern--;
patternLen++;
break;
} else if (pattern[1] == '-' && patternLen >= 3) {
} else if (patternLen >= 3 && pattern[1] == '-') {
int start = pattern[0];
int end = pattern[2];
int c = string[0];

View File

@ -128,4 +128,56 @@ start_server {} {
# make sure the server is still writable
r set x xx
}
}
test {client freed during loading} {
start_server [list overrides [list key-load-delay 10 rdbcompression no]] {
# create a big rdb that will take long to load. it is important
# for keys to be big since the server processes events only once in 2mb.
# 100mb of rdb, 100k keys will load in more than 1 second
r debug populate 100000 key 1000
catch {
r debug restart
}
set stdout [srv 0 stdout]
while 1 {
# check that the new server actually started and is ready for connections
if {[exec grep -i "Server initialized" | wc -l < $stdout] > 1} {
break
}
after 10
}
# make sure it's still loading
assert_equal [s loading] 1
# connect and disconnect 10 clients
set clients {}
for {set j 0} {$j < 10} {incr j} {
lappend clients [redis_deferring_client]
}
foreach rd $clients {
$rd debug log bla
}
foreach rd $clients {
$rd read
}
foreach rd $clients {
$rd close
}
# make sure the server freed the clients
wait_for_condition 100 100 {
[s connected_clients] < 3
} else {
fail "clients didn't disconnect"
}
# make sure it's still loading
assert_equal [s loading] 1
# no need to keep waiting for loading to complete
exec kill [srv 0 pid]
}
}

View File

@ -513,9 +513,9 @@ start_server {tags {"repl"}} {
set master_port [srv 0 port]
set master_pid [srv 0 pid]
# put enough data in the db that the rdb file will be bigger than the socket buffers
# and since we'll have key-load-delay of 100, 10000 keys will take at least 1 second
# and since we'll have key-load-delay of 100, 20000 keys will take at least 2 seconds
# we also need the replica to process requests during transfer (which it does only once in 2mb)
$master debug populate 10000 test 10000
$master debug populate 20000 test 10000
$master config set rdbcompression no
# If running on Linux, we also measure utime/stime to detect possible I/O handling issues
set os [catch {exec unamee}]

View File

@ -25,26 +25,6 @@ start_server {tags {"tls"}} {
}
test {TLS: Verify tls-protocols behaves as expected} {
r CONFIG SET tls-protocols TLSv1
set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1 0}]
catch {$s PING} e
assert_match {*I/O error*} $e
set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1 1}]
catch {$s PING} e
assert_match {PONG} $e
r CONFIG SET tls-protocols TLSv1.1
set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1.1 0}]
catch {$s PING} e
assert_match {*I/O error*} $e
set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1.1 1}]
catch {$s PING} e
assert_match {PONG} $e
r CONFIG SET tls-protocols TLSv1.2
set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1.2 0}]

View File

@ -107,5 +107,27 @@ start_server {tags {"tracking"}} {
assert {$keys eq {mykey}}
}
test {Tracking gets notification on tracking table key eviction} {
r CLIENT TRACKING off
r CLIENT TRACKING on REDIRECT $redir NOLOOP
r MSET key1 1 key2 2
# Let the server track the two keys for us
r MGET key1 key2
# Force the eviction of all the keys but one:
r config set tracking-table-max-keys 1
# Note that we may have other keys in the table for this client,
# since we disabled/enabled tracking multiple time with the same
# ID, and tracking does not do ID cleanups for performance reasons.
# So we check that eventually we'll receive one or the other key,
# otherwise the test will die for timeout.
while 1 {
set keys [lindex [$rd1 read] 2]
if {$keys eq {key1} || $keys eq {key2}} break
}
# We should receive an expire notification for one of
# the two keys (only one must remain)
assert {$keys eq {key1} || $keys eq {key2}}
}
$rd1 close
}