From 47c493e070c8ac59ccc34d694700bca8ec517fbc Mon Sep 17 00:00:00 2001 From: Brennan <31714723+BCathcart@users.noreply.github.com> Date: Tue, 1 Nov 2022 22:26:44 -0400 Subject: [PATCH] Re-design cluster link send buffer to improve memory management (#11343) Re-design cluster link send queue to improve memory management --- src/cluster.c | 327 +++++++++--------- src/cluster.h | 4 +- src/config.c | 2 +- src/server.h | 2 +- tests/cluster/cluster.tcl | 69 ---- tests/cluster/tests/24-links.tcl | 114 ------ .../{cluster_helper.tcl => cluster_util.tcl} | 41 ++- tests/test_helper.tcl | 11 +- tests/unit/cluster/links.tcl | 258 ++++++++++++++ 9 files changed, 467 insertions(+), 361 deletions(-) delete mode 100644 tests/cluster/tests/24-links.tcl rename tests/support/{cluster_helper.tcl => cluster_util.tcl} (76%) create mode 100644 tests/unit/cluster/links.tcl diff --git a/src/cluster.c b/src/cluster.c index 7389de710..69cea4a5f 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -83,6 +83,7 @@ void removeChannelsInSlot(unsigned int slot); unsigned int countKeysInSlot(unsigned int hashslot); unsigned int countChannelsInSlot(unsigned int hashslot); unsigned int delKeysInSlot(unsigned int hashslot); +static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen); /* Links to the next and previous entries for keys in the same slot are stored * in the dict entry metadata. See Slot to Key API below. */ @@ -127,6 +128,13 @@ static ConnectionType *connTypeOfCluster() { return connectionTypeTcp(); } +/* clusterLink send queue blocks */ +typedef struct { + size_t totlen; /* Total length of this block including the message */ + int refcount; /* Number of cluster link send msg queues containing the message */ + clusterMsg msg; +} clusterMsgSendBlock; + /* ----------------------------------------------------------------------------- * Initialization * -------------------------------------------------------------------------- */ @@ -802,13 +810,36 @@ void clusterReset(int hard) { /* ----------------------------------------------------------------------------- * CLUSTER communication link * -------------------------------------------------------------------------- */ +static clusterMsgSendBlock *createClusterMsgSendBlock(int type, uint32_t msglen) { + uint32_t blocklen = msglen + sizeof(clusterMsgSendBlock) - sizeof(clusterMsg); + clusterMsgSendBlock *msgblock = zcalloc(blocklen); + msgblock->refcount = 1; + msgblock->totlen = blocklen; + server.stat_cluster_links_memory += blocklen; + clusterBuildMessageHdr(&msgblock->msg,type,msglen); + return msgblock; +} + +static void clusterMsgSendBlockDecrRefCount(void *node) { + clusterMsgSendBlock *msgblock = (clusterMsgSendBlock*)node; + msgblock->refcount--; + serverAssert(msgblock->refcount >= 0); + if (msgblock->refcount == 0) { + server.stat_cluster_links_memory -= msgblock->totlen; + zfree(msgblock); + } +} clusterLink *createClusterLink(clusterNode *node) { clusterLink *link = zmalloc(sizeof(*link)); link->ctime = mstime(); - link->sndbuf = sdsempty(); + link->send_msg_queue = listCreate(); + listSetFreeMethod(link->send_msg_queue, clusterMsgSendBlockDecrRefCount); + link->head_msg_send_offset = 0; + link->send_msg_queue_mem = sizeof(list); link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN); link->rcvbuf_len = 0; + server.stat_cluster_links_memory += link->rcvbuf_alloc + link->send_msg_queue_mem; link->conn = NULL; link->node = node; /* Related node can only possibly be known at link creation time if this is an outbound link */ @@ -827,7 +858,9 @@ void freeClusterLink(clusterLink *link) { connClose(link->conn); link->conn = NULL; } - sdsfree(link->sndbuf); + server.stat_cluster_links_memory -= sizeof(list) + listLength(link->send_msg_queue)*sizeof(listNode); + listRelease(link->send_msg_queue); + server.stat_cluster_links_memory -= link->rcvbuf_alloc; zfree(link->rcvbuf); if (link->node) { if (link->node->link == link) { @@ -2684,22 +2717,45 @@ void handleLinkIOError(clusterLink *link) { freeClusterLink(link); } -/* Send data. This is handled using a trivial send buffer that gets - * consumed by write(). We don't try to optimize this for speed too much - * as this is a very low traffic channel. */ +/* Send the messages queued for the link. */ void clusterWriteHandler(connection *conn) { clusterLink *link = connGetPrivateData(conn); ssize_t nwritten; + size_t totwritten = 0; - nwritten = connWrite(conn, link->sndbuf, sdslen(link->sndbuf)); - if (nwritten <= 0) { - serverLog(LL_DEBUG,"I/O error writing to node link: %s", - (nwritten == -1) ? connGetLastError(conn) : "short write"); - handleLinkIOError(link); - return; + while (totwritten < NET_MAX_WRITES_PER_EVENT && listLength(link->send_msg_queue) > 0) { + listNode *head = listFirst(link->send_msg_queue); + clusterMsgSendBlock *msgblock = (clusterMsgSendBlock*)head->value; + clusterMsg *msg = &msgblock->msg; + size_t msg_offset = link->head_msg_send_offset; + size_t msg_len = ntohl(msg->totlen); + + nwritten = connWrite(conn, (char*)msg + msg_offset, msg_len - msg_offset); + if (nwritten <= 0) { + serverLog(LL_DEBUG,"I/O error writing to node link: %s", + (nwritten == -1) ? connGetLastError(conn) : "short write"); + handleLinkIOError(link); + return; + } + if (msg_offset + nwritten < msg_len) { + /* If full message wasn't written, record the offset + * and continue sending from this point next time */ + link->head_msg_send_offset += nwritten; + return; + } + serverAssert((msg_offset + nwritten) == msg_len); + link->head_msg_send_offset = 0; + + /* Delete the node and update our memory tracking */ + uint32_t blocklen = msgblock->totlen; + listDelNode(link->send_msg_queue, head); + server.stat_cluster_links_memory -= sizeof(listNode); + link->send_msg_queue_mem -= sizeof(listNode) + blocklen; + + totwritten += nwritten; } - sdsrange(link->sndbuf,nwritten,-1); - if (sdslen(link->sndbuf) == 0) + + if (listLength(link->send_msg_queue) == 0) connSetWriteHandler(link->conn, NULL); } @@ -2798,9 +2854,11 @@ void clusterReadHandler(connection *conn) { size_t unused = link->rcvbuf_alloc - link->rcvbuf_len; if ((size_t)nread > unused) { size_t required = link->rcvbuf_len + nread; + size_t prev_rcvbuf_alloc = link->rcvbuf_alloc; /* If less than 1mb, grow to twice the needed size, if larger grow by 1mb. */ link->rcvbuf_alloc = required < RCVBUF_MAX_PREALLOC ? required * 2: required + RCVBUF_MAX_PREALLOC; link->rcvbuf = zrealloc(link->rcvbuf, link->rcvbuf_alloc); + server.stat_cluster_links_memory += link->rcvbuf_alloc - prev_rcvbuf_alloc; } memcpy(link->rcvbuf + link->rcvbuf_len, buf, nread); link->rcvbuf_len += nread; @@ -2812,8 +2870,10 @@ void clusterReadHandler(connection *conn) { if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) { if (clusterProcessPacket(link)) { if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) { + size_t prev_rcvbuf_alloc = link->rcvbuf_alloc; zfree(link->rcvbuf); link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN); + server.stat_cluster_links_memory += link->rcvbuf_alloc - prev_rcvbuf_alloc; } link->rcvbuf_len = 0; } else { @@ -2823,20 +2883,24 @@ void clusterReadHandler(connection *conn) { } } -/* Put stuff into the send buffer. +/* Put the message block into the link's send queue. * * It is guaranteed that this function will never have as a side effect * the link to be invalidated, so it is safe to call this function * from event handlers that will do stuff with the same link later. */ -void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { - if (sdslen(link->sndbuf) == 0 && msglen != 0) +void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) { + if (listLength(link->send_msg_queue) == 0 && msgblock->msg.totlen != 0) connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1); - link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); + listAddNodeTail(link->send_msg_queue, msgblock); + msgblock->refcount++; + + /* Update memory tracking */ + link->send_msg_queue_mem += sizeof(listNode) + msgblock->totlen; + server.stat_cluster_links_memory += sizeof(listNode); /* Populate sent messages stats. */ - clusterMsg *hdr = (clusterMsg*) msg; - uint16_t type = ntohs(hdr->type); + uint16_t type = ntohs(msgblock->msg.type); if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_sent[type]++; } @@ -2847,7 +2911,7 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { * It is guaranteed that this function will never have as a side effect * some node->link to be invalidated, so it is safe to call this function * from event handlers that will do stuff with node links later. */ -void clusterBroadcastMessage(void *buf, size_t len) { +void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) { dictIterator *di; dictEntry *de; @@ -2858,15 +2922,14 @@ void clusterBroadcastMessage(void *buf, size_t len) { if (!node->link) continue; if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE)) continue; - clusterSendMessage(node->link,buf,len); + clusterSendMessage(node->link,msgblock); } dictReleaseIterator(di); } /* Build the message header. hdr must point to a buffer at least * sizeof(clusterMsg) in bytes. */ -void clusterBuildMessageHdr(clusterMsg *hdr, int type) { - int totlen = 0; +static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) { uint64_t offset; clusterNode *master; @@ -2877,7 +2940,6 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { master = (nodeIsSlave(myself) && myself->slaveof) ? myself->slaveof : myself; - memset(hdr,0,sizeof(*hdr)); hdr->ver = htons(CLUSTER_PROTO_VER); hdr->sig[0] = 'R'; hdr->sig[1] = 'C'; @@ -2923,18 +2985,7 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { if (nodeIsMaster(myself) && server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED; - /* Compute the message length for certain messages. For other messages - * this is up to the caller. */ - if (type == CLUSTERMSG_TYPE_FAIL) { - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += sizeof(clusterMsgDataFail); - } else if (type == CLUSTERMSG_TYPE_UPDATE) { - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += sizeof(clusterMsgDataUpdate); - } - hdr->totlen = htonl(totlen); - /* For PING, PONG, MEET and other variable length messages fixing the - * totlen field is up to the caller. */ + hdr->totlen = htonl(msglen); } /* Set the i-th entry of the gossip section in the message pointed by 'hdr' @@ -2958,8 +3009,6 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) { void clusterSendPing(clusterLink *link, int type) { static unsigned long long cluster_pings_sent = 0; cluster_pings_sent++; - unsigned char *buf; - clusterMsg *hdr; int gossipcount = 0; /* Number of gossip sections added so far. */ int wanted; /* Number of gossip sections we want to append if possible. */ int estlen; /* Upper bound on estimated packet length */ @@ -3015,13 +3064,11 @@ void clusterSendPing(clusterLink *link, int type) { /* Note: clusterBuildMessageHdr() expects the buffer to be always at least * sizeof(clusterMsg) or more. */ if (estlen < (int)sizeof(clusterMsg)) estlen = sizeof(clusterMsg); - buf = zcalloc(estlen); - hdr = (clusterMsg*) buf; + clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, estlen); + clusterMsg *hdr = &msgblock->msg; - /* Populate the header. */ if (!link->inbound && type == CLUSTERMSG_TYPE_PING) link->node->ping_sent = mstime(); - clusterBuildMessageHdr(hdr,type); /* Populate the gossip fields */ int maxiterations = wanted*3; @@ -3113,8 +3160,9 @@ void clusterSendPing(clusterLink *link, int type) { hdr->count = htons(gossipcount); hdr->extensions = htons(extensions); hdr->totlen = htonl(totlen); - clusterSendMessage(link,buf,totlen); - zfree(buf); + + clusterSendMessage(link,msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); } /* Send a PONG packet to every connected node that's not in handshake state @@ -3154,20 +3202,15 @@ void clusterBroadcastPong(int target) { dictReleaseIterator(di); } -/* Send a PUBLISH message. - * - * If link is NULL, then the message is broadcasted to the whole cluster. +/* Create a PUBLISH message block. * * Sanitizer suppression: In clusterMsgDataPublish, sizeof(bulk_data) is 8. * As all the struct is used as a buffer, when more than 8 bytes are copied into * the 'bulk_data', sanitizer generates an out-of-bounds error which is a false * positive in this context. */ REDIS_NO_SANITIZE("bounds") -void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) { - unsigned char *payload; - clusterMsg buf[1]; - clusterMsg *hdr = (clusterMsg*) buf; - uint32_t totlen; +clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message, uint16_t type) { + uint32_t channel_len, message_len; channel = getDecodedObject(channel); @@ -3175,34 +3218,21 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_ channel_len = sdslen(channel->ptr); message_len = sdslen(message->ptr); - clusterBuildMessageHdr(hdr,type); - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len; + size_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + msglen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len; + clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, msglen); + clusterMsg *hdr = &msgblock->msg; hdr->data.publish.msg.channel_len = htonl(channel_len); hdr->data.publish.msg.message_len = htonl(message_len); - hdr->totlen = htonl(totlen); - - /* Try to use the local buffer if possible */ - if (totlen < sizeof(buf)) { - payload = (unsigned char*)buf; - } else { - payload = zmalloc(totlen); - memcpy(payload,hdr,sizeof(*hdr)); - hdr = (clusterMsg*) payload; - } memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr)); memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr), message->ptr,sdslen(message->ptr)); - if (link) - clusterSendMessage(link,payload,totlen); - else - clusterBroadcastMessage(payload,totlen); - decrRefCount(channel); decrRefCount(message); - if (payload != (unsigned char*)buf) zfree(payload); + + return msgblock; } /* Send a FAIL message to all the nodes we are able to contact. @@ -3211,27 +3241,34 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_ * we switch the node state to CLUSTER_NODE_FAIL and ask all the other * nodes to do the same ASAP. */ void clusterSendFail(char *nodename) { - clusterMsg buf[1]; - clusterMsg *hdr = (clusterMsg*) buf; + uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData) + + sizeof(clusterMsgDataFail); + clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAIL, msglen); - clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL); + clusterMsg *hdr = &msgblock->msg; memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN); - clusterBroadcastMessage(buf,ntohl(hdr->totlen)); + + clusterBroadcastMessage(msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); } /* Send an UPDATE message to the specified link carrying the specified 'node' * slots configuration. The node name, slots bitmap, and configEpoch info * are included. */ void clusterSendUpdate(clusterLink *link, clusterNode *node) { - clusterMsg buf[1]; - clusterMsg *hdr = (clusterMsg*) buf; - if (link == NULL) return; - clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE); + + uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData) + + sizeof(clusterMsgDataUpdate); + clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_UPDATE, msglen); + + clusterMsg *hdr = &msgblock->msg; memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN); hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch); memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots)); - clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen)); + + clusterSendMessage(link,msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); } /* Send a MODULE message. @@ -3239,36 +3276,22 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) { * If link is NULL, then the message is broadcasted to the whole cluster. */ void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, const char *payload, uint32_t len) { - unsigned char *heapbuf; - clusterMsg buf[1]; - clusterMsg *hdr = (clusterMsg*) buf; - uint32_t totlen; - - clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE); - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += sizeof(clusterMsgModule) - 3 + len; + uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + msglen += sizeof(clusterMsgModule) - 3 + len; + clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_MODULE, msglen); + clusterMsg *hdr = &msgblock->msg; hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */ hdr->data.module.msg.type = type; hdr->data.module.msg.len = htonl(len); - hdr->totlen = htonl(totlen); - - /* Try to use the local buffer if possible */ - if (totlen < sizeof(buf)) { - heapbuf = (unsigned char*)buf; - } else { - heapbuf = zmalloc(totlen); - memcpy(heapbuf,hdr,sizeof(*hdr)); - hdr = (clusterMsg*) heapbuf; - } memcpy(hdr->data.module.msg.bulk_data,payload,len); if (link) - clusterSendMessage(link,heapbuf,totlen); + clusterSendMessage(link,msgblock); else - clusterBroadcastMessage(heapbuf,totlen); + clusterBroadcastMessage(msgblock); - if (heapbuf != (unsigned char*)buf) zfree(heapbuf); + clusterMsgSendBlockDecrRefCount(msgblock); } /* This function gets a cluster node ID string as target, the same way the nodes @@ -3301,11 +3324,16 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin * Publish this message across the slot (primary/replica). * -------------------------------------------------------------------------- */ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { + clusterMsgSendBlock *msgblock; + if (!sharded) { - clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH); + msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH); + clusterBroadcastMessage(msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); return; } + msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD); list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself); if (listLength(nodes_for_slot) != 0) { listIter li; @@ -3314,11 +3342,12 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { while((ln = listNext(&li))) { clusterNode *node = listNodeValue(ln); if (node != myself) { - clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD); + clusterSendMessage(node->link,msgblock); } } } listRelease(nodes_for_slot); + clusterMsgSendBlockDecrRefCount(msgblock); } /* ----------------------------------------------------------------------------- @@ -3332,44 +3361,38 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) { * Note that we send the failover request to everybody, master and slave nodes, * but only the masters are supposed to reply to our query. */ void clusterRequestFailoverAuth(void) { - clusterMsg buf[1]; - clusterMsg *hdr = (clusterMsg*) buf; - uint32_t totlen; + uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST, msglen); - clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST); + clusterMsg *hdr = &msgblock->msg; /* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit * in the header to communicate the nodes receiving the message that * they should authorized the failover even if the master is working. */ if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK; - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - hdr->totlen = htonl(totlen); - clusterBroadcastMessage(buf,totlen); + clusterBroadcastMessage(msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); } /* Send a FAILOVER_AUTH_ACK message to the specified node. */ void clusterSendFailoverAuth(clusterNode *node) { - clusterMsg buf[1]; - clusterMsg *hdr = (clusterMsg*) buf; - uint32_t totlen; - if (!node->link) return; - clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK); - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - hdr->totlen = htonl(totlen); - clusterSendMessage(node->link,(unsigned char*)buf,totlen); + + uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK, msglen); + + clusterSendMessage(node->link,msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); } /* Send a MFSTART message to the specified node. */ void clusterSendMFStart(clusterNode *node) { - clusterMsg buf[1]; - clusterMsg *hdr = (clusterMsg*) buf; - uint32_t totlen; - if (!node->link) return; - clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART); - totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - hdr->totlen = htonl(totlen); - clusterSendMessage(node->link,(unsigned char*)buf,totlen); + + uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData); + clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_MFSTART, msglen); + + clusterSendMessage(node->link,msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); } /* Vote for the node asking for our vote if there are the conditions. */ @@ -4020,27 +4043,13 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_ return 0; } -static void resizeClusterLinkBuffer(clusterLink *link) { - /* If unused space is a lot bigger than the used portion of the buffer then free up unused space. - * We use a factor of 4 because of the greediness of sdsMakeRoomFor (used by sdscatlen). */ - if (link != NULL && sdsavail(link->sndbuf) / 4 > sdslen(link->sndbuf)) { - link->sndbuf = sdsRemoveFreeSpace(link->sndbuf); - } -} - -/* Resize the send buffer of a node if it is wasting - * enough space. */ -static void clusterNodeCronResizeBuffers(clusterNode *node) { - resizeClusterLinkBuffer(node->link); - resizeClusterLinkBuffer(node->inbound_link); -} - static void freeClusterLinkOnBufferLimitReached(clusterLink *link) { - if (link == NULL || server.cluster_link_sendbuf_limit_bytes == 0) { + if (link == NULL || server.cluster_link_msg_queue_limit_bytes == 0) { return; } - unsigned long long mem_link = sdsalloc(link->sndbuf); - if (mem_link > server.cluster_link_sendbuf_limit_bytes) { + + unsigned long long mem_link = link->send_msg_queue_mem; + if (mem_link > server.cluster_link_msg_queue_limit_bytes) { serverLog(LL_WARNING, "Freeing cluster link(%s node %.40s, used memory: %llu) due to " "exceeding send buffer memory limit.", link->inbound ? "from" : "to", link->node ? link->node->name : "", mem_link); @@ -4055,20 +4064,6 @@ static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) { freeClusterLinkOnBufferLimitReached(node->inbound_link); } -static size_t getClusterLinkMemUsage(clusterLink *link) { - if (link != NULL) { - return sizeof(clusterLink) + sdsalloc(link->sndbuf) + link->rcvbuf_alloc; - } else { - return 0; - } -} - -/* Update memory usage statistics of all current cluster links */ -static void clusterNodeCronUpdateClusterLinksMemUsage(clusterNode *node) { - server.stat_cluster_links_memory += getClusterLinkMemUsage(node->link); - server.stat_cluster_links_memory += getClusterLinkMemUsage(node->inbound_link); -} - /* This is executed 10 times every second */ void clusterCron(void) { dictIterator *di; @@ -4095,21 +4090,13 @@ void clusterCron(void) { /* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */ server.cluster->stats_pfail_nodes = 0; - /* Clear so clusterNodeCronUpdateClusterLinksMemUsage can count the current memory usage of all cluster links. */ - server.stat_cluster_links_memory = 0; /* Run through some of the operations we want to do on each cluster node. */ di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); - /* The sequence goes: - * 1. We try to shrink link buffers if possible. - * 2. We free the links whose buffers are still oversized after possible shrinking. - * 3. We update the latest memory usage of cluster links. - * 4. We immediately attempt reconnecting after freeing links. - */ - clusterNodeCronResizeBuffers(node); + /* We free the inbound or outboud link to the node if the link has an + * oversized message send queue and immediately try reconnecting. */ clusterNodeCronFreeLinkOnBufferLimitReached(node); - clusterNodeCronUpdateClusterLinksMemUsage(node); /* The protocol is that function(s) below return non-zero if the node was * terminated. */ @@ -4911,10 +4898,10 @@ void addReplyClusterLinkDescription(client *c, clusterLink *link) { addReplyBulkCString(c, events); addReplyBulkCString(c, "send-buffer-allocated"); - addReplyLongLong(c, sdsalloc(link->sndbuf)); + addReplyLongLong(c, link->send_msg_queue_mem); addReplyBulkCString(c, "send-buffer-used"); - addReplyLongLong(c, sdslen(link->sndbuf)); + addReplyLongLong(c, link->send_msg_queue_mem); } /* Add to the output buffer of the given client an array of cluster link descriptions, diff --git a/src/cluster.h b/src/cluster.h index 75516716f..f27072f20 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -35,7 +35,9 @@ struct clusterNode; typedef struct clusterLink { mstime_t ctime; /* Link creation time */ connection *conn; /* Connection to remote node */ - sds sndbuf; /* Packet send buffer */ + list *send_msg_queue; /* List of messages to be sent */ + size_t head_msg_send_offset; /* Number of bytes already sent of message at head of queue */ + unsigned long long send_msg_queue_mem; /* Memory in bytes used by message queue */ char *rcvbuf; /* Packet reception buffer */ size_t rcvbuf_len; /* Used size of rcvbuf */ size_t rcvbuf_alloc; /* Allocated size of rcvbuf */ diff --git a/src/config.c b/src/config.c index 4149e06c3..b11e4dda4 100644 --- a/src/config.c +++ b/src/config.c @@ -3125,7 +3125,7 @@ standardConfig static_configs[] = { /* Unsigned Long Long configs */ createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory), - createULongLongConfig("cluster-link-sendbuf-limit", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.cluster_link_sendbuf_limit_bytes, 0, MEMORY_CONFIG, NULL, NULL), + createULongLongConfig("cluster-link-sendbuf-limit", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.cluster_link_msg_queue_limit_bytes, 0, MEMORY_CONFIG, NULL, NULL), /* Size_t configs */ createSizeTConfig("hash-max-listpack-entries", "hash-max-ziplist-entries", MODIFIABLE_CONFIG, 0, LONG_MAX, server.hash_max_listpack_entries, 512, INTEGER_CONFIG, NULL, NULL), diff --git a/src/server.h b/src/server.h index 32b24afc4..4dfaaea7b 100644 --- a/src/server.h +++ b/src/server.h @@ -1899,7 +1899,7 @@ struct redisServer { int cluster_allow_reads_when_down; /* Are reads allowed when the cluster is down? */ int cluster_config_file_lock_fd; /* cluster config fd, will be flock */ - unsigned long long cluster_link_sendbuf_limit_bytes; /* Memory usage limit on individual link send buffers*/ + unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */ int cluster_drop_packet_filter; /* Debug config that allows tactically * dropping packets of a specific type */ /* Scripting */ diff --git a/tests/cluster/cluster.tcl b/tests/cluster/cluster.tcl index 9c669e128..056bfc311 100644 --- a/tests/cluster/cluster.tcl +++ b/tests/cluster/cluster.tcl @@ -217,72 +217,3 @@ proc are_hostnames_propagated {match_string} { } return 1 } - -# Returns a parsed CLUSTER LINKS output of the instance identified -# by the given `id` as a list of dictionaries, with each dictionary -# corresponds to a link. -proc get_cluster_links id { - set lines [R $id cluster links] - set links {} - foreach l $lines { - if {$l eq {}} continue - assert_equal [llength $l] 12 - assert_equal [lindex $l 0] "direction" - set dir [lindex $l 1] - assert_equal [lindex $l 2] "node" - set node [lindex $l 3] - assert_equal [lindex $l 4] "create-time" - set create_time [lindex $l 5] - assert_equal [lindex $l 6] "events" - set events [lindex $l 7] - assert_equal [lindex $l 8] "send-buffer-allocated" - set send_buffer_allocated [lindex $l 9] - assert_equal [lindex $l 10] "send-buffer-used" - set send_buffer_used [lindex $l 11] - set link [dict create \ - dir $dir \ - node $node \ - create_time $create_time \ - events $events \ - send_buffer_allocated $send_buffer_allocated \ - send_buffer_used $send_buffer_used \ - ] - lappend links $link - } - return $links -} - -proc get_links_with_peer {this_instance_id peer_nodename} { - set links [get_cluster_links $this_instance_id] - set links_with_peer {} - foreach l $links { - if {[dict get $l node] eq $peer_nodename} { - lappend links_with_peer $l - } - } - return $links_with_peer -} - -# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that -# corresponds to the link established toward a peer identified by `peer_nodename` -proc get_link_to_peer {this_instance_id peer_nodename} { - set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename] - foreach l $links_with_peer { - if {[dict get $l dir] eq "to"} { - return $l - } - } - return {} -} - -# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that -# corresponds to the link accepted from a peer identified by `peer_nodename` -proc get_link_from_peer {this_instance_id peer_nodename} { - set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename] - foreach l $links_with_peer { - if {[dict get $l dir] eq "from"} { - return $l - } - } - return {} -} \ No newline at end of file diff --git a/tests/cluster/tests/24-links.tcl b/tests/cluster/tests/24-links.tcl deleted file mode 100644 index d0ddea200..000000000 --- a/tests/cluster/tests/24-links.tcl +++ /dev/null @@ -1,114 +0,0 @@ -source "../tests/includes/init-tests.tcl" - -test "Create a cluster with two single-node shards" { - create_cluster 2 0 -} - -test "Cluster should start ok" { - assert_cluster_state ok -} - -proc number_of_peers {id} { - expr [llength [get_cluster_nodes $id]] - 1 -} - -proc number_of_links {id} { - llength [get_cluster_links $id] -} - -test "Each node has two links with each peer" { - foreach_redis_id id { - # Assert that from point of view of each node, there are two links for - # each peer. It might take a while for cluster to stabilize so wait up - # to 5 seconds. - wait_for_condition 50 100 { - [number_of_peers $id]*2 == [number_of_links $id] - } else { - assert_equal [expr [number_of_peers $id]*2] [number_of_links $id] - } - - set nodes [get_cluster_nodes $id] - set links [get_cluster_links $id] - - # For each peer there should be exactly one - # link "to" it and one link "from" it. - foreach n $nodes { - if {[has_flag $n myself]} continue - set peer [dict get $n id] - set to 0 - set from 0 - foreach l $links { - if {[dict get $l node] eq $peer} { - if {[dict get $l dir] eq "to"} { - incr to - } elseif {[dict get $l dir] eq "from"} { - incr from - } - } - } - assert {$to eq 1} - assert {$from eq 1} - } - } -} - -set primary1_id 0 -set primary2_id 1 - -set primary1 [Rn $primary1_id] -set primary2 [Rn $primary2_id] - -test "Disconnect link when send buffer limit reached" { - # On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts - set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1] - $primary1 CONFIG set cluster-node-timeout [expr 60*60*1000] - - # Get primary1's links with primary2 - set primary2_name [dict get [get_myself $primary2_id] id] - set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] - set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name] - - # On primary1, set cluster link send buffer limit to 256KB, which is large enough to not be - # overflowed by regular gossip messages but also small enough that it doesn't take too much - # memory to overflow it. If it is set too high, Redis may get OOM killed by kernel before this - # limit is overflowed in some RAM-limited test environments. - set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1] - $primary1 CONFIG set cluster-link-sendbuf-limit [expr 256*1024] - assert {[get_info_field [$primary1 cluster info] total_cluster_links_buffer_limit_exceeded] eq 0} - - # To manufacture an ever-growing send buffer from primary1 to primary2, - # make primary2 unresponsive. - set primary2_pid [get_instance_attrib redis $primary2_id pid] - exec kill -SIGSTOP $primary2_pid - - # On primary1, send 128KB Pubsub messages in a loop until the send buffer of the link from - # primary1 to primary2 exceeds buffer limit therefore be dropped. - # For the send buffer to grow, we need to first exhaust TCP send buffer of primary1 and TCP - # receive buffer of primary2 first. The sizes of these two buffers vary by OS, but 100 128KB - # messages should be sufficient. - set i 0 - wait_for_condition 100 0 { - [catch {incr i} e] == 0 && - [catch {$primary1 publish channel [prepare_value [expr 128*1024]]} e] == 0 && - [catch {after 500} e] == 0 && - [get_info_field [$primary1 cluster info] total_cluster_links_buffer_limit_exceeded] >= 1 - } else { - fail "Cluster link not freed as expected" - } - puts -nonewline "$i 128KB messages needed to overflow 256KB buffer limit. " - - # A new link to primary2 should have been recreated - set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] - assert {[dict get $new_link_p1_to_p2 create_time] > [dict get $orig_link_p1_to_p2 create_time]} - - # Link from primary2 should not be affected - set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name] - assert {[dict get $same_link_p1_from_p2 create_time] eq [dict get $orig_link_p1_from_p2 create_time]} - - # Revive primary2 - exec kill -SIGCONT $primary2_pid - - # Reset configs on primary1 so config changes don't leak out to other tests - $primary1 CONFIG set cluster-node-timeout $oldtimeout - $primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit -} diff --git a/tests/support/cluster_helper.tcl b/tests/support/cluster_util.tcl similarity index 76% rename from tests/support/cluster_helper.tcl rename to tests/support/cluster_util.tcl index 644eefdae..8a58a4fa9 100644 --- a/tests/support/cluster_helper.tcl +++ b/tests/support/cluster_util.tcl @@ -1,5 +1,4 @@ -# Helper functions specifically for setting up and configuring redis -# clusters. +# Cluster helper functions # Check if cluster configuration is consistent. proc cluster_config_consistent {} { @@ -113,3 +112,41 @@ proc start_cluster {masters replicas options code {slot_allocator continuous_slo start_multiple_servers $node_count $options $code set ::singledb $old_singledb } + +# Test node for flag. +proc cluster_has_flag {node flag} { + expr {[lsearch -exact [dict get $node flags] $flag] != -1} +} + +# Returns the parsed "myself" node entry as a dictionary. +proc cluster_get_myself id { + set nodes [get_cluster_nodes $id] + foreach n $nodes { + if {[cluster_has_flag $n myself]} {return $n} + } + return {} +} + +# Returns a parsed CLUSTER NODES output as a list of dictionaries. +proc get_cluster_nodes id { + set lines [split [R $id cluster nodes] "\r\n"] + set nodes {} + foreach l $lines { + set l [string trim $l] + if {$l eq {}} continue + set args [split $l] + set node [dict create \ + id [lindex $args 0] \ + addr [lindex $args 1] \ + flags [split [lindex $args 2] ,] \ + slaveof [lindex $args 3] \ + ping_sent [lindex $args 4] \ + pong_recv [lindex $args 5] \ + config_epoch [lindex $args 6] \ + linkstate [lindex $args 7] \ + slots [lrange $args 8 end] \ + ] + lappend nodes $node + } + return $nodes +} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 86c34237c..33d2f0fd8 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -8,7 +8,7 @@ set tcl_precision 17 source tests/support/redis.tcl source tests/support/aofmanifest.tcl source tests/support/server.tcl -source tests/support/cluster_helper.tcl +source tests/support/cluster_util.tcl source tests/support/tmpfile.tcl source tests/support/test.tcl source tests/support/util.tcl @@ -203,11 +203,16 @@ proc r {args} { [srv $level "client"] {*}$args } +# Returns a Redis instance by index. +proc Rn {n} { + set level [expr -1*$n] + return [srv $level "client"] +} + # Provide easy access to a client for an inner server. Requires a positive # index, unlike r which uses an optional negative index. proc R {n args} { - set level [expr -1*$n] - [srv $level "client"] {*}$args + [Rn $n] {*}$args } proc reconnect {args} { diff --git a/tests/unit/cluster/links.tcl b/tests/unit/cluster/links.tcl new file mode 100644 index 000000000..c9d920961 --- /dev/null +++ b/tests/unit/cluster/links.tcl @@ -0,0 +1,258 @@ +# Returns a parsed CLUSTER LINKS output of the instance identified +# by the given `id` as a list of dictionaries, with each dictionary +# corresponds to a link. +proc get_cluster_links id { + set lines [R $id cluster links] + set links {} + foreach l $lines { + if {$l eq {}} continue + assert_equal [llength $l] 12 + assert_equal [lindex $l 0] "direction" + set dir [lindex $l 1] + assert_equal [lindex $l 2] "node" + set node [lindex $l 3] + assert_equal [lindex $l 4] "create-time" + set create_time [lindex $l 5] + assert_equal [lindex $l 6] "events" + set events [lindex $l 7] + assert_equal [lindex $l 8] "send-buffer-allocated" + set send_buffer_allocated [lindex $l 9] + assert_equal [lindex $l 10] "send-buffer-used" + set send_buffer_used [lindex $l 11] + set link [dict create \ + dir $dir \ + node $node \ + create_time $create_time \ + events $events \ + send_buffer_allocated $send_buffer_allocated \ + send_buffer_used $send_buffer_used \ + ] + lappend links $link + } + return $links +} + +proc get_links_with_peer {this_instance_id peer_nodename} { + set links [get_cluster_links $this_instance_id] + set links_with_peer {} + foreach l $links { + if {[dict get $l node] eq $peer_nodename} { + lappend links_with_peer $l + } + } + return $links_with_peer +} + +# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that +# corresponds to the link established toward a peer identified by `peer_nodename` +proc get_link_to_peer {this_instance_id peer_nodename} { + set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename] + foreach l $links_with_peer { + if {[dict get $l dir] eq "to"} { + return $l + } + } + return {} +} + +# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that +# corresponds to the link accepted from a peer identified by `peer_nodename` +proc get_link_from_peer {this_instance_id peer_nodename} { + set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename] + foreach l $links_with_peer { + if {[dict get $l dir] eq "from"} { + return $l + } + } + return {} +} + +# Reset cluster links to their original state +proc reset_links {id} { + set limit [lindex [R $id CONFIG get cluster-link-sendbuf-limit] 1] + + # Set a 1 byte limit and wait for cluster cron to run + # (executes every 100ms) and terminate links + R $id CONFIG SET cluster-link-sendbuf-limit 1 + after 150 + + # Reset limit + R $id CONFIG SET cluster-link-sendbuf-limit $limit + + # Wait until the cluster links come back up for each node + wait_for_condition 50 100 { + [number_of_links $id] == [expr [number_of_peers $id] * 2] + } else { + fail "Cluster links did not come back up" + } +} + +proc number_of_peers {id} { + expr [llength $::servers] - 1 +} + +proc number_of_links {id} { + llength [get_cluster_links $id] +} + +proc publish_messages {server num_msgs msg_size} { + for {set i 0} {$i < $num_msgs} {incr i} { + $server PUBLISH channel [string repeat "x" $msg_size] + } +} + +start_cluster 3 0 {tags {external:skip cluster}} { + test "Each node has two links with each peer" { + for {set id 0} {$id < [llength $::servers]} {incr id} { + # Assert that from point of view of each node, there are two links for + # each peer. It might take a while for cluster to stabilize so wait up + # to 5 seconds. + wait_for_condition 50 100 { + [number_of_peers $id]*2 == [number_of_links $id] + } else { + assert_equal [expr [number_of_peers $id]*2] [number_of_links $id] + } + + set nodes [get_cluster_nodes $id] + set links [get_cluster_links $id] + + # For each peer there should be exactly one + # link "to" it and one link "from" it. + foreach n $nodes { + if {[cluster_has_flag $n myself]} continue + set peer [dict get $n id] + set to 0 + set from 0 + foreach l $links { + if {[dict get $l node] eq $peer} { + if {[dict get $l dir] eq "to"} { + incr to + } elseif {[dict get $l dir] eq "from"} { + incr from + } + } + } + assert {$to eq 1} + assert {$from eq 1} + } + } + } + + set primary1_id 0 + set primary2_id 1 + + set primary1 [Rn $primary1_id] + set primary2 [Rn $primary2_id] + + test "Disconnect link when send buffer limit reached" { + # On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts + set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1] + $primary1 CONFIG set cluster-node-timeout [expr 60*60*1000] + + # Get primary1's links with primary2 + set primary2_name [dict get [cluster_get_myself $primary2_id] id] + set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] + set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name] + + # On primary1, set cluster link send buffer limit to 256KB, which is large enough to not be + # overflowed by regular gossip messages but also small enough that it doesn't take too much + # memory to overflow it. If it is set too high, Redis may get OOM killed by kernel before this + # limit is overflowed in some RAM-limited test environments. + set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1] + $primary1 CONFIG set cluster-link-sendbuf-limit [expr 256*1024] + assert {[CI $primary1_id total_cluster_links_buffer_limit_exceeded] eq 0} + + # To manufacture an ever-growing send buffer from primary1 to primary2, + # make primary2 unresponsive. + set primary2_pid [srv [expr -1*$primary2_id] pid] + exec kill -SIGSTOP $primary2_pid + + # On primary1, send 128KB Pubsub messages in a loop until the send buffer of the link from + # primary1 to primary2 exceeds buffer limit therefore be dropped. + # For the send buffer to grow, we need to first exhaust TCP send buffer of primary1 and TCP + # receive buffer of primary2 first. The sizes of these two buffers vary by OS, but 100 128KB + # messages should be sufficient. + set i 0 + wait_for_condition 100 0 { + [catch {incr i} e] == 0 && + [catch {$primary1 publish channel [prepare_value [expr 128*1024]]} e] == 0 && + [catch {after 500} e] == 0 && + [CI $primary1_id total_cluster_links_buffer_limit_exceeded] >= 1 + } else { + fail "Cluster link not freed as expected" + } + puts -nonewline "$i 128KB messages needed to overflow 256KB buffer limit. " + + # A new link to primary2 should have been recreated + set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name] + assert {[dict get $new_link_p1_to_p2 create_time] > [dict get $orig_link_p1_to_p2 create_time]} + + # Link from primary2 should not be affected + set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name] + assert {[dict get $same_link_p1_from_p2 create_time] eq [dict get $orig_link_p1_from_p2 create_time]} + + # Revive primary2 + exec kill -SIGCONT $primary2_pid + + # Reset configs on primary1 so config changes don't leak out to other tests + $primary1 CONFIG set cluster-node-timeout $oldtimeout + $primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit + + reset_links $primary1_id + } + + test "Link memory increases with publishes" { + set server_id 0 + set server [Rn $server_id] + set msg_size 10000 + set num_msgs 10 + + # Remove any sendbuf limit + $primary1 CONFIG set cluster-link-sendbuf-limit 0 + + # Publish ~100KB to one of the servers + $server MULTI + $server INFO memory + publish_messages $server $num_msgs $msg_size + $server INFO memory + set res [$server EXEC] + + set link_mem_before_pubs [getInfoProperty $res mem_cluster_links] + + # Remove the first half of the response string which contains the + # first "INFO memory" results and search for the property again + set res [string range $res [expr [string length $res] / 2] end] + set link_mem_after_pubs [getInfoProperty $res mem_cluster_links] + + # We expect the memory to have increased by more than + # the culmulative size of the publish messages + set mem_diff_floor [expr $msg_size * $num_msgs] + set mem_diff [expr $link_mem_after_pubs - $link_mem_before_pubs] + assert {$mem_diff > $mem_diff_floor} + + # Reset links to ensure no leftover data for the next test + reset_links $server_id + } + + test "Link memory resets after publish messages flush" { + set server [Rn 0] + set msg_size 100000 + set num_msgs 10 + + set link_mem_before [status $server mem_cluster_links] + + # Publish ~1MB to one of the servers + $server MULTI + publish_messages $server $num_msgs $msg_size + $server EXEC + + # Wait until the cluster link memory has returned to below the pre-publish value. + # We can't guarantee it returns to the exact same value since gossip messages + # can cause the values to fluctuate. + wait_for_condition 1000 500 { + [status $server mem_cluster_links] <= $link_mem_before + } else { + fail "Cluster link memory did not settle back to expected range" + } + } +}