diff --git a/src/rdma.c b/src/rdma.c index f95b5b122..b7b73fa18 100644 --- a/src/rdma.c +++ b/src/rdma.c @@ -74,6 +74,7 @@ typedef enum ValkeyRdmaOpcode { #define VALKEY_RDMA_MAX_RX_SIZE (16 * 1024 * 1024) #define VALKEY_RDMA_SYNCIO_RES 10 #define VALKEY_RDMA_INVALID_OPCODE 0xffff +#define VALKEY_RDMA_KEEPALIVE_MS 3000 typedef struct rdma_connection { connection c; @@ -94,6 +95,7 @@ typedef struct RdmaContext { connection *conn; char *ip; int port; + long long keepalive_te; /* RDMA has no transport layer keepalive */ struct ibv_pd *pd; struct rdma_event_channel *cm_channel; struct ibv_comp_channel *comp_channel; @@ -405,7 +407,7 @@ static int rdmaSendCommand(RdmaContext *ctx, struct rdma_cm_id *cm_id, ValkeyRdm } static int connRdmaRegisterRx(RdmaContext *ctx, struct rdma_cm_id *cm_id) { - ValkeyRdmaCmd cmd; + ValkeyRdmaCmd cmd = {0}; cmd.memory.opcode = htons(RegisterXferMemory); cmd.memory.addr = htonu64((uint64_t)ctx->rx.addr); @@ -419,7 +421,7 @@ static int connRdmaRegisterRx(RdmaContext *ctx, struct rdma_cm_id *cm_id) { } static int connRdmaGetFeature(RdmaContext *ctx, struct rdma_cm_id *cm_id, ValkeyRdmaCmd *cmd) { - ValkeyRdmaCmd _cmd; + ValkeyRdmaCmd _cmd = {0}; _cmd.feature.opcode = htons(GetServerFeature); _cmd.feature.select = cmd->feature.select; @@ -447,12 +449,13 @@ static int rdmaHandleEstablished(struct rdma_cm_event *ev) { return C_OK; } -static int rdmaHandleDisconnect(struct rdma_cm_event *ev) { +static int rdmaHandleDisconnect(aeEventLoop *el, struct rdma_cm_event *ev) { struct rdma_cm_id *cm_id = ev->id; RdmaContext *ctx = cm_id->context; connection *conn = ctx->conn; rdma_connection *rdma_conn = (rdma_connection *)conn; + aeDeleteTimeEvent(el, ctx->keepalive_te); conn->state = CONN_STATE_CLOSED; /* we can't close connection now, let's mark this connection as closed state */ @@ -669,7 +672,27 @@ static void connRdmaEventHandler(struct aeEventLoop *el, int fd, void *clientDat } } -static int rdmaHandleConnect(char *err, struct rdma_cm_event *ev, char *ip, size_t ip_len, int *port) { +static int rdmaKeepaliveTimeProc(struct aeEventLoop *el, long long id, void *clientData) { + struct rdma_cm_id *cm_id = clientData; + RdmaContext *ctx = cm_id->context; + connection *conn = ctx->conn; + ValkeyRdmaCmd cmd = {0}; + + UNUSED(el); + UNUSED(id); + if (conn->state != CONN_STATE_CONNECTED) { + return AE_NOMORE; + } + + cmd.keepalive.opcode = htons(Keepalive); + if (rdmaSendCommand(ctx, cm_id, &cmd) != C_OK) { + return AE_NOMORE; + } + + return VALKEY_RDMA_KEEPALIVE_MS; +} + +static int rdmaHandleConnect(aeEventLoop *el, char *err, struct rdma_cm_event *ev, char *ip, size_t ip_len, int *port) { int ret = C_OK; struct rdma_cm_id *cm_id = ev->id; struct sockaddr_storage caddr; @@ -694,6 +717,11 @@ static int rdmaHandleConnect(char *err, struct rdma_cm_event *ev, char *ip, size ctx = zcalloc(sizeof(RdmaContext)); ctx->ip = zstrdup(ip); ctx->port = *port; + ctx->keepalive_te = aeCreateTimeEvent(el, VALKEY_RDMA_KEEPALIVE_MS, rdmaKeepaliveTimeProc, cm_id, NULL); + if (ctx->keepalive_te == AE_ERR) { + return C_ERR; + } + cm_id->context = ctx; if (rdmaCreateResource(ctx, cm_id) == C_ERR) { goto reject; @@ -732,7 +760,8 @@ static rdma_listener *rdmaFdToListener(connListener *listener, int fd) { * 1, handle RDMA_CM_EVENT_CONNECT_REQUEST and return CM fd on success * 2, handle RDMA_CM_EVENT_ESTABLISHED and return C_OK on success */ -static int rdmaAccept(connListener *listener, char *err, int fd, char *ip, size_t ip_len, int *port, void **priv) { +static int +rdmaAccept(aeEventLoop *el, connListener *listener, char *err, int fd, char *ip, size_t ip_len, int *port, void **priv) { struct rdma_cm_event *ev; enum rdma_cm_event_type ev_type; int ret = C_OK; @@ -755,7 +784,7 @@ static int rdmaAccept(connListener *listener, char *err, int fd, char *ip, size_ ev_type = ev->event; switch (ev_type) { case RDMA_CM_EVENT_CONNECT_REQUEST: - ret = rdmaHandleConnect(err, ev, ip, ip_len, port); + ret = rdmaHandleConnect(el, err, ev, ip, ip_len, port); if (ret == C_OK) { RdmaContext *ctx = (RdmaContext *)ev->id->context; *priv = ev->id; @@ -773,7 +802,7 @@ static int rdmaAccept(connListener *listener, char *err, int fd, char *ip, size_ case RDMA_CM_EVENT_ADDR_CHANGE: case RDMA_CM_EVENT_DISCONNECTED: case RDMA_CM_EVENT_TIMEWAIT_EXIT: - rdmaHandleDisconnect(ev); + rdmaHandleDisconnect(el, ev); ret = C_OK; break; @@ -804,7 +833,7 @@ static void connRdmaAcceptHandler(aeEventLoop *el, int fd, void *privdata, int m UNUSED(mask); while (max--) { - cfd = rdmaAccept(listener, server.neterr, fd, cip, sizeof(cip), &cport, &connpriv); + cfd = rdmaAccept(el, listener, server.neterr, fd, cip, sizeof(cip), &cport, &connpriv); if (cfd == ANET_ERR) { if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "RDMA Accepting client connection: %s", server.neterr); return; @@ -951,7 +980,7 @@ static void rdmaCMeventHandler(struct aeEventLoop *el, int fd, void *clientData, case RDMA_CM_EVENT_TIMEWAIT_EXIT: case RDMA_CM_EVENT_CONNECT_REQUEST: case RDMA_CM_EVENT_ADDR_CHANGE: - case RDMA_CM_EVENT_DISCONNECTED: rdmaHandleDisconnect(ev); break; + case RDMA_CM_EVENT_DISCONNECTED: rdmaHandleDisconnect(el, ev); break; case RDMA_CM_EVENT_MULTICAST_JOIN: case RDMA_CM_EVENT_MULTICAST_ERROR: