From 792afb443211f190b3f8bea15e945661453fbddf Mon Sep 17 00:00:00 2001 From: ny0312 <49037844+ny0312@users.noreply.github.com> Date: Thu, 16 Dec 2021 21:56:59 -0800 Subject: [PATCH] Introduce memory management on cluster link buffers (#9774) Introduce memory management on cluster link buffers: * Introduce a new `cluster-link-sendbuf-limit` config that caps memory usage of cluster bus link send buffers. * Introduce a new `CLUSTER LINKS` command that displays current TCP links to/from peers. * Introduce a new `mem_cluster_links` field under `INFO` command output, which displays the overall memory usage by all current cluster links. * Introduce a new `total_cluster_links_buffer_limit_exceeded` field under `CLUSTER INFO` command output, which displays the accumulated count of cluster links freed due to `cluster-link-sendbuf-limit`. --- redis.conf | 11 ++ src/cluster.c | 216 +++++++++++++++++++++++++++---- src/cluster.h | 8 +- src/commands.c | 9 ++ src/commands/cluster-links.json | 15 +++ src/config.c | 1 + src/object.c | 3 + src/server.c | 3 + src/server.h | 3 + tests/cluster/cluster.tcl | 69 ++++++++++ tests/cluster/tests/24-links.tcl | 99 ++++++++++++++ tests/support/util.tcl | 8 ++ tests/unit/pendingquerybuf.tcl | 8 -- 13 files changed, 421 insertions(+), 32 deletions(-) create mode 100644 src/commands/cluster-links.json create mode 100644 tests/cluster/tests/24-links.tcl diff --git a/redis.conf b/redis.conf index b2810996d..1350ac96d 100644 --- a/redis.conf +++ b/redis.conf @@ -1576,6 +1576,17 @@ lua-time-limit 5000 # # cluster-allow-reads-when-down no +# Cluster link send buffer limit is the limit on the memory usage of an individual +# cluster bus link's send buffer in bytes. Cluster links would be freed if they exceed +# this limit. This is to primarily prevent send buffers from growing unbounded on links +# toward slow peers (E.g. PubSub messages being piled up). +# This limit is disabled by default. Enable this limit when 'mem_cluster_links' INFO field +# and/or 'send-buffer-allocated' entries in the 'CLUSTER LINKS` command output continuously increase. +# Minimum limit of 1gb is recommended so that cluster link buffer can fit in at least a single +# PubSub message by default. (client-query-buffer-limit default value is 1gb) +# +# cluster-link-sendbuf-limit 0 + # In order to setup your cluster make sure to read the documentation # available at https://redis.io web site. diff --git a/src/cluster.c b/src/cluster.c index cbc4cd184..6538ceb0b 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -568,11 +568,15 @@ void clusterInit(void) { server.cluster->failover_auth_epoch = 0; server.cluster->cant_failover_reason = CLUSTER_CANT_FAILOVER_NONE; server.cluster->lastVoteEpoch = 0; + + /* Initialize stats */ for (int i = 0; i < CLUSTERMSG_TYPE_COUNT; i++) { server.cluster->stats_bus_messages_sent[i] = 0; server.cluster->stats_bus_messages_received[i] = 0; } server.cluster->stats_pfail_nodes = 0; + server.cluster->stat_cluster_links_buffer_limit_exceeded = 0; + memset(server.cluster->slots,0, sizeof(server.cluster->slots)); clusterCloseAllSlots(); @@ -711,8 +715,13 @@ clusterLink *createClusterLink(clusterNode *node) { link->sndbuf = sdsempty(); link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN); link->rcvbuf_len = 0; - link->node = node; link->conn = NULL; + link->node = node; + /* Related node can only possibly be known at link creation time if this is an outbound link */ + link->inbound = (node == NULL); + if (!link->inbound) { + node->link = link; + } return link; } @@ -726,11 +735,33 @@ void freeClusterLink(clusterLink *link) { } sdsfree(link->sndbuf); zfree(link->rcvbuf); - if (link->node) - link->node->link = NULL; + if (link->node) { + if (link->node->link == link) { + serverAssert(!link->inbound); + link->node->link = NULL; + } else if (link->node->inbound_link == link) { + serverAssert(link->inbound); + link->node->inbound_link = NULL; + } + } zfree(link); } +void setClusterNodeToInboundClusterLink(clusterNode *node, clusterLink *link) { + serverAssert(!link->node); + serverAssert(link->inbound); + if (node->inbound_link) { + /* A peer may disconnect and then reconnect with us, and it's not guaranteed that + * we would always process the disconnection of the existing inbound link before + * accepting a new existing inbound link. Therefore, it's possible to have more than + * one inbound link from the same node at the same time. */ + serverLog(LL_DEBUG, "Replacing inbound link fd %d from node %s with fd %d", + node->inbound_link->conn->fd, node->name, link->conn->fd); + } + node->inbound_link = link; + link->node = node; +} + static void clusterConnAcceptHandler(connection *conn) { clusterLink *link; @@ -879,6 +910,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { node->data_received = 0; node->fail_time = 0; node->link = NULL; + node->inbound_link = NULL; memset(node->ip,0,sizeof(node->ip)); node->port = 0; node->cport = 0; @@ -1046,8 +1078,9 @@ void freeClusterNode(clusterNode *n) { serverAssert(dictDelete(server.cluster->nodes,nodename) == DICT_OK); sdsfree(nodename); - /* Release link and associated data structures. */ + /* Release links and associated data structures. */ if (n->link) freeClusterLink(n->link); + if (n->inbound_link) freeClusterLink(n->inbound_link); listRelease(n->fail_reports); zfree(n->slaves); zfree(n); @@ -1821,6 +1854,26 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc } } +static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) { + clusterNode *sender; + if (link->node && !nodeInHandshake(link->node)) { + /* If the link has an associated node, use that so that we don't have to look it + * up every time, except when the node is still in handshake, the node still has + * a random name thus not truly "known". */ + sender = link->node; + } else { + /* Otherwise, fetch sender based on the message */ + sender = clusterLookupNode(hdr->sender); + /* We know the sender node but haven't associate it with the link. This must + * be an inbound link because only for inbound links we didn't know which node + * to associate when they were created. */ + if (sender && !link->node) { + setClusterNodeToInboundClusterLink(sender, link); + } + } + return sender; +} + /* When this function is called, there is a packet to process starting * at link->rcvbuf. Releasing the buffer is up to the caller, so this * function should just handle the higher level stuff of processing the @@ -1896,10 +1949,7 @@ int clusterProcessPacket(clusterLink *link) { if (totlen != explen) return 1; } - /* Check if the sender is a known node. Note that for incoming connections - * we don't store link->node information, but resolve the node by the - * ID in the header each time in the current implementation. */ - sender = clusterLookupNode(hdr->sender); + sender = getNodeFromLinkAndMsg(link, hdr); /* Update the last time we saw any data from this node. We * use this in order to avoid detecting a timeout from a node that @@ -2000,7 +2050,7 @@ int clusterProcessPacket(clusterLink *link) { serverLog(LL_DEBUG,"%s packet received: %s", clusterGetMessageTypeString(type), link->node ? link->node->name : "NULL"); - if (link->node) { + if (!link->inbound) { if (nodeInHandshake(link->node)) { /* If we already have this node, try to change the * IP/port of the node with the new one. */ @@ -2070,7 +2120,7 @@ int clusterProcessPacket(clusterLink *link) { } /* Update our info about the node */ - if (link->node && type == CLUSTERMSG_TYPE_PONG) { + if (!link->inbound && type == CLUSTERMSG_TYPE_PONG) { link->node->pong_received = now; link->node->ping_sent = 0; @@ -2673,7 +2723,7 @@ void clusterSendPing(clusterLink *link, int type) { hdr = (clusterMsg*) buf; /* Populate the header. */ - if (link->node && type == CLUSTERMSG_TYPE_PING) + if (!link->inbound && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime(); clusterBuildMessageHdr(hdr,type); @@ -3588,7 +3638,7 @@ void clusterHandleManualFailover(void) { /* Check if the node is disconnected and re-establish the connection. * Also update a few stats while we are here, that can be used to make * better decisions in other part of the code. */ -int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) { +static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) { /* Not interested in reconnecting the link with myself or nodes * for which we have no address. */ if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) return 1; @@ -3622,20 +3672,57 @@ int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout freeClusterLink(link); return 0; } - node->link = link; } return 0; } -/* Resize the send buffer of a node if it is wasting - * enough space. */ -int clusterNodeCronResizeBuffers(clusterNode *node) { +static void resizeClusterLinkBuffer(clusterLink *link) { /* If unused space is a lot bigger than the used portion of the buffer then free up unused space. * We use a factor of 4 because of the greediness of sdsMakeRoomFor (used by sdscatlen). */ - if (node->link != NULL && sdsavail(node->link->sndbuf) / 4 > sdslen(node->link->sndbuf)) { - node->link->sndbuf = sdsRemoveFreeSpace(node->link->sndbuf); + if (link != NULL && sdsavail(link->sndbuf) / 4 > sdslen(link->sndbuf)) { + link->sndbuf = sdsRemoveFreeSpace(link->sndbuf); } - return 0; +} + +/* Resize the send buffer of a node if it is wasting + * enough space. */ +static void clusterNodeCronResizeBuffers(clusterNode *node) { + resizeClusterLinkBuffer(node->link); + resizeClusterLinkBuffer(node->inbound_link); +} + +static void freeClusterLinkOnBufferLimitReached(clusterLink *link) { + if (link == NULL || server.cluster_link_sendbuf_limit_bytes == 0) { + return; + } + unsigned long long mem_link = sdsalloc(link->sndbuf); + if (mem_link > server.cluster_link_sendbuf_limit_bytes) { + serverLog(LL_WARNING, "Freeing cluster link(%s node %s, used memory: %llu) due to " + "exceeding send buffer memory limit.", link->inbound ? "from" : "to", + link->node ? link->node->name : "", mem_link); + freeClusterLink(link); + server.cluster->stat_cluster_links_buffer_limit_exceeded++; + } +} + +/* Free outbound link to a node if its send buffer size exceeded limit. */ +static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) { + freeClusterLinkOnBufferLimitReached(node->link); + freeClusterLinkOnBufferLimitReached(node->inbound_link); +} + +static size_t getClusterLinkMemUsage(clusterLink *link) { + if (link != NULL) { + return sizeof(clusterLink) + sdsalloc(link->sndbuf) + link->rcvbuf_alloc; + } else { + return 0; + } +} + +/* Update memory usage statistics of all current cluster links */ +static void clusterNodeCronUpdateClusterLinksMemUsage(clusterNode *node) { + server.stat_cluster_links_memory += getClusterLinkMemUsage(node->link); + server.stat_cluster_links_memory += getClusterLinkMemUsage(node->inbound_link); } /* This is executed 10 times every second */ @@ -3662,14 +3749,25 @@ void clusterCron(void) { /* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */ server.cluster->stats_pfail_nodes = 0; + /* Clear so clusterNodeCronUpdateClusterLinksMemUsage can count the current memory usage of all cluster links. */ + server.stat_cluster_links_memory = 0; /* Run through some of the operations we want to do on each cluster node. */ di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); - /* The protocol is that they return non-zero if the node was - * terminated. */ + /* The sequence goes: + * 1. We try to shrink link buffers if possible. + * 2. We free the links whose buffers are still oversized after possible shrinking. + * 3. We update the latest memory usage of cluster links. + * 4. We immediately attempt reconnecting after freeing links. + */ + clusterNodeCronResizeBuffers(node); + clusterNodeCronFreeLinkOnBufferLimitReached(node); + clusterNodeCronUpdateClusterLinksMemUsage(node); + /* The protocol is that function(s) below return non-zero if the node was + * terminated. + */ if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue; - if(clusterNodeCronResizeBuffers(node)) continue; } dictReleaseIterator(di); @@ -4399,6 +4497,70 @@ sds clusterGenNodesDescription(int filter, int use_pport) { return ci; } +/* Add to the output buffer of the given client the description of the given cluster link. + * The description is a map with each entry being an attribute of the link. */ +void addReplyClusterLinkDescription(client *c, clusterLink *link) { + addReplyMapLen(c, 6); + + addReplyBulkCString(c, "direction"); + addReplyBulkCString(c, link->inbound ? "from" : "to"); + + /* addReplyClusterLinkDescription is only called for links that have been + * associated with nodes. The association is always bi-directional, so + * in addReplyClusterLinkDescription, link->node should never be NULL. */ + serverAssert(link->node); + sds node_name = sdsnewlen(link->node->name, CLUSTER_NAMELEN); + addReplyBulkCString(c, "node"); + addReplyBulkCString(c, node_name); + sdsfree(node_name); + + addReplyBulkCString(c, "create-time"); + addReplyLongLong(c, link->ctime); + + char events[3], *p; + p = events; + if (link->conn) { + if (connHasReadHandler(link->conn)) *p++ = 'r'; + if (connHasWriteHandler(link->conn)) *p++ = 'w'; + } + *p = '\0'; + addReplyBulkCString(c, "events"); + addReplyBulkCString(c, events); + + addReplyBulkCString(c, "send-buffer-allocated"); + addReplyLongLong(c, sdsalloc(link->sndbuf)); + + addReplyBulkCString(c, "send-buffer-used"); + addReplyLongLong(c, sdslen(link->sndbuf)); +} + +/* Add to the output buffer of the given client an array of cluster link descriptions, + * with array entry being a description of a single current cluster link. */ +void addReplyClusterLinksDescription(client *c) { + dictIterator *di; + dictEntry *de; + void *arraylen_ptr = NULL; + int num_links = 0; + + arraylen_ptr = addReplyDeferredLen(c); + + di = dictGetSafeIterator(server.cluster->nodes); + while((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node->link) { + num_links++; + addReplyClusterLinkDescription(c, node->link); + } + if (node->inbound_link) { + num_links++; + addReplyClusterLinkDescription(c, node->inbound_link); + } + } + dictReleaseIterator(di); + + setDeferredArrayLen(c, arraylen_ptr, num_links); +} + /* ----------------------------------------------------------------------------- * CLUSTER command * -------------------------------------------------------------------------- */ @@ -4608,6 +4770,9 @@ void clusterCommand(client *c) { "SLOTS", " Return information about slots range mappings. Each range is made of:", " start, end, master and replicas IP addresses, ports and ids", +"LINKS", +" Return information about all network links between this node and its peers.", +" Output format is an array where each array element is a map containing attributes of a link", NULL }; addReplyHelp(c, help); @@ -4919,6 +5084,10 @@ NULL info = sdscatprintf(info, "cluster_stats_messages_received:%lld\r\n", tot_msg_received); + info = sdscatprintf(info, + "total_cluster_links_buffer_limit_exceeded:%llu\r\n", + server.cluster->stat_cluster_links_buffer_limit_exceeded); + /* Produce the reply protocol. */ addReplyVerbatim(c,info,sdslen(info),"txt"); sdsfree(info); @@ -5182,6 +5351,9 @@ NULL } clusterReset(hard); addReply(c,shared.ok); + } else if (!strcasecmp(c->argv[1]->ptr,"links") && c->argc == 2) { + /* CLUSTER LINKS */ + addReplyClusterLinksDescription(c); } else { addReplySubcommandSyntaxError(c); return; diff --git a/src/cluster.h b/src/cluster.h index e0cf5c4dd..d64e2a5b9 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -39,7 +39,8 @@ typedef struct clusterLink { char *rcvbuf; /* Packet reception buffer */ size_t rcvbuf_len; /* Used size of rcvbuf */ size_t rcvbuf_alloc; /* Allocated size of rcvbuf */ - struct clusterNode *node; /* Node related to this link if any, or NULL */ + struct clusterNode *node; /* Node related to this link. Initialized to NULL when unknown */ + int inbound; /* 1 if this link is an inbound link accepted from the related node */ } clusterLink; /* Cluster node flags and macros. */ @@ -137,7 +138,8 @@ typedef struct clusterNode { int pport; /* Latest known clients plaintext port. Only used if the main clients port is for TLS. */ int cport; /* Latest known cluster port of this node. */ - clusterLink *link; /* TCP/IP link with this node */ + clusterLink *link; /* TCP/IP link established toward this node */ + clusterLink *inbound_link; /* TCP/IP link accepted from this node */ list *fail_reports; /* List of nodes signaling this as failing */ } clusterNode; @@ -192,11 +194,13 @@ typedef struct clusterState { /* The following fields are used by masters to take state on elections. */ uint64_t lastVoteEpoch; /* Epoch of the last vote granted. */ int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ + /* Stats */ /* Messages received and sent by type. */ long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT]; long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT]; long long stats_pfail_nodes; /* Number of nodes in PFAIL status, excluding nodes without address. */ + unsigned long long stat_cluster_links_buffer_limit_exceeded; /* Total number of cluster links freed due to exceeding buffer limit */ } clusterState; /* Redis cluster messages header */ diff --git a/src/commands.c b/src/commands.c index c88072072..4232ba8b5 100644 --- a/src/commands.c +++ b/src/commands.c @@ -399,6 +399,14 @@ struct redisCommandArg CLUSTER_KEYSLOT_Args[] = { {0} }; +/********** CLUSTER LINKS ********************/ + +/* CLUSTER LINKS history */ +#define CLUSTER_LINKS_History NULL + +/* CLUSTER LINKS hints */ +#define CLUSTER_LINKS_Hints NULL + /********** CLUSTER MEET ********************/ /* CLUSTER MEET history */ @@ -552,6 +560,7 @@ struct redisCommand CLUSTER_Subcommands[] = { {"help","Show helpful text about the different subcommands","O(1)","5.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_HELP_History,CLUSTER_HELP_Hints,clusterCommand,2,CMD_LOADING|CMD_STALE,0}, {"info","Provides info about Redis Cluster node state","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_INFO_History,CLUSTER_INFO_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0}, {"keyslot","Returns the hash slot of the specified key","O(N) where N is the number of bytes in the key","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_KEYSLOT_History,CLUSTER_KEYSLOT_Hints,clusterCommand,3,CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_KEYSLOT_Args}, +{"links","Returns a list of all TCP links to and from peer nodes in cluster","O(N) where N is the total number of Cluster nodes","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_LINKS_History,CLUSTER_LINKS_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0}, {"meet","Force a node cluster to handshake with another node","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MEET_History,CLUSTER_MEET_Hints,clusterCommand,-4,CMD_ADMIN|CMD_RANDOM|CMD_STALE,0,.args=CLUSTER_MEET_Args}, {"myid","Return the node id","O(1)","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_MYID_History,CLUSTER_MYID_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0}, {"nodes","Get Cluster config for the node","O(N) where N is the total number of Cluster nodes","3.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_CLUSTER,CLUSTER_NODES_History,CLUSTER_NODES_Hints,clusterCommand,2,CMD_RANDOM|CMD_STALE,0}, diff --git a/src/commands/cluster-links.json b/src/commands/cluster-links.json new file mode 100644 index 000000000..cc53d3384 --- /dev/null +++ b/src/commands/cluster-links.json @@ -0,0 +1,15 @@ +{ + "LINKS": { + "summary": "Returns a list of all TCP links to and from peer nodes in cluster", + "complexity": "O(N) where N is the total number of Cluster nodes", + "group": "cluster", + "since": "7.0.0", + "arity": 2, + "container": "CLUSTER", + "function": "clusterCommand", + "command_flags": [ + "RANDOM", + "STALE" + ] + } +} diff --git a/src/config.c b/src/config.c index 5146559e4..f171b0041 100644 --- a/src/config.c +++ b/src/config.c @@ -2690,6 +2690,7 @@ standardConfig configs[] = { /* Unsigned Long Long configs */ createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory), + createULongLongConfig("cluster-link-sendbuf-limit", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.cluster_link_sendbuf_limit_bytes, 0, MEMORY_CONFIG, NULL, NULL), /* Size_t configs */ createSizeTConfig("hash-max-listpack-entries", "hash-max-ziplist-entries", MODIFIABLE_CONFIG, 0, LONG_MAX, server.hash_max_listpack_entries, 512, INTEGER_CONFIG, NULL, NULL), diff --git a/src/object.c b/src/object.c index 7a5563ccb..d53f3e084 100644 --- a/src/object.c +++ b/src/object.c @@ -1196,6 +1196,9 @@ struct redisMemOverhead *getMemoryOverheadData(void) { server.stat_clients_type_memory[CLIENT_TYPE_NORMAL]; mem_total += mh->clients_normal; + mh->cluster_links = server.stat_cluster_links_memory; + mem_total += mh->cluster_links; + mem = 0; if (server.aof_state != AOF_OFF) { mem += sdsZmallocSize(server.aof_buf); diff --git a/src/server.c b/src/server.c index 0abf20dcd..e09974e41 100644 --- a/src/server.c +++ b/src/server.c @@ -2352,6 +2352,7 @@ void initServer(void) { server.stat_module_progress = 0; for (int j = 0; j < CLIENT_TYPE_COUNT; j++) server.stat_clients_type_memory[j] = 0; + server.stat_cluster_links_memory = 0; server.cron_malloc_stats.zmalloc_used = 0; server.cron_malloc_stats.process_rss = 0; server.cron_malloc_stats.allocator_allocated = 0; @@ -4559,6 +4560,7 @@ sds genRedisInfoString(const char *section) { "mem_total_replication_buffers:%zu\r\n" "mem_clients_slaves:%zu\r\n" "mem_clients_normal:%zu\r\n" + "mem_cluster_links:%zu\r\n" "mem_aof_buffer:%zu\r\n" "mem_allocator:%s\r\n" "active_defrag_running:%d\r\n" @@ -4611,6 +4613,7 @@ sds genRedisInfoString(const char *section) { server.repl_buffer_mem, mh->clients_slaves, mh->clients_normal, + mh->cluster_links, mh->aof_buffer, ZMALLOC_LIB, server.active_defrag_running, diff --git a/src/server.h b/src/server.h index a3600e13a..6ecad5456 100644 --- a/src/server.h +++ b/src/server.h @@ -1217,6 +1217,7 @@ struct redisMemOverhead { size_t repl_backlog; size_t clients_slaves; size_t clients_normal; + size_t cluster_links; size_t aof_buffer; size_t lua_caches; size_t functions_caches; @@ -1462,6 +1463,7 @@ struct redisServer { size_t stat_module_cow_bytes; /* Copy on write bytes during module fork. */ double stat_module_progress; /* Module save progress. */ redisAtomic size_t stat_clients_type_memory[CLIENT_TYPE_COUNT];/* Mem usage by type */ + size_t stat_cluster_links_memory;/* Mem usage by cluster links */ long long stat_unexpected_error_replies; /* Number of unexpected (aof-loading, replica to master, etc.) error replies */ long long stat_total_error_replies; /* Total number of issued error replies ( command + rejected errors ) */ long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */ @@ -1734,6 +1736,7 @@ struct redisServer { int cluster_allow_reads_when_down; /* Are reads allowed when the cluster is down? */ int cluster_config_file_lock_fd; /* cluster config fd, will be flock */ + unsigned long long cluster_link_sendbuf_limit_bytes; /* Memory usage limit on individual link send buffers*/ /* Scripting */ client *script_caller; /* The client running script right now, or NULL */ mstime_t script_time_limit; /* Script timeout in milliseconds */ diff --git a/tests/cluster/cluster.tcl b/tests/cluster/cluster.tcl index e95789282..7b7ce5343 100644 --- a/tests/cluster/cluster.tcl +++ b/tests/cluster/cluster.tcl @@ -175,3 +175,72 @@ proc wait_for_cluster_propagation {} { fail "cluster config did not reach a consistent state" } } + +# Returns a parsed CLUSTER LINKS output of the instance identified +# by the given `id` as a list of dictionaries, with each dictionary +# corresponds to a link. +proc get_cluster_links id { + set lines [R $id cluster links] + set links {} + foreach l $lines { + if {$l eq {}} continue + assert_equal [llength $l] 12 + assert_equal [lindex $l 0] "direction" + set dir [lindex $l 1] + assert_equal [lindex $l 2] "node" + set node [lindex $l 3] + assert_equal [lindex $l 4] "create-time" + set create_time [lindex $l 5] + assert_equal [lindex $l 6] "events" + set events [lindex $l 7] + assert_equal [lindex $l 8] "send-buffer-allocated" + set send_buffer_allocated [lindex $l 9] + assert_equal [lindex $l 10] "send-buffer-used" + set send_buffer_used [lindex $l 11] + set link [dict create \ + dir $dir \ + node $node \ + create_time $create_time \ + events $events \ + send_buffer_allocated $send_buffer_allocated \ + send_buffer_used $send_buffer_used \ + ] + lappend links $link + } + return $links +} + +proc get_links_with_peer {this_instance_id peer_nodename} { + set links [get_cluster_links $this_instance_id] + set links_with_peer {} + foreach l $links { + if {[dict get $l node] eq $peer_nodename} { + lappend links_with_peer $l + } + } + return $links_with_peer +} + +# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that +# corresponds to the link established toward a peer identified by `peer_nodename` +proc get_link_to_peer {this_instance_id peer_nodename} { + set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename] + foreach l $links_with_peer { + if {[dict get $l dir] eq "to"} { + return $l + } + } + return {} +} + +# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that +# corresponds to the link accepted from a peer identified by `peer_nodename` +proc get_link_from_peer {this_instance_id peer_nodename} { + set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename] + foreach l $links_with_peer { + if {[dict get $l dir] eq "from"} { + return $l + } + } + return {} +} diff --git a/tests/cluster/tests/24-links.tcl b/tests/cluster/tests/24-links.tcl new file mode 100644 index 000000000..6657a8ce4 --- /dev/null +++ b/tests/cluster/tests/24-links.tcl @@ -0,0 +1,99 @@ +source "../tests/includes/init-tests.tcl" + +test "Create a cluster with two single-node shards" { + create_cluster 2 0 +} + +test "Cluster should start ok" { + assert_cluster_state ok +} + +test "Each node has two links with each peer" { + foreach_redis_id id { + # Get number of peers, excluding myself + set nodes [get_cluster_nodes $id] + set num_peers [expr [llength $nodes] - 1] + + # Get number of links to peers + set links [get_cluster_links $id] + set num_links [llength $links] + + # Two links per peer + assert {$num_peers*2 eq $num_links} + + # For each peer there should be exactly one + # link "to" it and one link "from" it. + foreach n $nodes { + if {[has_flag $n myself]} continue + set peer [dict get $n id] + set to 0 + set from 0 + foreach l $links { + if {[dict get $l node] eq $peer} { + if {[dict get $l dir] eq "to"} { + incr to + } elseif {[dict get $l dir] eq "from"} { + incr from + } + } + } + assert {$to eq 1} + assert {$from eq 1} + } + } +} + +set primary1_id 0 +set primary2_id 1 + +set primary1 [Rn $primary1_id] +set primary2 [Rn $primary2_id] + +test "Disconnect link when send buffer limit reached" { + # On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts + set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1] + $primary1 CONFIG set cluster-node-timeout [expr 60*60*1000] + + # Get primary1's links with primary2 + set primary2_name [dict get [get_myself $primary2_id] id] + set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] + set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name] + + # On primary1, set cluster link send buffer limit to 32MB + set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1] + $primary1 CONFIG set cluster-link-sendbuf-limit [expr 32*1024*1024] + assert {[get_info_field [$primary1 cluster info] total_cluster_links_buffer_limit_exceeded] eq 0} + + # To manufacture an ever-growing send buffer from primary1 to primary2, + # make primary2 unresponsive. + set primary2_pid [get_instance_attrib redis $primary2_id pid] + exec kill -SIGSTOP $primary2_pid + + # On primary1, send a 10MB Pubsub message. It will stay in send buffer of + # the link from primary1 to primary2 + $primary1 publish channel [prepare_value [expr 10*1024*1024]] + + # Check the same link has not been disconnected, but its send buffer has grown + set same_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] + assert {[dict get $same_link_p1_to_p2 create_time] eq [dict get $orig_link_p1_to_p2 create_time]} + assert {[dict get $same_link_p1_to_p2 send_buffer_allocated] > [dict get $orig_link_p1_to_p2 send_buffer_allocated]} + + # On primary1, send another 30MB Pubsub message. + $primary1 publish channel [prepare_value [expr 30*1024*1024]] + + # Link has exceeded buffer limit and been dropped and recreated + set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] + assert {[dict get $new_link_p1_to_p2 create_time] > [dict get $orig_link_p1_to_p2 create_time]} + assert {[get_info_field [$primary1 cluster info] total_cluster_links_buffer_limit_exceeded] eq 1} + + # Link from primary2 should not be affected + set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name] + assert {[dict get $same_link_p1_from_p2 create_time] eq [dict get $orig_link_p1_from_p2 create_time]} + + # Revive primary2 + exec kill -SIGCONT $primary2_pid + + # Reset configs on primary1 so config changes don't leak out to other tests + $primary1 CONFIG set cluster-node-timeout $oldtimeout + $primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit +} diff --git a/tests/support/util.tcl b/tests/support/util.tcl index d97743665..08fea1faa 100644 --- a/tests/support/util.tcl +++ b/tests/support/util.tcl @@ -978,3 +978,11 @@ proc read_big_bulk {code {compare no} {prefix ""}} { r readraw 0 return $resp_len } + +proc prepare_value {size} { + set _v "c" + for {set i 1} {$i < $size} {incr i} { + append _v 0 + } + return $_v +} diff --git a/tests/unit/pendingquerybuf.tcl b/tests/unit/pendingquerybuf.tcl index b1c2ee0d5..c1278c8fd 100644 --- a/tests/unit/pendingquerybuf.tcl +++ b/tests/unit/pendingquerybuf.tcl @@ -4,14 +4,6 @@ proc info_memory {r property} { } } -proc prepare_value {size} { - set _v "c" - for {set i 1} {$i < $size} {incr i} { - append _v 0 - } - return $_v -} - start_server {tags {"wait external:skip"}} { start_server {} { set slave [srv 0 client]