From 8a9c84f4a5737499bd630fb8de6c7359637d3987 Mon Sep 17 00:00:00 2001 From: hwware Date: Tue, 5 May 2020 23:35:08 -0400 Subject: [PATCH 01/21] add include guard for lolwut.h --- src/lolwut.h | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/lolwut.h b/src/lolwut.h index 38c0de423..682d00531 100644 --- a/src/lolwut.h +++ b/src/lolwut.h @@ -34,6 +34,10 @@ /* This represents a very simple generic canvas in order to draw stuff. * It's up to each LOLWUT versions to translate what they draw to the * screen, depending on the result to accomplish. */ + +#ifndef __LOLWUT_H +#define __LOLWUT_H + typedef struct lwCanvas { int width; int height; @@ -47,3 +51,5 @@ void lwDrawPixel(lwCanvas *canvas, int x, int y, int color); int lwGetPixel(lwCanvas *canvas, int x, int y); void lwDrawLine(lwCanvas *canvas, int x1, int y1, int x2, int y2, int color); void lwDrawSquare(lwCanvas *canvas, int x, int y, float size, float angle, int color); + +#endif From c7edffbd56c7a2c95c2483485fe86dcb43077fa4 Mon Sep 17 00:00:00 2001 From: hwware Date: Wed, 6 May 2020 01:07:17 -0400 Subject: [PATCH 02/21] add jemalloc-bg-thread config in redis conf --- redis.conf | 3 +++ 1 file changed, 3 insertions(+) diff --git a/redis.conf b/redis.conf index bed906058..bab7cff47 100644 --- a/redis.conf +++ b/redis.conf @@ -1782,6 +1782,9 @@ rdb-save-incremental-fsync yes # the main dictionary scan # active-defrag-max-scan-fields 1000 +# Jemalloc background thread for purging will be enabled by default +jemalloc-bg-thread yes + # It is possible to pin different threads and processes of Redis to specific # CPUs in your system, in order to maximize the performances of the server. # This is useful both in order to pin different Redis threads in different From 24e12641d579d31f5a0fd9b8ed2ccf8e10806596 Mon Sep 17 00:00:00 2001 From: Brad Dunbar Date: Tue, 5 May 2020 09:57:01 -0400 Subject: [PATCH 03/21] Remove unreachable branch. --- src/util.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/util.c b/src/util.c index d173f776f..537c9313c 100644 --- a/src/util.c +++ b/src/util.c @@ -67,8 +67,6 @@ int stringmatchlen(const char *pattern, int patternLen, return 0; /* no match */ break; case '?': - if (stringLen == 0) - return 0; /* no match */ string++; stringLen--; break; From 3672875b4cb7bca525bbc2da42f49153fcb7e0eb Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 6 May 2020 16:18:21 +0200 Subject: [PATCH 04/21] stringmatchlen() should not expect null terminated strings. --- src/util.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/util.c b/src/util.c index 537c9313c..0d48f5701 100644 --- a/src/util.c +++ b/src/util.c @@ -51,7 +51,7 @@ int stringmatchlen(const char *pattern, int patternLen, while(patternLen && stringLen) { switch(pattern[0]) { case '*': - while (pattern[1] == '*') { + while (patternLen && pattern[1] == '*') { pattern++; patternLen--; } @@ -94,7 +94,7 @@ int stringmatchlen(const char *pattern, int patternLen, pattern--; patternLen++; break; - } else if (pattern[1] == '-' && patternLen >= 3) { + } else if (patternLen >= 3 && pattern[1] == '-') { int start = pattern[0]; int end = pattern[2]; int c = string[0]; From 960186a71fee12b0e64b109c7b2066d2dae0ffe5 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 8 May 2020 11:38:07 +0200 Subject: [PATCH 05/21] Cluster: introduce data_received field. We want to send pings and pongs at specific intervals, since our packets also contain information about the configuration of the cluster and are used for gossip. However since our cluster bus is used in a mixed way for data (such as Pub/Sub or modules cluster messages) and metadata, sometimes a very busy channel may delay the reception of pong packets. So after discussing it in #7216, this commit introduces a new field that is not exposed in the cluster, is only an internal information about the last time we received any data from a given node: we use this field in order to avoid detecting failures, claiming data reception of new data from the node is a proof of liveness. --- src/cluster.c | 36 ++++++++++++++++++++++++++---------- src/cluster.h | 1 + 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index d8f83f08d..78524bd34 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -749,6 +749,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->slaves = NULL; node->slaveof = NULL; node->ping_sent = node->pong_received = 0; + node->data_received = 0; node->fail_time = 0; node->link = NULL; memset(node->ip,0,sizeof(node->ip)); @@ -1678,6 +1679,7 @@ int clusterProcessPacket(clusterLink *link) { clusterMsg *hdr = (clusterMsg*) link->rcvbuf; uint32_t totlen = ntohl(hdr->totlen); uint16_t type = ntohs(hdr->type); + mstime_t now = mstime(); if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_received[type]++; @@ -1741,6 +1743,13 @@ int clusterProcessPacket(clusterLink *link) { /* Check if the sender is a known node. */ sender = clusterLookupNode(hdr->sender); + + /* Update the last time we saw any data from this node. We + * use this in order to avoid detecting a timeout from a node that + * is just sending a lot of data in the cluster bus, for instance + * because of Pub/Sub. */ + if (sender) sender->data_received = now; + if (sender && !nodeInHandshake(sender)) { /* Update our curretEpoch if we see a newer epoch in the cluster. */ senderCurrentEpoch = ntohu64(hdr->currentEpoch); @@ -1755,7 +1764,7 @@ int clusterProcessPacket(clusterLink *link) { } /* Update the replication offset info for this node. */ sender->repl_offset = ntohu64(hdr->offset); - sender->repl_offset_time = mstime(); + sender->repl_offset_time = now; /* If we are a slave performing a manual failover and our master * sent its offset while already paused, populate the MF state. */ if (server.cluster->mf_end && @@ -1869,7 +1878,7 @@ int clusterProcessPacket(clusterLink *link) { * address. */ serverLog(LL_DEBUG,"PONG contains mismatching sender ID. About node %.40s added %d ms ago, having flags %d", link->node->name, - (int)(mstime()-(link->node->ctime)), + (int)(now-(link->node->ctime)), link->node->flags); link->node->flags |= CLUSTER_NODE_NOADDR; link->node->ip[0] = '\0'; @@ -1904,7 +1913,7 @@ int clusterProcessPacket(clusterLink *link) { /* Update our info about the node */ if (link->node && type == CLUSTERMSG_TYPE_PONG) { - link->node->pong_received = mstime(); + link->node->pong_received = now; link->node->ping_sent = 0; /* The PFAIL condition can be reversed without external @@ -2051,7 +2060,7 @@ int clusterProcessPacket(clusterLink *link) { "FAIL message received from %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); failing->flags |= CLUSTER_NODE_FAIL; - failing->fail_time = mstime(); + failing->fail_time = now; failing->flags &= ~CLUSTER_NODE_PFAIL; clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| CLUSTER_TODO_UPDATE_STATE); @@ -2104,9 +2113,9 @@ int clusterProcessPacket(clusterLink *link) { /* Manual failover requested from slaves. Initialize the state * accordingly. */ resetManualFailover(); - server.cluster->mf_end = mstime() + CLUSTER_MF_TIMEOUT; + server.cluster->mf_end = now + CLUSTER_MF_TIMEOUT; server.cluster->mf_slave = sender; - pauseClients(mstime()+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT)); + pauseClients(now+(CLUSTER_MF_TIMEOUT*CLUSTER_MF_PAUSE_MULT)); serverLog(LL_WARNING,"Manual failover requested by replica %.40s.", sender->name); } else if (type == CLUSTERMSG_TYPE_UPDATE) { @@ -3529,7 +3538,6 @@ void clusterCron(void) { while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); now = mstime(); /* Use an updated time at every iteration. */ - mstime_t delay; if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR|CLUSTER_NODE_HANDSHAKE)) @@ -3553,7 +3561,7 @@ void clusterCron(void) { this_slaves = okslaves; } - /* If we are waiting for the PONG more than half the cluster + /* If we are not receiving any data for more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ if (node->link && /* is connected */ @@ -3562,7 +3570,9 @@ void clusterCron(void) { node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ - now - node->ping_sent > server.cluster_node_timeout/2) + now - node->ping_sent > server.cluster_node_timeout/2 && + /* and in such interval we are not seeing any traffic at all. */ + now - node->data_received > server.cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ freeClusterLink(node->link); @@ -3597,7 +3607,13 @@ void clusterCron(void) { /* Compute the delay of the PONG. Note that if we already received * the PONG, then node->ping_sent is zero, so can't reach this * code at all. */ - delay = now - node->ping_sent; + mstime_t delay = now - node->ping_sent; + + /* We consider every incoming data as proof of liveness, since + * our cluster bus link is also used for data: under heavy data + * load pong delays are possible. */ + mstime_t data_delay = now - node->data_received; + if (data_delay < delay) delay = data_delay; if (delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is diff --git a/src/cluster.h b/src/cluster.h index 35fc0cbfa..d3af4a355 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -124,6 +124,7 @@ typedef struct clusterNode { tables. */ mstime_t ping_sent; /* Unix time we sent latest ping */ mstime_t pong_received; /* Unix time we received the pong */ + mstime_t data_received; /* Unix time we received any data */ mstime_t fail_time; /* Unix time when FAIL flag was set */ mstime_t voted_time; /* Last time we voted for a slave of this master */ mstime_t repl_offset_time; /* Unix time we received offset for this node */ From 002fcde3da5413cc0d75d0eb313f5f28e2a401ea Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 8 May 2020 18:10:16 +0200 Subject: [PATCH 06/21] Cluster: refactor ping/data delay handling. --- src/cluster.c | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 78524bd34..50c79f4f6 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -3564,15 +3564,17 @@ void clusterCron(void) { /* If we are not receiving any data for more than half the cluster * timeout, reconnect the link: maybe there is a connection * issue even if the node is alive. */ + mstime_t ping_delay = now - node->ping_sent; + mstime_t data_delay = now - node->data_received; if (node->link && /* is connected */ now - node->link->ctime > server.cluster_node_timeout && /* was not already reconnected */ node->ping_sent && /* we already sent a ping */ node->pong_received < node->ping_sent && /* still waiting pong */ /* and we are waiting for the pong more than timeout/2 */ - now - node->ping_sent > server.cluster_node_timeout/2 && + ping_delay > server.cluster_node_timeout/2 && /* and in such interval we are not seeing any traffic at all. */ - now - node->data_received > server.cluster_node_timeout/2) + data_delay > server.cluster_node_timeout/2) { /* Disconnect the link, it will be reconnected automatically. */ freeClusterLink(node->link); @@ -3604,18 +3606,18 @@ void clusterCron(void) { /* Check only if we have an active ping for this instance. */ if (node->ping_sent == 0) continue; - /* Compute the delay of the PONG. Note that if we already received - * the PONG, then node->ping_sent is zero, so can't reach this - * code at all. */ - mstime_t delay = now - node->ping_sent; - - /* We consider every incoming data as proof of liveness, since + /* Check if this node looks unreachable. + * Note that if we already received the PONG, then node->ping_sent + * is zero, so can't reach this code at all, so we don't risk of + * checking for a PONG delay if we didn't sent the PING. + * + * We also consider every incoming data as proof of liveness, since * our cluster bus link is also used for data: under heavy data * load pong delays are possible. */ - mstime_t data_delay = now - node->data_received; - if (data_delay < delay) delay = data_delay; + mstime_t node_delay = (ping_delay < data_delay) ? ping_delay : + data_delay; - if (delay > server.cluster_node_timeout) { + if (node_delay > server.cluster_node_timeout) { /* Timeout reached. Set the node as possibly failing if it is * not already in this state. */ if (!(node->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) { From 1276058ea8dfb840db77538e0b35197643d5d680 Mon Sep 17 00:00:00 2001 From: antirez Date: Sat, 9 May 2020 11:12:51 +0200 Subject: [PATCH 07/21] Cluster: clarify we always resolve the sender. --- src/cluster.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 50c79f4f6..a2fab323a 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1741,7 +1741,9 @@ int clusterProcessPacket(clusterLink *link) { if (totlen != explen) return 1; } - /* Check if the sender is a known node. */ + /* Check if the sender is a known node. Note that for incoming connections + * we don't store link->node information, but resolve the node by the + * ID in the header each time in the current implementation. */ sender = clusterLookupNode(hdr->sender); /* Update the last time we saw any data from this node. We From a23cdbb94b25fda7f49c421c94c85830d0a526c6 Mon Sep 17 00:00:00 2001 From: ShooterIT Date: Tue, 5 May 2020 23:09:45 +0800 Subject: [PATCH 08/21] Redis Benchmark: Fix coredump because of double free --- src/redis-benchmark.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-benchmark.c b/src/redis-benchmark.c index 77daf981c..34295147d 100644 --- a/src/redis-benchmark.c +++ b/src/redis-benchmark.c @@ -279,7 +279,7 @@ static redisConfig *getRedisConfig(const char *ip, int port, for (; i < 2; i++) { int res = redisGetReply(c, &r); if (reply) freeReplyObject(reply); - reply = ((redisReply *) r); + reply = res == REDIS_OK ? ((redisReply *) r) : NULL; if (res != REDIS_OK || !r) goto fail; if (reply->type == REDIS_REPLY_ERROR) { fprintf(stderr, "ERROR: %s\n", reply->str); From 5c41802d551477e21df2c568bd0c488a86bcbd0b Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Tue, 12 May 2020 08:59:09 +0300 Subject: [PATCH 09/21] fix unstable replication test this test which has coverage for varoius flows of diskless master was failing randomly from time to time. the failure was: [err]: diskless all replicas drop during rdb pipe in tests/integration/replication.tcl log message of '*Diskless rdb transfer, last replica dropped, killing fork child*' not found what seemed to have happened is that the master didn't detect that all replicas dropped by the time the replication ended, it thought that one replica is still connected. now the test takes a few seconds longer but it seems stable. --- tests/integration/replication.tcl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/integration/replication.tcl b/tests/integration/replication.tcl index f69002b36..22b0bfeaf 100644 --- a/tests/integration/replication.tcl +++ b/tests/integration/replication.tcl @@ -513,9 +513,9 @@ start_server {tags {"repl"}} { set master_port [srv 0 port] set master_pid [srv 0 pid] # put enough data in the db that the rdb file will be bigger than the socket buffers - # and since we'll have key-load-delay of 100, 10000 keys will take at least 1 second + # and since we'll have key-load-delay of 100, 20000 keys will take at least 2 seconds # we also need the replica to process requests during transfer (which it does only once in 2mb) - $master debug populate 10000 test 10000 + $master debug populate 20000 test 10000 $master config set rdbcompression no # If running on Linux, we also measure utime/stime to detect possible I/O handling issues set os [catch {exec unamee}] From e3b5648df083d12c4f2b635deb827732db02607d Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 14 May 2020 10:41:28 +0200 Subject: [PATCH 10/21] Tracking: send eviction messages when evicting entries. A fix for #7249. --- src/server.c | 6 ++++++ src/tracking.c | 35 +++++++++++++++++++++++------------ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/src/server.c b/src/server.c index e2b4b6f3d..847a6a95a 100644 --- a/src/server.c +++ b/src/server.c @@ -2053,6 +2053,12 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { /* Stop the I/O threads if we don't have enough pending work. */ stopThreadedIOIfNeeded(); + /* Resize tracking keys table if needed. This is also done at every + * command execution, but we want to be sure that if the last command + * executed changes the value via CONFIG SET, the server will perform + * the operation even if completely idle. */ + if (server.tracking_clients) trackingLimitUsedSlots(); + /* Start a scheduled BGSAVE if the corresponding flag is set. This is * useful when we are forced to postpone a BGSAVE because an AOF * rewrite is in progress. diff --git a/src/tracking.c b/src/tracking.c index a995817e2..cfde26fc9 100644 --- a/src/tracking.c +++ b/src/tracking.c @@ -279,15 +279,22 @@ void trackingRememberKeyToBroadcast(client *c, char *keyname, size_t keylen) { * * Note that 'c' may be NULL in case the operation was performed outside the * context of a client modifying the database (for instance when we delete a - * key because of expire). */ -void trackingInvalidateKey(client *c, robj *keyobj) { + * key because of expire). + * + * The last argument 'bcast' tells the function if it should also schedule + * the key for broadcasting to clients in BCAST mode. This is the case when + * the function is called from the Redis core once a key is modified, however + * we also call the function in order to evict keys in the key table in case + * of memory pressure: in that case the key didn't really change, so we want + * just to notify the clients that are in the table for this key, that would + * otherwise miss the fact we are no longer tracking the key for them. */ +void trackingInvalidateKeyRaw(client *c, char *key, size_t keylen, int bcast) { if (TrackingTable == NULL) return; - sds sdskey = keyobj->ptr; - if (raxSize(PrefixTable) > 0) - trackingRememberKeyToBroadcast(c,sdskey,sdslen(sdskey)); + if (bcast && raxSize(PrefixTable) > 0) + trackingRememberKeyToBroadcast(c,key,keylen); - rax *ids = raxFind(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey)); + rax *ids = raxFind(TrackingTable,(unsigned char*)key,keylen); if (ids == raxNotFound) return; raxIterator ri; @@ -317,7 +324,7 @@ void trackingInvalidateKey(client *c, robj *keyobj) { continue; } - sendTrackingMessage(target,sdskey,sdslen(sdskey),0); + sendTrackingMessage(target,key,keylen,0); } raxStop(&ri); @@ -325,7 +332,13 @@ void trackingInvalidateKey(client *c, robj *keyobj) { * again if more keys will be modified in this caching slot. */ TrackingTableTotalItems -= raxSize(ids); raxFree(ids); - raxRemove(TrackingTable,(unsigned char*)sdskey,sdslen(sdskey),NULL); + raxRemove(TrackingTable,(unsigned char*)key,keylen,NULL); +} + +/* Wrapper (the one actually called across the core) to pass the key + * as object. */ +void trackingInvalidateKey(client *c, robj *keyobj) { + trackingInvalidateKeyRaw(c,keyobj->ptr,sdslen(keyobj->ptr),1); } /* This function is called when one or all the Redis databases are flushed @@ -392,10 +405,8 @@ void trackingLimitUsedSlots(void) { effort--; raxSeek(&ri,"^",NULL,0); raxRandomWalk(&ri,0); - rax *ids = ri.data; - TrackingTableTotalItems -= raxSize(ids); - raxFree(ids); - raxRemove(TrackingTable,ri.key,ri.key_len,NULL); + if (raxEOF(&ri)) break; + trackingInvalidateKeyRaw(NULL,(char*)ri.key,ri.key_len,0); if (raxSize(TrackingTable) <= max_keys) { timeout_counter = 0; raxStop(&ri); From 693629585b7a2c2f13890fe7001662545436b9a3 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 14 May 2020 11:17:47 +0200 Subject: [PATCH 11/21] rax.c updated from upstream antirez/rax. --- src/rax.c | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/rax.c b/src/rax.c index a560dde02..7dcf04582 100644 --- a/src/rax.c +++ b/src/rax.c @@ -1,6 +1,8 @@ /* Rax -- A radix tree implementation. * - * Copyright (c) 2017-2018, Salvatore Sanfilippo + * Version 1.2 -- 7 February 2019 + * + * Copyright (c) 2017-2019, Salvatore Sanfilippo * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -1737,7 +1739,7 @@ int raxRandomWalk(raxIterator *it, size_t steps) { } if (steps == 0) { - size_t fle = floor(log(it->rt->numele)); + size_t fle = 1+floor(log(it->rt->numele)); fle *= 2; steps = 1 + rand() % fle; } From f7f219a137fcd04ebd79c06fbe8c71e89b638635 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 14 May 2020 11:27:31 +0200 Subject: [PATCH 12/21] Regression test for #7249. --- tests/unit/tracking.tcl | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/unit/tracking.tcl b/tests/unit/tracking.tcl index 43bb5f864..0332fa726 100644 --- a/tests/unit/tracking.tcl +++ b/tests/unit/tracking.tcl @@ -107,5 +107,27 @@ start_server {tags {"tracking"}} { assert {$keys eq {mykey}} } + test {Tracking gets notification on tracking table key eviction} { + r CLIENT TRACKING off + r CLIENT TRACKING on REDIRECT $redir NOLOOP + r MSET key1 1 key2 2 + # Let the server track the two keys for us + r MGET key1 key2 + # Force the eviction of all the keys but one: + r config set tracking-table-max-keys 1 + # Note that we may have other keys in the table for this client, + # since we disabled/enabled tracking multiple time with the same + # ID, and tracking does not do ID cleanups for performance reasons. + # So we check that eventually we'll receive one or the other key, + # otherwise the test will die for timeout. + while 1 { + set keys [lindex [$rd1 read] 2] + if {$keys eq {key1} || $keys eq {key2}} break + } + # We should receive an expire notification for one of + # the two keys (only one must remain) + assert {$keys eq {key1} || $keys eq {key2}} + } + $rd1 close } From 9da134cd88996f5d072451d56b0201b423d984d1 Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Sun, 10 May 2020 19:13:47 +0300 Subject: [PATCH 13/21] fix redis 6.0 not freeing closed connections during loading. This bug was introduced by a recent change in which readQueryFromClient is using freeClientAsync, and despite the fact that now freeClientsInAsyncFreeQueue is in beforeSleep, that's not enough since it's not called during loading in processEventsWhileBlocked. furthermore, afterSleep was called in that case but beforeSleep wasn't. This bug also caused slowness sine the level-triggered mode of epoll kept signaling these connections as readable causing us to keep doing connRead again and again for ll of these, which keep accumulating. now both before and after sleep are called, but not all of their actions are performed during loading, some are only reserved for the main loop. fixes issue #7215 --- src/ae.c | 5 +- src/server.c | 124 ++++++++++++++++++++++---------------- tests/integration/rdb.tcl | 52 ++++++++++++++++ 3 files changed, 128 insertions(+), 53 deletions(-) diff --git a/src/ae.c b/src/ae.c index 1bf6cbfbf..7aa204250 100644 --- a/src/ae.c +++ b/src/ae.c @@ -428,6 +428,9 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) tvp = &tv; } + if (eventLoop->beforesleep != NULL) + eventLoop->beforesleep(eventLoop); + /* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp); @@ -522,8 +525,6 @@ int aeWait(int fd, int mask, long long milliseconds) { void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { - if (eventLoop->beforesleep != NULL) - eventLoop->beforesleep(eventLoop); aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); } } diff --git a/src/server.c b/src/server.c index 847a6a95a..d272a6fe0 100644 --- a/src/server.c +++ b/src/server.c @@ -2087,14 +2087,26 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { return 1000/server.hz; } +extern int ProcessingEventsWhileBlocked; + /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep - * for ready file descriptors. */ + * for ready file descriptors. + * Note: This function is (currently) called from two functions: + * 1. aeMain - The main server loop + * 2. processEventsWhileBlocked - Process clients during RDB/AOF load + * If it was called from processEventsWhileBlocked we don't want + * to perform all actions (For example, we don't want to expire + * keys), but we do need to perform some actions. + * The most important is freeClientsInAsyncFreeQueue but we also + * call some other low-risk functions. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - /* Handle precise timeouts of blocked clients. */ - handleBlockedClientsTimeout(); + if (!ProcessingEventsWhileBlocked) { + /* Handle precise timeouts of blocked clients. */ + handleBlockedClientsTimeout(); + } /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); @@ -2105,65 +2117,69 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); - /* Call the Redis Cluster before sleep function. Note that this function - * may change the state of Redis Cluster (from ok to fail or vice versa), - * so it's a good idea to call it before serving the unblocked clients - * later in this function. */ - if (server.cluster_enabled) clusterBeforeSleep(); + if (!ProcessingEventsWhileBlocked) { + /* Call the Redis Cluster before sleep function. Note that this function + * may change the state of Redis Cluster (from ok to fail or vice versa), + * so it's a good idea to call it before serving the unblocked clients + * later in this function. */ + if (server.cluster_enabled) clusterBeforeSleep(); - /* Run a fast expire cycle (the called function will return - * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && server.masterhost == NULL) - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + /* Run a fast expire cycle (the called function will return + * ASAP if a fast cycle is not needed). */ + if (server.active_expire_enabled && server.masterhost == NULL) + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); - /* Unblock all the clients blocked for synchronous replication - * in WAIT. */ - if (listLength(server.clients_waiting_acks)) - processClientsWaitingReplicas(); + /* Unblock all the clients blocked for synchronous replication + * in WAIT. */ + if (listLength(server.clients_waiting_acks)) + processClientsWaitingReplicas(); - /* Check if there are clients unblocked by modules that implement - * blocking commands. */ - if (moduleCount()) moduleHandleBlockedClients(); + /* Check if there are clients unblocked by modules that implement + * blocking commands. */ + if (moduleCount()) moduleHandleBlockedClients(); - /* Try to process pending commands for clients that were just unblocked. */ - if (listLength(server.unblocked_clients)) - processUnblockedClients(); + /* Try to process pending commands for clients that were just unblocked. */ + if (listLength(server.unblocked_clients)) + processUnblockedClients(); - /* Send all the slaves an ACK request if at least one client blocked - * during the previous event loop iteration. Note that we do this after - * processUnblockedClients(), so if there are multiple pipelined WAITs - * and the just unblocked WAIT gets blocked again, we don't have to wait - * a server cron cycle in absence of other event loop events. See #6623. */ - if (server.get_ack_from_slaves) { - robj *argv[3]; + /* Send all the slaves an ACK request if at least one client blocked + * during the previous event loop iteration. Note that we do this after + * processUnblockedClients(), so if there are multiple pipelined WAITs + * and the just unblocked WAIT gets blocked again, we don't have to wait + * a server cron cycle in absence of other event loop events. See #6623. */ + if (server.get_ack_from_slaves) { + robj *argv[3]; - argv[0] = createStringObject("REPLCONF",8); - argv[1] = createStringObject("GETACK",6); - argv[2] = createStringObject("*",1); /* Not used argument. */ - replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); - decrRefCount(argv[0]); - decrRefCount(argv[1]); - decrRefCount(argv[2]); - server.get_ack_from_slaves = 0; + argv[0] = createStringObject("REPLCONF",8); + argv[1] = createStringObject("GETACK",6); + argv[2] = createStringObject("*",1); /* Not used argument. */ + replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); + decrRefCount(argv[0]); + decrRefCount(argv[1]); + decrRefCount(argv[2]); + server.get_ack_from_slaves = 0; + } + + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(0); } - /* Send the invalidation messages to clients participating to the - * client side caching protocol in broadcasting (BCAST) mode. */ - trackingBroadcastInvalidationMessages(); - - /* Write the AOF buffer on disk */ - flushAppendOnlyFile(0); - /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); - /* Before we are going to sleep, let the threads access the dataset by - * releasing the GIL. Redis main thread will not touch anything at this - * time. */ - if (moduleCount()) moduleReleaseGIL(); + if (!ProcessingEventsWhileBlocked) { + /* Before we are going to sleep, let the threads access the dataset by + * releasing the GIL. Redis main thread will not touch anything at this + * time. */ + if (moduleCount()) moduleReleaseGIL(); + } } /* This function is called immadiately after the event loop multiplexing @@ -2171,7 +2187,10 @@ void beforeSleep(struct aeEventLoop *eventLoop) { * the different events callbacks. */ void afterSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - if (moduleCount()) moduleAcquireGIL(); + + if (!ProcessingEventsWhileBlocked) { + if (moduleCount()) moduleAcquireGIL(); + } } /* =========================== Server initialization ======================== */ @@ -2879,6 +2898,11 @@ void initServer(void) { "blocked clients subsystem."); } + /* Register before and after sleep handlers (note this needs to be done + * before loading persistence since it is used by processEventsWhileBlocked. */ + aeSetBeforeSleepProc(server.el,beforeSleep); + aeSetAfterSleepProc(server.el,afterSleep); + /* Open the AOF file if needed. */ if (server.aof_state == AOF_ON) { server.aof_fd = open(server.aof_filename, @@ -5135,8 +5159,6 @@ int main(int argc, char **argv) { } redisSetCpuAffinity(server.server_cpulist); - aeSetBeforeSleepProc(server.el,beforeSleep); - aeSetAfterSleepProc(server.el,afterSleep); aeMain(server.el); aeDeleteEventLoop(server.el); return 0; diff --git a/tests/integration/rdb.tcl b/tests/integration/rdb.tcl index b364291ee..123e9c8b6 100644 --- a/tests/integration/rdb.tcl +++ b/tests/integration/rdb.tcl @@ -128,4 +128,56 @@ start_server {} { # make sure the server is still writable r set x xx } +} + +test {client freed during loading} { + start_server [list overrides [list key-load-delay 10 rdbcompression no]] { + # create a big rdb that will take long to load. it is important + # for keys to be big since the server processes events only once in 2mb. + # 100mb of rdb, 100k keys will load in more than 1 second + r debug populate 100000 key 1000 + + catch { + r debug restart + } + + set stdout [srv 0 stdout] + while 1 { + # check that the new server actually started and is ready for connections + if {[exec grep -i "Server initialized" | wc -l < $stdout] > 1} { + break + } + after 10 + } + # make sure it's still loading + assert_equal [s loading] 1 + + # connect and disconnect 10 clients + set clients {} + for {set j 0} {$j < 10} {incr j} { + lappend clients [redis_deferring_client] + } + foreach rd $clients { + $rd debug log bla + } + foreach rd $clients { + $rd read + } + foreach rd $clients { + $rd close + } + + # make sure the server freed the clients + wait_for_condition 100 100 { + [s connected_clients] < 3 + } else { + fail "clients didn't disconnect" + } + + # make sure it's still loading + assert_equal [s loading] 1 + + # no need to keep waiting for loading to complete + exec kill [srv 0 pid] + } } \ No newline at end of file From 8bf660af90d1a5e33653f18111769f89669ecefc Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 12 May 2020 13:07:44 +0200 Subject: [PATCH 14/21] Some rework of #7234. --- src/ae.c | 7 ++- src/ae.h | 9 ++-- src/networking.c | 5 ++- src/server.c | 111 +++++++++++++++++++++++++---------------------- 4 files changed, 72 insertions(+), 60 deletions(-) diff --git a/src/ae.c b/src/ae.c index 7aa204250..379cfd1e6 100644 --- a/src/ae.c +++ b/src/ae.c @@ -370,6 +370,7 @@ static int processTimeEvents(aeEventLoop *eventLoop) { * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. + * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */ int aeProcessEvents(aeEventLoop *eventLoop, int flags) @@ -428,7 +429,7 @@ int aeProcessEvents(aeEventLoop *eventLoop, int flags) tvp = &tv; } - if (eventLoop->beforesleep != NULL) + if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop); /* Call the multiplexing API, will return only on timeout or when @@ -525,7 +526,9 @@ int aeWait(int fd, int mask, long long milliseconds) { void aeMain(aeEventLoop *eventLoop) { eventLoop->stop = 0; while (!eventLoop->stop) { - aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); + aeProcessEvents(eventLoop, AE_ALL_EVENTS| + AE_CALL_BEFORE_SLEEP| + AE_CALL_AFTER_SLEEP); } } diff --git a/src/ae.h b/src/ae.h index 9acd72434..63b306615 100644 --- a/src/ae.h +++ b/src/ae.h @@ -47,11 +47,12 @@ things to disk before sending replies, and want to do that in a group fashion. */ -#define AE_FILE_EVENTS 1 -#define AE_TIME_EVENTS 2 +#define AE_FILE_EVENTS (1<<0) +#define AE_TIME_EVENTS (1<<1) #define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS) -#define AE_DONT_WAIT 4 -#define AE_CALL_AFTER_SLEEP 8 +#define AE_DONT_WAIT (1<<2) +#define AE_CALL_BEFORE_SLEEP (1<<3) +#define AE_CALL_AFTER_SLEEP (1<<4) #define AE_NOMORE -1 #define AE_DELETED_EVENT_ID -1 diff --git a/src/networking.c b/src/networking.c index 75c0c16b1..ce5b0ae38 100644 --- a/src/networking.c +++ b/src/networking.c @@ -2863,8 +2863,9 @@ int processEventsWhileBlocked(void) { ProcessingEventsWhileBlocked = 1; while (iterations--) { int events = 0; - events += aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT); - events += handleClientsWithPendingWrites(); + events += aeProcessEvents(server.el, + AE_FILE_EVENTS|AE_DONT_WAIT| + AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); if (!events) break; count += events; } diff --git a/src/server.c b/src/server.c index d272a6fe0..73a4a9f55 100644 --- a/src/server.c +++ b/src/server.c @@ -2092,22 +2092,33 @@ extern int ProcessingEventsWhileBlocked; /* This function gets called every time Redis is entering the * main loop of the event driven library, that is, before to sleep * for ready file descriptors. + * * Note: This function is (currently) called from two functions: * 1. aeMain - The main server loop * 2. processEventsWhileBlocked - Process clients during RDB/AOF load + * * If it was called from processEventsWhileBlocked we don't want * to perform all actions (For example, we don't want to expire * keys), but we do need to perform some actions. + * * The most important is freeClientsInAsyncFreeQueue but we also * call some other low-risk functions. */ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); - if (!ProcessingEventsWhileBlocked) { - /* Handle precise timeouts of blocked clients. */ - handleBlockedClientsTimeout(); + /* Just call a subset of vital functions in case we are re-entering + * the event loop from processEventsWhileBlocked(). */ + if (ProcessingEventsWhileBlocked) { + handleClientsWithPendingReadsUsingThreads(); + tlsProcessPendingData(); + handleClientsWithPendingWrites(); + freeClientsInAsyncFreeQueue(); + return; } + /* Handle precise timeouts of blocked clients. */ + handleBlockedClientsTimeout(); + /* We should handle pending reads clients ASAP after event loop. */ handleClientsWithPendingReadsUsingThreads(); @@ -2117,69 +2128,65 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* If tls still has pending unread data don't sleep at all. */ aeSetDontWait(server.el, tlsHasPendingData()); - if (!ProcessingEventsWhileBlocked) { - /* Call the Redis Cluster before sleep function. Note that this function - * may change the state of Redis Cluster (from ok to fail or vice versa), - * so it's a good idea to call it before serving the unblocked clients - * later in this function. */ - if (server.cluster_enabled) clusterBeforeSleep(); + /* Call the Redis Cluster before sleep function. Note that this function + * may change the state of Redis Cluster (from ok to fail or vice versa), + * so it's a good idea to call it before serving the unblocked clients + * later in this function. */ + if (server.cluster_enabled) clusterBeforeSleep(); - /* Run a fast expire cycle (the called function will return - * ASAP if a fast cycle is not needed). */ - if (server.active_expire_enabled && server.masterhost == NULL) - activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); + /* Run a fast expire cycle (the called function will return + * ASAP if a fast cycle is not needed). */ + if (server.active_expire_enabled && server.masterhost == NULL) + activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); - /* Unblock all the clients blocked for synchronous replication - * in WAIT. */ - if (listLength(server.clients_waiting_acks)) - processClientsWaitingReplicas(); + /* Unblock all the clients blocked for synchronous replication + * in WAIT. */ + if (listLength(server.clients_waiting_acks)) + processClientsWaitingReplicas(); - /* Check if there are clients unblocked by modules that implement - * blocking commands. */ - if (moduleCount()) moduleHandleBlockedClients(); + /* Check if there are clients unblocked by modules that implement + * blocking commands. */ + if (moduleCount()) moduleHandleBlockedClients(); - /* Try to process pending commands for clients that were just unblocked. */ - if (listLength(server.unblocked_clients)) - processUnblockedClients(); + /* Try to process pending commands for clients that were just unblocked. */ + if (listLength(server.unblocked_clients)) + processUnblockedClients(); - /* Send all the slaves an ACK request if at least one client blocked - * during the previous event loop iteration. Note that we do this after - * processUnblockedClients(), so if there are multiple pipelined WAITs - * and the just unblocked WAIT gets blocked again, we don't have to wait - * a server cron cycle in absence of other event loop events. See #6623. */ - if (server.get_ack_from_slaves) { - robj *argv[3]; + /* Send all the slaves an ACK request if at least one client blocked + * during the previous event loop iteration. Note that we do this after + * processUnblockedClients(), so if there are multiple pipelined WAITs + * and the just unblocked WAIT gets blocked again, we don't have to wait + * a server cron cycle in absence of other event loop events. See #6623. */ + if (server.get_ack_from_slaves) { + robj *argv[3]; - argv[0] = createStringObject("REPLCONF",8); - argv[1] = createStringObject("GETACK",6); - argv[2] = createStringObject("*",1); /* Not used argument. */ - replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); - decrRefCount(argv[0]); - decrRefCount(argv[1]); - decrRefCount(argv[2]); - server.get_ack_from_slaves = 0; - } - - /* Send the invalidation messages to clients participating to the - * client side caching protocol in broadcasting (BCAST) mode. */ - trackingBroadcastInvalidationMessages(); - - /* Write the AOF buffer on disk */ - flushAppendOnlyFile(0); + argv[0] = createStringObject("REPLCONF",8); + argv[1] = createStringObject("GETACK",6); + argv[2] = createStringObject("*",1); /* Not used argument. */ + replicationFeedSlaves(server.slaves, server.slaveseldb, argv, 3); + decrRefCount(argv[0]); + decrRefCount(argv[1]); + decrRefCount(argv[2]); + server.get_ack_from_slaves = 0; } + /* Send the invalidation messages to clients participating to the + * client side caching protocol in broadcasting (BCAST) mode. */ + trackingBroadcastInvalidationMessages(); + + /* Write the AOF buffer on disk */ + flushAppendOnlyFile(0); + /* Handle writes with pending output buffers. */ handleClientsWithPendingWritesUsingThreads(); /* Close clients that need to be closed asynchronous */ freeClientsInAsyncFreeQueue(); - if (!ProcessingEventsWhileBlocked) { - /* Before we are going to sleep, let the threads access the dataset by - * releasing the GIL. Redis main thread will not touch anything at this - * time. */ - if (moduleCount()) moduleReleaseGIL(); - } + /* Before we are going to sleep, let the threads access the dataset by + * releasing the GIL. Redis main thread will not touch anything at this + * time. */ + if (moduleCount()) moduleReleaseGIL(); } /* This function is called immadiately after the event loop multiplexing From 74249be4a24a8da90f87e9898fb456a49141c57c Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 14 May 2020 10:02:57 +0200 Subject: [PATCH 15/21] Track events processed while blocked globally. Related to #7234. --- src/connection.h | 2 +- src/networking.c | 19 ++++++++++++------- src/server.c | 16 +++++++++++----- src/server.h | 5 +++-- src/tls.c | 7 +++++-- 5 files changed, 32 insertions(+), 17 deletions(-) diff --git a/src/connection.h b/src/connection.h index db09dfd83..0fd6c5f24 100644 --- a/src/connection.h +++ b/src/connection.h @@ -222,6 +222,6 @@ const char *connGetInfo(connection *conn, char *buf, size_t buf_len); /* Helpers for tls special considerations */ int tlsHasPendingData(); -void tlsProcessPendingData(); +int tlsProcessPendingData(); #endif /* __REDIS_CONNECTION_H */ diff --git a/src/networking.c b/src/networking.c index ce5b0ae38..671e374f4 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1257,7 +1257,10 @@ void freeClientAsync(client *c) { pthread_mutex_unlock(&async_free_queue_mutex); } -void freeClientsInAsyncFreeQueue(void) { +/* Free the clietns marked as CLOSE_ASAP, return the number of clients + * freed. */ +int freeClientsInAsyncFreeQueue(void) { + int freed = listLength(server.clients_to_close); while (listLength(server.clients_to_close)) { listNode *ln = listFirst(server.clients_to_close); client *c = listNodeValue(ln); @@ -1266,6 +1269,7 @@ void freeClientsInAsyncFreeQueue(void) { freeClient(c); listDelNode(server.clients_to_close,ln); } + return freed; } /* Return a client by ID, or NULL if the client ID is not in the set @@ -2852,9 +2856,8 @@ int clientsArePaused(void) { * write, close sequence needed to serve a client. * * The function returns the total number of events processed. */ -int processEventsWhileBlocked(void) { +void processEventsWhileBlocked(void) { int iterations = 4; /* See the function top-comment. */ - int count = 0; /* Note: when we are processing events while blocked (for instance during * busy Lua scripts), we set a global flag. When such flag is set, we @@ -2862,15 +2865,17 @@ int processEventsWhileBlocked(void) { * See https://github.com/antirez/redis/issues/6988 for more info. */ ProcessingEventsWhileBlocked = 1; while (iterations--) { - int events = 0; - events += aeProcessEvents(server.el, + long long startval = server.events_processed_while_blocked; + long long ae_events = aeProcessEvents(server.el, AE_FILE_EVENTS|AE_DONT_WAIT| AE_CALL_BEFORE_SLEEP|AE_CALL_AFTER_SLEEP); + /* Note that server.events_processed_while_blocked will also get + * incremeted by callbacks called by the event loop handlers. */ + server.events_processed_while_blocked += ae_events; + long long events = server.events_processed_while_blocked - startval; if (!events) break; - count += events; } ProcessingEventsWhileBlocked = 0; - return count; } /* ========================================================================== diff --git a/src/server.c b/src/server.c index 73a4a9f55..5bc4666ee 100644 --- a/src/server.c +++ b/src/server.c @@ -2107,12 +2107,17 @@ void beforeSleep(struct aeEventLoop *eventLoop) { UNUSED(eventLoop); /* Just call a subset of vital functions in case we are re-entering - * the event loop from processEventsWhileBlocked(). */ + * the event loop from processEventsWhileBlocked(). Note that in this + * case we keep track of the number of events we are processing, since + * processEventsWhileBlocked() wants to stop ASAP if there are no longer + * events to handle. */ if (ProcessingEventsWhileBlocked) { - handleClientsWithPendingReadsUsingThreads(); - tlsProcessPendingData(); - handleClientsWithPendingWrites(); - freeClientsInAsyncFreeQueue(); + uint64_t processed = 0; + processed += handleClientsWithPendingReadsUsingThreads(); + processed += tlsProcessPendingData(); + processed += handleClientsWithPendingWrites(); + processed += freeClientsInAsyncFreeQueue(); + server.events_processed_while_blocked += processed; return; } @@ -2763,6 +2768,7 @@ void initServer(void) { server.clients_waiting_acks = listCreate(); server.get_ack_from_slaves = 0; server.clients_paused = 0; + server.events_processed_while_blocked = 0; server.system_memory_size = zmalloc_get_memory_size(); if (server.tls_port && tlsConfigure(&server.tls_ctx_config) == C_ERR) { diff --git a/src/server.h b/src/server.h index 59cf1370e..55ee2d300 100644 --- a/src/server.h +++ b/src/server.h @@ -1095,6 +1095,7 @@ struct redisServer { queries. Will still serve RESP2 queries. */ int io_threads_num; /* Number of IO threads to use. */ int io_threads_do_reads; /* Read and parse from IO threads? */ + long long events_processed_while_blocked; /* processEventsWhileBlocked() */ /* RDB / AOF loading information */ int loading; /* We are loading data from disk if true */ @@ -1652,7 +1653,7 @@ void rewriteClientCommandVector(client *c, int argc, ...); void rewriteClientCommandArgument(client *c, int i, robj *newval); void replaceClientCommandVector(client *c, int argc, robj **argv); unsigned long getClientOutputBufferMemoryUsage(client *c); -void freeClientsInAsyncFreeQueue(void); +int freeClientsInAsyncFreeQueue(void); void asyncCloseClientOnOutputBufferLimitReached(client *c); int getClientType(client *c); int getClientTypeByName(char *name); @@ -1662,7 +1663,7 @@ void disconnectSlaves(void); int listenToPort(int port, int *fds, int *count); void pauseClients(mstime_t duration); int clientsArePaused(void); -int processEventsWhileBlocked(void); +void processEventsWhileBlocked(void); int handleClientsWithPendingWrites(void); int handleClientsWithPendingWritesUsingThreads(void); int handleClientsWithPendingReadsUsingThreads(void); diff --git a/src/tls.c b/src/tls.c index c18aafebe..ee85bd302 100644 --- a/src/tls.c +++ b/src/tls.c @@ -768,15 +768,17 @@ int tlsHasPendingData() { return listLength(pending_list) > 0; } -void tlsProcessPendingData() { +int tlsProcessPendingData() { listIter li; listNode *ln; + int processed = listLength(pending_list); listRewind(pending_list,&li); while((ln = listNext(&li))) { tls_connection *conn = listNodeValue(ln); tlsHandleEvent(conn, AE_READABLE); } + return processed; } #else /* USE_OPENSSL */ @@ -804,7 +806,8 @@ int tlsHasPendingData() { return 0; } -void tlsProcessPendingData() { +int tlsProcessPendingData() { + return 0; } #endif From 80c906bd30117b02f72b4b9251c34bb85e3c4703 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 15 May 2020 10:19:13 +0200 Subject: [PATCH 16/21] Cache master without checking of deferred close flags. The context is issue #7205: since the introduction of threaded I/O we close clients asynchronously by default from readQueryFromClient(). So we should no longer prevent the caching of the master client, to later PSYNC incrementally, if such flags are set. However we also don't want the master client to be cached with such flags (would be closed immediately after being restored). And yet we want a way to understand if a master was closed because of a protocol error, and in that case prevent the caching. --- src/networking.c | 10 ++++------ src/replication.c | 8 ++++++-- src/server.h | 1 + 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/src/networking.c b/src/networking.c index 671e374f4..6de00be09 100644 --- a/src/networking.c +++ b/src/networking.c @@ -1141,10 +1141,7 @@ void freeClient(client *c) { * some unexpected state, by checking its flags. */ if (server.master && c->flags & CLIENT_MASTER) { serverLog(LL_WARNING,"Connection with master lost."); - if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY| - CLIENT_CLOSE_ASAP| - CLIENT_BLOCKED))) - { + if (!(c->flags & (CLIENT_PROTOCOL_ERROR|CLIENT_BLOCKED))) { replicationCacheMaster(c); return; } @@ -1558,7 +1555,8 @@ int processInlineBuffer(client *c) { } /* Helper function. Record protocol erro details in server log, - * and set the client as CLIENT_CLOSE_AFTER_REPLY. */ + * and set the client as CLIENT_CLOSE_AFTER_REPLY and + * CLIENT_PROTOCOL_ERROR. */ #define PROTO_DUMP_LEN 128 static void setProtocolError(const char *errstr, client *c) { if (server.verbosity <= LL_VERBOSE) { @@ -1584,7 +1582,7 @@ static void setProtocolError(const char *errstr, client *c) { "Protocol error (%s) from client: %s. %s", errstr, client, buf); sdsfree(client); } - c->flags |= CLIENT_CLOSE_AFTER_REPLY; + c->flags |= (CLIENT_CLOSE_AFTER_REPLY|CLIENT_PROTOCOL_ERROR); } /* Process the query buffer for client 'c', setting up the client argument diff --git a/src/replication.c b/src/replication.c index c59639cd1..f433d6413 100644 --- a/src/replication.c +++ b/src/replication.c @@ -2688,14 +2688,18 @@ void replicationCacheMaster(client *c) { /* Unlink the client from the server structures. */ unlinkClient(c); + /* Clear flags that can create issues once we reconnect the client. */ + c->flags &= ~(CLIENT_CLOSE_ASAP|CLIENT_CLOSE_AFTER_REPLY); + /* Reset the master client so that's ready to accept new commands: * we want to discard te non processed query buffers and non processed * offsets, including pending transactions, already populated arguments, * pending outputs to the master. */ sdsclear(server.master->querybuf); sdsclear(server.master->pending_querybuf); - /* Adjust reploff and read_reploff to the last meaningful offset we executed. - * this is the offset the replica will use for future PSYNC. */ + + /* Adjust reploff and read_reploff to the last meaningful offset we + * executed. This is the offset the replica will use for future PSYNC. */ server.master->reploff = adjustMeaningfulReplOffset(); server.master->read_reploff = server.master->reploff; if (c->flags & CLIENT_MULTI) discardTransaction(c); diff --git a/src/server.h b/src/server.h index 55ee2d300..f835bf5e9 100644 --- a/src/server.h +++ b/src/server.h @@ -255,6 +255,7 @@ typedef long long ustime_t; /* microsecond time type. */ #define CLIENT_TRACKING_NOLOOP (1ULL<<37) /* Don't send invalidation messages about writes performed by myself.*/ #define CLIENT_IN_TO_TABLE (1ULL<<38) /* This client is in the timeout table. */ +#define CLIENT_PROTOCOL_ERROR (1ULL<<39) /* Protocol error chatting with it. */ /* Client block type (btype field in client structure) * if CLIENT_BLOCKED flag is set. */ From 2435341d7450c7e81e2facc4aebe981e1c900cc4 Mon Sep 17 00:00:00 2001 From: Madelyn Olson Date: Thu, 14 May 2020 08:37:24 -0700 Subject: [PATCH 17/21] Added a refcount on timer events to prevent deletion of recursive timer calls --- src/ae.c | 10 ++++++++++ src/ae.h | 2 ++ 2 files changed, 12 insertions(+) diff --git a/src/ae.c b/src/ae.c index 379cfd1e6..689a27d16 100644 --- a/src/ae.c +++ b/src/ae.c @@ -238,6 +238,7 @@ long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, te->clientData = clientData; te->prev = NULL; te->next = eventLoop->timeEventHead; + te->refcount = 0; if (te->next) te->next->prev = te; eventLoop->timeEventHead = te; @@ -316,6 +317,13 @@ static int processTimeEvents(aeEventLoop *eventLoop) { /* Remove events scheduled for deletion. */ if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; + /* If a reference exists for this timer event, + * don't free it. This is currently incremented + * for recursive timerProc calls */ + if (te->refcount) { + te = next; + continue; + } if (te->prev) te->prev->next = te->next; else @@ -345,7 +353,9 @@ static int processTimeEvents(aeEventLoop *eventLoop) { int retval; id = te->id; + te->refcount++; retval = te->timeProc(eventLoop, id, te->clientData); + te->refcount--; processed++; if (retval != AE_NOMORE) { aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms); diff --git a/src/ae.h b/src/ae.h index 63b306615..d1b7f34bf 100644 --- a/src/ae.h +++ b/src/ae.h @@ -86,6 +86,8 @@ typedef struct aeTimeEvent { void *clientData; struct aeTimeEvent *prev; struct aeTimeEvent *next; + int refcount; /* refcount to prevent timer events from being + * freed in recursive time event calls. */ } aeTimeEvent; /* A fired event */ From 389697988a78e131e326b1b02005b0deec4a8a43 Mon Sep 17 00:00:00 2001 From: David Carlier Date: Tue, 12 May 2020 21:19:12 +0100 Subject: [PATCH 18/21] NetBSD build update. This platform supports CPU affinity (but not OpenBSD). --- src/Makefile | 10 ++++++++++ src/config.h | 2 +- src/setcpuaffinity.c | 19 +++++++++++++++++++ 3 files changed, 30 insertions(+), 1 deletion(-) diff --git a/src/Makefile b/src/Makefile index 55f862cfc..b8c05c32b 100644 --- a/src/Makefile +++ b/src/Makefile @@ -128,6 +128,14 @@ else ifeq ($(uname_S),DragonFly) # FreeBSD FINAL_LIBS+= -lpthread -lexecinfo +else +ifeq ($(uname_S),OpenBSD) + # OpenBSD + FINAL_LIBS+= -lpthread -lexecinfo +else +ifeq ($(uname_S),NetBSD) + # NetBSD + FINAL_LIBS+= -lpthread -lexecinfo else # All the other OSes (notably Linux) FINAL_LDFLAGS+= -rdynamic @@ -138,6 +146,8 @@ endif endif endif endif +endif +endif # Include paths to dependencies FINAL_CFLAGS+= -I../deps/hiredis -I../deps/linenoise -I../deps/lua/src diff --git a/src/config.h b/src/config.h index 6025d4e96..0fcc42972 100644 --- a/src/config.h +++ b/src/config.h @@ -248,7 +248,7 @@ int pthread_setname_np(const char *name); #endif /* Check if we can use setcpuaffinity(). */ -#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__ || defined __OpenBSD__) +#if (defined __linux || defined __NetBSD__ || defined __FreeBSD__) #define USE_SETCPUAFFINITY void setcpuaffinity(const char *cpulist); #endif diff --git a/src/setcpuaffinity.c b/src/setcpuaffinity.c index dcae81c71..13594113c 100644 --- a/src/setcpuaffinity.c +++ b/src/setcpuaffinity.c @@ -36,6 +36,10 @@ #include #include #endif +#ifdef __NetBSD__ +#include +#include +#endif #include "config.h" #ifdef USE_SETCPUAFFINITY @@ -71,11 +75,18 @@ void setcpuaffinity(const char *cpulist) { #ifdef __FreeBSD__ cpuset_t cpuset; #endif +#ifdef __NetBSD__ + cpuset_t *cpuset; +#endif if (!cpulist) return; +#ifndef __NetBSD__ CPU_ZERO(&cpuset); +#else + cpuset = cpuset_create(); +#endif q = cpulist; while (p = q, q = next_token(q, ','), p) { @@ -110,7 +121,11 @@ void setcpuaffinity(const char *cpulist) { return; while (a <= b) { +#ifndef __NetBSD__ CPU_SET(a, &cpuset); +#else + cpuset_set(a, cpuset); +#endif a += s; } } @@ -124,6 +139,10 @@ void setcpuaffinity(const char *cpulist) { #ifdef __FreeBSD__ cpuset_setaffinity(CPU_LEVEL_WHICH, CPU_WHICH_TID, -1, sizeof(cpuset), &cpuset); #endif +#ifdef __NetBSD__ + pthread_setaffinity_np(pthread_self(), cpuset_size(cpuset), cpuset); + cpuset_destroy(cpuset); +#endif } #endif /* USE_SETCPUAFFINITY */ From 77ae66930c47b4b3c685bd534a043b86d9df5900 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 10 May 2020 17:35:27 +0300 Subject: [PATCH 19/21] TLS: Add crypto locks for older OpenSSL support. This is really required only for older OpenSSL versions. Also, at the moment Redis does not use OpenSSL from multiple threads so this will only be useful if modules end up doing that. --- src/tls.c | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/src/tls.c b/src/tls.c index ee85bd302..28a74df9a 100644 --- a/src/tls.c +++ b/src/tls.c @@ -93,11 +93,56 @@ static int parseProtocolsConfig(const char *str) { * served to the reader yet. */ static list *pending_list = NULL; +/** + * OpenSSL global initialization and locking handling callbacks. + * Note that this is only required for OpenSSL < 1.1.0. + */ + +#if OPENSSL_VERSION_NUMBER < 0x10100000L +#define USE_CRYPTO_LOCKS +#endif + +#ifdef USE_CRYPTO_LOCKS + +static pthread_mutex_t *openssl_locks; + +static void sslLockingCallback(int mode, int lock_id, const char *f, int line) { + pthread_mutex_t *mt = openssl_locks + lock_id; + + if (mode & CRYPTO_LOCK) { + pthread_mutex_lock(mt); + } else { + pthread_mutex_unlock(mt); + } + + (void)f; + (void)line; +} + +static void initCryptoLocks(void) { + unsigned i, nlocks; + if (CRYPTO_get_locking_callback() != NULL) { + /* Someone already set the callback before us. Don't destroy it! */ + return; + } + nlocks = CRYPTO_num_locks(); + openssl_locks = zmalloc(sizeof(*openssl_locks) * nlocks); + for (i = 0; i < nlocks; i++) { + pthread_mutex_init(openssl_locks + i, NULL); + } + CRYPTO_set_locking_callback(sslLockingCallback); +} +#endif /* USE_CRYPTO_LOCKS */ + void tlsInit(void) { ERR_load_crypto_strings(); SSL_load_error_strings(); SSL_library_init(); +#ifdef USE_CRYPTO_LOCKS + initCryptoLocks(); +#endif + if (!RAND_poll()) { serverLog(LL_WARNING, "OpenSSL: Failed to seed random number generator."); } From 16ba33c05bf94b94ec8217e82aa50a1aeef62573 Mon Sep 17 00:00:00 2001 From: Yossi Gottlieb Date: Sun, 10 May 2020 17:38:04 +0300 Subject: [PATCH 20/21] TLS: Fix test failures on recent Debian/Ubuntu. Seems like on some systems choosing specific TLS v1/v1.1 versions no longer works as expected. Test is reduced for v1.2 now which is still good enough to test the mechansim, and matters most anyway. --- tests/unit/tls.tcl | 20 -------------------- 1 file changed, 20 deletions(-) diff --git a/tests/unit/tls.tcl b/tests/unit/tls.tcl index 950f65557..2b04590cd 100644 --- a/tests/unit/tls.tcl +++ b/tests/unit/tls.tcl @@ -25,26 +25,6 @@ start_server {tags {"tls"}} { } test {TLS: Verify tls-protocols behaves as expected} { - r CONFIG SET tls-protocols TLSv1 - - set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1 0}] - catch {$s PING} e - assert_match {*I/O error*} $e - - set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1 1}] - catch {$s PING} e - assert_match {PONG} $e - - r CONFIG SET tls-protocols TLSv1.1 - - set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1.1 0}] - catch {$s PING} e - assert_match {*I/O error*} $e - - set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1.1 1}] - catch {$s PING} e - assert_match {PONG} $e - r CONFIG SET tls-protocols TLSv1.2 set s [redis [srv 0 host] [srv 0 port] 0 1 {-tls1.2 0}] From ae306a3df6cf63b31a0814cb5393a9df59947d2e Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 15 May 2020 22:29:52 +0200 Subject: [PATCH 21/21] Redis 6.0.2. --- 00-RELEASENOTES | 179 ++++++++++++++++++++++++++++++++++++++++++++++++ src/version.h | 2 +- 2 files changed, 180 insertions(+), 1 deletion(-) diff --git a/00-RELEASENOTES b/00-RELEASENOTES index 1d5486ae8..1fa620bd8 100644 --- a/00-RELEASENOTES +++ b/00-RELEASENOTES @@ -11,6 +11,185 @@ CRITICAL: There is a critical bug affecting MOST USERS. Upgrade ASAP. SECURITY: There are security fixes in the release. -------------------------------------------------------------------------------- +================================================================================ +Redis 6.0.2 Released Fri May 15 22:24:36 CEST 2020 +================================================================================ + +Upgrade urgency MODERATE: many not critical bugfixes in different areas. + Critical fix to client side caching when + keys are evicted from the tracking table but + no notifications are sent. + +The following are the most serious fix: + +* XPENDING should not update consumer's seen-time +* optimize memory usage of deferred replies - fixed +* Fix CRC64 initialization outside the Redis server itself. +* stringmatchlen() should not expect null terminated strings. +* Cluster nodes availability checks improved when there is + high Pub/Sub load on the cluster bus. +* Redis Benchmark: Fix coredump because of double free +* Tracking: send eviction messages when evicting entries. +* rax.c updated from upstream antirez/rax. +* fix redis 6.0 not freeing closed connections during loading. + +New features: + +* Support setcpuaffinity on linux/bsd +* Client Side Caching: Add Tracking Prefix Number Stats in Server Info +* Add --user argument to redis-benchmark.c (ACL) + +Full list of commits: + +Yossi Gottlieb in commit 16ba33c05: + TLS: Fix test failures on recent Debian/Ubuntu. + 1 file changed, 20 deletions(-) + +Yossi Gottlieb in commit 77ae66930: + TLS: Add crypto locks for older OpenSSL support. + 1 file changed, 45 insertions(+) + +David Carlier in commit 389697988: + NetBSD build update. + 3 files changed, 30 insertions(+), 1 deletion(-) + +Madelyn Olson in commit 2435341d7: + Added a refcount on timer events to prevent deletion of recursive timer calls + 2 files changed, 12 insertions(+) + +antirez in commit 80c906bd3: + Cache master without checking of deferred close flags. + 3 files changed, 11 insertions(+), 8 deletions(-) + +antirez in commit 74249be4a: + Track events processed while blocked globally. + 5 files changed, 32 insertions(+), 17 deletions(-) + +antirez in commit 8bf660af9: + Some rework of #7234. + 4 files changed, 77 insertions(+), 65 deletions(-) + +Oran Agra in commit 9da134cd8: + fix redis 6.0 not freeing closed connections during loading. + 3 files changed, 133 insertions(+), 58 deletions(-) + +antirez in commit f7f219a13: + Regression test for #7249. + 1 file changed, 22 insertions(+) + +antirez in commit 693629585: + rax.c updated from upstream antirez/rax. + 1 file changed, 4 insertions(+), 2 deletions(-) + +antirez in commit e3b5648df: + Tracking: send eviction messages when evicting entries. + 2 files changed, 29 insertions(+), 12 deletions(-) + +Oran Agra in commit 5c41802d5: + fix unstable replication test + 1 file changed, 2 insertions(+), 2 deletions(-) + +ShooterIT in commit a23cdbb94: + Redis Benchmark: Fix coredump because of double free + 1 file changed, 1 insertion(+), 1 deletion(-) + +antirez in commit 1276058ea: + Cluster: clarify we always resolve the sender. + 1 file changed, 3 insertions(+), 1 deletion(-) + +antirez in commit 002fcde3d: + Cluster: refactor ping/data delay handling. + 1 file changed, 13 insertions(+), 11 deletions(-) + +antirez in commit 960186a71: + Cluster: introduce data_received field. + 2 files changed, 27 insertions(+), 10 deletions(-) + +antirez in commit 3672875b4: + stringmatchlen() should not expect null terminated strings. + 1 file changed, 2 insertions(+), 2 deletions(-) + +Brad Dunbar in commit 24e12641d: + Remove unreachable branch. + 1 file changed, 2 deletions(-) + +hwware in commit c7edffbd5: + add jemalloc-bg-thread config in redis conf + 1 file changed, 3 insertions(+) + +hwware in commit 8a9c84f4a: + add include guard for lolwut.h + 1 file changed, 6 insertions(+) + +antirez in commit cb683a84f: + Don't propagate spurious MULTI on DEBUG LOADAOF. + 2 files changed, 6 insertions(+), 3 deletions(-) + +antirez in commit 84d9766d6: + Dump recent backlog on master query generating errors. + 1 file changed, 29 insertions(+) + +Titouan Christophe in commit ec1e106ec: + make struct user anonymous (only typedefed) + 1 file changed, 1 insertion(+), 1 deletion(-) + +antirez in commit e48c37316: + Test: --dont-clean should do first cleanup. + 1 file changed, 2 insertions(+), 5 deletions(-) + +Benjamin Sergeant in commit 1e561cfaa: + Add --user argument to redis-benchmark.c (ACL) + 1 file changed, 15 insertions(+), 2 deletions(-) + +antirez in commit d1af82a88: + Drop not needed part from #7194. + 1 file changed, 1 insertion(+), 1 deletion(-) + +Muhammad Zahalqa in commit 897a360d0: + Fix compiler warnings on function rev(unsigned long) + 1 file changed, 3 insertions(+), 3 deletions(-) + +antirez in commit ac316d8cc: + Move CRC64 initialization in main(). + 2 files changed, 1 insertion(+), 4 deletions(-) + +antirez in commit fc7bc3204: + Fix CRC64 initialization outside the Redis server itself. + 1 file changed, 3 insertions(+) + +hwware in commit a6e55c096: + Client Side Caching: Add Tracking Prefix Number Stats in Server Info + 3 files changed, 8 insertions(+) + +antirez in commit b062fd523: + Fix NetBSD build by fixing redis_set_thread_title() support. + 1 file changed, 4 insertions(+), 1 deletion(-) + +antirez in commit 4efb25d9c: + Rework a bit the documentation for CPU pinning. + 2 files changed, 18 insertions(+), 8 deletions(-) + +zhenwei pi in commit d6436eb7c: + Support setcpuaffinity on linux/bsd + 12 files changed, 180 insertions(+), 1 deletion(-) + +Guy Benoish in commit 3a441c7d9: + XPENDING should not update consumer's seen-time + 4 files changed, 33 insertions(+), 20 deletions(-) + +Oran Agra in commit 75addb4fe: + optimize memory usage of deferred replies - fixed + 1 file changed, 29 insertions(+) + +Deliang Yang in commit c57d9146f: + reformat code + 1 file changed, 1 insertion(+), 1 deletion(-) + +Oran Agra in commit 3d3861dd8: + add daily github actions with libc malloc and valgrind + 5 files changed, 106 insertions(+), 18 deletions(-) + + ================================================================================ Redis 6.0.1 Released Sat May 02 00:06:07 CEST 2020 ================================================================================ diff --git a/src/version.h b/src/version.h index 91d3566d3..ed67debe1 100644 --- a/src/version.h +++ b/src/version.h @@ -1 +1 @@ -#define REDIS_VERSION "6.0.1" +#define REDIS_VERSION "6.0.2"