diff --git a/src/cluster.c b/src/cluster.c index cae63e924..b23160b90 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -427,6 +427,7 @@ void clusterInit(void) { server.cluster->stats_bus_messages_sent[i] = 0; server.cluster->stats_bus_messages_received[i] = 0; } + server.cluster->stats_pfail_nodes = 0; memset(server.cluster->slots,0, sizeof(server.cluster->slots)); clusterCloseAllSlots(); @@ -2254,6 +2255,33 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */ } +/* Return non zero if the node is already present in the gossip section of the + * message pointed by 'hdr' and having 'count' gossip entries. Otherwise + * zero is returned. Helper for clusterSendPing(). */ +int clusterNodeIsInGossipSection(clusterMsg *hdr, int count, clusterNode *n) { + int j; + for (j = 0; j < count; j++) { + if (memcmp(hdr->data.ping.gossip[j].nodename,n->name, + CLUSTER_NAMELEN) == 0) break; + } + return j != count; +} + +/* Set the i-th entry of the gossip section in the message pointed by 'hdr' + * to the info of the specified node 'n'. */ +void clusterSetGossipEntry(clusterMsg *hdr, int i, clusterNode *n) { + clusterMsgDataGossip *gossip; + gossip = &(hdr->data.ping.gossip[i]); + memcpy(gossip->nodename,n->name,CLUSTER_NAMELEN); + gossip->ping_sent = htonl(n->ping_sent/1000); + gossip->pong_received = htonl(n->pong_received/1000); + memcpy(gossip->ip,n->ip,sizeof(n->ip)); + gossip->port = htons(n->port); + gossip->cport = htons(n->cport); + gossip->flags = htons(n->flags); + gossip->notused1 = 0; +} + /* Send a PING or PONG packet to the specified node, making sure to add enough * gossip informations. */ void clusterSendPing(clusterLink *link, int type) { @@ -2298,11 +2326,15 @@ void clusterSendPing(clusterLink *link, int type) { if (wanted < 3) wanted = 3; if (wanted > freshnodes) wanted = freshnodes; + /* Include all the nodes in PFAIL state, so that failure reports are + * faster to propagate to go from PFAIL to FAIL state. */ + int pfail_wanted = server.cluster->stats_pfail_nodes; + /* Compute the maxium totlen to allocate our buffer. We'll fix the totlen * later according to the number of gossip sections we really were able * to put inside the packet. */ totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); - totlen += (sizeof(clusterMsgDataGossip)*wanted); + totlen += (sizeof(clusterMsgDataGossip)*(wanted+pfail_wanted)); /* Note: clusterBuildMessageHdr() expects the buffer to be always at least * sizeof(clusterMsg) or more. */ if (totlen < (int)sizeof(clusterMsg)) totlen = sizeof(clusterMsg); @@ -2319,17 +2351,13 @@ void clusterSendPing(clusterLink *link, int type) { while(freshnodes > 0 && gossipcount < wanted && maxiterations--) { dictEntry *de = dictGetRandomKey(server.cluster->nodes); clusterNode *this = dictGetVal(de); - clusterMsgDataGossip *gossip; - int j; /* Don't include this node: the whole packet header is about us * already, so we just gossip about other nodes. */ if (this == myself) continue; - /* Give a bias to FAIL/PFAIL nodes. */ - if (maxiterations > wanted*2 && - !(this->flags & (CLUSTER_NODE_PFAIL|CLUSTER_NODE_FAIL))) - continue; + /* PFAIL nodes will be added later. */ + if (this->flags & CLUSTER_NODE_PFAIL) continue; /* In the gossip section don't include: * 1) Nodes in HANDSHAKE state. @@ -2343,27 +2371,37 @@ void clusterSendPing(clusterLink *link, int type) { continue; } - /* Check if we already added this node */ - for (j = 0; j < gossipcount; j++) { - if (memcmp(hdr->data.ping.gossip[j].nodename,this->name, - CLUSTER_NAMELEN) == 0) break; - } - if (j != gossipcount) continue; + /* Do not add a node we already have. */ + if (clusterNodeIsInGossipSection(hdr,gossipcount,this)) continue; /* Add it */ + clusterSetGossipEntry(hdr,gossipcount,this); freshnodes--; - gossip = &(hdr->data.ping.gossip[gossipcount]); - memcpy(gossip->nodename,this->name,CLUSTER_NAMELEN); - gossip->ping_sent = htonl(this->ping_sent/1000); - gossip->pong_received = htonl(this->pong_received/1000); - memcpy(gossip->ip,this->ip,sizeof(this->ip)); - gossip->port = htons(this->port); - gossip->cport = htons(this->cport); - gossip->flags = htons(this->flags); - gossip->notused1 = 0; gossipcount++; } + /* If there are PFAIL nodes, add them at the end. */ + if (pfail_wanted) { + dictIterator *di; + dictEntry *de; + + di = dictGetSafeIterator(server.cluster->nodes); + while((de = dictNext(di)) != NULL && pfail_wanted > 0) { + clusterNode *node = dictGetVal(de); + if (node->flags & CLUSTER_NODE_HANDSHAKE) continue; + if (node->flags & CLUSTER_NODE_NOADDR) continue; + if (!(node->flags & CLUSTER_NODE_PFAIL)) continue; + clusterSetGossipEntry(hdr,gossipcount,node); + freshnodes--; + gossipcount++; + /* We take the count of the slots we allocated, since the + * PFAIL stats may not match perfectly with the current number + * of PFAIL nodes. */ + pfail_wanted--; + } + dictReleaseIterator(di); + } + /* Ready to send... fix the totlen fiend and queue the message in the * output buffer. */ totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); @@ -3189,13 +3227,21 @@ void clusterCron(void) { handshake_timeout = server.cluster_node_timeout; if (handshake_timeout < 1000) handshake_timeout = 1000; - /* Check if we have disconnected nodes and re-establish the connection. */ + /* Check if we have disconnected nodes and re-establish the connection. + * Also update a few stats while we are here, that can be used to make + * better decisions in other part of the code. */ di = dictGetSafeIterator(server.cluster->nodes); + server.cluster->stats_pfail_nodes = 0; while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); + /* Not interested in reconnecting the link with myself or nodes + * for which we have no address. */ if (node->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_NOADDR)) continue; + if (node->flags & CLUSTER_NODE_PFAIL) + server.cluster->stats_pfail_nodes++; + /* A Node in HANDSHAKE state has a limited lifespan equal to the * configured node timeout. */ if (nodeInHandshake(node) && now - node->ctime > handshake_timeout) { diff --git a/src/cluster.h b/src/cluster.h index e7c088569..5e228c0f9 100644 --- a/src/cluster.h +++ b/src/cluster.h @@ -165,6 +165,8 @@ typedef struct clusterState { /* Messages received and sent by type. */ long long stats_bus_messages_sent[CLUSTERMSG_TYPE_COUNT]; long long stats_bus_messages_received[CLUSTERMSG_TYPE_COUNT]; + long long stats_pfail_nodes; /* Number of nodes in PFAIL status, + excluding nodes without address. */ } clusterState; /* Redis cluster messages header */