Re-design cluster link send buffer to improve memory management (#11343)

Re-design cluster link send queue to improve memory management
This commit is contained in:
Brennan 2022-11-01 22:26:44 -04:00 committed by GitHub
parent 4a8a625051
commit 47c493e070
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 467 additions and 361 deletions

View File

@ -83,6 +83,7 @@ void removeChannelsInSlot(unsigned int slot);
unsigned int countKeysInSlot(unsigned int hashslot);
unsigned int countChannelsInSlot(unsigned int hashslot);
unsigned int delKeysInSlot(unsigned int hashslot);
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
/* Links to the next and previous entries for keys in the same slot are stored
* in the dict entry metadata. See Slot to Key API below. */
@ -127,6 +128,13 @@ static ConnectionType *connTypeOfCluster() {
return connectionTypeTcp();
}
/* clusterLink send queue blocks */
typedef struct {
size_t totlen; /* Total length of this block including the message */
int refcount; /* Number of cluster link send msg queues containing the message */
clusterMsg msg;
} clusterMsgSendBlock;
/* -----------------------------------------------------------------------------
* Initialization
* -------------------------------------------------------------------------- */
@ -802,13 +810,36 @@ void clusterReset(int hard) {
/* -----------------------------------------------------------------------------
* CLUSTER communication link
* -------------------------------------------------------------------------- */
static clusterMsgSendBlock *createClusterMsgSendBlock(int type, uint32_t msglen) {
uint32_t blocklen = msglen + sizeof(clusterMsgSendBlock) - sizeof(clusterMsg);
clusterMsgSendBlock *msgblock = zcalloc(blocklen);
msgblock->refcount = 1;
msgblock->totlen = blocklen;
server.stat_cluster_links_memory += blocklen;
clusterBuildMessageHdr(&msgblock->msg,type,msglen);
return msgblock;
}
static void clusterMsgSendBlockDecrRefCount(void *node) {
clusterMsgSendBlock *msgblock = (clusterMsgSendBlock*)node;
msgblock->refcount--;
serverAssert(msgblock->refcount >= 0);
if (msgblock->refcount == 0) {
server.stat_cluster_links_memory -= msgblock->totlen;
zfree(msgblock);
}
}
clusterLink *createClusterLink(clusterNode *node) {
clusterLink *link = zmalloc(sizeof(*link));
link->ctime = mstime();
link->sndbuf = sdsempty();
link->send_msg_queue = listCreate();
listSetFreeMethod(link->send_msg_queue, clusterMsgSendBlockDecrRefCount);
link->head_msg_send_offset = 0;
link->send_msg_queue_mem = sizeof(list);
link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
link->rcvbuf_len = 0;
server.stat_cluster_links_memory += link->rcvbuf_alloc + link->send_msg_queue_mem;
link->conn = NULL;
link->node = node;
/* Related node can only possibly be known at link creation time if this is an outbound link */
@ -827,7 +858,9 @@ void freeClusterLink(clusterLink *link) {
connClose(link->conn);
link->conn = NULL;
}
sdsfree(link->sndbuf);
server.stat_cluster_links_memory -= sizeof(list) + listLength(link->send_msg_queue)*sizeof(listNode);
listRelease(link->send_msg_queue);
server.stat_cluster_links_memory -= link->rcvbuf_alloc;
zfree(link->rcvbuf);
if (link->node) {
if (link->node->link == link) {
@ -2684,22 +2717,45 @@ void handleLinkIOError(clusterLink *link) {
freeClusterLink(link);
}
/* Send data. This is handled using a trivial send buffer that gets
* consumed by write(). We don't try to optimize this for speed too much
* as this is a very low traffic channel. */
/* Send the messages queued for the link. */
void clusterWriteHandler(connection *conn) {
clusterLink *link = connGetPrivateData(conn);
ssize_t nwritten;
size_t totwritten = 0;
nwritten = connWrite(conn, link->sndbuf, sdslen(link->sndbuf));
if (nwritten <= 0) {
serverLog(LL_DEBUG,"I/O error writing to node link: %s",
(nwritten == -1) ? connGetLastError(conn) : "short write");
handleLinkIOError(link);
return;
while (totwritten < NET_MAX_WRITES_PER_EVENT && listLength(link->send_msg_queue) > 0) {
listNode *head = listFirst(link->send_msg_queue);
clusterMsgSendBlock *msgblock = (clusterMsgSendBlock*)head->value;
clusterMsg *msg = &msgblock->msg;
size_t msg_offset = link->head_msg_send_offset;
size_t msg_len = ntohl(msg->totlen);
nwritten = connWrite(conn, (char*)msg + msg_offset, msg_len - msg_offset);
if (nwritten <= 0) {
serverLog(LL_DEBUG,"I/O error writing to node link: %s",
(nwritten == -1) ? connGetLastError(conn) : "short write");
handleLinkIOError(link);
return;
}
if (msg_offset + nwritten < msg_len) {
/* If full message wasn't written, record the offset
* and continue sending from this point next time */
link->head_msg_send_offset += nwritten;
return;
}
serverAssert((msg_offset + nwritten) == msg_len);
link->head_msg_send_offset = 0;
/* Delete the node and update our memory tracking */
uint32_t blocklen = msgblock->totlen;
listDelNode(link->send_msg_queue, head);
server.stat_cluster_links_memory -= sizeof(listNode);
link->send_msg_queue_mem -= sizeof(listNode) + blocklen;
totwritten += nwritten;
}
sdsrange(link->sndbuf,nwritten,-1);
if (sdslen(link->sndbuf) == 0)
if (listLength(link->send_msg_queue) == 0)
connSetWriteHandler(link->conn, NULL);
}
@ -2798,9 +2854,11 @@ void clusterReadHandler(connection *conn) {
size_t unused = link->rcvbuf_alloc - link->rcvbuf_len;
if ((size_t)nread > unused) {
size_t required = link->rcvbuf_len + nread;
size_t prev_rcvbuf_alloc = link->rcvbuf_alloc;
/* If less than 1mb, grow to twice the needed size, if larger grow by 1mb. */
link->rcvbuf_alloc = required < RCVBUF_MAX_PREALLOC ? required * 2: required + RCVBUF_MAX_PREALLOC;
link->rcvbuf = zrealloc(link->rcvbuf, link->rcvbuf_alloc);
server.stat_cluster_links_memory += link->rcvbuf_alloc - prev_rcvbuf_alloc;
}
memcpy(link->rcvbuf + link->rcvbuf_len, buf, nread);
link->rcvbuf_len += nread;
@ -2812,8 +2870,10 @@ void clusterReadHandler(connection *conn) {
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
if (clusterProcessPacket(link)) {
if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) {
size_t prev_rcvbuf_alloc = link->rcvbuf_alloc;
zfree(link->rcvbuf);
link->rcvbuf = zmalloc(link->rcvbuf_alloc = RCVBUF_INIT_LEN);
server.stat_cluster_links_memory += link->rcvbuf_alloc - prev_rcvbuf_alloc;
}
link->rcvbuf_len = 0;
} else {
@ -2823,20 +2883,24 @@ void clusterReadHandler(connection *conn) {
}
}
/* Put stuff into the send buffer.
/* Put the message block into the link's send queue.
*
* It is guaranteed that this function will never have as a side effect
* the link to be invalidated, so it is safe to call this function
* from event handlers that will do stuff with the same link later. */
void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
if (sdslen(link->sndbuf) == 0 && msglen != 0)
void clusterSendMessage(clusterLink *link, clusterMsgSendBlock *msgblock) {
if (listLength(link->send_msg_queue) == 0 && msgblock->msg.totlen != 0)
connSetWriteHandlerWithBarrier(link->conn, clusterWriteHandler, 1);
link->sndbuf = sdscatlen(link->sndbuf, msg, msglen);
listAddNodeTail(link->send_msg_queue, msgblock);
msgblock->refcount++;
/* Update memory tracking */
link->send_msg_queue_mem += sizeof(listNode) + msgblock->totlen;
server.stat_cluster_links_memory += sizeof(listNode);
/* Populate sent messages stats. */
clusterMsg *hdr = (clusterMsg*) msg;
uint16_t type = ntohs(hdr->type);
uint16_t type = ntohs(msgblock->msg.type);
if (type < CLUSTERMSG_TYPE_COUNT)
server.cluster->stats_bus_messages_sent[type]++;
}
@ -2847,7 +2911,7 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) {
* It is guaranteed that this function will never have as a side effect
* some node->link to be invalidated, so it is safe to call this function
* from event handlers that will do stuff with node links later. */
void clusterBroadcastMessage(void *buf, size_t len) {
void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) {
dictIterator *di;
dictEntry *de;
@ -2858,15 +2922,14 @@ void clusterBroadcastMessage(void *buf, size_t len) {
if (!node->link) continue;
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
clusterSendMessage(node->link,buf,len);
clusterSendMessage(node->link,msgblock);
}
dictReleaseIterator(di);
}
/* Build the message header. hdr must point to a buffer at least
* sizeof(clusterMsg) in bytes. */
void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
int totlen = 0;
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
uint64_t offset;
clusterNode *master;
@ -2877,7 +2940,6 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
master = (nodeIsSlave(myself) && myself->slaveof) ?
myself->slaveof : myself;
memset(hdr,0,sizeof(*hdr));
hdr->ver = htons(CLUSTER_PROTO_VER);
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
@ -2923,18 +2985,7 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
if (nodeIsMaster(myself) && server.cluster->mf_end)
hdr->mflags[0] |= CLUSTERMSG_FLAG0_PAUSED;
/* Compute the message length for certain messages. For other messages
* this is up to the caller. */
if (type == CLUSTERMSG_TYPE_FAIL) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataUpdate);
}
hdr->totlen = htonl(totlen);
/* For PING, PONG, MEET and other variable length messages fixing the
* totlen field is up to the caller. */
hdr->totlen = htonl(msglen);
}
/* Set the i-th entry of the gossip section in the message pointed by 'hdr'
@ -2958,8 +3009,6 @@ void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) {
void clusterSendPing(clusterLink *link, int type) {
static unsigned long long cluster_pings_sent = 0;
cluster_pings_sent++;
unsigned char *buf;
clusterMsg *hdr;
int gossipcount = 0; /* Number of gossip sections added so far. */
int wanted; /* Number of gossip sections we want to append if possible. */
int estlen; /* Upper bound on estimated packet length */
@ -3015,13 +3064,11 @@ void clusterSendPing(clusterLink *link, int type) {
/* Note: clusterBuildMessageHdr() expects the buffer to be always at least
* sizeof(clusterMsg) or more. */
if (estlen < (int)sizeof(clusterMsg)) estlen = sizeof(clusterMsg);
buf = zcalloc(estlen);
hdr = (clusterMsg*) buf;
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, estlen);
clusterMsg *hdr = &msgblock->msg;
/* Populate the header. */
if (!link->inbound && type == CLUSTERMSG_TYPE_PING)
link->node->ping_sent = mstime();
clusterBuildMessageHdr(hdr,type);
/* Populate the gossip fields */
int maxiterations = wanted*3;
@ -3113,8 +3160,9 @@ void clusterSendPing(clusterLink *link, int type) {
hdr->count = htons(gossipcount);
hdr->extensions = htons(extensions);
hdr->totlen = htonl(totlen);
clusterSendMessage(link,buf,totlen);
zfree(buf);
clusterSendMessage(link,msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
/* Send a PONG packet to every connected node that's not in handshake state
@ -3154,20 +3202,15 @@ void clusterBroadcastPong(int target) {
dictReleaseIterator(di);
}
/* Send a PUBLISH message.
*
* If link is NULL, then the message is broadcasted to the whole cluster.
/* Create a PUBLISH message block.
*
* Sanitizer suppression: In clusterMsgDataPublish, sizeof(bulk_data) is 8.
* As all the struct is used as a buffer, when more than 8 bytes are copied into
* the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
* positive in this context. */
REDIS_NO_SANITIZE("bounds")
void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_t type) {
unsigned char *payload;
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message, uint16_t type) {
uint32_t channel_len, message_len;
channel = getDecodedObject(channel);
@ -3175,34 +3218,21 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_
channel_len = sdslen(channel->ptr);
message_len = sdslen(message->ptr);
clusterBuildMessageHdr(hdr,type);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
size_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
msglen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, msglen);
clusterMsg *hdr = &msgblock->msg;
hdr->data.publish.msg.channel_len = htonl(channel_len);
hdr->data.publish.msg.message_len = htonl(message_len);
hdr->totlen = htonl(totlen);
/* Try to use the local buffer if possible */
if (totlen < sizeof(buf)) {
payload = (unsigned char*)buf;
} else {
payload = zmalloc(totlen);
memcpy(payload,hdr,sizeof(*hdr));
hdr = (clusterMsg*) payload;
}
memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
message->ptr,sdslen(message->ptr));
if (link)
clusterSendMessage(link,payload,totlen);
else
clusterBroadcastMessage(payload,totlen);
decrRefCount(channel);
decrRefCount(message);
if (payload != (unsigned char*)buf) zfree(payload);
return msgblock;
}
/* Send a FAIL message to all the nodes we are able to contact.
@ -3211,27 +3241,34 @@ void clusterSendPublish(clusterLink *link, robj *channel, robj *message, uint16_
* we switch the node state to CLUSTER_NODE_FAIL and ask all the other
* nodes to do the same ASAP. */
void clusterSendFail(char *nodename) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData)
+ sizeof(clusterMsgDataFail);
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAIL, msglen);
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAIL);
clusterMsg *hdr = &msgblock->msg;
memcpy(hdr->data.fail.about.nodename,nodename,CLUSTER_NAMELEN);
clusterBroadcastMessage(buf,ntohl(hdr->totlen));
clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
/* Send an UPDATE message to the specified link carrying the specified 'node'
* slots configuration. The node name, slots bitmap, and configEpoch info
* are included. */
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
if (link == NULL) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData)
+ sizeof(clusterMsgDataUpdate);
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_UPDATE, msglen);
clusterMsg *hdr = &msgblock->msg;
memcpy(hdr->data.update.nodecfg.nodename,node->name,CLUSTER_NAMELEN);
hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
clusterSendMessage(link,(unsigned char*)buf,ntohl(hdr->totlen));
clusterSendMessage(link,msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
/* Send a MODULE message.
@ -3239,36 +3276,22 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) {
* If link is NULL, then the message is broadcasted to the whole cluster. */
void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type,
const char *payload, uint32_t len) {
unsigned char *heapbuf;
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MODULE);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgModule) - 3 + len;
uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
msglen += sizeof(clusterMsgModule) - 3 + len;
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_MODULE, msglen);
clusterMsg *hdr = &msgblock->msg;
hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
hdr->data.module.msg.type = type;
hdr->data.module.msg.len = htonl(len);
hdr->totlen = htonl(totlen);
/* Try to use the local buffer if possible */
if (totlen < sizeof(buf)) {
heapbuf = (unsigned char*)buf;
} else {
heapbuf = zmalloc(totlen);
memcpy(heapbuf,hdr,sizeof(*hdr));
hdr = (clusterMsg*) heapbuf;
}
memcpy(hdr->data.module.msg.bulk_data,payload,len);
if (link)
clusterSendMessage(link,heapbuf,totlen);
clusterSendMessage(link,msgblock);
else
clusterBroadcastMessage(heapbuf,totlen);
clusterBroadcastMessage(msgblock);
if (heapbuf != (unsigned char*)buf) zfree(heapbuf);
clusterMsgSendBlockDecrRefCount(msgblock);
}
/* This function gets a cluster node ID string as target, the same way the nodes
@ -3301,11 +3324,16 @@ int clusterSendModuleMessageToTarget(const char *target, uint64_t module_id, uin
* Publish this message across the slot (primary/replica).
* -------------------------------------------------------------------------- */
void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterMsgSendBlock *msgblock;
if (!sharded) {
clusterSendPublish(NULL, channel, message, CLUSTERMSG_TYPE_PUBLISH);
msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH);
clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
return;
}
msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
list *nodes_for_slot = clusterGetNodesServingMySlots(server.cluster->myself);
if (listLength(nodes_for_slot) != 0) {
listIter li;
@ -3314,11 +3342,12 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
while((ln = listNext(&li))) {
clusterNode *node = listNodeValue(ln);
if (node != myself) {
clusterSendPublish(node->link, channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
clusterSendMessage(node->link,msgblock);
}
}
}
listRelease(nodes_for_slot);
clusterMsgSendBlockDecrRefCount(msgblock);
}
/* -----------------------------------------------------------------------------
@ -3332,44 +3361,38 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
* Note that we send the failover request to everybody, master and slave nodes,
* but only the masters are supposed to reply to our query. */
void clusterRequestFailoverAuth(void) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST, msglen);
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST);
clusterMsg *hdr = &msgblock->msg;
/* If this is a manual failover, set the CLUSTERMSG_FLAG0_FORCEACK bit
* in the header to communicate the nodes receiving the message that
* they should authorized the failover even if the master is working. */
if (server.cluster->mf_end) hdr->mflags[0] |= CLUSTERMSG_FLAG0_FORCEACK;
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterBroadcastMessage(buf,totlen);
clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
/* Send a FAILOVER_AUTH_ACK message to the specified node. */
void clusterSendFailoverAuth(clusterNode *node) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
if (!node->link) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterSendMessage(node->link,(unsigned char*)buf,totlen);
uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK, msglen);
clusterSendMessage(node->link,msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
/* Send a MFSTART message to the specified node. */
void clusterSendMFStart(clusterNode *node) {
clusterMsg buf[1];
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
if (!node->link) return;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_MFSTART);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
hdr->totlen = htonl(totlen);
clusterSendMessage(node->link,(unsigned char*)buf,totlen);
uint32_t msglen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_MFSTART, msglen);
clusterSendMessage(node->link,msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
}
/* Vote for the node asking for our vote if there are the conditions. */
@ -4020,27 +4043,13 @@ static int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_
return 0;
}
static void resizeClusterLinkBuffer(clusterLink *link) {
/* If unused space is a lot bigger than the used portion of the buffer then free up unused space.
* We use a factor of 4 because of the greediness of sdsMakeRoomFor (used by sdscatlen). */
if (link != NULL && sdsavail(link->sndbuf) / 4 > sdslen(link->sndbuf)) {
link->sndbuf = sdsRemoveFreeSpace(link->sndbuf);
}
}
/* Resize the send buffer of a node if it is wasting
* enough space. */
static void clusterNodeCronResizeBuffers(clusterNode *node) {
resizeClusterLinkBuffer(node->link);
resizeClusterLinkBuffer(node->inbound_link);
}
static void freeClusterLinkOnBufferLimitReached(clusterLink *link) {
if (link == NULL || server.cluster_link_sendbuf_limit_bytes == 0) {
if (link == NULL || server.cluster_link_msg_queue_limit_bytes == 0) {
return;
}
unsigned long long mem_link = sdsalloc(link->sndbuf);
if (mem_link > server.cluster_link_sendbuf_limit_bytes) {
unsigned long long mem_link = link->send_msg_queue_mem;
if (mem_link > server.cluster_link_msg_queue_limit_bytes) {
serverLog(LL_WARNING, "Freeing cluster link(%s node %.40s, used memory: %llu) due to "
"exceeding send buffer memory limit.", link->inbound ? "from" : "to",
link->node ? link->node->name : "", mem_link);
@ -4055,20 +4064,6 @@ static void clusterNodeCronFreeLinkOnBufferLimitReached(clusterNode *node) {
freeClusterLinkOnBufferLimitReached(node->inbound_link);
}
static size_t getClusterLinkMemUsage(clusterLink *link) {
if (link != NULL) {
return sizeof(clusterLink) + sdsalloc(link->sndbuf) + link->rcvbuf_alloc;
} else {
return 0;
}
}
/* Update memory usage statistics of all current cluster links */
static void clusterNodeCronUpdateClusterLinksMemUsage(clusterNode *node) {
server.stat_cluster_links_memory += getClusterLinkMemUsage(node->link);
server.stat_cluster_links_memory += getClusterLinkMemUsage(node->inbound_link);
}
/* This is executed 10 times every second */
void clusterCron(void) {
dictIterator *di;
@ -4095,21 +4090,13 @@ void clusterCron(void) {
/* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
server.cluster->stats_pfail_nodes = 0;
/* Clear so clusterNodeCronUpdateClusterLinksMemUsage can count the current memory usage of all cluster links. */
server.stat_cluster_links_memory = 0;
/* Run through some of the operations we want to do on each cluster node. */
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
/* The sequence goes:
* 1. We try to shrink link buffers if possible.
* 2. We free the links whose buffers are still oversized after possible shrinking.
* 3. We update the latest memory usage of cluster links.
* 4. We immediately attempt reconnecting after freeing links.
*/
clusterNodeCronResizeBuffers(node);
/* We free the inbound or outboud link to the node if the link has an
* oversized message send queue and immediately try reconnecting. */
clusterNodeCronFreeLinkOnBufferLimitReached(node);
clusterNodeCronUpdateClusterLinksMemUsage(node);
/* The protocol is that function(s) below return non-zero if the node was
* terminated.
*/
@ -4911,10 +4898,10 @@ void addReplyClusterLinkDescription(client *c, clusterLink *link) {
addReplyBulkCString(c, events);
addReplyBulkCString(c, "send-buffer-allocated");
addReplyLongLong(c, sdsalloc(link->sndbuf));
addReplyLongLong(c, link->send_msg_queue_mem);
addReplyBulkCString(c, "send-buffer-used");
addReplyLongLong(c, sdslen(link->sndbuf));
addReplyLongLong(c, link->send_msg_queue_mem);
}
/* Add to the output buffer of the given client an array of cluster link descriptions,

View File

@ -35,7 +35,9 @@ struct clusterNode;
typedef struct clusterLink {
mstime_t ctime; /* Link creation time */
connection *conn; /* Connection to remote node */
sds sndbuf; /* Packet send buffer */
list *send_msg_queue; /* List of messages to be sent */
size_t head_msg_send_offset; /* Number of bytes already sent of message at head of queue */
unsigned long long send_msg_queue_mem; /* Memory in bytes used by message queue */
char *rcvbuf; /* Packet reception buffer */
size_t rcvbuf_len; /* Used size of rcvbuf */
size_t rcvbuf_alloc; /* Allocated size of rcvbuf */

View File

@ -3125,7 +3125,7 @@ standardConfig static_configs[] = {
/* Unsigned Long Long configs */
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),
createULongLongConfig("cluster-link-sendbuf-limit", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.cluster_link_sendbuf_limit_bytes, 0, MEMORY_CONFIG, NULL, NULL),
createULongLongConfig("cluster-link-sendbuf-limit", NULL, MODIFIABLE_CONFIG, 0, ULLONG_MAX, server.cluster_link_msg_queue_limit_bytes, 0, MEMORY_CONFIG, NULL, NULL),
/* Size_t configs */
createSizeTConfig("hash-max-listpack-entries", "hash-max-ziplist-entries", MODIFIABLE_CONFIG, 0, LONG_MAX, server.hash_max_listpack_entries, 512, INTEGER_CONFIG, NULL, NULL),

View File

@ -1899,7 +1899,7 @@ struct redisServer {
int cluster_allow_reads_when_down; /* Are reads allowed when the cluster
is down? */
int cluster_config_file_lock_fd; /* cluster config fd, will be flock */
unsigned long long cluster_link_sendbuf_limit_bytes; /* Memory usage limit on individual link send buffers*/
unsigned long long cluster_link_msg_queue_limit_bytes; /* Memory usage limit on individual link msg queue */
int cluster_drop_packet_filter; /* Debug config that allows tactically
* dropping packets of a specific type */
/* Scripting */

View File

@ -217,72 +217,3 @@ proc are_hostnames_propagated {match_string} {
}
return 1
}
# Returns a parsed CLUSTER LINKS output of the instance identified
# by the given `id` as a list of dictionaries, with each dictionary
# corresponds to a link.
proc get_cluster_links id {
set lines [R $id cluster links]
set links {}
foreach l $lines {
if {$l eq {}} continue
assert_equal [llength $l] 12
assert_equal [lindex $l 0] "direction"
set dir [lindex $l 1]
assert_equal [lindex $l 2] "node"
set node [lindex $l 3]
assert_equal [lindex $l 4] "create-time"
set create_time [lindex $l 5]
assert_equal [lindex $l 6] "events"
set events [lindex $l 7]
assert_equal [lindex $l 8] "send-buffer-allocated"
set send_buffer_allocated [lindex $l 9]
assert_equal [lindex $l 10] "send-buffer-used"
set send_buffer_used [lindex $l 11]
set link [dict create \
dir $dir \
node $node \
create_time $create_time \
events $events \
send_buffer_allocated $send_buffer_allocated \
send_buffer_used $send_buffer_used \
]
lappend links $link
}
return $links
}
proc get_links_with_peer {this_instance_id peer_nodename} {
set links [get_cluster_links $this_instance_id]
set links_with_peer {}
foreach l $links {
if {[dict get $l node] eq $peer_nodename} {
lappend links_with_peer $l
}
}
return $links_with_peer
}
# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
# corresponds to the link established toward a peer identified by `peer_nodename`
proc get_link_to_peer {this_instance_id peer_nodename} {
set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
foreach l $links_with_peer {
if {[dict get $l dir] eq "to"} {
return $l
}
}
return {}
}
# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
# corresponds to the link accepted from a peer identified by `peer_nodename`
proc get_link_from_peer {this_instance_id peer_nodename} {
set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
foreach l $links_with_peer {
if {[dict get $l dir] eq "from"} {
return $l
}
}
return {}
}

View File

@ -1,114 +0,0 @@
source "../tests/includes/init-tests.tcl"
test "Create a cluster with two single-node shards" {
create_cluster 2 0
}
test "Cluster should start ok" {
assert_cluster_state ok
}
proc number_of_peers {id} {
expr [llength [get_cluster_nodes $id]] - 1
}
proc number_of_links {id} {
llength [get_cluster_links $id]
}
test "Each node has two links with each peer" {
foreach_redis_id id {
# Assert that from point of view of each node, there are two links for
# each peer. It might take a while for cluster to stabilize so wait up
# to 5 seconds.
wait_for_condition 50 100 {
[number_of_peers $id]*2 == [number_of_links $id]
} else {
assert_equal [expr [number_of_peers $id]*2] [number_of_links $id]
}
set nodes [get_cluster_nodes $id]
set links [get_cluster_links $id]
# For each peer there should be exactly one
# link "to" it and one link "from" it.
foreach n $nodes {
if {[has_flag $n myself]} continue
set peer [dict get $n id]
set to 0
set from 0
foreach l $links {
if {[dict get $l node] eq $peer} {
if {[dict get $l dir] eq "to"} {
incr to
} elseif {[dict get $l dir] eq "from"} {
incr from
}
}
}
assert {$to eq 1}
assert {$from eq 1}
}
}
}
set primary1_id 0
set primary2_id 1
set primary1 [Rn $primary1_id]
set primary2 [Rn $primary2_id]
test "Disconnect link when send buffer limit reached" {
# On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts
set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1]
$primary1 CONFIG set cluster-node-timeout [expr 60*60*1000]
# Get primary1's links with primary2
set primary2_name [dict get [get_myself $primary2_id] id]
set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
# On primary1, set cluster link send buffer limit to 256KB, which is large enough to not be
# overflowed by regular gossip messages but also small enough that it doesn't take too much
# memory to overflow it. If it is set too high, Redis may get OOM killed by kernel before this
# limit is overflowed in some RAM-limited test environments.
set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1]
$primary1 CONFIG set cluster-link-sendbuf-limit [expr 256*1024]
assert {[get_info_field [$primary1 cluster info] total_cluster_links_buffer_limit_exceeded] eq 0}
# To manufacture an ever-growing send buffer from primary1 to primary2,
# make primary2 unresponsive.
set primary2_pid [get_instance_attrib redis $primary2_id pid]
exec kill -SIGSTOP $primary2_pid
# On primary1, send 128KB Pubsub messages in a loop until the send buffer of the link from
# primary1 to primary2 exceeds buffer limit therefore be dropped.
# For the send buffer to grow, we need to first exhaust TCP send buffer of primary1 and TCP
# receive buffer of primary2 first. The sizes of these two buffers vary by OS, but 100 128KB
# messages should be sufficient.
set i 0
wait_for_condition 100 0 {
[catch {incr i} e] == 0 &&
[catch {$primary1 publish channel [prepare_value [expr 128*1024]]} e] == 0 &&
[catch {after 500} e] == 0 &&
[get_info_field [$primary1 cluster info] total_cluster_links_buffer_limit_exceeded] >= 1
} else {
fail "Cluster link not freed as expected"
}
puts -nonewline "$i 128KB messages needed to overflow 256KB buffer limit. "
# A new link to primary2 should have been recreated
set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
assert {[dict get $new_link_p1_to_p2 create_time] > [dict get $orig_link_p1_to_p2 create_time]}
# Link from primary2 should not be affected
set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
assert {[dict get $same_link_p1_from_p2 create_time] eq [dict get $orig_link_p1_from_p2 create_time]}
# Revive primary2
exec kill -SIGCONT $primary2_pid
# Reset configs on primary1 so config changes don't leak out to other tests
$primary1 CONFIG set cluster-node-timeout $oldtimeout
$primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit
}

View File

@ -1,5 +1,4 @@
# Helper functions specifically for setting up and configuring redis
# clusters.
# Cluster helper functions
# Check if cluster configuration is consistent.
proc cluster_config_consistent {} {
@ -113,3 +112,41 @@ proc start_cluster {masters replicas options code {slot_allocator continuous_slo
start_multiple_servers $node_count $options $code
set ::singledb $old_singledb
}
# Test node for flag.
proc cluster_has_flag {node flag} {
expr {[lsearch -exact [dict get $node flags] $flag] != -1}
}
# Returns the parsed "myself" node entry as a dictionary.
proc cluster_get_myself id {
set nodes [get_cluster_nodes $id]
foreach n $nodes {
if {[cluster_has_flag $n myself]} {return $n}
}
return {}
}
# Returns a parsed CLUSTER NODES output as a list of dictionaries.
proc get_cluster_nodes id {
set lines [split [R $id cluster nodes] "\r\n"]
set nodes {}
foreach l $lines {
set l [string trim $l]
if {$l eq {}} continue
set args [split $l]
set node [dict create \
id [lindex $args 0] \
addr [lindex $args 1] \
flags [split [lindex $args 2] ,] \
slaveof [lindex $args 3] \
ping_sent [lindex $args 4] \
pong_recv [lindex $args 5] \
config_epoch [lindex $args 6] \
linkstate [lindex $args 7] \
slots [lrange $args 8 end] \
]
lappend nodes $node
}
return $nodes
}

View File

@ -8,7 +8,7 @@ set tcl_precision 17
source tests/support/redis.tcl
source tests/support/aofmanifest.tcl
source tests/support/server.tcl
source tests/support/cluster_helper.tcl
source tests/support/cluster_util.tcl
source tests/support/tmpfile.tcl
source tests/support/test.tcl
source tests/support/util.tcl
@ -203,11 +203,16 @@ proc r {args} {
[srv $level "client"] {*}$args
}
# Returns a Redis instance by index.
proc Rn {n} {
set level [expr -1*$n]
return [srv $level "client"]
}
# Provide easy access to a client for an inner server. Requires a positive
# index, unlike r which uses an optional negative index.
proc R {n args} {
set level [expr -1*$n]
[srv $level "client"] {*}$args
[Rn $n] {*}$args
}
proc reconnect {args} {

View File

@ -0,0 +1,258 @@
# Returns a parsed CLUSTER LINKS output of the instance identified
# by the given `id` as a list of dictionaries, with each dictionary
# corresponds to a link.
proc get_cluster_links id {
set lines [R $id cluster links]
set links {}
foreach l $lines {
if {$l eq {}} continue
assert_equal [llength $l] 12
assert_equal [lindex $l 0] "direction"
set dir [lindex $l 1]
assert_equal [lindex $l 2] "node"
set node [lindex $l 3]
assert_equal [lindex $l 4] "create-time"
set create_time [lindex $l 5]
assert_equal [lindex $l 6] "events"
set events [lindex $l 7]
assert_equal [lindex $l 8] "send-buffer-allocated"
set send_buffer_allocated [lindex $l 9]
assert_equal [lindex $l 10] "send-buffer-used"
set send_buffer_used [lindex $l 11]
set link [dict create \
dir $dir \
node $node \
create_time $create_time \
events $events \
send_buffer_allocated $send_buffer_allocated \
send_buffer_used $send_buffer_used \
]
lappend links $link
}
return $links
}
proc get_links_with_peer {this_instance_id peer_nodename} {
set links [get_cluster_links $this_instance_id]
set links_with_peer {}
foreach l $links {
if {[dict get $l node] eq $peer_nodename} {
lappend links_with_peer $l
}
}
return $links_with_peer
}
# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
# corresponds to the link established toward a peer identified by `peer_nodename`
proc get_link_to_peer {this_instance_id peer_nodename} {
set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
foreach l $links_with_peer {
if {[dict get $l dir] eq "to"} {
return $l
}
}
return {}
}
# Return the entry in CLUSTER LINKS output by instance identified by `this_instance_id` that
# corresponds to the link accepted from a peer identified by `peer_nodename`
proc get_link_from_peer {this_instance_id peer_nodename} {
set links_with_peer [get_links_with_peer $this_instance_id $peer_nodename]
foreach l $links_with_peer {
if {[dict get $l dir] eq "from"} {
return $l
}
}
return {}
}
# Reset cluster links to their original state
proc reset_links {id} {
set limit [lindex [R $id CONFIG get cluster-link-sendbuf-limit] 1]
# Set a 1 byte limit and wait for cluster cron to run
# (executes every 100ms) and terminate links
R $id CONFIG SET cluster-link-sendbuf-limit 1
after 150
# Reset limit
R $id CONFIG SET cluster-link-sendbuf-limit $limit
# Wait until the cluster links come back up for each node
wait_for_condition 50 100 {
[number_of_links $id] == [expr [number_of_peers $id] * 2]
} else {
fail "Cluster links did not come back up"
}
}
proc number_of_peers {id} {
expr [llength $::servers] - 1
}
proc number_of_links {id} {
llength [get_cluster_links $id]
}
proc publish_messages {server num_msgs msg_size} {
for {set i 0} {$i < $num_msgs} {incr i} {
$server PUBLISH channel [string repeat "x" $msg_size]
}
}
start_cluster 3 0 {tags {external:skip cluster}} {
test "Each node has two links with each peer" {
for {set id 0} {$id < [llength $::servers]} {incr id} {
# Assert that from point of view of each node, there are two links for
# each peer. It might take a while for cluster to stabilize so wait up
# to 5 seconds.
wait_for_condition 50 100 {
[number_of_peers $id]*2 == [number_of_links $id]
} else {
assert_equal [expr [number_of_peers $id]*2] [number_of_links $id]
}
set nodes [get_cluster_nodes $id]
set links [get_cluster_links $id]
# For each peer there should be exactly one
# link "to" it and one link "from" it.
foreach n $nodes {
if {[cluster_has_flag $n myself]} continue
set peer [dict get $n id]
set to 0
set from 0
foreach l $links {
if {[dict get $l node] eq $peer} {
if {[dict get $l dir] eq "to"} {
incr to
} elseif {[dict get $l dir] eq "from"} {
incr from
}
}
}
assert {$to eq 1}
assert {$from eq 1}
}
}
}
set primary1_id 0
set primary2_id 1
set primary1 [Rn $primary1_id]
set primary2 [Rn $primary2_id]
test "Disconnect link when send buffer limit reached" {
# On primary1, set timeout to 1 hour so links won't get disconnected due to timeouts
set oldtimeout [lindex [$primary1 CONFIG get cluster-node-timeout] 1]
$primary1 CONFIG set cluster-node-timeout [expr 60*60*1000]
# Get primary1's links with primary2
set primary2_name [dict get [cluster_get_myself $primary2_id] id]
set orig_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
set orig_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
# On primary1, set cluster link send buffer limit to 256KB, which is large enough to not be
# overflowed by regular gossip messages but also small enough that it doesn't take too much
# memory to overflow it. If it is set too high, Redis may get OOM killed by kernel before this
# limit is overflowed in some RAM-limited test environments.
set oldlimit [lindex [$primary1 CONFIG get cluster-link-sendbuf-limit] 1]
$primary1 CONFIG set cluster-link-sendbuf-limit [expr 256*1024]
assert {[CI $primary1_id total_cluster_links_buffer_limit_exceeded] eq 0}
# To manufacture an ever-growing send buffer from primary1 to primary2,
# make primary2 unresponsive.
set primary2_pid [srv [expr -1*$primary2_id] pid]
exec kill -SIGSTOP $primary2_pid
# On primary1, send 128KB Pubsub messages in a loop until the send buffer of the link from
# primary1 to primary2 exceeds buffer limit therefore be dropped.
# For the send buffer to grow, we need to first exhaust TCP send buffer of primary1 and TCP
# receive buffer of primary2 first. The sizes of these two buffers vary by OS, but 100 128KB
# messages should be sufficient.
set i 0
wait_for_condition 100 0 {
[catch {incr i} e] == 0 &&
[catch {$primary1 publish channel [prepare_value [expr 128*1024]]} e] == 0 &&
[catch {after 500} e] == 0 &&
[CI $primary1_id total_cluster_links_buffer_limit_exceeded] >= 1
} else {
fail "Cluster link not freed as expected"
}
puts -nonewline "$i 128KB messages needed to overflow 256KB buffer limit. "
# A new link to primary2 should have been recreated
set new_link_p1_to_p2 [get_link_to_peer $primary1_id $primary2_name]
assert {[dict get $new_link_p1_to_p2 create_time] > [dict get $orig_link_p1_to_p2 create_time]}
# Link from primary2 should not be affected
set same_link_p1_from_p2 [get_link_from_peer $primary1_id $primary2_name]
assert {[dict get $same_link_p1_from_p2 create_time] eq [dict get $orig_link_p1_from_p2 create_time]}
# Revive primary2
exec kill -SIGCONT $primary2_pid
# Reset configs on primary1 so config changes don't leak out to other tests
$primary1 CONFIG set cluster-node-timeout $oldtimeout
$primary1 CONFIG set cluster-link-sendbuf-limit $oldlimit
reset_links $primary1_id
}
test "Link memory increases with publishes" {
set server_id 0
set server [Rn $server_id]
set msg_size 10000
set num_msgs 10
# Remove any sendbuf limit
$primary1 CONFIG set cluster-link-sendbuf-limit 0
# Publish ~100KB to one of the servers
$server MULTI
$server INFO memory
publish_messages $server $num_msgs $msg_size
$server INFO memory
set res [$server EXEC]
set link_mem_before_pubs [getInfoProperty $res mem_cluster_links]
# Remove the first half of the response string which contains the
# first "INFO memory" results and search for the property again
set res [string range $res [expr [string length $res] / 2] end]
set link_mem_after_pubs [getInfoProperty $res mem_cluster_links]
# We expect the memory to have increased by more than
# the culmulative size of the publish messages
set mem_diff_floor [expr $msg_size * $num_msgs]
set mem_diff [expr $link_mem_after_pubs - $link_mem_before_pubs]
assert {$mem_diff > $mem_diff_floor}
# Reset links to ensure no leftover data for the next test
reset_links $server_id
}
test "Link memory resets after publish messages flush" {
set server [Rn 0]
set msg_size 100000
set num_msgs 10
set link_mem_before [status $server mem_cluster_links]
# Publish ~1MB to one of the servers
$server MULTI
publish_messages $server $num_msgs $msg_size
$server EXEC
# Wait until the cluster link memory has returned to below the pre-publish value.
# We can't guarantee it returns to the exact same value since gossip messages
# can cause the values to fluctuate.
wait_for_condition 1000 500 {
[status $server mem_cluster_links] <= $link_mem_before
} else {
fail "Cluster link memory did not settle back to expected range"
}
}
}