Load balance connections across threads (config NYI and hardcoded)
This commit is contained in:
parent
2c220889c6
commit
a47b0f4d3b
27
src/anet.c
27
src/anet.c
@ -246,6 +246,16 @@ static int anetSetReuseAddr(char *err, int fd) {
|
|||||||
return ANET_OK;
|
return ANET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int anetSetReusePort(char *err, int fd) {
|
||||||
|
int yes = 1;
|
||||||
|
/* Let us load balance listen()s from multiple threads */
|
||||||
|
if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)) == -1) {
|
||||||
|
anetSetError(err, "setsockopt SO_REUSEPORT: %s", strerror(errno));
|
||||||
|
return ANET_ERR;
|
||||||
|
}
|
||||||
|
return ANET_OK;
|
||||||
|
}
|
||||||
|
|
||||||
static int anetCreateSocket(char *err, int domain) {
|
static int anetCreateSocket(char *err, int domain) {
|
||||||
int s;
|
int s;
|
||||||
if ((s = socket(domain, SOCK_STREAM, 0)) == -1) {
|
if ((s = socket(domain, SOCK_STREAM, 0)) == -1) {
|
||||||
@ -265,6 +275,7 @@ static int anetCreateSocket(char *err, int domain) {
|
|||||||
#define ANET_CONNECT_NONE 0
|
#define ANET_CONNECT_NONE 0
|
||||||
#define ANET_CONNECT_NONBLOCK 1
|
#define ANET_CONNECT_NONBLOCK 1
|
||||||
#define ANET_CONNECT_BE_BINDING 2 /* Best effort binding. */
|
#define ANET_CONNECT_BE_BINDING 2 /* Best effort binding. */
|
||||||
|
#define ANET_CONNECT_REUSEPORT 4
|
||||||
static int anetTcpGenericConnect(char *err, char *addr, int port,
|
static int anetTcpGenericConnect(char *err, char *addr, int port,
|
||||||
char *source_addr, int flags)
|
char *source_addr, int flags)
|
||||||
{
|
{
|
||||||
@ -287,7 +298,10 @@ static int anetTcpGenericConnect(char *err, char *addr, int port,
|
|||||||
* the next entry in servinfo. */
|
* the next entry in servinfo. */
|
||||||
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
|
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
|
||||||
continue;
|
continue;
|
||||||
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
|
if (anetSetReuseAddr(err,s) == ANET_ERR)
|
||||||
|
goto error;
|
||||||
|
if (flags & ANET_CONNECT_REUSEPORT && anetSetReusePort(err, s) != ANET_OK)
|
||||||
|
goto error;
|
||||||
if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK)
|
if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK)
|
||||||
goto error;
|
goto error;
|
||||||
if (source_addr) {
|
if (source_addr) {
|
||||||
@ -462,7 +476,7 @@ static int anetV6Only(char *err, int s) {
|
|||||||
return ANET_OK;
|
return ANET_OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
|
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog, int fReusePort)
|
||||||
{
|
{
|
||||||
int s = -1, rv;
|
int s = -1, rv;
|
||||||
char _port[6]; /* strlen("65535") */
|
char _port[6]; /* strlen("65535") */
|
||||||
@ -484,6 +498,7 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl
|
|||||||
|
|
||||||
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
|
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
|
||||||
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
|
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
|
||||||
|
if (fReusePort && anetSetReusePort(err,s) == ANET_ERR) goto error;
|
||||||
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
|
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
@ -500,14 +515,14 @@ end:
|
|||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
|
||||||
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
|
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int fReusePort)
|
||||||
{
|
{
|
||||||
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
|
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog, fReusePort);
|
||||||
}
|
}
|
||||||
|
|
||||||
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog)
|
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int fReusePort)
|
||||||
{
|
{
|
||||||
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
|
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog, fReusePort);
|
||||||
}
|
}
|
||||||
|
|
||||||
int anetUnixServer(char *err, char *path, mode_t perm, int backlog)
|
int anetUnixServer(char *err, char *path, mode_t perm, int backlog)
|
||||||
|
@ -62,8 +62,8 @@ int anetUnixNonBlockConnect(char *err, char *path);
|
|||||||
int anetRead(int fd, char *buf, int count);
|
int anetRead(int fd, char *buf, int count);
|
||||||
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len);
|
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len);
|
||||||
int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len);
|
int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len);
|
||||||
int anetTcpServer(char *err, int port, char *bindaddr, int backlog);
|
int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int fReusePort);
|
||||||
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog);
|
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int fReusePort);
|
||||||
int anetUnixServer(char *err, char *path, mode_t perm, int backlog);
|
int anetUnixServer(char *err, char *path, mode_t perm, int backlog);
|
||||||
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
|
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
|
||||||
int anetUnixAccept(char *err, int serversock);
|
int anetUnixAccept(char *err, int serversock);
|
||||||
|
@ -486,7 +486,7 @@ void clusterInit(void) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (listenToPort(server.port+CLUSTER_PORT_INCR,
|
if (listenToPort(server.port+CLUSTER_PORT_INCR,
|
||||||
server.cfd,&server.cfd_count) == C_ERR)
|
server.cfd,&server.cfd_count, 0 /*fReusePort*/) == C_ERR)
|
||||||
{
|
{
|
||||||
exit(1);
|
exit(1);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1644,6 +1644,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|||||||
UNUSED(el);
|
UNUSED(el);
|
||||||
UNUSED(mask);
|
UNUSED(mask);
|
||||||
serverAssert(mask & AE_READ_THREADSAFE);
|
serverAssert(mask & AE_READ_THREADSAFE);
|
||||||
|
serverAssert(c->iel == ielFromEventLoop(el));
|
||||||
|
|
||||||
readlen = PROTO_IOBUF_LEN;
|
readlen = PROTO_IOBUF_LEN;
|
||||||
/* If this is a multi bulk request, and we are processing a bulk reply
|
/* If this is a multi bulk request, and we are processing a bulk reply
|
||||||
|
95
src/server.c
95
src/server.c
@ -2257,7 +2257,10 @@ void initServerConfig(void) {
|
|||||||
server.bindaddr_count = 0;
|
server.bindaddr_count = 0;
|
||||||
server.unixsocket = NULL;
|
server.unixsocket = NULL;
|
||||||
server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
|
server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
|
||||||
server.ipfd_count = 0;
|
for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel)
|
||||||
|
{
|
||||||
|
server.rgbindinfo[iel].ipfd_count = 0;
|
||||||
|
}
|
||||||
server.sofd = -1;
|
server.sofd = -1;
|
||||||
server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE;
|
server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE;
|
||||||
server.dbnum = CONFIG_DEFAULT_DBNUM;
|
server.dbnum = CONFIG_DEFAULT_DBNUM;
|
||||||
@ -2634,7 +2637,7 @@ void checkTcpBacklogSettings(void) {
|
|||||||
* impossible to bind, or no bind addresses were specified in the server
|
* impossible to bind, or no bind addresses were specified in the server
|
||||||
* configuration but the function is not able to bind * for at least
|
* configuration but the function is not able to bind * for at least
|
||||||
* one of the IPv4 or IPv6 protocols. */
|
* one of the IPv4 or IPv6 protocols. */
|
||||||
int listenToPort(int port, int *fds, int *count) {
|
int listenToPort(int port, int *fds, int *count, int fReusePort) {
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
/* Force binding of 0.0.0.0 if no bind address is specified, always
|
/* Force binding of 0.0.0.0 if no bind address is specified, always
|
||||||
@ -2646,7 +2649,7 @@ int listenToPort(int port, int *fds, int *count) {
|
|||||||
/* Bind * for both IPv6 and IPv4, we enter here only if
|
/* Bind * for both IPv6 and IPv4, we enter here only if
|
||||||
* server.bindaddr_count == 0. */
|
* server.bindaddr_count == 0. */
|
||||||
fds[*count] = anetTcp6Server(server.neterr,port,NULL,
|
fds[*count] = anetTcp6Server(server.neterr,port,NULL,
|
||||||
server.tcp_backlog);
|
server.tcp_backlog, fReusePort);
|
||||||
if (fds[*count] != ANET_ERR) {
|
if (fds[*count] != ANET_ERR) {
|
||||||
anetNonBlock(NULL,fds[*count]);
|
anetNonBlock(NULL,fds[*count]);
|
||||||
(*count)++;
|
(*count)++;
|
||||||
@ -2658,7 +2661,7 @@ int listenToPort(int port, int *fds, int *count) {
|
|||||||
if (*count == 1 || unsupported) {
|
if (*count == 1 || unsupported) {
|
||||||
/* Bind the IPv4 address as well. */
|
/* Bind the IPv4 address as well. */
|
||||||
fds[*count] = anetTcpServer(server.neterr,port,NULL,
|
fds[*count] = anetTcpServer(server.neterr,port,NULL,
|
||||||
server.tcp_backlog);
|
server.tcp_backlog, fReusePort);
|
||||||
if (fds[*count] != ANET_ERR) {
|
if (fds[*count] != ANET_ERR) {
|
||||||
anetNonBlock(NULL,fds[*count]);
|
anetNonBlock(NULL,fds[*count]);
|
||||||
(*count)++;
|
(*count)++;
|
||||||
@ -2674,11 +2677,11 @@ int listenToPort(int port, int *fds, int *count) {
|
|||||||
} else if (strchr(server.bindaddr[j],':')) {
|
} else if (strchr(server.bindaddr[j],':')) {
|
||||||
/* Bind IPv6 address. */
|
/* Bind IPv6 address. */
|
||||||
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
|
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
|
||||||
server.tcp_backlog);
|
server.tcp_backlog, fReusePort);
|
||||||
} else {
|
} else {
|
||||||
/* Bind IPv4 address. */
|
/* Bind IPv4 address. */
|
||||||
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
|
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
|
||||||
server.tcp_backlog);
|
server.tcp_backlog, fReusePort);
|
||||||
}
|
}
|
||||||
if (fds[*count] == ANET_ERR) {
|
if (fds[*count] == ANET_ERR) {
|
||||||
serverLog(LL_WARNING,
|
serverLog(LL_WARNING,
|
||||||
@ -2734,19 +2737,48 @@ void resetServerStats(void) {
|
|||||||
server.aof_delayed_fsync = 0;
|
server.aof_delayed_fsync = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initServerAcceptHandlers(void)
|
static void initNetworkingThread(int iel, int fReusePort)
|
||||||
{
|
{
|
||||||
/* Create an event handler for accepting new connections in TCP and Unix
|
/* Open the TCP listening socket for the user commands. */
|
||||||
* domain sockets. */
|
if (server.port != 0 &&
|
||||||
for (int j = 0; j < server.ipfd_count; j++) {
|
listenToPort(server.port,server.rgbindinfo[iel].ipfd,&server.rgbindinfo[iel].ipfd_count, fReusePort) == C_ERR)
|
||||||
int iel = j % server.cel;
|
exit(1);
|
||||||
if (aeCreateFileEvent(server.rgel[iel], server.ipfd[j], AE_READABLE|AE_READ_THREADSAFE,
|
|
||||||
|
/* Create an event handler for accepting new connections in TCP */
|
||||||
|
for (int j = 0; j < server.rgbindinfo[iel].ipfd_count; j++) {
|
||||||
|
if (aeCreateFileEvent(server.rgel[iel], server.rgbindinfo[iel].ipfd[j], AE_READABLE|AE_READ_THREADSAFE,
|
||||||
acceptTcpHandler,NULL) == AE_ERR)
|
acceptTcpHandler,NULL) == AE_ERR)
|
||||||
{
|
{
|
||||||
serverPanic(
|
serverPanic(
|
||||||
"Unrecoverable error creating server.ipfd file event.");
|
"Unrecoverable error creating server.ipfd file event.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void initNetworking(int fReusePort)
|
||||||
|
{
|
||||||
|
int celListen = (fReusePort) ? server.cel : 1;
|
||||||
|
for (int iel = 0; iel < celListen; ++iel)
|
||||||
|
initNetworkingThread(iel, fReusePort);
|
||||||
|
|
||||||
|
/* Open the listening Unix domain socket. */
|
||||||
|
if (server.unixsocket != NULL) {
|
||||||
|
unlink(server.unixsocket); /* don't care if this fails */
|
||||||
|
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
|
||||||
|
server.unixsocketperm, server.tcp_backlog);
|
||||||
|
if (server.sofd == ANET_ERR) {
|
||||||
|
serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
anetNonBlock(NULL,server.sofd);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Abort if there are no listening sockets at all. */
|
||||||
|
if (server.rgbindinfo[IDX_EVENT_LOOP_MAIN].ipfd_count == 0 && server.sofd < 0) {
|
||||||
|
serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
if (server.sofd > 0 && aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.sofd,AE_READABLE|AE_READ_THREADSAFE,
|
if (server.sofd > 0 && aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.sofd,AE_READABLE|AE_READ_THREADSAFE,
|
||||||
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
|
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
|
||||||
}
|
}
|
||||||
@ -2797,29 +2829,6 @@ void initServer(void) {
|
|||||||
}
|
}
|
||||||
server.db = zmalloc(sizeof(redisDb)*server.dbnum, MALLOC_LOCAL);
|
server.db = zmalloc(sizeof(redisDb)*server.dbnum, MALLOC_LOCAL);
|
||||||
|
|
||||||
/* Open the TCP listening socket for the user commands. */
|
|
||||||
if (server.port != 0 &&
|
|
||||||
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
|
|
||||||
exit(1);
|
|
||||||
|
|
||||||
/* Open the listening Unix domain socket. */
|
|
||||||
if (server.unixsocket != NULL) {
|
|
||||||
unlink(server.unixsocket); /* don't care if this fails */
|
|
||||||
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
|
|
||||||
server.unixsocketperm, server.tcp_backlog);
|
|
||||||
if (server.sofd == ANET_ERR) {
|
|
||||||
serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
anetNonBlock(NULL,server.sofd);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Abort if there are no listening sockets at all. */
|
|
||||||
if (server.ipfd_count == 0 && server.sofd < 0) {
|
|
||||||
serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
|
|
||||||
exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Create the Redis databases, and initialize other internal state. */
|
/* Create the Redis databases, and initialize other internal state. */
|
||||||
for (int j = 0; j < server.dbnum; j++) {
|
for (int j = 0; j < server.dbnum; j++) {
|
||||||
server.db[j].pdict = dictCreate(&dbDictType,NULL);
|
server.db[j].pdict = dictCreate(&dbDictType,NULL);
|
||||||
@ -3538,7 +3547,11 @@ int processCommand(client *c) {
|
|||||||
void closeListeningSockets(int unlink_unix_socket) {
|
void closeListeningSockets(int unlink_unix_socket) {
|
||||||
int j;
|
int j;
|
||||||
|
|
||||||
for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]);
|
for (int iel = 0; iel < server.cel; ++iel)
|
||||||
|
{
|
||||||
|
for (j = 0; j < server.rgbindinfo[iel].ipfd_count; j++)
|
||||||
|
close(server.rgbindinfo[iel].ipfd[j]);
|
||||||
|
}
|
||||||
if (server.sofd != -1) close(server.sofd);
|
if (server.sofd != -1) close(server.sofd);
|
||||||
if (server.cluster_enabled)
|
if (server.cluster_enabled)
|
||||||
for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
|
for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]);
|
||||||
@ -4961,6 +4974,9 @@ int main(int argc, char **argv) {
|
|||||||
if (background) daemonize();
|
if (background) daemonize();
|
||||||
|
|
||||||
initServer();
|
initServer();
|
||||||
|
server.cel = 4; //testing
|
||||||
|
initNetworking(1 /* fReusePort */);
|
||||||
|
|
||||||
if (background || server.pidfile) createPidFile();
|
if (background || server.pidfile) createPidFile();
|
||||||
redisSetProcTitle(argv[0]);
|
redisSetProcTitle(argv[0]);
|
||||||
redisAsciiArt();
|
redisAsciiArt();
|
||||||
@ -4987,7 +5003,7 @@ int main(int argc, char **argv) {
|
|||||||
exit(1);
|
exit(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (server.ipfd_count > 0)
|
if (server.rgbindinfo[IDX_EVENT_LOOP_MAIN].ipfd_count > 0)
|
||||||
serverLog(LL_NOTICE,"Ready to accept connections");
|
serverLog(LL_NOTICE,"Ready to accept connections");
|
||||||
if (server.sofd > 0)
|
if (server.sofd > 0)
|
||||||
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
|
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
|
||||||
@ -5000,11 +5016,6 @@ int main(int argc, char **argv) {
|
|||||||
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
|
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
server.cel = 4; //testing
|
|
||||||
initServerAcceptHandlers();
|
|
||||||
|
|
||||||
serverAssert(server.cel > 0 && server.cel <= MAX_EVENT_LOOPS);
|
serverAssert(server.cel > 0 && server.cel <= MAX_EVENT_LOOPS);
|
||||||
pthread_t rgthread[MAX_EVENT_LOOPS];
|
pthread_t rgthread[MAX_EVENT_LOOPS];
|
||||||
for (int iel = 0; iel < server.cel; ++iel)
|
for (int iel = 0; iel < server.cel; ++iel)
|
||||||
|
10
src/server.h
10
src/server.h
@ -1026,6 +1026,11 @@ struct clusterState;
|
|||||||
#define MAX_EVENT_LOOPS 16
|
#define MAX_EVENT_LOOPS 16
|
||||||
#define IDX_EVENT_LOOP_MAIN 0
|
#define IDX_EVENT_LOOP_MAIN 0
|
||||||
|
|
||||||
|
struct bindinfo {
|
||||||
|
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
||||||
|
int ipfd_count; /* Used slots in ipfd[] */
|
||||||
|
};
|
||||||
|
|
||||||
struct redisServer {
|
struct redisServer {
|
||||||
/* General */
|
/* General */
|
||||||
pid_t pid; /* Main process pid. */
|
pid_t pid; /* Main process pid. */
|
||||||
@ -1066,8 +1071,7 @@ struct redisServer {
|
|||||||
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
|
int bindaddr_count; /* Number of addresses in server.bindaddr[] */
|
||||||
char *unixsocket; /* UNIX socket path */
|
char *unixsocket; /* UNIX socket path */
|
||||||
mode_t unixsocketperm; /* UNIX socket permission */
|
mode_t unixsocketperm; /* UNIX socket permission */
|
||||||
int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */
|
struct bindinfo rgbindinfo[MAX_EVENT_LOOPS];
|
||||||
int ipfd_count; /* Used slots in ipfd[] */
|
|
||||||
int sofd; /* Unix socket file descriptor */
|
int sofd; /* Unix socket file descriptor */
|
||||||
int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */
|
int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */
|
||||||
int cfd_count; /* Used slots in cfd[] */
|
int cfd_count; /* Used slots in cfd[] */
|
||||||
@ -1592,7 +1596,7 @@ int getClientTypeByName(const char *name);
|
|||||||
const char *getClientTypeName(int cclass);
|
const char *getClientTypeName(int cclass);
|
||||||
void flushSlavesOutputBuffers(void);
|
void flushSlavesOutputBuffers(void);
|
||||||
void disconnectSlaves(void);
|
void disconnectSlaves(void);
|
||||||
int listenToPort(int port, int *fds, int *count);
|
int listenToPort(int port, int *fds, int *count, int fReusePort);
|
||||||
void pauseClients(mstime_t duration);
|
void pauseClients(mstime_t duration);
|
||||||
int clientsArePaused(void);
|
int clientsArePaused(void);
|
||||||
int processEventsWhileBlocked(int iel);
|
int processEventsWhileBlocked(int iel);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user