zhenwei pi dd4bd5065b
Introduce Valkey Over RDMA transport (experimental) ()
Adds an option to build RDMA support as a module:

    make BUILD_RDMA=module

To start valkey-server with RDMA, use a command line like the following:

    ./src/valkey-server --loadmodule src/valkey-rdma.so \
        port=6379 bind=xx.xx.xx.xx

* Implement server side of connection module only, this means we can
*NOT*
  compile RDMA support as built-in.

* Add necessary information in README.md

* Support 'CONFIG SET/GET', for example, 'CONFIG Set rdma.port 6380',
then
  check this by 'rdma res show cm_id' and valkey-cli (with RDMA support,
  but not implemented in this patch).

* The full listeners show like:

      listener0:name=tcp,bind=*,bind=-::*,port=6379
      listener1:name=unix,bind=/var/run/valkey.sock
      listener2:name=rdma,bind=xx.xx.xx.xx,bind=yy.yy.yy.yy,port=6379
      listener3:name=tls,bind=*,bind=-::*,port=16379

Because the lack of RDMA support from TCL, use a simple C program to
test
Valkey Over RDMA (under tests/rdma/). This is a quite raw version with
basic
library dependence: libpthread, libibverbs, librdmacm. Run using the
script:

    ./runtest-rdma [ OPTIONS ]

To run RDMA in GitHub actions, a kernel module RXE for emulated soft
RDMA, needs
to be installed. The kernel module source code is fetched a repo
containing only
the RXE kernel driver from the Linux kernel, but stored in an separate
repo to
avoid cloning the whole Linux kernel repo.

----

Since 2021/06, I created a
[PR](https://github.com/redis/redis/pull/9161) for *Redis Over RDMA*
proposal. Then I did some work to [fully abstract connection and make
TLS dynamically loadable](https://github.com/redis/redis/pull/9320), a
new connection type could be built into Redis statically, or a separated
shared library(loaded by Redis on startup) since Redis 7.2.0.

Base on the new connection framework, I created a new
[PR](https://github.com/redis/redis/pull/11182), some
guys(@xiezhq-hermann @zhangyiming1201 @JSpewock @uvletter @FujiZ)
noticed, played and tested this PR. However, because of the lack of time
and knowledge from the maintainers, this PR has been pending about 2
years.

Related doc: [Introduce *Valkey Over RDMA*
specification](https://github.com/valkey-io/valkey-doc/pull/123). (same
as Redis, and this should be same)

Changes in this PR:
- implement *Valkey Over RDMA*. (compact the Valkey style)

Finally, if this feature is considered to merge, I volunteer to maintain
it.

---------

Signed-off-by: zhenwei pi <pizhenwei@bytedance.com>
2024-07-15 14:04:22 +02:00

1060 lines
28 KiB
C

/* ==========================================================================
* rdma-test.c - a simple test client for Valkey Over RDMA (Linux only)
* --------------------------------------------------------------------------
* Copyright (C) 2021-2024 zhenwei pi <pizhenwei@bytedance.com>
*
* This work is licensed under BSD 3-Clause, License 1 of the COPYING file in
* the top-level directory.
* ==========================================================================
*/
#ifndef __linux__ /* currently RDMA is only supported on Linux */
#error "BUILD ERROR: RDMA is only supported on Linux"
#else /* __linux__ */
#define _GNU_SOURCE
#include <arpa/inet.h>
#include <assert.h>
#include <endian.h>
#include <errno.h>
#include <fcntl.h>
#include <getopt.h>
#include <limits.h>
#include <netdb.h>
#include <poll.h>
#include <rdma/rdma_cma.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
typedef struct valkeyRdmaFeature {
/* defined as following Opcodes */
uint16_t opcode;
/* select features */
uint16_t select;
uint8_t rsvd[20];
/* feature bits */
uint64_t features;
} valkeyRdmaFeature;
typedef struct valkeyRdmaKeepalive {
/* defined as following Opcodes */
uint16_t opcode;
uint8_t rsvd[30];
} valkeyRdmaKeepalive;
typedef struct valkeyRdmaMemory {
/* defined as following Opcodes */
uint16_t opcode;
uint8_t rsvd[14];
/* address of a transfer buffer which is used to receive remote streaming data,
* aka 'RX buffer address'. The remote side should use this as 'TX buffer address' */
uint64_t addr;
/* length of the 'RX buffer' */
uint32_t length;
/* the RDMA remote key of 'RX buffer' */
uint32_t key;
} valkeyRdmaMemory;
typedef union valkeyRdmaCmd {
valkeyRdmaFeature feature;
valkeyRdmaKeepalive keepalive;
valkeyRdmaMemory memory;
} valkeyRdmaCmd;
typedef enum valkeyRdmaOpcode {
GetServerFeature = 0,
SetClientFeature = 1,
Keepalive = 2,
RegisterXferMemory = 3,
} valkeyRdmaOpcode;
#define MAX_THREADS 32
#define UNUSED(x) (void)(x)
#define MIN(a, b) (a) < (b) ? a : b
#define VALKEY_RDMA_MAX_WQE 1024
#define VALKEY_RDMA_DEFAULT_RX_LEN (1024*1024)
#define VALKEY_RDMA_INVALID_OPCODE 0xffff
typedef struct RdmaContext {
struct rdma_cm_id *cm_id;
struct rdma_event_channel *cm_channel;
struct ibv_comp_channel *comp_channel;
struct ibv_cq *cq;
struct ibv_pd *pd;
bool connected;
/* TX */
char *tx_addr;
uint32_t tx_length;
uint32_t tx_offset;
uint32_t tx_key;
char *send_buf;
uint32_t send_length;
uint32_t send_ops;
struct ibv_mr *send_mr;
/* RX */
uint32_t rx_offset;
char *recv_buf;
unsigned int recv_length;
unsigned int recv_offset;
struct ibv_mr *recv_mr;
/* CMD 0 ~ VALKEY_RDMA_MAX_WQE for recv buffer
* VALKEY_RDMA_MAX_WQE ~ 2 * VALKEY_RDMA_MAX_WQE -1 for send buffer */
valkeyRdmaCmd *cmd_buf;
struct ibv_mr *cmd_mr;
} RdmaContext;
static int valkeySetFdBlocking(int fd, int blocking) {
int flags;
flags = fcntl(fd, F_GETFL);
if (flags == -1) {
return -1;
}
if (blocking)
flags &= ~O_NONBLOCK;
else
flags |= O_NONBLOCK;
return fcntl(fd, F_SETFL, flags);
}
#define rdmaFatal(msg) \
do { \
fprintf(stderr, "%s:%d %s\n", __func__, __LINE__, msg); \
assert(0); \
} while (0)
static inline long valkeyNowMs(void) {
struct timeval tv;
if (gettimeofday(&tv, NULL) < 0)
return -1;
return tv.tv_sec * 1000 + tv.tv_usec / 1000;
}
static int rdmaPostRecv(RdmaContext *ctx, struct rdma_cm_id *cm_id, valkeyRdmaCmd *cmd) {
struct ibv_sge sge;
size_t length = sizeof(valkeyRdmaCmd);
struct ibv_recv_wr recv_wr, *bad_wr;
sge.addr = (uint64_t)cmd;
sge.length = length;
sge.lkey = ctx->cmd_mr->lkey;
recv_wr.wr_id = (uint64_t)cmd;
recv_wr.sg_list = &sge;
recv_wr.num_sge = 1;
recv_wr.next = NULL;
if (ibv_post_recv(cm_id->qp, &recv_wr, &bad_wr)) {
return -1;
}
return 0;
}
static void rdmaDestroyIoBuf(RdmaContext *ctx) {
if (ctx->recv_mr) {
ibv_dereg_mr(ctx->recv_mr);
ctx->recv_mr = NULL;
}
free(ctx->recv_buf);
ctx->recv_buf = NULL;
if (ctx->send_mr) {
ibv_dereg_mr(ctx->send_mr);
ctx->send_mr = NULL;
}
free(ctx->send_buf);
ctx->send_buf = NULL;
if (ctx->cmd_mr) {
ibv_dereg_mr(ctx->cmd_mr);
ctx->cmd_mr = NULL;
}
free(ctx->cmd_buf);
ctx->cmd_buf = NULL;
}
static int rdmaSetupIoBuf(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
int access = IBV_ACCESS_LOCAL_WRITE;
size_t length = sizeof(valkeyRdmaCmd) * VALKEY_RDMA_MAX_WQE * 2;
valkeyRdmaCmd *cmd;
int i;
/* setup CMD buf & MR */
ctx->cmd_buf = calloc(length, 1);
ctx->cmd_mr = ibv_reg_mr(ctx->pd, ctx->cmd_buf, length, access);
if (!ctx->cmd_mr) {
rdmaFatal("RDMA: reg recv mr failed");
goto destroy_iobuf;
}
for (i = 0; i < VALKEY_RDMA_MAX_WQE; i++) {
cmd = ctx->cmd_buf + i;
if (rdmaPostRecv(ctx, cm_id, cmd) == -1) {
rdmaFatal("RDMA: post recv failed");
goto destroy_iobuf;
}
}
for (i = VALKEY_RDMA_MAX_WQE; i < VALKEY_RDMA_MAX_WQE * 2; i++) {
cmd = ctx->cmd_buf + i;
cmd->keepalive.opcode = VALKEY_RDMA_INVALID_OPCODE;
}
/* setup recv buf & MR */
access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
length = VALKEY_RDMA_DEFAULT_RX_LEN;
ctx->recv_buf = calloc(length, 1);
ctx->recv_length = length;
ctx->recv_mr = ibv_reg_mr(ctx->pd, ctx->recv_buf, length, access);
if (!ctx->recv_mr) {
rdmaFatal("RDMA: reg send mr failed");
goto destroy_iobuf;
}
return 0;
destroy_iobuf:
rdmaDestroyIoBuf(ctx);
return -1;
}
static int rdmaAdjustSendbuf(RdmaContext *ctx, unsigned int length) {
int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
if (length == ctx->send_length) {
return 0;
}
/* try to free old MR & buffer */
if (ctx->send_length) {
ibv_dereg_mr(ctx->send_mr);
free(ctx->send_buf);
ctx->send_length = 0;
}
/* create a new buffer & MR */
ctx->send_buf = calloc(length, 1);
ctx->send_length = length;
ctx->send_mr = ibv_reg_mr(ctx->pd, ctx->send_buf, length, access);
if (!ctx->send_mr) {
rdmaFatal("RDMA: reg send buf mr failed");
free(ctx->send_buf);
ctx->send_buf = NULL;
ctx->send_length = 0;
return -1;
}
return 0;
}
static int rdmaSendCommand(RdmaContext *ctx, struct rdma_cm_id *cm_id, valkeyRdmaCmd *cmd) {
struct ibv_send_wr send_wr, *bad_wr;
struct ibv_sge sge;
valkeyRdmaCmd *_cmd;
int i;
int ret;
/* find an unused cmd buffer */
for (i = VALKEY_RDMA_MAX_WQE; i < 2 * VALKEY_RDMA_MAX_WQE; i++) {
_cmd = ctx->cmd_buf + i;
if (_cmd->keepalive.opcode == VALKEY_RDMA_INVALID_OPCODE) {
break;
}
}
assert(i < 2 * VALKEY_RDMA_MAX_WQE);
memcpy(_cmd, cmd, sizeof(valkeyRdmaCmd));
sge.addr = (uint64_t)_cmd;
sge.length = sizeof(valkeyRdmaCmd);
sge.lkey = ctx->cmd_mr->lkey;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
send_wr.wr_id = (uint64_t)_cmd;
send_wr.opcode = IBV_WR_SEND;
send_wr.send_flags = IBV_SEND_SIGNALED;
send_wr.next = NULL;
ret = ibv_post_send(cm_id->qp, &send_wr, &bad_wr);
if (ret) {
return -1;
}
return 0;
}
static int connRdmaRegisterRx(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
valkeyRdmaCmd cmd = { 0 };
cmd.memory.opcode = htons(RegisterXferMemory);
cmd.memory.addr = htobe64((uint64_t)ctx->recv_buf);
cmd.memory.length = htonl(ctx->recv_length);
cmd.memory.key = htonl(ctx->recv_mr->rkey);
ctx->rx_offset = 0;
ctx->recv_offset = 0;
return rdmaSendCommand(ctx, cm_id, &cmd);
}
static int connRdmaHandleRecv(RdmaContext *ctx, struct rdma_cm_id *cm_id, valkeyRdmaCmd *cmd, uint32_t byte_len) {
if (byte_len != sizeof(valkeyRdmaCmd)) {
rdmaFatal("RDMA: FATAL error, recv corrupted cmd");
return -1;
}
switch (ntohs(cmd->keepalive.opcode)) {
case RegisterXferMemory:
ctx->tx_addr = (char *)be64toh(cmd->memory.addr);
ctx->tx_length = ntohl(cmd->memory.length);
ctx->tx_key = ntohl(cmd->memory.key);
ctx->tx_offset = 0;
rdmaAdjustSendbuf(ctx, ctx->tx_length);
break;
case Keepalive:
break;
default:
rdmaFatal("RDMA: FATAL error, unknown cmd");
return -1;
}
return rdmaPostRecv(ctx, cm_id, cmd);
}
static int connRdmaHandleRecvImm(RdmaContext *ctx, struct rdma_cm_id *cm_id, valkeyRdmaCmd *cmd, uint32_t byte_len) {
assert(byte_len + ctx->rx_offset <= ctx->recv_length);
ctx->rx_offset += byte_len;
return rdmaPostRecv(ctx, cm_id, cmd);
}
static int connRdmaHandleSend(valkeyRdmaCmd *cmd) {
/* mark this cmd has already sent */
memset(cmd, 0x00, sizeof(*cmd));
cmd->keepalive.opcode = VALKEY_RDMA_INVALID_OPCODE;
return 0;
}
static int connRdmaHandleWrite(RdmaContext *ctx, uint32_t byte_len) {
UNUSED(ctx);
UNUSED(byte_len);
return 0;
}
static int connRdmaHandleCq(RdmaContext *ctx) {
struct rdma_cm_id *cm_id = ctx->cm_id;
struct ibv_cq *ev_cq = NULL;
void *ev_ctx = NULL;
struct ibv_wc wc = {0};
valkeyRdmaCmd *cmd;
int ret;
if (ibv_get_cq_event(ctx->comp_channel, &ev_cq, &ev_ctx) < 0) {
if (errno != EAGAIN) {
rdmaFatal("RDMA: get cq event failed");
return -1;
}
} else if (ibv_req_notify_cq(ev_cq, 0)) {
rdmaFatal("RDMA: notify cq failed");
return -1;
}
pollcq:
ret = ibv_poll_cq(ctx->cq, 1, &wc);
if (ret < 0) {
rdmaFatal("RDMA: poll cq failed");
return -1;
} else if (ret == 0) {
return 0;
}
ibv_ack_cq_events(ctx->cq, 1);
if (wc.status != IBV_WC_SUCCESS) {
rdmaFatal("RDMA: send/recv failed");
return -1;
}
switch (wc.opcode) {
case IBV_WC_RECV:
cmd = (valkeyRdmaCmd *)wc.wr_id;
if (connRdmaHandleRecv(ctx, cm_id, cmd, wc.byte_len) == -1) {
return -1;
}
break;
case IBV_WC_RECV_RDMA_WITH_IMM:
cmd = (valkeyRdmaCmd *)wc.wr_id;
if (connRdmaHandleRecvImm(ctx, cm_id, cmd, ntohl(wc.imm_data)) == -1) {
return -1;
}
break;
case IBV_WC_RDMA_WRITE:
if (connRdmaHandleWrite(ctx, wc.byte_len) == -1) {
return -1;
}
break;
case IBV_WC_SEND:
cmd = (valkeyRdmaCmd *)wc.wr_id;
if (connRdmaHandleSend(cmd) == -1) {
return -1;
}
break;
default:
rdmaFatal("RDMA: unexpected opcode");
return -1;
}
goto pollcq;
return 0;
}
static ssize_t valkeyRdmaRead(RdmaContext *ctx, char *buf, size_t data_len) {
struct rdma_cm_id *cm_id = ctx->cm_id;
struct pollfd pfd;
long timed = 1000;
long start = valkeyNowMs();
uint32_t toread, remained;
copy:
if (ctx->recv_offset < ctx->rx_offset) {
remained = ctx->rx_offset - ctx->recv_offset;
toread = MIN(remained, data_len);
memcpy(buf, ctx->recv_buf + ctx->recv_offset, toread);
ctx->recv_offset += toread;
if (ctx->recv_offset == ctx->recv_length) {
connRdmaRegisterRx(ctx, cm_id);
}
return toread;
}
pollcq:
/* try to poll a CQ firstly */
if (connRdmaHandleCq(ctx) == -1) {
return -1;
}
if (ctx->recv_offset < ctx->rx_offset) {
goto copy;
}
pfd.fd = ctx->comp_channel->fd;
pfd.events = POLLIN;
pfd.revents = 0;
if (poll(&pfd, 1, 1000) < 0) {
return -1;
}
if ((valkeyNowMs() - start) < timed) {
goto pollcq;
}
rdmaFatal("RDMA: read timeout");
return -1;
}
static ssize_t valkeyRdmaReadFull(RdmaContext *ctx, char *buf, size_t data_len) {
size_t inbytes = 0;
do {
inbytes += valkeyRdmaRead(ctx, buf + inbytes, data_len - inbytes);
} while (inbytes < data_len);
return data_len;
}
static size_t connRdmaSend(RdmaContext *ctx, struct rdma_cm_id *cm_id, const void *data, size_t data_len) {
struct ibv_send_wr send_wr, *bad_wr;
struct ibv_sge sge;
uint32_t off = ctx->tx_offset;
char *addr = ctx->send_buf + off;
char *remote_addr = ctx->tx_addr + off;
int ret;
assert(data_len <= ctx->tx_length);
memcpy(addr, data, data_len);
sge.addr = (uint64_t)addr;
sge.lkey = ctx->send_mr->lkey;
sge.length = data_len;
send_wr.sg_list = &sge;
send_wr.num_sge = 1;
send_wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
send_wr.send_flags = (++ctx->send_ops % VALKEY_RDMA_MAX_WQE) ? 0 : IBV_SEND_SIGNALED;
send_wr.imm_data = htonl(data_len);
send_wr.wr.rdma.remote_addr = (uint64_t)remote_addr;
send_wr.wr.rdma.rkey = ctx->tx_key;
send_wr.next = NULL;
ret = ibv_post_send(cm_id->qp, &send_wr, &bad_wr);
if (ret) {
return -1;
}
ctx->tx_offset += data_len;
return data_len;
}
static ssize_t valkeyRdmaWrite(RdmaContext *ctx, char *buf, size_t data_len) {
struct rdma_cm_id *cm_id = ctx->cm_id;
struct pollfd pfd;
long timed = 1000;
long start = valkeyNowMs();
uint32_t towrite, wrote = 0;
size_t ret;
/* try to pollcq to */
goto pollcq;
waitcq:
pfd.fd = ctx->comp_channel->fd;
pfd.events = POLLIN;
pfd.revents = 0;
if (poll(&pfd, 1, 1) < 0) {
return -1;
}
pollcq:
if (connRdmaHandleCq(ctx) == -1) {
return -1;
}
assert(ctx->tx_offset <= ctx->tx_length);
if (ctx->tx_offset == ctx->tx_length) {
/* wait a new TX buffer */
goto waitcq;
}
towrite = MIN(ctx->tx_length - ctx->tx_offset, data_len - wrote);
ret = connRdmaSend(ctx, cm_id, buf + wrote, towrite);
if (ret == (size_t)-1) {
return -1;
}
wrote += ret;
if (wrote == data_len) {
return data_len;
}
if ((valkeyNowMs() - start) < timed) {
goto waitcq;
}
rdmaFatal("RDMA: write timeout");
return -1;
}
static void valkeyRdmaClose(RdmaContext *ctx) {
struct rdma_cm_id *cm_id = ctx->cm_id;
connRdmaHandleCq(ctx);
rdma_disconnect(cm_id);
ibv_destroy_cq(ctx->cq);
rdmaDestroyIoBuf(ctx);
ibv_destroy_qp(cm_id->qp);
ibv_destroy_comp_channel(ctx->comp_channel);
ibv_dealloc_pd(ctx->pd);
rdma_destroy_id(cm_id);
rdma_destroy_event_channel(ctx->cm_channel);
}
static void valkeyRdmaFree(void *privctx) {
if (!privctx)
return;
free(privctx);
}
static int valkeyRdmaConnect(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
struct ibv_comp_channel *comp_channel = NULL;
struct ibv_cq *cq = NULL;
struct ibv_pd *pd = NULL;
struct ibv_qp_init_attr init_attr = {0};
struct rdma_conn_param conn_param = {0};
pd = ibv_alloc_pd(cm_id->verbs);
if (!pd) {
rdmaFatal("RDMA: alloc pd failed");
goto error;
}
comp_channel = ibv_create_comp_channel(cm_id->verbs);
if (!comp_channel) {
rdmaFatal("RDMA: alloc pd failed");
goto error;
}
if (valkeySetFdBlocking(comp_channel->fd, 0) != 0) {
rdmaFatal("RDMA: set recv comp channel fd non-block failed");
goto error;
}
cq = ibv_create_cq(cm_id->verbs, VALKEY_RDMA_MAX_WQE * 2, ctx, comp_channel, 0);
if (!cq) {
rdmaFatal("RDMA: create send cq failed");
goto error;
}
if (ibv_req_notify_cq(cq, 0)) {
rdmaFatal("RDMA: notify send cq failed");
goto error;
}
/* create qp with attr */
init_attr.cap.max_send_wr = VALKEY_RDMA_MAX_WQE;
init_attr.cap.max_recv_wr = VALKEY_RDMA_MAX_WQE;
init_attr.cap.max_send_sge = 1;
init_attr.cap.max_recv_sge = 1;
init_attr.qp_type = IBV_QPT_RC;
init_attr.send_cq = cq;
init_attr.recv_cq = cq;
if (rdma_create_qp(cm_id, pd, &init_attr)) {
rdmaFatal("RDMA: create qp failed");
goto error;
}
ctx->cm_id = cm_id;
ctx->comp_channel = comp_channel;
ctx->cq = cq;
ctx->pd = pd;
if (rdmaSetupIoBuf(ctx, cm_id) != 0)
goto free_qp;
/* rdma connect with param */
conn_param.responder_resources = 1;
conn_param.initiator_depth = 1;
conn_param.retry_count = 7;
conn_param.rnr_retry_count = 7;
if (rdma_connect(cm_id, &conn_param)) {
rdmaFatal("RDMA: connect failed");
goto destroy_iobuf;
}
return 0;
destroy_iobuf:
rdmaDestroyIoBuf(ctx);
free_qp:
ibv_destroy_qp(cm_id->qp);
error:
if (cq)
ibv_destroy_cq(cq);
if (pd)
ibv_dealloc_pd(pd);
if (comp_channel)
ibv_destroy_comp_channel(comp_channel);
return -1;
}
static int valkeyRdmaEstablished(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
/* it's time to tell redis we have already connected */
ctx->connected = true;
return connRdmaRegisterRx(ctx, cm_id);
}
static int valkeyRdmaCM(RdmaContext *ctx, int timeout) {
struct rdma_cm_event *event;
char errorstr[128];
int ret = -1;
while (rdma_get_cm_event(ctx->cm_channel, &event) == 0) {
/* printf("GET RDMA CM EVENT: %s\n", rdma_event_str(event->event)); */
switch (event->event) {
case RDMA_CM_EVENT_ADDR_RESOLVED:
if (timeout < 0 || timeout > 100)
timeout = 100; /* at most 100ms to resolve route */
ret = rdma_resolve_route(event->id, timeout);
if (ret) {
rdmaFatal("RDMA: route resolve failed");
}
break;
case RDMA_CM_EVENT_ROUTE_RESOLVED:
ret = valkeyRdmaConnect(ctx, event->id);
break;
case RDMA_CM_EVENT_ESTABLISHED:
ret = valkeyRdmaEstablished(ctx, event->id);
break;
case RDMA_CM_EVENT_TIMEWAIT_EXIT:
ret = -1;
rdmaFatal("RDMA: connect timeout");
break;
case RDMA_CM_EVENT_ADDR_ERROR:
case RDMA_CM_EVENT_ROUTE_ERROR:
case RDMA_CM_EVENT_CONNECT_ERROR:
case RDMA_CM_EVENT_UNREACHABLE:
case RDMA_CM_EVENT_REJECTED:
case RDMA_CM_EVENT_DISCONNECTED:
case RDMA_CM_EVENT_ADDR_CHANGE:
default:
snprintf(errorstr, sizeof(errorstr), "RDMA: connect failed - %s", rdma_event_str(event->event));
rdmaFatal(errorstr);
ret = -1;
break;
}
rdma_ack_cm_event(event);
}
return ret;
}
static int valkeyRdmaWaitConn(RdmaContext *ctx, long timeout) {
int timed;
struct pollfd pfd;
long now = valkeyNowMs();
long start = now;
while (now - start < timeout) {
timed = (int)(timeout - (now - start));
pfd.fd = ctx->cm_channel->fd;
pfd.events = POLLIN;
pfd.revents = 0;
if (poll(&pfd, 1, timed) < 0) {
return -1;
}
if (valkeyRdmaCM(ctx, timed) == -1) {
return -1;
}
if (ctx->connected) {
return 0;
}
now = valkeyNowMs();
}
return -1;
}
static RdmaContext *valkeyContextConnectRdma(const char *addr, int port, int timeout) {
int ret;
char _port[6]; /* strlen("65535"); */
struct addrinfo hints, *servinfo = NULL, *p;
RdmaContext *ctx = NULL;
struct sockaddr_storage saddr;
long start = valkeyNowMs(), timed;
snprintf(_port, 6, "%d", port);
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
if ((ret = getaddrinfo(addr, _port, &hints, &servinfo)) != 0) {
hints.ai_family = AF_INET6;
if ((ret = getaddrinfo(addr, _port, &hints, &servinfo)) != 0) {
rdmaFatal(gai_strerror(ret));
return NULL;
}
}
ctx = calloc(sizeof(RdmaContext), 1);
if (!ctx) {
rdmaFatal("Out of memory");
goto free_rdma;
}
ctx->cm_channel = rdma_create_event_channel();
if (!ctx->cm_channel) {
rdmaFatal("RDMA: create event channel failed");
goto free_rdma;
}
if (rdma_create_id(ctx->cm_channel, &ctx->cm_id, (void *)ctx, RDMA_PS_TCP)) {
rdmaFatal("RDMA: create id failed");
goto free_rdma;
}
if ((valkeySetFdBlocking(ctx->cm_channel->fd, 0) != 0)) {
rdmaFatal("RDMA: set cm channel fd non-block failed");
goto free_rdma;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
if (p->ai_family == PF_INET) {
memcpy(&saddr, p->ai_addr, sizeof(struct sockaddr_in));
((struct sockaddr_in *)&saddr)->sin_port = htons(port);
} else if (p->ai_family == PF_INET6) {
memcpy(&saddr, p->ai_addr, sizeof(struct sockaddr_in6));
((struct sockaddr_in6 *)&saddr)->sin6_port = htons(port);
} else {
rdmaFatal("RDMA: unsupported family");
goto free_rdma;
}
/* resolve addr as most 100ms */
if (rdma_resolve_addr(ctx->cm_id, NULL, (struct sockaddr *)&saddr, 100)) {
continue;
}
timed = timeout - (valkeyNowMs() - start);
if ((valkeyRdmaWaitConn(ctx, timed) == 0) && ctx->connected) {
ret = 0;
goto end;
}
}
if ((!ctx->connected) && (p == NULL)) {
rdmaFatal("RDMA: resolve failed");
}
free_rdma:
if (ctx->cm_id) {
rdma_destroy_id(ctx->cm_id);
}
if (ctx->cm_channel) {
rdma_destroy_event_channel(ctx->cm_channel);
}
if (ctx) {
free(ctx);
}
end:
if(servinfo) {
freeaddrinfo(servinfo);
}
return ctx;
}
static int port = 6379;
static char *host = NULL;
static int minkeys = 128;
static int maxkeys = 8192;
static int keysize = 1024 + 1; /* for '\0' terminator */
struct test_kv_pair {
char key[32]; /* "THREAD01-000001" */
char *value;
};
static void *test_routine(void *arg) {
pid_t tid = gettid();
RdmaContext *ctx;
struct test_kv_pair *kv_pairs = NULL, *kv_pair;
int keys;
ctx = valkeyContextConnectRdma(host, port, 1000);
if (!ctx) {
rdmaFatal("RDMA connect failed");
}
int bufsize = keysize + 128;
char *inbuf = malloc(bufsize);
char *outbuf = malloc(bufsize);
int inbytes, outbytes;
/* # round 1, test PING */
char *pingcmd = "*1\r\n$4\r\nPING\r\n";
char *pingresp = "+PONG\r\n";
valkeyRdmaWrite(ctx, pingcmd, strlen(pingcmd));
inbytes = valkeyRdmaReadFull(ctx, inbuf, strlen(pingresp));
assert(!strncmp(pingresp, inbuf, inbytes));
printf("Valkey Over RDMA test thread[%d] PING/PONG [OK]\n", tid);
/* prepare random KV for SET/GET */
keys = random() % (maxkeys - minkeys) + minkeys;
kv_pairs = calloc(sizeof(struct test_kv_pair), keys);
for (int i = 0; i < keys; i++) {
kv_pair = &kv_pairs[i];
snprintf(kv_pair->key, sizeof(kv_pair->key) - 1, "THREAD%02d-%06d", tid, i);
kv_pair->value = calloc(keysize, 1);
for (int k = 0; k < keysize - 1; k++) {
kv_pair->value[k] = 'A' + random() % 26; /* generate upper case string */
}
}
printf("Valkey Over RDMA test thread[%d] prepare %d KVs [OK]\n", tid, keys);
/* # round 2, test SET */
char *okresp = "+OK\r\n";
for (int i = 0; i < keys; i++) {
kv_pair = &kv_pairs[i];
/* build SET command */
outbytes = sprintf(outbuf, "*3\r\n$3\r\nSET\r\n$%ld\r\n%s\r\n$%ld\r\n%s\r\n",
strlen(kv_pair->key), kv_pair->key,
strlen(kv_pair->value), kv_pair->value);
valkeyRdmaWrite(ctx, outbuf, outbytes);
inbytes = valkeyRdmaReadFull(ctx, inbuf, strlen(okresp));
assert(!strncmp("+OK\r\n", inbuf, inbytes));
}
printf("Valkey Over RDMA test thread[%d] SET %d KVs [OK]\n", tid, keys);
/* # round 3, test BGSAVE, to avoid "-ERR Background save already", run BGSAVE only once */
char *bgsavecmd = "*1\r\n$6\r\nBGSAVE\r\n";
char *bgsaveresp = "+Background saving started\r\n";
static int bgsaved;
if (!__atomic_fetch_add(&bgsaved, 1, __ATOMIC_SEQ_CST)) {
valkeyRdmaWrite(ctx, bgsavecmd, strlen(bgsavecmd));
inbytes = valkeyRdmaReadFull(ctx, inbuf, strlen(bgsaveresp));
assert(!strncmp(bgsaveresp, inbuf, inbytes));
printf("Valkey Over RDMA test thread[%d] BGSAVE [OK]\n", tid);
}
/* # round 4, test GET. also verify all the value already set */
char *getrespprex = "$1024\r\n";
int getrespprexlen = strlen(getrespprex);
for (int i = 0; i < keys; i++) {
kv_pair = &kv_pairs[i];
/* build GET command */
outbytes = sprintf(outbuf, "*2\r\n$3\r\nGET\r\n$%ld\r\n%s\r\n",
strlen(kv_pair->key), kv_pair->key);
valkeyRdmaWrite(ctx, outbuf, outbytes);
inbytes = valkeyRdmaReadFull(ctx, inbuf, getrespprexlen + strlen(kv_pair->value) + 2);
assert(!strncmp(getrespprex, inbuf, getrespprexlen));
assert(!strncmp(kv_pair->value, inbuf + getrespprexlen, strlen(kv_pair->value)));
}
printf("Valkey Over RDMA test thread[%d] GET %d KVs [OK]\n", tid, keys);
return NULL;
}
void usage(char *proc) {
printf("%s usage:\n", proc);
printf("\t--help/-H\n");
printf("\t--host/-h HOSTADDR\n");
printf("\t--port/-p PORT\n");
printf("\t--maxkeys/-M MAXKEYS\n");
printf("\t--minkeys/-M MINKEYS\n");
printf("\t--thread/-t THREADS\n");
}
int main(int argc, char *argv[])
{
int c, args;
int nr_threads = 0;
pthread_t threads[MAX_THREADS];
static struct option long_opts[] = {
{ "help", no_argument, NULL, 'H' },
{ "host", required_argument, NULL, 'h' },
{ "port", required_argument, NULL, 'p' },
{ "maxkeys", required_argument, NULL, 'M' },
{ "minkeys", required_argument, NULL, 'm' },
{ "thread", required_argument, NULL, 't' },
};
static char *short_opts = "Hh:p:t:M:m:";
while (1) {
c = getopt_long(argc, argv, short_opts, long_opts, &args);
if (c == -1) {
break;
}
switch (c) {
case 'h':
host = optarg;
break;
case 'p':
port = atoi(optarg);
if (port <= 0 || port > 65535) {
rdmaFatal("invalid port");
}
break;
case 't':
nr_threads = atoi(optarg);
if (nr_threads < 0 || nr_threads > MAX_THREADS) {
rdmaFatal("--threads/-t is expected as [0, 32]");
}
break;
case 'M':
maxkeys = atoi(optarg);
break;
case 'm':
minkeys = atoi(optarg);
break;
case 'H':
usage(argv[0]);
exit(0);
default:
usage(argv[0]);
exit(-1); /* this is not considered as success, to avoid auto-test workaround */
}
}
if (!host) {
rdmaFatal("missing --host/-H");
}
if (minkeys > maxkeys) {
rdmaFatal("minkeys should less than maxkeys");
}
/* To make the test randomly */
srandom(time(NULL) ^ getpid());
/* main thread mode */
if (!nr_threads) {
printf("Test a single client in main thread ...\n");
test_routine(NULL);
return 0;
}
/* multi threads mode */
for (int i = 0; i < nr_threads; i++) {
assert(!pthread_create(&threads[i], NULL, test_routine, NULL));
}
for (int i = 0; i < nr_threads; i++) {
pthread_join(threads[i], NULL);
}
printf("Valkey Over RDMA test [OK]\n");
return 0;
}
#endif /* __linux__ */