RDMA: Support user keepalive command (#916)
If the client side crashes by any issue or exits normally, the kernel will try to disconnect RDMA QPs. Then the kernel of server side receives CM packets, valkey-server handles CM disconnected event and close connection. However, there is a lack of keepalive mechanism from RDMA transport layer. Once the kernel of client side crashes, the server side will not be notified. To avoid this issue, valkey server sents Keepaliv command periodically to detect any dead QPs. An example of mlx-cx5: ``` # RDMA: CQ handle error status: transport retry counter exceeded[0xc], opcode : 0x0 # RDMA: CQ handle error status: transport retry counter exceeded[0xc], opcode : 0x0 # RDMA: CQ handle error status: Work Request Flushed Error[0x5], opcode : 0x0 # RDMA: CQ handle error status: Work Request Flushed Error[0x5], opcode : 0x0 # RDMA: CQ handle error status: Work Request Flushed Error[0x5], opcode : 0x0 # RDMA: CQ handle error status: Work Request Flushed Error[0x5], opcode : 0x0 ``` Signed-off-by: zhenwei pi <pizhenwei@bytedance.com>
This commit is contained in:
parent
e1b3629186
commit
2673320b66
47
src/rdma.c
47
src/rdma.c
@ -74,6 +74,7 @@ typedef enum ValkeyRdmaOpcode {
|
||||
#define VALKEY_RDMA_MAX_RX_SIZE (16 * 1024 * 1024)
|
||||
#define VALKEY_RDMA_SYNCIO_RES 10
|
||||
#define VALKEY_RDMA_INVALID_OPCODE 0xffff
|
||||
#define VALKEY_RDMA_KEEPALIVE_MS 3000
|
||||
|
||||
typedef struct rdma_connection {
|
||||
connection c;
|
||||
@ -94,6 +95,7 @@ typedef struct RdmaContext {
|
||||
connection *conn;
|
||||
char *ip;
|
||||
int port;
|
||||
long long keepalive_te; /* RDMA has no transport layer keepalive */
|
||||
struct ibv_pd *pd;
|
||||
struct rdma_event_channel *cm_channel;
|
||||
struct ibv_comp_channel *comp_channel;
|
||||
@ -405,7 +407,7 @@ static int rdmaSendCommand(RdmaContext *ctx, struct rdma_cm_id *cm_id, ValkeyRdm
|
||||
}
|
||||
|
||||
static int connRdmaRegisterRx(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
|
||||
ValkeyRdmaCmd cmd;
|
||||
ValkeyRdmaCmd cmd = {0};
|
||||
|
||||
cmd.memory.opcode = htons(RegisterXferMemory);
|
||||
cmd.memory.addr = htonu64((uint64_t)ctx->rx.addr);
|
||||
@ -419,7 +421,7 @@ static int connRdmaRegisterRx(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
|
||||
}
|
||||
|
||||
static int connRdmaGetFeature(RdmaContext *ctx, struct rdma_cm_id *cm_id, ValkeyRdmaCmd *cmd) {
|
||||
ValkeyRdmaCmd _cmd;
|
||||
ValkeyRdmaCmd _cmd = {0};
|
||||
|
||||
_cmd.feature.opcode = htons(GetServerFeature);
|
||||
_cmd.feature.select = cmd->feature.select;
|
||||
@ -447,12 +449,13 @@ static int rdmaHandleEstablished(struct rdma_cm_event *ev) {
|
||||
return C_OK;
|
||||
}
|
||||
|
||||
static int rdmaHandleDisconnect(struct rdma_cm_event *ev) {
|
||||
static int rdmaHandleDisconnect(aeEventLoop *el, struct rdma_cm_event *ev) {
|
||||
struct rdma_cm_id *cm_id = ev->id;
|
||||
RdmaContext *ctx = cm_id->context;
|
||||
connection *conn = ctx->conn;
|
||||
rdma_connection *rdma_conn = (rdma_connection *)conn;
|
||||
|
||||
aeDeleteTimeEvent(el, ctx->keepalive_te);
|
||||
conn->state = CONN_STATE_CLOSED;
|
||||
|
||||
/* we can't close connection now, let's mark this connection as closed state */
|
||||
@ -669,7 +672,27 @@ static void connRdmaEventHandler(struct aeEventLoop *el, int fd, void *clientDat
|
||||
}
|
||||
}
|
||||
|
||||
static int rdmaHandleConnect(char *err, struct rdma_cm_event *ev, char *ip, size_t ip_len, int *port) {
|
||||
static int rdmaKeepaliveTimeProc(struct aeEventLoop *el, long long id, void *clientData) {
|
||||
struct rdma_cm_id *cm_id = clientData;
|
||||
RdmaContext *ctx = cm_id->context;
|
||||
connection *conn = ctx->conn;
|
||||
ValkeyRdmaCmd cmd = {0};
|
||||
|
||||
UNUSED(el);
|
||||
UNUSED(id);
|
||||
if (conn->state != CONN_STATE_CONNECTED) {
|
||||
return AE_NOMORE;
|
||||
}
|
||||
|
||||
cmd.keepalive.opcode = htons(Keepalive);
|
||||
if (rdmaSendCommand(ctx, cm_id, &cmd) != C_OK) {
|
||||
return AE_NOMORE;
|
||||
}
|
||||
|
||||
return VALKEY_RDMA_KEEPALIVE_MS;
|
||||
}
|
||||
|
||||
static int rdmaHandleConnect(aeEventLoop *el, char *err, struct rdma_cm_event *ev, char *ip, size_t ip_len, int *port) {
|
||||
int ret = C_OK;
|
||||
struct rdma_cm_id *cm_id = ev->id;
|
||||
struct sockaddr_storage caddr;
|
||||
@ -694,6 +717,11 @@ static int rdmaHandleConnect(char *err, struct rdma_cm_event *ev, char *ip, size
|
||||
ctx = zcalloc(sizeof(RdmaContext));
|
||||
ctx->ip = zstrdup(ip);
|
||||
ctx->port = *port;
|
||||
ctx->keepalive_te = aeCreateTimeEvent(el, VALKEY_RDMA_KEEPALIVE_MS, rdmaKeepaliveTimeProc, cm_id, NULL);
|
||||
if (ctx->keepalive_te == AE_ERR) {
|
||||
return C_ERR;
|
||||
}
|
||||
|
||||
cm_id->context = ctx;
|
||||
if (rdmaCreateResource(ctx, cm_id) == C_ERR) {
|
||||
goto reject;
|
||||
@ -732,7 +760,8 @@ static rdma_listener *rdmaFdToListener(connListener *listener, int fd) {
|
||||
* 1, handle RDMA_CM_EVENT_CONNECT_REQUEST and return CM fd on success
|
||||
* 2, handle RDMA_CM_EVENT_ESTABLISHED and return C_OK on success
|
||||
*/
|
||||
static int rdmaAccept(connListener *listener, char *err, int fd, char *ip, size_t ip_len, int *port, void **priv) {
|
||||
static int
|
||||
rdmaAccept(aeEventLoop *el, connListener *listener, char *err, int fd, char *ip, size_t ip_len, int *port, void **priv) {
|
||||
struct rdma_cm_event *ev;
|
||||
enum rdma_cm_event_type ev_type;
|
||||
int ret = C_OK;
|
||||
@ -755,7 +784,7 @@ static int rdmaAccept(connListener *listener, char *err, int fd, char *ip, size_
|
||||
ev_type = ev->event;
|
||||
switch (ev_type) {
|
||||
case RDMA_CM_EVENT_CONNECT_REQUEST:
|
||||
ret = rdmaHandleConnect(err, ev, ip, ip_len, port);
|
||||
ret = rdmaHandleConnect(el, err, ev, ip, ip_len, port);
|
||||
if (ret == C_OK) {
|
||||
RdmaContext *ctx = (RdmaContext *)ev->id->context;
|
||||
*priv = ev->id;
|
||||
@ -773,7 +802,7 @@ static int rdmaAccept(connListener *listener, char *err, int fd, char *ip, size_
|
||||
case RDMA_CM_EVENT_ADDR_CHANGE:
|
||||
case RDMA_CM_EVENT_DISCONNECTED:
|
||||
case RDMA_CM_EVENT_TIMEWAIT_EXIT:
|
||||
rdmaHandleDisconnect(ev);
|
||||
rdmaHandleDisconnect(el, ev);
|
||||
ret = C_OK;
|
||||
break;
|
||||
|
||||
@ -804,7 +833,7 @@ static void connRdmaAcceptHandler(aeEventLoop *el, int fd, void *privdata, int m
|
||||
UNUSED(mask);
|
||||
|
||||
while (max--) {
|
||||
cfd = rdmaAccept(listener, server.neterr, fd, cip, sizeof(cip), &cport, &connpriv);
|
||||
cfd = rdmaAccept(el, listener, server.neterr, fd, cip, sizeof(cip), &cport, &connpriv);
|
||||
if (cfd == ANET_ERR) {
|
||||
if (errno != EWOULDBLOCK) serverLog(LL_WARNING, "RDMA Accepting client connection: %s", server.neterr);
|
||||
return;
|
||||
@ -951,7 +980,7 @@ static void rdmaCMeventHandler(struct aeEventLoop *el, int fd, void *clientData,
|
||||
case RDMA_CM_EVENT_TIMEWAIT_EXIT:
|
||||
case RDMA_CM_EVENT_CONNECT_REQUEST:
|
||||
case RDMA_CM_EVENT_ADDR_CHANGE:
|
||||
case RDMA_CM_EVENT_DISCONNECTED: rdmaHandleDisconnect(ev); break;
|
||||
case RDMA_CM_EVENT_DISCONNECTED: rdmaHandleDisconnect(el, ev); break;
|
||||
|
||||
case RDMA_CM_EVENT_MULTICAST_JOIN:
|
||||
case RDMA_CM_EVENT_MULTICAST_ERROR:
|
||||
|
Loading…
x
Reference in New Issue
Block a user