Cluster: bulk-accept new nodes connections.
The same change was operated for normal client connections. This is important for Cluster as well, since when a node rejoins the cluster, when a partition heals or after a restart, it gets flooded with new connection attempts by all the other nodes trying to form a full mesh again.
This commit is contained in:
parent
3625b52791
commit
8a170c817d
@ -492,32 +492,38 @@ void freeClusterLink(clusterLink *link) {
|
|||||||
zfree(link);
|
zfree(link);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define MAX_CLUSTER_ACCEPTS_PER_CALL 1000
|
||||||
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
void clusterAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||||
int cport, cfd;
|
int cport, cfd;
|
||||||
|
int max = MAX_CLUSTER_ACCEPTS_PER_CALL;
|
||||||
char cip[REDIS_IP_STR_LEN];
|
char cip[REDIS_IP_STR_LEN];
|
||||||
clusterLink *link;
|
clusterLink *link;
|
||||||
REDIS_NOTUSED(el);
|
REDIS_NOTUSED(el);
|
||||||
REDIS_NOTUSED(mask);
|
REDIS_NOTUSED(mask);
|
||||||
REDIS_NOTUSED(privdata);
|
REDIS_NOTUSED(privdata);
|
||||||
|
|
||||||
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
|
while(max--) {
|
||||||
if (cfd == ANET_ERR) {
|
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
|
||||||
redisLog(REDIS_VERBOSE,"Accepting cluster node: %s", server.neterr);
|
if (cfd == ANET_ERR) {
|
||||||
return;
|
if (errno != EWOULDBLOCK)
|
||||||
}
|
redisLog(REDIS_VERBOSE,
|
||||||
anetNonBlock(NULL,cfd);
|
"Accepting cluster node: %s", server.neterr);
|
||||||
anetEnableTcpNoDelay(NULL,cfd);
|
return;
|
||||||
|
}
|
||||||
|
anetNonBlock(NULL,cfd);
|
||||||
|
anetEnableTcpNoDelay(NULL,cfd);
|
||||||
|
|
||||||
/* Use non-blocking I/O for cluster messages. */
|
/* Use non-blocking I/O for cluster messages. */
|
||||||
redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
|
redisLog(REDIS_VERBOSE,"Accepted cluster node %s:%d", cip, cport);
|
||||||
/* Create a link object we use to handle the connection.
|
/* Create a link object we use to handle the connection.
|
||||||
* It gets passed to the readable handler when data is available.
|
* It gets passed to the readable handler when data is available.
|
||||||
* Initiallly the link->node pointer is set to NULL as we don't know
|
* Initiallly the link->node pointer is set to NULL as we don't know
|
||||||
* which node is, but the right node is references once we know the
|
* which node is, but the right node is references once we know the
|
||||||
* node identity. */
|
* node identity. */
|
||||||
link = createClusterLink(NULL);
|
link = createClusterLink(NULL);
|
||||||
link->fd = cfd;
|
link->fd = cfd;
|
||||||
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
|
aeCreateFileEvent(server.el,cfd,AE_READABLE,clusterReadHandler,link);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* -----------------------------------------------------------------------------
|
/* -----------------------------------------------------------------------------
|
||||||
|
Loading…
x
Reference in New Issue
Block a user