
1. Remove redundant connIncrRefs/connDecrRefs In socket.c, the reference counter is incremented before calling callHandler, but the same reference counter is also incremented inside callHandler before calling the actual callback. static inline int callHandler(connection *conn, ConnectionCallbackFunc handler) { connIncrRefs(conn); if (handler) handler(conn); connDecrRefs(conn); ... } This commit removes the redundant incr/decr calls in socket.c 2. Correct return value of connRead for TLS when peer closed According to comments in connection.h, connRead returns 0 when the peer has closed the connection. This patch corrects the return value for TLS connections. (Without this patch, it returns -1 which means error.) There is an observable difference in what is logged in the verbose level: "Client closed connection" vs "Reading from client: (null)". --------- Signed-off-by: Viktor Söderqvist <viktor.soderqvist@est.tech>
464 lines
16 KiB
C
464 lines
16 KiB
C
/*
|
|
* Copyright (c) 2019, Redis Ltd.
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright notice,
|
|
* this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above copyright
|
|
* notice, this list of conditions and the following disclaimer in the
|
|
* documentation and/or other materials provided with the distribution.
|
|
* * Neither the name of Redis nor the names of its contributors may be used
|
|
* to endorse or promote products derived from this software without
|
|
* specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
|
|
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
* POSSIBILITY OF SUCH DAMAGE.
|
|
*/
|
|
|
|
#include "server.h"
|
|
#include "connhelpers.h"
|
|
|
|
/* The connections module provides a lean abstraction of network connections
|
|
* to avoid direct socket and async event management across the server code base.
|
|
*
|
|
* It does NOT provide advanced connection features commonly found in similar
|
|
* libraries such as complete in/out buffer management, throttling, etc. These
|
|
* functions remain in networking.c.
|
|
*
|
|
* The primary goal is to allow transparent handling of TCP and TLS based
|
|
* connections. To do so, connections have the following properties:
|
|
*
|
|
* 1. A connection may live before its corresponding socket exists. This
|
|
* allows various context and configuration setting to be handled before
|
|
* establishing the actual connection.
|
|
* 2. The caller may register/unregister logical read/write handlers to be
|
|
* called when the connection has data to read from/can accept writes.
|
|
* These logical handlers may or may not correspond to actual AE events,
|
|
* depending on the implementation (for TCP they are; for TLS they aren't).
|
|
*/
|
|
|
|
static ConnectionType CT_Socket;
|
|
|
|
/* When a connection is created we must know its type already, but the
|
|
* underlying socket may or may not exist:
|
|
*
|
|
* - For accepted connections, it exists as we do not model the listen/accept
|
|
* part; So caller calls connCreateSocket() followed by connAccept().
|
|
* - For outgoing connections, the socket is created by the connection module
|
|
* itself; So caller calls connCreateSocket() followed by connConnect(),
|
|
* which registers a connect callback that fires on connected/error state
|
|
* (and after any transport level handshake was done).
|
|
*
|
|
* NOTE: An earlier version relied on connections being part of other structs
|
|
* and not independently allocated. This could lead to further optimizations
|
|
* like using container_of(), etc. However it was discontinued in favor of
|
|
* this approach for these reasons:
|
|
*
|
|
* 1. In some cases conns are created/handled outside the context of the
|
|
* containing struct, in which case it gets a bit awkward to copy them.
|
|
* 2. Future implementations may wish to allocate arbitrary data for the
|
|
* connection.
|
|
* 3. The container_of() approach is anyway risky because connections may
|
|
* be embedded in different structs, not just client.
|
|
*/
|
|
|
|
static connection *connCreateSocket(void) {
|
|
connection *conn = zcalloc(sizeof(connection));
|
|
conn->type = &CT_Socket;
|
|
conn->fd = -1;
|
|
conn->iovcnt = IOV_MAX;
|
|
|
|
return conn;
|
|
}
|
|
|
|
/* Create a new socket-type connection that is already associated with
|
|
* an accepted connection.
|
|
*
|
|
* The socket is not ready for I/O until connAccept() was called and
|
|
* invoked the connection-level accept handler.
|
|
*
|
|
* Callers should use connGetState() and verify the created connection
|
|
* is not in an error state (which is not possible for a socket connection,
|
|
* but could but possible with other protocols).
|
|
*/
|
|
static connection *connCreateAcceptedSocket(int fd, void *priv) {
|
|
UNUSED(priv);
|
|
connection *conn = connCreateSocket();
|
|
conn->fd = fd;
|
|
conn->state = CONN_STATE_ACCEPTING;
|
|
return conn;
|
|
}
|
|
|
|
static int connSocketConnect(connection *conn,
|
|
const char *addr,
|
|
int port,
|
|
const char *src_addr,
|
|
ConnectionCallbackFunc connect_handler) {
|
|
int fd = anetTcpNonBlockBestEffortBindConnect(NULL, addr, port, src_addr);
|
|
if (fd == -1) {
|
|
conn->state = CONN_STATE_ERROR;
|
|
conn->last_errno = errno;
|
|
return C_ERR;
|
|
}
|
|
|
|
conn->fd = fd;
|
|
conn->state = CONN_STATE_CONNECTING;
|
|
|
|
conn->conn_handler = connect_handler;
|
|
aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE, conn->type->ae_handler, conn);
|
|
|
|
return C_OK;
|
|
}
|
|
|
|
/* ------ Pure socket connections ------- */
|
|
|
|
/* A very incomplete list of implementation-specific calls. Much of the above shall
|
|
* move here as we implement additional connection types.
|
|
*/
|
|
|
|
static void connSocketShutdown(connection *conn) {
|
|
if (conn->fd == -1) return;
|
|
|
|
shutdown(conn->fd, SHUT_RDWR);
|
|
}
|
|
|
|
/* Close the connection and free resources. */
|
|
static void connSocketClose(connection *conn) {
|
|
if (conn->fd != -1) {
|
|
aeDeleteFileEvent(server.el, conn->fd, AE_READABLE | AE_WRITABLE);
|
|
close(conn->fd);
|
|
conn->fd = -1;
|
|
}
|
|
|
|
/* If called from within a handler, schedule the close but
|
|
* keep the connection until the handler returns.
|
|
*/
|
|
if (connHasRefs(conn)) {
|
|
conn->flags |= CONN_FLAG_CLOSE_SCHEDULED;
|
|
return;
|
|
}
|
|
|
|
zfree(conn);
|
|
}
|
|
|
|
static int connSocketWrite(connection *conn, const void *data, size_t data_len) {
|
|
int ret = write(conn->fd, data, data_len);
|
|
if (ret < 0 && errno != EAGAIN) {
|
|
conn->last_errno = errno;
|
|
|
|
/* Don't overwrite the state of a connection that is not already
|
|
* connected, not to mess with handler callbacks.
|
|
*/
|
|
if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) conn->state = CONN_STATE_ERROR;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static int connSocketWritev(connection *conn, const struct iovec *iov, int iovcnt) {
|
|
int ret = writev(conn->fd, iov, iovcnt);
|
|
if (ret < 0 && errno != EAGAIN) {
|
|
conn->last_errno = errno;
|
|
|
|
/* Don't overwrite the state of a connection that is not already
|
|
* connected, not to mess with handler callbacks.
|
|
*/
|
|
if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) conn->state = CONN_STATE_ERROR;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
|
|
int ret = read(conn->fd, buf, buf_len);
|
|
if (!ret) {
|
|
conn->state = CONN_STATE_CLOSED;
|
|
} else if (ret < 0 && errno != EAGAIN) {
|
|
conn->last_errno = errno;
|
|
|
|
/* Don't overwrite the state of a connection that is not already
|
|
* connected, not to mess with handler callbacks.
|
|
*/
|
|
if (errno != EINTR && conn->state == CONN_STATE_CONNECTED) conn->state = CONN_STATE_ERROR;
|
|
}
|
|
|
|
return ret;
|
|
}
|
|
|
|
static int connSocketAccept(connection *conn, ConnectionCallbackFunc accept_handler) {
|
|
int ret = C_OK;
|
|
|
|
if (conn->state != CONN_STATE_ACCEPTING) return C_ERR;
|
|
conn->state = CONN_STATE_CONNECTED;
|
|
|
|
if (!callHandler(conn, accept_handler)) ret = C_ERR;
|
|
|
|
return ret;
|
|
}
|
|
|
|
/* Register a write handler, to be called when the connection is writable.
|
|
* If NULL, the existing handler is removed.
|
|
*
|
|
* The barrier flag indicates a write barrier is requested, resulting with
|
|
* CONN_FLAG_WRITE_BARRIER set. This will ensure that the write handler is
|
|
* always called before and not after the read handler in a single event
|
|
* loop.
|
|
*/
|
|
static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
|
|
if (func == conn->write_handler) return C_OK;
|
|
|
|
conn->write_handler = func;
|
|
if (barrier)
|
|
conn->flags |= CONN_FLAG_WRITE_BARRIER;
|
|
else
|
|
conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
|
|
if (!conn->write_handler)
|
|
aeDeleteFileEvent(server.el, conn->fd, AE_WRITABLE);
|
|
else if (aeCreateFileEvent(server.el, conn->fd, AE_WRITABLE, conn->type->ae_handler, conn) == AE_ERR)
|
|
return C_ERR;
|
|
return C_OK;
|
|
}
|
|
|
|
/* Register a read handler, to be called when the connection is readable.
|
|
* If NULL, the existing handler is removed.
|
|
*/
|
|
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
|
|
if (func == conn->read_handler) return C_OK;
|
|
|
|
conn->read_handler = func;
|
|
if (!conn->read_handler)
|
|
aeDeleteFileEvent(server.el, conn->fd, AE_READABLE);
|
|
else if (aeCreateFileEvent(server.el, conn->fd, AE_READABLE, conn->type->ae_handler, conn) == AE_ERR)
|
|
return C_ERR;
|
|
return C_OK;
|
|
}
|
|
|
|
static const char *connSocketGetLastError(connection *conn) {
|
|
return strerror(conn->last_errno);
|
|
}
|
|
|
|
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) {
|
|
UNUSED(el);
|
|
UNUSED(fd);
|
|
connection *conn = clientData;
|
|
|
|
if (conn->state == CONN_STATE_CONNECTING && (mask & AE_WRITABLE) && conn->conn_handler) {
|
|
int conn_error = anetGetError(conn->fd);
|
|
if (conn_error) {
|
|
conn->last_errno = conn_error;
|
|
conn->state = CONN_STATE_ERROR;
|
|
} else {
|
|
conn->state = CONN_STATE_CONNECTED;
|
|
}
|
|
|
|
if (!conn->write_handler) aeDeleteFileEvent(server.el, conn->fd, AE_WRITABLE);
|
|
|
|
if (!callHandler(conn, conn->conn_handler)) return;
|
|
conn->conn_handler = NULL;
|
|
}
|
|
|
|
/* Normally we execute the readable event first, and the writable
|
|
* event later. This is useful as sometimes we may be able
|
|
* to serve the reply of a query immediately after processing the
|
|
* query.
|
|
*
|
|
* However if WRITE_BARRIER is set in the mask, our application is
|
|
* asking us to do the reverse: never fire the writable event
|
|
* after the readable. In such a case, we invert the calls.
|
|
* This is useful when, for instance, we want to do things
|
|
* in the beforeSleep() hook, like fsync'ing a file to disk,
|
|
* before replying to a client. */
|
|
int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
|
|
|
|
int call_write = (mask & AE_WRITABLE) && conn->write_handler;
|
|
int call_read = (mask & AE_READABLE) && conn->read_handler;
|
|
|
|
/* Handle normal I/O flows */
|
|
if (!invert && call_read) {
|
|
if (!callHandler(conn, conn->read_handler)) return;
|
|
}
|
|
/* Fire the writable event. */
|
|
if (call_write) {
|
|
if (!callHandler(conn, conn->write_handler)) return;
|
|
}
|
|
/* If we have to invert the call, fire the readable event now
|
|
* after the writable one. */
|
|
if (invert && call_read) {
|
|
if (!callHandler(conn, conn->read_handler)) return;
|
|
}
|
|
}
|
|
|
|
static void connSocketAcceptHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
|
|
int cport, cfd;
|
|
int max = server.max_new_conns_per_cycle;
|
|
struct ClientFlags flags = {0};
|
|
char cip[NET_IP_STR_LEN];
|
|
UNUSED(el);
|
|
UNUSED(mask);
|
|
UNUSED(privdata);
|
|
|
|
while (max--) {
|
|
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
|
|
if (cfd == ANET_ERR) {
|
|
if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "Accepting client connection: %s", server.neterr);
|
|
return;
|
|
}
|
|
serverLog(LL_VERBOSE, "Accepted %s:%d", cip, cport);
|
|
acceptCommonHandler(connCreateAcceptedSocket(cfd, NULL), flags, cip);
|
|
}
|
|
}
|
|
|
|
static int connSocketAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) {
|
|
if (anetFdToString(conn->fd, ip, ip_len, port, remote) == 0) return C_OK;
|
|
|
|
conn->last_errno = errno;
|
|
return C_ERR;
|
|
}
|
|
|
|
static int connSocketIsLocal(connection *conn) {
|
|
char cip[NET_IP_STR_LEN + 1] = {0};
|
|
|
|
if (connSocketAddr(conn, cip, sizeof(cip) - 1, NULL, 1) == C_ERR) return -1;
|
|
|
|
return !strncmp(cip, "127.", 4) || !strcmp(cip, "::1");
|
|
}
|
|
|
|
static int connSocketListen(connListener *listener) {
|
|
return listenToPort(listener);
|
|
}
|
|
|
|
static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
|
|
int fd = anetTcpNonBlockConnect(NULL, addr, port);
|
|
if (fd == -1) {
|
|
conn->state = CONN_STATE_ERROR;
|
|
conn->last_errno = errno;
|
|
return C_ERR;
|
|
}
|
|
|
|
if ((aeWait(fd, AE_WRITABLE, timeout) & AE_WRITABLE) == 0) {
|
|
conn->state = CONN_STATE_ERROR;
|
|
conn->last_errno = ETIMEDOUT;
|
|
return C_ERR;
|
|
}
|
|
|
|
conn->fd = fd;
|
|
conn->state = CONN_STATE_CONNECTED;
|
|
return C_OK;
|
|
}
|
|
|
|
/* Connection-based versions of syncio.c functions.
|
|
* NOTE: This should ideally be refactored out in favor of pure async work.
|
|
*/
|
|
|
|
static ssize_t connSocketSyncWrite(connection *conn, char *ptr, ssize_t size, long long timeout) {
|
|
return syncWrite(conn->fd, ptr, size, timeout);
|
|
}
|
|
|
|
static ssize_t connSocketSyncRead(connection *conn, char *ptr, ssize_t size, long long timeout) {
|
|
return syncRead(conn->fd, ptr, size, timeout);
|
|
}
|
|
|
|
static ssize_t connSocketSyncReadLine(connection *conn, char *ptr, ssize_t size, long long timeout) {
|
|
return syncReadLine(conn->fd, ptr, size, timeout);
|
|
}
|
|
|
|
static const char *connSocketGetType(connection *conn) {
|
|
(void)conn;
|
|
|
|
return CONN_TYPE_SOCKET;
|
|
}
|
|
|
|
static ConnectionType CT_Socket = {
|
|
/* connection type */
|
|
.get_type = connSocketGetType,
|
|
|
|
/* connection type initialize & finalize & configure */
|
|
.init = NULL,
|
|
.cleanup = NULL,
|
|
.configure = NULL,
|
|
|
|
/* ae & accept & listen & error & address handler */
|
|
.ae_handler = connSocketEventHandler,
|
|
.accept_handler = connSocketAcceptHandler,
|
|
.addr = connSocketAddr,
|
|
.is_local = connSocketIsLocal,
|
|
.listen = connSocketListen,
|
|
|
|
/* create/shutdown/close connection */
|
|
.conn_create = connCreateSocket,
|
|
.conn_create_accepted = connCreateAcceptedSocket,
|
|
.shutdown = connSocketShutdown,
|
|
.close = connSocketClose,
|
|
|
|
/* connect & accept */
|
|
.connect = connSocketConnect,
|
|
.blocking_connect = connSocketBlockingConnect,
|
|
.accept = connSocketAccept,
|
|
|
|
/* IO */
|
|
.write = connSocketWrite,
|
|
.writev = connSocketWritev,
|
|
.read = connSocketRead,
|
|
.set_write_handler = connSocketSetWriteHandler,
|
|
.set_read_handler = connSocketSetReadHandler,
|
|
.get_last_error = connSocketGetLastError,
|
|
.sync_write = connSocketSyncWrite,
|
|
.sync_read = connSocketSyncRead,
|
|
.sync_readline = connSocketSyncReadLine,
|
|
|
|
/* pending data */
|
|
.has_pending_data = NULL,
|
|
.process_pending_data = NULL,
|
|
.postpone_update_state = NULL,
|
|
.update_state = NULL,
|
|
};
|
|
|
|
int connBlock(connection *conn) {
|
|
if (conn->fd == -1) return C_ERR;
|
|
return anetBlock(NULL, conn->fd);
|
|
}
|
|
|
|
int connNonBlock(connection *conn) {
|
|
if (conn->fd == -1) return C_ERR;
|
|
return anetNonBlock(NULL, conn->fd);
|
|
}
|
|
|
|
int connEnableTcpNoDelay(connection *conn) {
|
|
if (conn->fd == -1) return C_ERR;
|
|
return anetEnableTcpNoDelay(NULL, conn->fd);
|
|
}
|
|
|
|
int connDisableTcpNoDelay(connection *conn) {
|
|
if (conn->fd == -1) return C_ERR;
|
|
return anetDisableTcpNoDelay(NULL, conn->fd);
|
|
}
|
|
|
|
int connKeepAlive(connection *conn, int interval) {
|
|
if (conn->fd == -1) return C_ERR;
|
|
return anetKeepAlive(NULL, conn->fd, interval);
|
|
}
|
|
|
|
int connSendTimeout(connection *conn, long long ms) {
|
|
return anetSendTimeout(NULL, conn->fd, ms);
|
|
}
|
|
|
|
int connRecvTimeout(connection *conn, long long ms) {
|
|
return anetRecvTimeout(NULL, conn->fd, ms);
|
|
}
|
|
|
|
int RedisRegisterConnectionTypeSocket(void) {
|
|
return connTypeRegister(&CT_Socket);
|
|
}
|