Free unused capacity in the cluster send buffer. (#9255)
* Free unused capacity in the cluster send buffer. * Refactor cluster cron to include a dedicated loop for node based cron jobs
This commit is contained in:
parent
a403816405
commit
8647addf93
102
src/cluster.c
102
src/cluster.c
@ -3525,6 +3525,59 @@ void clusterHandleManualFailover(void) {
|
||||
* CLUSTER cron job
|
||||
* -------------------------------------------------------------------------- */
|
||||
|
||||
/* Check if the node is disconnected and re-establish the connection.
|
||||
* Also update a few stats while we are here, that can be used to make
|
||||
* better decisions in other part of the code. */
|
||||
int clusterNodeCronHandleReconnect(clusterNode *node, mstime_t handshake_timeout, mstime_t now) {
|
||||
/* Not interested in reconnecting the link with myself or nodes
|
||||
* for which we have no address. */
|
||||
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) return 1;
|
||||
|
||||
if (node->flags & CLUSTER_NODE_PFAIL)
|
||||
server.cluster->stats_pfail_nodes++;
|
||||
|
||||
/* A Node in HANDSHAKE state has a limited lifespan equal to the
|
||||
* configured node timeout. */
|
||||
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
|
||||
clusterDelNode(node);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (node->link == NULL) {
|
||||
clusterLink *link = createClusterLink(node);
|
||||
link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
|
||||
connSetPrivateData(link->conn, link);
|
||||
if (connConnect(link->conn, node->ip, node->cport, server.bind_source_addr,
|
||||
clusterLinkConnectHandler) == -1) {
|
||||
/* We got a synchronous error from connect before
|
||||
* clusterSendPing() had a chance to be called.
|
||||
* If node->ping_sent is zero, failure detection can't work,
|
||||
* so we claim we actually sent a ping now (that will
|
||||
* be really sent as soon as the link is obtained). */
|
||||
if (node->ping_sent == 0) node->ping_sent = mstime();
|
||||
serverLog(LL_DEBUG, "Unable to connect to "
|
||||
"Cluster Node [%s]:%d -> %s", node->ip,
|
||||
node->cport, server.neterr);
|
||||
|
||||
freeClusterLink(link);
|
||||
return 0;
|
||||
}
|
||||
node->link = link;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* Resize the send buffer of a node if it is wasting
|
||||
* enough space. */
|
||||
int clusterNodeCronResizeBuffers(clusterNode *node) {
|
||||
/* If unused space is a lot bigger than the used portion of the buffer then free up unused space.
|
||||
* We use a factor of 4 because of the greediness of sdsMakeRoomFor (used by sdscatlen). */
|
||||
if (node->link != NULL && sdsavail(node->link->sndbuf) / 4 > sdslen(node->link->sndbuf)) {
|
||||
node->link->sndbuf = sdsRemoveFreeSpace(node->link->sndbuf);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* This is executed 10 times every second */
|
||||
void clusterCron(void) {
|
||||
dictIterator *di;
|
||||
@ -3579,51 +3632,18 @@ void clusterCron(void) {
|
||||
/* Update myself flags. */
|
||||
clusterUpdateMyselfFlags();
|
||||
|
||||
/* Check if we have disconnected nodes and re-establish the connection.
|
||||
* Also update a few stats while we are here, that can be used to make
|
||||
* better decisions in other part of the code. */
|
||||
di = dictGetSafeIterator(server.cluster->nodes);
|
||||
/* Clear so clusterNodeCronHandleReconnect can count the number of nodes in PFAIL. */
|
||||
server.cluster->stats_pfail_nodes = 0;
|
||||
/* Run through some of the operations we want to do on each cluster node. */
|
||||
di = dictGetSafeIterator(server.cluster->nodes);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
clusterNode *node = dictGetVal(de);
|
||||
|
||||
/* Not interested in reconnecting the link with myself or nodes
|
||||
* for which we have no address. */
|
||||
if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue;
|
||||
|
||||
if (node->flags & CLUSTER_NODE_PFAIL)
|
||||
server.cluster->stats_pfail_nodes++;
|
||||
|
||||
/* A Node in HANDSHAKE state has a limited lifespan equal to the
|
||||
* configured node timeout. */
|
||||
if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) {
|
||||
clusterDelNode(node);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (node->link == NULL) {
|
||||
clusterLink *link = createClusterLink(node);
|
||||
link->conn = server.tls_cluster ? connCreateTLS() : connCreateSocket();
|
||||
connSetPrivateData(link->conn, link);
|
||||
if (connConnect(link->conn, node->ip, node->cport, server.bind_source_addr,
|
||||
clusterLinkConnectHandler) == -1) {
|
||||
/* We got a synchronous error from connect before
|
||||
* clusterSendPing() had a chance to be called.
|
||||
* If node->ping_sent is zero, failure detection can't work,
|
||||
* so we claim we actually sent a ping now (that will
|
||||
* be really sent as soon as the link is obtained). */
|
||||
if (node->ping_sent == 0) node->ping_sent = mstime();
|
||||
serverLog(LL_DEBUG, "Unable to connect to "
|
||||
"Cluster Node [%s]:%d -> %s", node->ip,
|
||||
node->cport, server.neterr);
|
||||
|
||||
freeClusterLink(link);
|
||||
continue;
|
||||
}
|
||||
node->link = link;
|
||||
}
|
||||
/* The protocol is that they return non-zero if the node was
|
||||
* terminated. */
|
||||
if(clusterNodeCronHandleReconnect(node, handshake_timeout, now)) continue;
|
||||
if(clusterNodeCronResizeBuffers(node)) continue;
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
dictReleaseIterator(di);
|
||||
|
||||
/* Ping some random node 1 time every 10 iterations, so that we usually ping
|
||||
* one random node every second. */
|
||||
|
Loading…
x
Reference in New Issue
Block a user