diff --git a/00-RELEASENOTES b/00-RELEASENOTES index 1d5486ae8..1fa620bd8 100644 --- a/00-RELEASENOTES +++ b/00-RELEASENOTES @@ -11,6 +11,185 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP. SECURITY: There are security fixes in the release. -------------------------------------------------------------------------------- +================================================================================ +Redis 6.0.2 Released Fri May 15 22:24:36 CEST 2020 +================================================================================ + +Upgrade urgency MODERATE: many not critical bugfixes in different areas. + Critical fix to client side caching when + keys are evicted from the tracking table but + no notifications are sent. + +The following are the most serious fix: + +* XPENDING should not update consumer's seen-time +* optimize memory usage of deferred replies - fixed +* Fix CRC64 initialization outside the Redis server itself. +* stringmatchlen() should not expect null terminated strings. +* Cluster nodes availability checks improved when there is + high Pub/Sub load on the cluster bus. +* Redis Benchmark: Fix coredump because of double free +* Tracking: send eviction messages when evicting entries. +* rax.c updated from upstream antirez/rax. +* fix redis 6.0 not freeing closed connections during loading. + +New features: + +* Support setcpuaffinity on linux/bsd +* Client Side Caching: Add Tracking Prefix Number Stats in Server Info +* Add --user argument to redis-benchmark.c (ACL) + +Full list of commits: + +Yossi Gottlieb in commit 16ba33c05: + TLS: Fix test failures on recent Debian/Ubuntu. + 1 file changed, 20 deletions(-) + +Yossi Gottlieb in commit 77ae66930: + TLS: Add crypto locks for older OpenSSL support. + 1 file changed, 45 insertions(+) + +David Carlier in commit 389697988: + NetBSD build update. + 3 files changed, 30 insertions(+), 1 deletion(-) + +Madelyn Olson in commit 2435341d7: + Added a refcount on timer events to prevent deletion of recursive timer calls + 2 files changed, 12 insertions(+) + +antirez in commit 80c906bd3: + Cache master without checking of deferred close flags. + 3 files changed, 11 insertions(+), 8 deletions(-) + +antirez in commit 74249be4a: + Track events processed while blocked globally. + 5 files changed, 32 insertions(+), 17 deletions(-) + +antirez in commit 8bf660af9: + Some rework of #7234. + 4 files changed, 77 insertions(+), 65 deletions(-) + +Oran Agra in commit 9da134cd8: + fix redis 6.0 not freeing closed connections during loading. + 3 files changed, 133 insertions(+), 58 deletions(-) + +antirez in commit f7f219a13: + Regression test for #7249. + 1 file changed, 22 insertions(+) + +antirez in commit 693629585: + rax.c updated from upstream antirez/rax. + 1 file changed, 4 insertions(+), 2 deletions(-) + +antirez in commit e3b5648df: + Tracking: send eviction messages when evicting entries. + 2 files changed, 29 insertions(+), 12 deletions(-) + +Oran Agra in commit 5c41802d5: + fix unstable replication test + 1 file changed, 2 insertions(+), 2 deletions(-) + +ShooterIT in commit a23cdbb94: + Redis Benchmark: Fix coredump because of double free + 1 file changed, 1 insertion(+), 1 deletion(-) + +antirez in commit 1276058ea: + Cluster: clarify we always resolve the sender. + 1 file changed, 3 insertions(+), 1 deletion(-) + +antirez in commit 002fcde3d: + Cluster: refactor ping/data delay handling. + 1 file changed, 13 insertions(+), 11 deletions(-) + +antirez in commit 960186a71: + Cluster: introduce data_received field. + 2 files changed, 27 insertions(+), 10 deletions(-) + +antirez in commit 3672875b4: + stringmatchlen() should not expect null terminated strings. + 1 file changed, 2 insertions(+), 2 deletions(-) + +Brad Dunbar in commit 24e12641d: + Remove unreachable branch. + 1 file changed, 2 deletions(-) + +hwware in commit c7edffbd5: + add jemalloc-bg-thread config in redis conf + 1 file changed, 3 insertions(+) + +hwware in commit 8a9c84f4a: + add include guard for lolwut.h + 1 file changed, 6 insertions(+) + +antirez in commit cb683a84f: + Don't propagate spurious MULTI on DEBUG LOADAOF. + 2 files changed, 6 insertions(+), 3 deletions(-) + +antirez in commit 84d9766d6: + Dump recent backlog on master query generating errors. + 1 file changed, 29 insertions(+) + +Titouan Christophe in commit ec1e106ec: + make struct user anonymous (only typedefed) + 1 file changed, 1 insertion(+), 1 deletion(-) + +antirez in commit e48c37316: + Test: --dont-clean should do first cleanup. + 1 file changed, 2 insertions(+), 5 deletions(-) + +Benjamin Sergeant in commit 1e561cfaa: + Add --user argument to redis-benchmark.c (ACL) + 1 file changed, 15 insertions(+), 2 deletions(-) + +antirez in commit d1af82a88: + Drop not needed part from #7194. + 1 file changed, 1 insertion(+), 1 deletion(-) + +Muhammad Zahalqa in commit 897a360d0: + Fix compiler warnings on function rev(unsigned long) + 1 file changed, 3 insertions(+), 3 deletions(-) + +antirez in commit ac316d8cc: + Move CRC64 initialization in main(). + 2 files changed, 1 insertion(+), 4 deletions(-) + +antirez in commit fc7bc3204: + Fix CRC64 initialization outside the Redis server itself. + 1 file changed, 3 insertions(+) + +hwware in commit a6e55c096: + Client Side Caching: Add Tracking Prefix Number Stats in Server Info + 3 files changed, 8 insertions(+) + +antirez in commit b062fd523: + Fix NetBSD build by fixing redis_set_thread_title() support. + 1 file changed, 4 insertions(+), 1 deletion(-) + +antirez in commit 4efb25d9c: + Rework a bit the documentation for CPU pinning. + 2 files changed, 18 insertions(+), 8 deletions(-) + +zhenwei pi in commit d6436eb7c: + Support setcpuaffinity on linux/bsd + 12 files changed, 180 insertions(+), 1 deletion(-) + +Guy Benoish in commit 3a441c7d9: + XPENDING should not update consumer's seen-time + 4 files changed, 33 insertions(+), 20 deletions(-) + +Oran Agra in commit 75addb4fe: + optimize memory usage of deferred replies - fixed + 1 file changed, 29 insertions(+) + +Deliang Yang in commit c57d9146f: + reformat code + 1 file changed, 1 insertion(+), 1 deletion(-) + +Oran Agra in commit 3d3861dd8: + add daily github actions with libc malloc and valgrind + 5 files changed, 106 insertions(+), 18 deletions(-) + + ================================================================================ Redis 6.0.1 Released Sat May 02 00:06:07 CEST 2020 ================================================================================ diff --git a/keydb.conf b/keydb.conf index 4ee6da736..30e12a5e0 100644 --- a/keydb.conf +++ b/keydb.conf @@ -1782,6 +1782,9 @@ rdb-save-incremental-fsync yes # the main dictionary scan # active-defrag-max-scan-fields 1000 +# Jemalloc background thread for purging will be enabled by default +jemalloc-bg-thread yes + # It is possible to pin different threads and processes of Redis to specific # CPUs in your system, in order to maximize the performances of the server. # This is useful both in order to pin different Redis threads in different diff --git a/src/Makefile b/src/Makefile index f8d018b13..da848d474 100644 --- a/src/Makefile +++ b/src/Makefile @@ -159,6 +159,14 @@ else ifeq ($(uname_S),DragonFly) # FreeBSD FINAL_LIBS+= -lpthread -lexecinfo +else +ifeq ($(uname_S),OpenBSD) + # OpenBSD + FINAL_LIBS+= -lpthread -lexecinfo +else +ifeq ($(uname_S),NetBSD) + # NetBSD + FINAL_LIBS+= -lpthread -lexecinfo else # All the other OSes (notably Linux) FINAL_LDFLAGS+= -rdynamic @@ -171,6 +179,8 @@ endif endif endif endif +endif +endif # Include paths to dependencies FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src FINAL_CXXFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src diff --git a/src/ae.cpp b/src/ae.cpp index c4d32b3e0..5eba26b8c 100644 --- a/src/ae.cpp +++ b/src/ae.cpp @@ -526,6 +526,7 @@ extern "C" long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long millise te->clientData = clientData; te->prev = NULL; te->next = eventLoop->timeEventHead; + te->refcount = 0; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; @@ -607,6 +608,13 @@ static int processTimeEvents(aeEventLoop *eventLoop) { /* Remove events scheduled for deletion. */ if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; + /* If a reference exists for this timer event, + * don't free it. This is currently incremented + * for recursive timerProc calls */ + if (te->refcount) { + te = next; + continue; + } if (te->prev) te->prev->next = te->next; else @@ -636,7 +644,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) { int retval; id = te->id; + te->refcount++; retval = te->timeProc(eventLoop, id, te->clientData); + te->refcount--; processed++; if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); @@ -718,6 +728,7 @@ extern "C" void ProcessEventCore(aeEventLoop *eventLoop, aeFileEvent *fe, int ma * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. + * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) @@ -777,6 +788,13 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) tvp = &tv; } + if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) { + std::unique_lock ulock(g_lock, std::defer_lock); + if (!(eventLoop->beforesleepFlags & AE_SLEEP_THREADSAFE)) + ulock.lock(); + eventLoop->beforesleep(eventLoop); + } + /* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); diff --git a/src/ae.h b/src/ae.h index e2dbcb754..156c219ef 100644 --- a/src/ae.h +++ b/src/ae.h @@ -58,11 +58,12 @@ extern "C" { #define AE_WRITE_THREADSAFE 16 #define AE_SLEEP_THREADSAFE 32 -#define AE_FILE_EVENTS 1 -#define AE_TIME_EVENTS 2 +#define AE_FILE_EVENTS (1<<0) +#define AE_TIME_EVENTS (1<<1) #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) -#define AE_DONT_WAIT 4 -#define AE_CALL_AFTER_SLEEP 8 +#define AE_DONT_WAIT (1<<2) +#define AE_CALL_BEFORE_SLEEP (1<<3) +#define AE_CALL_AFTER_SLEEP (1<<4) #define AE_NOMORE -1 #define AE_DELETED_EVENT_ID -1 @@ -97,6 +98,8 @@ typedef struct aeTimeEvent { void *clientData; struct aeTimeEvent *prev; struct aeTimeEvent *next; + int refcount; /* refcount to prevent timer events from being + * freed in recursive time event calls. */ } aeTimeEvent; /* A fired event */ diff --git a/src/cluster.cpp b/src/cluster.cpp index 544658347..6d6f2be2e 100644 --- a/src/cluster.cpp +++ b/src/cluster.cpp @@ -787,6 +787,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->slaves = NULL; node->slaveof = NULL; node->ping_sent = node->pong_received = 0; + node->data_received = 0; node->fail_time = 0; node->link = NULL; memset(node->ip,0,sizeof(node->ip)); @@ -1720,6 +1721,7 @@ int clusterProcessPacket(clusterLink *link) { clusterMsg *hdr = (clusterMsg*) link->rcvbuf; uint32_t totlen = ntohl(hdr->totlen); uint16_t type = ntohs(hdr->type); + mstime_t now = mstime(); if (type < CLUSTERMSG_TYPE_COUNT) g_pserver->cluster->stats_bus_messages_received[type]++; @@ -1781,8 +1783,17 @@ int clusterProcessPacket(clusterLink *link) { if (totlen != explen) return 1; } - /* Check if the sender is a known node. */ + /* Check if the sender is a known node. Note that for incoming connections + * we don't store link->node information, but resolve the node by the + * ID in the header each time in the current implementation. */ sender = clusterLookupNode(hdr->sender); + + /* Update the last time we saw any data from this node. We + * use this in order to avoid detecting a timeout from a node that + * is just sending a lot of data in the cluster bus, for instance + * because of Pub/Sub. */ + if (sender) sender->data_received = now; + if (sender && !nodeInHandshake(sender)) { /* Update our curretEpoch if we see a newer epoch in the cluster. */ senderCurrentEpoch = ntohu64(hdr->currentEpoch); @@ -1797,7 +1808,7 @@ int clusterProcessPacket(clusterLink *link) { } /* Update the replication offset info for this node. */ sender->repl_offset = ntohu64(hdr->offset); - sender->repl_offset_time = mstime(); + sender->repl_offset_time = now; /* If we are a slave performing a manual failover and our master * sent its offset while already paused, populate the MF state. */ if (g_pserver->cluster->mf_end && @@ -1911,7 +1922,7 @@ int clusterProcessPacket(clusterLink *link) { * address. */ serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d", link->node->name, - (int)(mstime()-(link->node->ctime)), + (int)(now-(link->node->ctime)), link->node->flags); link->node->flags |= CLUSTER_NODE_NOADDR; link->node->ip[0] = '\0'; @@ -1946,7 +1957,7 @@ int clusterProcessPacket(clusterLink *link) { /* Update our info about the node */ if (link->node && type == CLUSTERMSG_TYPE_PONG) { - link->node->pong_received = mstime(); + link->node->pong_received = now; link->node->ping_sent = 0; /* The PFAIL condition can be reversed without external @@ -2093,7 +2104,7 @@ int clusterProcessPacket(clusterLink *link) { "FAIL message received from %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); failing->flags |= CLUSTER_NODE_FAIL; - failing->fail_time = mstime(); + failing->fail_time = now; failing->flags &= ~CLUSTER_NODE_PFAIL; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); @@ -2146,9 +2157,9 @@ int clusterProcessPacket(clusterLink *link) { /* Manual failover requested from slaves. Initialize the state * accordingly. */ resetManualFailover(); - g_pserver->cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; + g_pserver->cluster->mf_end = now + CLUSTER_MF_TIMEOUT; g_pserver->cluster->mf_slave = sender; - pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT)); + pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT)); serverLog(LL_WARNING,"Manual failover requested by replica %.40s.", sender->name); } else if (type == CLUSTERMSG_TYPE_UPDATE) { @@ -3577,7 +3588,6 @@ void clusterCron(void) { while((de = dictNext(di)) != NULL) { clusterNode *node = (clusterNode*)dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ - mstime_t delay; if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) @@ -3601,16 +3611,20 @@ void clusterCron(void) { this_slaves = okslaves; } - /* If we are waiting for the PONG more than half the cluster + /* If we are not receiving any data for more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ + mstime_t ping_delay = now - node->ping_sent; + mstime_t data_delay = now - node->data_received; if (node->link && /* is connected */ now - node->link->ctime > g_pserver->cluster_node_timeout && /* was not already reconnected */ node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ - now - node->ping_sent > g_pserver->cluster_node_timeout/2) + ping_delay > g_pserver->cluster_node_timeout/2 && + /* and in such interval we are not seeing any traffic at all. */ + data_delay > g_pserver->cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ freeClusterLink(node->link); @@ -3642,12 +3656,18 @@ void clusterCron(void) { /* Check only if we have an active ping for this instance. */ if (node->ping_sent == 0) continue; - /* Compute the delay of the PONG. Note that if we already received - * the PONG, then node->ping_sent is zero, so can't reach this - * code at all. */ - delay = now - node->ping_sent; + /* Check if this node looks unreachable. + * Note that if we already received the PONG, then node->ping_sent + * is zero, so can't reach this code at all, so we don't risk of + * checking for a PONG delay if we didn't sent the PING. + * + * We also consider every incoming data as proof of liveness, since + * our cluster bus link is also used for data: under heavy data + * load pong delays are possible. */ + mstime_t node_delay = (ping_delay < data_delay) ? ping_delay : + data_delay; - if (delay > g_pserver->cluster_node_timeout) { + if (node_delay > g_pserver->cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) { diff --git a/src/cluster.h b/src/cluster.h index 59b3dc343..c6d714cf3 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -128,6 +128,7 @@ typedef struct clusterNode { tables. */ mstime_t ping_sent; /* Unix time we sent latest ping */ mstime_t pong_received; /* Unix time we received the pong */ + mstime_t data_received; /* Unix time we received any data */ mstime_t fail_time; /* Unix time when FAIL flag was set */ mstime_t voted_time; /* Last time we voted for a slave of this master */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ diff --git a/src/config.h b/src/config.h index 82f745ac4..022cb0033 100644 --- a/src/config.h +++ b/src/config.h @@ -257,7 +257,7 @@ int pthread_setname_np(const char *name); #endif /* Check if we can use setcpuaffinity(). */ -#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__) +#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__) #define USE_SETCPUAFFINITY #ifdef __cplusplus extern "C" diff --git a/src/connection.h b/src/connection.h index ff8010f5c..10f9e322b 100644 --- a/src/connection.h +++ b/src/connection.h @@ -225,6 +225,6 @@ const char *connGetInfo(connection *conn, char *buf, size_t buf_len); /* Helpers for tls special considerations */ int tlsHasPendingData(); -void tlsProcessPendingData(); +int tlsProcessPendingData(); #endif /* __REDIS_CONNECTION_H */ diff --git a/src/lolwut.h b/src/lolwut.h index 38c0de423..682d00531 100644 --- a/src/lolwut.h +++ b/src/lolwut.h @@ -34,6 +34,10 @@ /* This represents a very simple generic canvas in order to draw stuff. * It's up to each LOLWUT versions to translate what they draw to the * screen, depending on the result to accomplish. */ + +#ifndef __LOLWUT_H +#define __LOLWUT_H + typedef struct lwCanvas { int width; int height; @@ -47,3 +51,5 @@ void lwDrawPixel(lwCanvas *canvas, int x, int y, int color); int lwGetPixel(lwCanvas *canvas, int x, int y); void lwDrawLine(lwCanvas *canvas, int x1, int y1, int x2, int y2, int color); void lwDrawSquare(lwCanvas *canvas, int x, int y, float size, float angle, int color); + +#endif diff --git a/src/networking.cpp b/src/networking.cpp index fe3e99d15..54c0d35ee 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1552,8 +1552,7 @@ bool freeClient(client *c) { * some unexpected state, by checking its flags. */ if (FActiveMaster(c)) { serverLog(LL_WARNING,"Connection with master lost."); - if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY| - CLIENT_CLOSE_ASAP| + if (!(c->flags & (CLIENT_PROTOCOL_ERROR| CLIENT_BLOCKED))) { replicationCacheMaster(MasterInfoFromClient(c), c); @@ -1671,7 +1670,7 @@ void freeClientAsync(client *c) { listAddNodeTail(g_pserver->clients_to_close,c); } -void freeClientsInAsyncFreeQueue(int iel) { +int freeClientsInAsyncFreeQueue(int iel) { serverAssert(GlobalLocksAcquired()); listIter li; listNode *ln; @@ -1696,6 +1695,7 @@ void freeClientsInAsyncFreeQueue(int iel) { c->flags &= ~CLIENT_CLOSE_ASAP; freeClient(c); } + return (int)vecclientsFree.size(); } /* Return a client by ID, or NULL if the client ID is not in the set @@ -2108,7 +2108,8 @@ int processInlineBuffer(client *c) { } /* Helper function. Record protocol erro details in server log, - * and set the client as CLIENT_CLOSE_AFTER_REPLY. */ + * and set the client as CLIENT_CLOSE_AFTER_REPLY and + * CLIENT_PROTOCOL_ERROR. */ #define PROTO_DUMP_LEN 128 static void setProtocolError(const char *errstr, client *c) { if (cserver.verbosity <= LL_VERBOSE) { @@ -2134,7 +2135,7 @@ static void setProtocolError(const char *errstr, client *c) { "Protocol error (%s) from client: %s. %s", errstr, client, buf); sdsfree(client); } - c->flags |= CLIENT_CLOSE_AFTER_REPLY; + c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR); } /* Process the query buffer for client 'c', setting up the client argument @@ -3436,10 +3437,9 @@ void unpauseClientsIfNecessary() * write, close sequence needed to serve a client. * * The function returns the total number of events processed. */ -int processEventsWhileBlocked(int iel) { +void processEventsWhileBlocked(int iel) { serverAssert(GlobalLocksAcquired()); int iterations = 4; /* See the function top-comment. */ - int count = 0; std::vector vecclients; listIter li; @@ -3465,11 +3465,15 @@ int processEventsWhileBlocked(int iel) { try { while (iterations--) { - int events = 0; - events += aeProcessEvents(g_pserver->rgthreadvar[iel].el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(iel, aof_state); + long long startval = g_pserver->events_processed_while_blocked; + long long ae_events = aeProcessEvents(g_pserver->rgthreadvar[iel].el, + AE_FILE_EVENTS|AE_DONT_WAIT| + AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); + /* Note that server.events_processed_while_blocked will also get + * incremeted by callbacks called by the event loop handlers. */ + g_pserver->events_processed_while_blocked += ae_events; + long long events = g_pserver->events_processed_while_blocked - startval; if (!events) break; - count += events; } } catch (...) @@ -3488,6 +3492,5 @@ int processEventsWhileBlocked(int iel) { locker.release(); for (client *c : vecclients) c->lock.lock(); - return count; } diff --git a/src/rax.c b/src/rax.c index a560dde02..7dcf04582 100644 --- a/src/rax.c +++ b/src/rax.c @@ -1,6 +1,8 @@ /* Rax -- A radix tree implementation. * - * Copyright (c) 2017-2018, Salvatore Sanfilippo + * Version 1.2 -- 7 February 2019 + * + * Copyright (c) 2017-2019, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -1737,7 +1739,7 @@ int raxRandomWalk(raxIterator *it, size_t steps) { } if (steps == 0) { - size_t fle = floor(log(it->rt->numele)); + size_t fle = 1+floor(log(it->rt->numele)); fle *= 2; steps = 1 + rand() % fle; } diff --git a/src/rdb.cpp b/src/rdb.cpp index 9e69be93a..abaa3d608 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2409,8 +2409,6 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { decrRefCount(val); val = nullptr; } else { - - /* Add the new object in the hash table */ int fInserted = dbMerge(db, &keyobj, val, (rsi && rsi->fForceSetKey) || (rdbflags & RDBFLAGS_ALLOW_DUP)); // Note: dbMerge will incrRef @@ -2431,6 +2429,7 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { val = nullptr; } } + if (g_pserver->key_load_delay) usleep(g_pserver->key_load_delay); diff --git a/src/redis-benchmark.cpp b/src/redis-benchmark.cpp index 1a396c98d..37f6dca3c 100644 --- a/src/redis-benchmark.cpp +++ b/src/redis-benchmark.cpp @@ -284,7 +284,7 @@ static redisConfig *getRedisConfig(const char *ip, int port, for (; i < 2; i++) { int res = redisGetReply(c, &r); if (reply) freeReplyObject(reply); - reply = ((redisReply *) r); + reply = res == REDIS_OK ? ((redisReply *) r) : NULL; if (res != REDIS_OK || !r) goto fail; if (reply->type == REDIS_REPLY_ERROR) { fprintf(stderr, "ERROR: %s\n", reply->str); diff --git a/src/replication.cpp b/src/replication.cpp index 33d143023..792ac16d1 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3244,12 +3244,16 @@ void replicationCacheMaster(redisMaster *mi, client *c) { /* Unlink the client from the server structures. */ unlinkClient(c); + /* Clear flags that can create issues once we reconnect the client. */ + c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY); + /* Reset the master client so that's ready to accept new commands: * we want to discard te non processed query buffers and non processed * offsets, including pending transactions, already populated arguments, * pending outputs to the master. */ sdsclear(mi->master->querybuf); sdsclear(mi->master->pending_querybuf); + /* Adjust reploff and read_reploff to the last meaningful offset we executed. * this is the offset the replica will use for future PSYNC. */ mi->master->reploff = adjustMeaningfulReplOffset(); diff --git a/src/server.cpp b/src/server.cpp index 26fde6e69..d7dcdaebf 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2112,6 +2112,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { migrateCloseTimedoutSockets(); } + /* Resize tracking keys table if needed. This is also done at every + * command execution, but we want to be sure that if the last command + * executed changes the value via CONFIG SET, the server will perform + * the operation even if completely idle. */ + if (g_pserver->tracking_clients) trackingLimitUsedSlots(); + /* Start a scheduled BGSAVE if the corresponding flag is set. This is * useful when we are forced to postpone a BGSAVE because an AOF * rewrite is in progress. @@ -2168,9 +2174,22 @@ int serverCronLite(struct aeEventLoop *eventLoop, long long id, void *clientData return 1000/g_pserver->hz; } +extern int ProcessingEventsWhileBlocked; + /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep - * for ready file descriptors. */ + * for ready file descriptors. + * + * Note: This function is (currently) called from two functions: + * 1. aeMain - The main server loop + * 2. processEventsWhileBlocked - Process clients during RDB/AOF load + * + * If it was called from processEventsWhileBlocked we don't want + * to perform all actions (For example, we don't want to expire + * keys), but we do need to perform some actions. + * + * The most important is freeClientsInAsyncFreeQueue but we also + * call some other low-risk functions. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); @@ -2448,6 +2467,7 @@ void initServerConfig(void) { g_pserver->slaves = listCreate(); g_pserver->monitors = listCreate(); g_pserver->clients_timeout_table = raxNew(); + g_pserver->events_processed_while_blocked = 0; g_pserver->timezone = getTimeZone(); /* Initialized by tzset(). */ cserver.configfile = NULL; cserver.executable = NULL; @@ -2937,6 +2957,8 @@ static void initServerThread(struct redisServerThreadVars *pvar, int fMain) pvar->tlsfd_count = 0; pvar->cclients = 0; pvar->el = aeCreateEventLoop(g_pserver->maxclients+CONFIG_FDSET_INCR); + aeSetBeforeSleepProc(pvar->el, fMain ? beforeSleep : beforeSleepLite, fMain ? 0 : AE_SLEEP_THREADSAFE); + aeSetAfterSleepProc(pvar->el, afterSleep, AE_SLEEP_THREADSAFE); pvar->current_client = nullptr; pvar->clients_paused = 0; pvar->fRetrySetAofEvent = false; @@ -5277,10 +5299,7 @@ void *workerThreadMain(void *parg) serverTL = g_pserver->rgthreadvar+iel; // set the TLS threadsafe global moduleAcquireGIL(true); // Normally afterSleep acquires this, but that won't be called on the first run - int isMainThread = (iel == IDX_EVENT_LOOP_MAIN); aeEventLoop *el = g_pserver->rgthreadvar[iel].el; - aeSetBeforeSleepProc(el, isMainThread ? beforeSleep : beforeSleepLite, isMainThread ? 0 : AE_SLEEP_THREADSAFE); - aeSetAfterSleepProc(el, afterSleep, AE_SLEEP_THREADSAFE); try { aeMain(el); diff --git a/src/server.h b/src/server.h index e3515f14a..753c641c0 100644 --- a/src/server.h +++ b/src/server.h @@ -430,7 +430,8 @@ public: #define CLIENT_TRACKING_NOLOOP (1ULL<<37) /* Don't send invalidation messages about writes performed by myself.*/ #define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */ -#define CLIENT_FORCE_REPLY (1ULL<<39) /* Should addReply be forced to write the text? */ +#define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */ +#define CLIENT_FORCE_REPLY (1ULL<<40) /* Should addReply be forced to write the text? */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ @@ -1680,6 +1681,8 @@ struct redisServer { dict *migrate_cached_sockets;/* MIGRATE cached sockets */ std::atomic next_client_id; /* Next client unique ID. Incremental. */ int protected_mode; /* Don't accept external connections. */ + long long events_processed_while_blocked; /* processEventsWhileBlocked() */ + /* RDB / AOF loading information */ std::atomic loading; /* We are loading data from disk if true */ off_t loading_total_bytes; @@ -2213,7 +2216,7 @@ void rewriteClientCommandVector(client *c, int argc, ...); void rewriteClientCommandArgument(client *c, int i, robj *newval); void replaceClientCommandVector(client *c, int argc, robj **argv); unsigned long getClientOutputBufferMemoryUsage(client *c); -void freeClientsInAsyncFreeQueue(int iel); +int freeClientsInAsyncFreeQueue(int iel); void asyncCloseClientOnOutputBufferLimitReached(client *c); int getClientType(client *c); int getClientTypeByName(const char *name); @@ -2225,7 +2228,7 @@ int listenToPort(int port, int *fds, int *count, int fReusePort, int fFirstListe void pauseClients(mstime_t duration); int clientsArePaused(void); void unpauseClientsIfNecessary(); -int processEventsWhileBlocked(int iel); +void processEventsWhileBlocked(int iel); int handleClientsWithPendingWrites(int iel, int aof_state); int clientHasPendingReplies(client *c); void unlinkClient(client *c); diff --git a/src/setcpuaffinity.c b/src/setcpuaffinity.c index dcae81c71..13594113c 100644 --- a/src/setcpuaffinity.c +++ b/src/setcpuaffinity.c @@ -36,6 +36,10 @@ #include #include #endif +#ifdef __NetBSD__ +#include +#include +#endif #include "config.h" #ifdef USE_SETCPUAFFINITY @@ -71,11 +75,18 @@ void setcpuaffinity(const char *cpulist) { #ifdef __FreeBSD__ cpuset_t cpuset; #endif +#ifdef __NetBSD__ + cpuset_t *cpuset; +#endif if (!cpulist) return; +#ifndef __NetBSD__ CPU_ZERO(&cpuset); +#else + cpuset = cpuset_create(); +#endif q = cpulist; while (p = q, q = next_token(q, ','), p) { @@ -110,7 +121,11 @@ void setcpuaffinity(const char *cpulist) { return; while (a <= b) { +#ifndef __NetBSD__ CPU_SET(a, &cpuset); +#else + cpuset_set(a, cpuset); +#endif a += s; } } @@ -124,6 +139,10 @@ void setcpuaffinity(const char *cpulist) { #ifdef __FreeBSD__ cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1, sizeof(cpuset), &cpuset); #endif +#ifdef __NetBSD__ + pthread_setaffinity_np(pthread_self(), cpuset_size(cpuset), cpuset); + cpuset_destroy(cpuset); +#endif } #endif /* USE_SETCPUAFFINITY */ diff --git a/src/tls.cpp b/src/tls.cpp index c18aafebe..28a74df9a 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -93,11 +93,56 @@ static int parseProtocolsConfig(const char *str) { * served to the reader yet. */ static list *pending_list = NULL; +/** + * OpenSSL global initialization and locking handling callbacks. + * Note that this is only required for OpenSSL < 1.1.0. + */ + +#if OPENSSL_VERSION_NUMBER < 0x10100000L +#define USE_CRYPTO_LOCKS +#endif + +#ifdef USE_CRYPTO_LOCKS + +static pthread_mutex_t *openssl_locks; + +static void sslLockingCallback(int mode, int lock_id, const char *f, int line) { + pthread_mutex_t *mt = openssl_locks + lock_id; + + if (mode & CRYPTO_LOCK) { + pthread_mutex_lock(mt); + } else { + pthread_mutex_unlock(mt); + } + + (void)f; + (void)line; +} + +static void initCryptoLocks(void) { + unsigned i, nlocks; + if (CRYPTO_get_locking_callback() != NULL) { + /* Someone already set the callback before us. Don't destroy it! */ + return; + } + nlocks = CRYPTO_num_locks(); + openssl_locks = zmalloc(sizeof(*openssl_locks) * nlocks); + for (i = 0; i < nlocks; i++) { + pthread_mutex_init(openssl_locks + i, NULL); + } + CRYPTO_set_locking_callback(sslLockingCallback); +} +#endif /* USE_CRYPTO_LOCKS */ + void tlsInit(void) { ERR_load_crypto_strings(); SSL_load_error_strings(); SSL_library_init(); +#ifdef USE_CRYPTO_LOCKS + initCryptoLocks(); +#endif + if (!RAND_poll()) { serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator."); } @@ -768,15 +813,17 @@ int tlsHasPendingData() { return listLength(pending_list) > 0; } -void tlsProcessPendingData() { +int tlsProcessPendingData() { listIter li; listNode *ln; + int processed = listLength(pending_list); listRewind(pending_list,&li); while((ln = listNext(&li))) { tls_connection *conn = listNodeValue(ln); tlsHandleEvent(conn, AE_READABLE); } + return processed; } #else /* USE_OPENSSL */ @@ -804,7 +851,8 @@ int tlsHasPendingData() { return 0; } -void tlsProcessPendingData() { +int tlsProcessPendingData() { + return 0; } #endif diff --git a/src/tracking.cpp b/src/tracking.cpp index 274c098ee..d0eee35ff 100644 --- a/src/tracking.cpp +++ b/src/tracking.cpp @@ -279,15 +279,22 @@ void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) { * * Note that 'c' may be NULL in case the operation was performed outside the * context of a client modifying the database (for instance when we delete a - * key because of expire). */ -void trackingInvalidateKey(client *c, robj *keyobj) { + * key because of expire). + * + * The last argument 'bcast' tells the function if it should also schedule + * the key for broadcasting to clients in BCAST mode. This is the case when + * the function is called from the Redis core once a key is modified, however + * we also call the function in order to evict keys in the key table in case + * of memory pressure: in that case the key didn't really change, so we want + * just to notify the clients that are in the table for this key, that would + * otherwise miss the fact we are no longer tracking the key for them. */ +void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) { if (TrackingTable == NULL) return; - sds sdskey = szFromObj(keyobj); - if (raxSize(PrefixTable) > 0) - trackingRememberKeyToBroadcast(c,sdskey,sdslen(sdskey)); + if (bcast && raxSize(PrefixTable) > 0) + trackingRememberKeyToBroadcast(c,key,keylen); - rax *ids = (rax*)raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); + rax *ids = (rax*)raxFind(TrackingTable,(unsigned char*)key,keylen); if (ids == raxNotFound) return; raxIterator ri; @@ -317,7 +324,7 @@ void trackingInvalidateKey(client *c, robj *keyobj) { continue; } - sendTrackingMessage(target,sdskey,sdslen(sdskey),0); + sendTrackingMessage(target,key,keylen,0); } raxStop(&ri); @@ -325,7 +332,13 @@ void trackingInvalidateKey(client *c, robj *keyobj) { * again if more keys will be modified in this caching slot. */ TrackingTableTotalItems -= raxSize(ids); raxFree(ids); - raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL); + raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL); +} + +/* Wrapper (the one actually called across the core) to pass the key + * as object. */ +void trackingInvalidateKey(client *c, robj *keyobj) { + trackingInvalidateKeyRaw(c,szFromObj(keyobj),sdslen(szFromObj(keyobj)),1); } /* This function is called when one or all the Redis databases are flushed @@ -392,10 +405,8 @@ void trackingLimitUsedSlots(void) { effort--; raxSeek(&ri,"^",NULL,0); raxRandomWalk(&ri,0); - rax *ids = (rax*)ri.data; - TrackingTableTotalItems -= raxSize(ids); - raxFree(ids); - raxRemove(TrackingTable,ri.key,ri.key_len,NULL); + if (raxEOF(&ri)) break; + trackingInvalidateKeyRaw(NULL,(char*)ri.key,ri.key_len,0); if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; raxStop(&ri); diff --git a/src/util.c b/src/util.c index d173f776f..0d48f5701 100644 --- a/src/util.c +++ b/src/util.c @@ -51,7 +51,7 @@ int stringmatchlen(const char *pattern, int patternLen, while(patternLen && stringLen) { switch(pattern[0]) { case '*': - while (pattern[1] == '*') { + while (patternLen && pattern[1] == '*') { pattern++; patternLen--; } @@ -67,8 +67,6 @@ int stringmatchlen(const char *pattern, int patternLen, return 0; /* no match */ break; case '?': - if (stringLen == 0) - return 0; /* no match */ string++; stringLen--; break; @@ -96,7 +94,7 @@ int stringmatchlen(const char *pattern, int patternLen, pattern--; patternLen++; break; - } else if (pattern[1] == '-' && patternLen >= 3) { + } else if (patternLen >= 3 && pattern[1] == '-') { int start = pattern[0]; int end = pattern[2]; int c = string[0]; diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index b364291ee..123e9c8b6 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -128,4 +128,56 @@ start_server {} { # make sure the server is still writable r set x xx } +} + +test {client freed during loading} { + start_server [list overrides [list key-load-delay 10 rdbcompression no]] { + # create a big rdb that will take long to load. it is important + # for keys to be big since the server processes events only once in 2mb. + # 100mb of rdb, 100k keys will load in more than 1 second + r debug populate 100000 key 1000 + + catch { + r debug restart + } + + set stdout [srv 0 stdout] + while 1 { + # check that the new server actually started and is ready for connections + if {[exec grep -i "Server initialized" | wc -l < $stdout] > 1} { + break + } + after 10 + } + # make sure it's still loading + assert_equal [s loading] 1 + + # connect and disconnect 10 clients + set clients {} + for {set j 0} {$j < 10} {incr j} { + lappend clients [redis_deferring_client] + } + foreach rd $clients { + $rd debug log bla + } + foreach rd $clients { + $rd read + } + foreach rd $clients { + $rd close + } + + # make sure the server freed the clients + wait_for_condition 100 100 { + [s connected_clients] < 3 + } else { + fail "clients didn't disconnect" + } + + # make sure it's still loading + assert_equal [s loading] 1 + + # no need to keep waiting for loading to complete + exec kill [srv 0 pid] + } } \ No newline at end of file diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index 2cbd7284b..f856a6307 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -513,9 +513,9 @@ start_server {tags {"repl"}} { set master_port [srv 0 port] set master_pid [srv 0 pid] # put enough data in the db that the rdb file will be bigger than the socket buffers - # and since we'll have key-load-delay of 100, 10000 keys will take at least 1 second + # and since we'll have key-load-delay of 100, 20000 keys will take at least 2 seconds # we also need the replica to process requests during transfer (which it does only once in 2mb) - $master debug populate 10000 test 10000 + $master debug populate 20000 test 10000 $master config set rdbcompression no # If running on Linux, we also measure utime/stime to detect possible I/O handling issues set os [catch {exec unamee}] diff --git a/tests/unit/tls.tcl b/tests/unit/tls.tcl index 950f65557..2b04590cd 100644 --- a/tests/unit/tls.tcl +++ b/tests/unit/tls.tcl @@ -25,26 +25,6 @@ start_server {tags {"tls"}} { } test {TLS: Verify tls-protocols behaves as expected} { - r CONFIG SET tls-protocols TLSv1 - - set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1 0}] - catch {$s PING} e - assert_match {*I/O error*} $e - - set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1 1}] - catch {$s PING} e - assert_match {PONG} $e - - r CONFIG SET tls-protocols TLSv1.1 - - set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1.1 0}] - catch {$s PING} e - assert_match {*I/O error*} $e - - set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1.1 1}] - catch {$s PING} e - assert_match {PONG} $e - r CONFIG SET tls-protocols TLSv1.2 set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1.2 0}] diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index 43bb5f864..0332fa726 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -107,5 +107,27 @@ start_server {tags {"tracking"}} { assert {$keys eq {mykey}} } + test {Tracking gets notification on tracking table key eviction} { + r CLIENT TRACKING off + r CLIENT TRACKING on REDIRECT $redir NOLOOP + r MSET key1 1 key2 2 + # Let the server track the two keys for us + r MGET key1 key2 + # Force the eviction of all the keys but one: + r config set tracking-table-max-keys 1 + # Note that we may have other keys in the table for this client, + # since we disabled/enabled tracking multiple time with the same + # ID, and tracking does not do ID cleanups for performance reasons. + # So we check that eventually we'll receive one or the other key, + # otherwise the test will die for timeout. + while 1 { + set keys [lindex [$rd1 read] 2] + if {$keys eq {key1} || $keys eq {key2}} break + } + # We should receive an expire notification for one of + # the two keys (only one must remain) + assert {$keys eq {key1} || $keys eq {key2}} + } + $rd1 close }