From 65e48b868e16856f90297465810ea71d14a8422f Mon Sep 17 00:00:00 2001 From: John Sully Date: Sat, 16 Feb 2019 14:26:19 -0500 Subject: [PATCH] Load balance connections across threads (config NYI and hardcoded) --- src/anet.c | 27 ++++++++++--- src/anet.h | 4 +- src/cluster.c | 2 +- src/networking.cpp | 1 + src/server.c | 95 ++++++++++++++++++++++++++-------------------- src/server.h | 10 +++-- 6 files changed, 85 insertions(+), 54 deletions(-) diff --git a/src/anet.c b/src/anet.c index 2981fca13..91ab94efd 100644 --- a/src/anet.c +++ b/src/anet.c @@ -246,6 +246,16 @@ static int anetSetReuseAddr(char *err, int fd) { return ANET_OK; } +static int anetSetReusePort(char *err, int fd) { + int yes = 1; + /* Let us load balance listen()s from multiple threads */ + if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &yes, sizeof(yes)) == -1) { + anetSetError(err, "setsockopt SO_REUSEPORT: %s", strerror(errno)); + return ANET_ERR; + } + return ANET_OK; +} + static int anetCreateSocket(char *err, int domain) { int s; if ((s = socket(domain, SOCK_STREAM, 0)) == -1) { @@ -265,6 +275,7 @@ static int anetCreateSocket(char *err, int domain) { #define ANET_CONNECT_NONE 0 #define ANET_CONNECT_NONBLOCK 1 #define ANET_CONNECT_BE_BINDING 2 /* Best effort binding. */ +#define ANET_CONNECT_REUSEPORT 4 static int anetTcpGenericConnect(char *err, char *addr, int port, char *source_addr, int flags) { @@ -287,7 +298,10 @@ static int anetTcpGenericConnect(char *err, char *addr, int port, * the next entry in servinfo. */ if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1) continue; - if (anetSetReuseAddr(err,s) == ANET_ERR) goto error; + if (anetSetReuseAddr(err,s) == ANET_ERR) + goto error; + if (flags & ANET_CONNECT_REUSEPORT && anetSetReusePort(err, s) != ANET_OK) + goto error; if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK) goto error; if (source_addr) { @@ -462,7 +476,7 @@ static int anetV6Only(char *err, int s) { return ANET_OK; } -static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) +static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog, int fReusePort) { int s = -1, rv; char _port[6]; /* strlen("65535") */ @@ -484,6 +498,7 @@ static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backl if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error; if (anetSetReuseAddr(err,s) == ANET_ERR) goto error; + if (fReusePort && anetSetReusePort(err,s) == ANET_ERR) goto error; if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR; goto end; } @@ -500,14 +515,14 @@ end: return s; } -int anetTcpServer(char *err, int port, char *bindaddr, int backlog) +int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int fReusePort) { - return _anetTcpServer(err, port, bindaddr, AF_INET, backlog); + return _anetTcpServer(err, port, bindaddr, AF_INET, backlog, fReusePort); } -int anetTcp6Server(char *err, int port, char *bindaddr, int backlog) +int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int fReusePort) { - return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog); + return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog, fReusePort); } int anetUnixServer(char *err, char *path, mode_t perm, int backlog) diff --git a/src/anet.h b/src/anet.h index 4e272284a..44c57b4cd 100644 --- a/src/anet.h +++ b/src/anet.h @@ -62,8 +62,8 @@ int anetUnixNonBlockConnect(char *err, char *path); int anetRead(int fd, char *buf, int count); int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len); int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len); -int anetTcpServer(char *err, int port, char *bindaddr, int backlog); -int anetTcp6Server(char *err, int port, char *bindaddr, int backlog); +int anetTcpServer(char *err, int port, char *bindaddr, int backlog, int fReusePort); +int anetTcp6Server(char *err, int port, char *bindaddr, int backlog, int fReusePort); int anetUnixServer(char *err, char *path, mode_t perm, int backlog); int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port); int anetUnixAccept(char *err, int serversock); diff --git a/src/cluster.c b/src/cluster.c index eb2749b1b..6cadecc39 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -486,7 +486,7 @@ void clusterInit(void) { } if (listenToPort(server.port+CLUSTER_PORT_INCR, - server.cfd,&server.cfd_count) == C_ERR) + server.cfd,&server.cfd_count, 0 /*fReusePort*/) == C_ERR) { exit(1); } else { diff --git a/src/networking.cpp b/src/networking.cpp index bb3788501..7188631be 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -1644,6 +1644,7 @@ void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) { UNUSED(el); UNUSED(mask); serverAssert(mask & AE_READ_THREADSAFE); + serverAssert(c->iel == ielFromEventLoop(el)); readlen = PROTO_IOBUF_LEN; /* If this is a multi bulk request, and we are processing a bulk reply diff --git a/src/server.c b/src/server.c index bc14279f4..c4de1e7ec 100644 --- a/src/server.c +++ b/src/server.c @@ -2257,7 +2257,10 @@ void initServerConfig(void) { server.bindaddr_count = 0; server.unixsocket = NULL; server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM; - server.ipfd_count = 0; + for (int iel = 0; iel < MAX_EVENT_LOOPS; ++iel) + { + server.rgbindinfo[iel].ipfd_count = 0; + } server.sofd = -1; server.protected_mode = CONFIG_DEFAULT_PROTECTED_MODE; server.dbnum = CONFIG_DEFAULT_DBNUM; @@ -2634,7 +2637,7 @@ void checkTcpBacklogSettings(void) { * impossible to bind, or no bind addresses were specified in the server * configuration but the function is not able to bind * for at least * one of the IPv4 or IPv6 protocols. */ -int listenToPort(int port, int *fds, int *count) { +int listenToPort(int port, int *fds, int *count, int fReusePort) { int j; /* Force binding of 0.0.0.0 if no bind address is specified, always @@ -2646,7 +2649,7 @@ int listenToPort(int port, int *fds, int *count) { /* Bind * for both IPv6 and IPv4, we enter here only if * server.bindaddr_count == 0. */ fds[*count] = anetTcp6Server(server.neterr,port,NULL, - server.tcp_backlog); + server.tcp_backlog, fReusePort); if (fds[*count] != ANET_ERR) { anetNonBlock(NULL,fds[*count]); (*count)++; @@ -2658,7 +2661,7 @@ int listenToPort(int port, int *fds, int *count) { if (*count == 1 || unsupported) { /* Bind the IPv4 address as well. */ fds[*count] = anetTcpServer(server.neterr,port,NULL, - server.tcp_backlog); + server.tcp_backlog, fReusePort); if (fds[*count] != ANET_ERR) { anetNonBlock(NULL,fds[*count]); (*count)++; @@ -2674,11 +2677,11 @@ int listenToPort(int port, int *fds, int *count) { } else if (strchr(server.bindaddr[j],':')) { /* Bind IPv6 address. */ fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j], - server.tcp_backlog); + server.tcp_backlog, fReusePort); } else { /* Bind IPv4 address. */ fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j], - server.tcp_backlog); + server.tcp_backlog, fReusePort); } if (fds[*count] == ANET_ERR) { serverLog(LL_WARNING, @@ -2734,19 +2737,48 @@ void resetServerStats(void) { server.aof_delayed_fsync = 0; } -void initServerAcceptHandlers(void) +static void initNetworkingThread(int iel, int fReusePort) { - /* Create an event handler for accepting new connections in TCP and Unix - * domain sockets. */ - for (int j = 0; j < server.ipfd_count; j++) { - int iel = j % server.cel; - if (aeCreateFileEvent(server.rgel[iel], server.ipfd[j], AE_READABLE|AE_READ_THREADSAFE, + /* Open the TCP listening socket for the user commands. */ + if (server.port != 0 && + listenToPort(server.port,server.rgbindinfo[iel].ipfd,&server.rgbindinfo[iel].ipfd_count, fReusePort) == C_ERR) + exit(1); + + /* Create an event handler for accepting new connections in TCP */ + for (int j = 0; j < server.rgbindinfo[iel].ipfd_count; j++) { + if (aeCreateFileEvent(server.rgel[iel], server.rgbindinfo[iel].ipfd[j], AE_READABLE|AE_READ_THREADSAFE, acceptTcpHandler,NULL) == AE_ERR) { serverPanic( "Unrecoverable error creating server.ipfd file event."); } } +} + +static void initNetworking(int fReusePort) +{ + int celListen = (fReusePort) ? server.cel : 1; + for (int iel = 0; iel < celListen; ++iel) + initNetworkingThread(iel, fReusePort); + + /* Open the listening Unix domain socket. */ + if (server.unixsocket != NULL) { + unlink(server.unixsocket); /* don't care if this fails */ + server.sofd = anetUnixServer(server.neterr,server.unixsocket, + server.unixsocketperm, server.tcp_backlog); + if (server.sofd == ANET_ERR) { + serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); + exit(1); + } + anetNonBlock(NULL,server.sofd); + } + + /* Abort if there are no listening sockets at all. */ + if (server.rgbindinfo[IDX_EVENT_LOOP_MAIN].ipfd_count == 0 && server.sofd < 0) { + serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); + exit(1); + } + if (server.sofd > 0 && aeCreateFileEvent(server.rgel[IDX_EVENT_LOOP_MAIN],server.sofd,AE_READABLE|AE_READ_THREADSAFE, acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); } @@ -2797,29 +2829,6 @@ void initServer(void) { } server.db = zmalloc(sizeof(redisDb)*server.dbnum, MALLOC_LOCAL); - /* Open the TCP listening socket for the user commands. */ - if (server.port != 0 && - listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR) - exit(1); - - /* Open the listening Unix domain socket. */ - if (server.unixsocket != NULL) { - unlink(server.unixsocket); /* don't care if this fails */ - server.sofd = anetUnixServer(server.neterr,server.unixsocket, - server.unixsocketperm, server.tcp_backlog); - if (server.sofd == ANET_ERR) { - serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr); - exit(1); - } - anetNonBlock(NULL,server.sofd); - } - - /* Abort if there are no listening sockets at all. */ - if (server.ipfd_count == 0 && server.sofd < 0) { - serverLog(LL_WARNING, "Configured to not listen anywhere, exiting."); - exit(1); - } - /* Create the Redis databases, and initialize other internal state. */ for (int j = 0; j < server.dbnum; j++) { server.db[j].pdict = dictCreate(&dbDictType,NULL); @@ -3538,7 +3547,11 @@ int processCommand(client *c) { void closeListeningSockets(int unlink_unix_socket) { int j; - for (j = 0; j < server.ipfd_count; j++) close(server.ipfd[j]); + for (int iel = 0; iel < server.cel; ++iel) + { + for (j = 0; j < server.rgbindinfo[iel].ipfd_count; j++) + close(server.rgbindinfo[iel].ipfd[j]); + } if (server.sofd != -1) close(server.sofd); if (server.cluster_enabled) for (j = 0; j < server.cfd_count; j++) close(server.cfd[j]); @@ -4961,6 +4974,9 @@ int main(int argc, char **argv) { if (background) daemonize(); initServer(); + server.cel = 4; //testing + initNetworking(1 /* fReusePort */); + if (background || server.pidfile) createPidFile(); redisSetProcTitle(argv[0]); redisAsciiArt(); @@ -4987,7 +5003,7 @@ int main(int argc, char **argv) { exit(1); } } - if (server.ipfd_count > 0) + if (server.rgbindinfo[IDX_EVENT_LOOP_MAIN].ipfd_count > 0) serverLog(LL_NOTICE,"Ready to accept connections"); if (server.sofd > 0) serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); @@ -5000,11 +5016,6 @@ int main(int argc, char **argv) { serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory); } - - - server.cel = 4; //testing - initServerAcceptHandlers(); - serverAssert(server.cel > 0 && server.cel <= MAX_EVENT_LOOPS); pthread_t rgthread[MAX_EVENT_LOOPS]; for (int iel = 0; iel < server.cel; ++iel) diff --git a/src/server.h b/src/server.h index dc595b05a..626746323 100644 --- a/src/server.h +++ b/src/server.h @@ -1026,6 +1026,11 @@ struct clusterState; #define MAX_EVENT_LOOPS 16 #define IDX_EVENT_LOOP_MAIN 0 +struct bindinfo { + int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ + int ipfd_count; /* Used slots in ipfd[] */ +}; + struct redisServer { /* General */ pid_t pid; /* Main process pid. */ @@ -1066,8 +1071,7 @@ struct redisServer { int bindaddr_count; /* Number of addresses in server.bindaddr[] */ char *unixsocket; /* UNIX socket path */ mode_t unixsocketperm; /* UNIX socket permission */ - int ipfd[CONFIG_BINDADDR_MAX]; /* TCP socket file descriptors */ - int ipfd_count; /* Used slots in ipfd[] */ + struct bindinfo rgbindinfo[MAX_EVENT_LOOPS]; int sofd; /* Unix socket file descriptor */ int cfd[CONFIG_BINDADDR_MAX];/* Cluster bus listening socket */ int cfd_count; /* Used slots in cfd[] */ @@ -1592,7 +1596,7 @@ int getClientTypeByName(const char *name); const char *getClientTypeName(int cclass); void flushSlavesOutputBuffers(void); void disconnectSlaves(void); -int listenToPort(int port, int *fds, int *count); +int listenToPort(int port, int *fds, int *count, int fReusePort); void pauseClients(mstime_t duration); int clientsArePaused(void); int processEventsWhileBlocked(int iel);