Use SOCK_NONBLOCK to reduce system calls for outgoing connections (#293)
What this PR mainly does: 1. Refactor the `anetCreateSocket()` to make it more generic for more socket arguments, and use `SOCK_NONBLOCK` if available, which will reduce two system calls (`F_GETFL` and `F_SETFL`) of enabling the non-blocking mode on each newly created socket. 2. Clean up the unused `anetUnixGenericConnect()` that calls `anetCreateSocket()`. `SOCK_NONBLOCK` for system call `socket()` is supported on most UNIX-like platforms (`linux`, `dragonfly`, `freebsd`, `netbsd`, `openbsd`, `solaris`, etc.). This improvement will significantly reduce the system calls considering how massively `anetTcpGenericConnect()` will be called when needed. As for the cleanup, `anetUnixGenericConnect` was introduced in c61e692 and the only reference back then was from the `createClient()` in `redis-benchmark.c` which had been removed in ec8f066 and made it the dead code. Most of that dead code was also cleaned up in f657315e, and it seems that the `anetUnixGenericConnect` got left out. Therefore, I also cleaned it up, ***but I'm not so certain about doing this cleanup in this PR. Maybe you would prefer to do it in a separate PR?*** References: - [socket(2) on Linux](https://man7.org/linux/man-pages/man2/socket.2.html) - [socket(2) on FreeBSD](https://man.freebsd.org/cgi/man.cgi?socket(2)) - [socket(2) on DragonFly](https://man.dragonflybsd.org/?command=socket§ion=2) - [socket(2) on NetBSD](https://man.netbsd.org/socket.2) - [socket(2) on OpenBSD](https://man.openbsd.org/socket.2) - [socket(3c) on Solaris](https://docs.oracle.com/cd/E88353_01/html/E37843/socket-3c.html) - [socket(3socket) on illumos](https://illumos.org/man/3SOCKET/socket) --------- Signed-off-by: Andy Pan <i@andypan.me>
This commit is contained in:
parent
b0d5a0f58d
commit
948cd8f2c2
88
src/anet.c
88
src/anet.c
@ -380,19 +380,51 @@ static int anetSetReuseAddr(char *err, int fd) {
|
||||
return ANET_OK;
|
||||
}
|
||||
|
||||
static int anetCreateSocket(char *err, int domain) {
|
||||
/* In general, SOCK_CLOEXEC won't have noticeable effect
|
||||
* except for cases which really need this flag.
|
||||
* Otherwise, it is just a flag that is nice to have.
|
||||
* Its absence shouldn't affect a common socket's functionality.
|
||||
*/
|
||||
#define ANET_SOCKET_CLOEXEC 1
|
||||
#define ANET_SOCKET_NONBLOCK 2
|
||||
#define ANET_SOCKET_REUSEADDR 4
|
||||
static int anetCreateSocket(char *err, int domain, int type, int protocol, int flags) {
|
||||
int s;
|
||||
if ((s = socket(domain, SOCK_STREAM, 0)) == -1) {
|
||||
|
||||
#ifdef SOCK_CLOEXEC
|
||||
if (flags & ANET_SOCKET_CLOEXEC) {
|
||||
type |= SOCK_CLOEXEC;
|
||||
flags &= ~ANET_SOCKET_CLOEXEC;
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef SOCK_NONBLOCK
|
||||
if (flags & ANET_SOCKET_NONBLOCK) {
|
||||
type |= SOCK_NONBLOCK;
|
||||
flags &= ~ANET_SOCKET_NONBLOCK;
|
||||
}
|
||||
#endif
|
||||
|
||||
if ((s = socket(domain, type, protocol)) == -1) {
|
||||
anetSetError(err, "creating socket: %s", strerror(errno));
|
||||
return ANET_ERR;
|
||||
}
|
||||
|
||||
/* Make sure connection-intensive things like the benchmark tool
|
||||
* will be able to close/open sockets a zillion of times */
|
||||
if (anetSetReuseAddr(err,s) == ANET_ERR) {
|
||||
if (flags & ANET_SOCKET_CLOEXEC && anetCloexec(s) == ANET_ERR) {
|
||||
close(s);
|
||||
return ANET_ERR;
|
||||
}
|
||||
|
||||
if (flags & ANET_SOCKET_NONBLOCK && anetNonBlock(err, s) == ANET_ERR) {
|
||||
close(s);
|
||||
return ANET_ERR;
|
||||
}
|
||||
|
||||
if (flags & ANET_SOCKET_REUSEADDR && anetSetReuseAddr(err,s) == ANET_ERR) {
|
||||
close(s);
|
||||
return ANET_ERR;
|
||||
}
|
||||
|
||||
return s;
|
||||
}
|
||||
|
||||
@ -418,12 +450,15 @@ static int anetTcpGenericConnect(char *err, const char *addr, int port,
|
||||
for (p = servinfo; p != NULL; p = p->ai_next) {
|
||||
/* Try to create the socket and to connect it.
|
||||
* If we fail in the socket() call, or on connect(), we retry with
|
||||
* the next entry in servinfo. */
|
||||
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
|
||||
* the next entry in servinfo.
|
||||
*
|
||||
* Make sure connection-intensive things like the benchmark tool
|
||||
* will be able to close/open sockets a zillion of times.
|
||||
*/
|
||||
int sockflags = ANET_SOCKET_CLOEXEC | ANET_SOCKET_REUSEADDR;
|
||||
if (flags & ANET_CONNECT_NONBLOCK) sockflags |= ANET_SOCKET_NONBLOCK;
|
||||
if ((s = anetCreateSocket(err,p->ai_family,p->ai_socktype,p->ai_protocol,sockflags)) == ANET_ERR)
|
||||
continue;
|
||||
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
|
||||
if (flags & ANET_CONNECT_NONBLOCK && anetNonBlock(err,s) != ANET_OK)
|
||||
goto error;
|
||||
if (source_addr) {
|
||||
int bound = 0;
|
||||
/* Using getaddrinfo saves us from self-determining IPv4 vs IPv6 */
|
||||
@ -491,34 +526,6 @@ int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port,
|
||||
ANET_CONNECT_NONBLOCK|ANET_CONNECT_BE_BINDING);
|
||||
}
|
||||
|
||||
int anetUnixGenericConnect(char *err, const char *path, int flags)
|
||||
{
|
||||
int s;
|
||||
struct sockaddr_un sa;
|
||||
|
||||
if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR)
|
||||
return ANET_ERR;
|
||||
|
||||
sa.sun_family = AF_LOCAL;
|
||||
valkey_strlcpy(sa.sun_path,path,sizeof(sa.sun_path));
|
||||
if (flags & ANET_CONNECT_NONBLOCK) {
|
||||
if (anetNonBlock(err,s) != ANET_OK) {
|
||||
close(s);
|
||||
return ANET_ERR;
|
||||
}
|
||||
}
|
||||
if (connect(s,(struct sockaddr*)&sa,sizeof(sa)) == -1) {
|
||||
if (errno == EINPROGRESS &&
|
||||
flags & ANET_CONNECT_NONBLOCK)
|
||||
return s;
|
||||
|
||||
anetSetError(err, "connect: %s", strerror(errno));
|
||||
close(s);
|
||||
return ANET_ERR;
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog, mode_t perm) {
|
||||
if (bind(s,sa,len) == -1) {
|
||||
anetSetError(err, "bind: %s", strerror(errno));
|
||||
@ -607,7 +614,10 @@ int anetUnixServer(char *err, char *path, mode_t perm, int backlog)
|
||||
anetSetError(err,"unix socket path too long (%zu), must be under %zu", strlen(path), sizeof(sa.sun_path));
|
||||
return ANET_ERR;
|
||||
}
|
||||
if ((s = anetCreateSocket(err,AF_LOCAL)) == ANET_ERR)
|
||||
|
||||
int type = SOCK_STREAM;
|
||||
int flags = ANET_SOCKET_CLOEXEC | ANET_SOCKET_NONBLOCK | ANET_SOCKET_REUSEADDR;
|
||||
if ((s = anetCreateSocket(err,AF_LOCAL,type,0,flags)) == ANET_ERR)
|
||||
return ANET_ERR;
|
||||
|
||||
memset(&sa,0,sizeof(sa));
|
||||
|
@ -66,8 +66,6 @@ static int connUnixListen(connListener *listener) {
|
||||
serverLog(LL_WARNING, "Failed opening Unix socket: %s", server.neterr);
|
||||
exit(1);
|
||||
}
|
||||
anetNonBlock(NULL, fd);
|
||||
anetCloexec(fd);
|
||||
listener->fd[listener->count++] = fd;
|
||||
}
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user