RDMA builtin support (#1209)

There are several patches in this PR:

* Abstract set/rewrite config bind option: `bind` option is a special
config, `socket` and `tls` are using the same one. However RDMA uses the
similar style but different one. Use a bit abstract work to make it
flexible for both `socket` and `RDMA`. (Even for QUIC in the future.)
* Introduce closeListener for connection type: closing socket by a
simple syscall would be fine, RDMA has complex logic. Introduce
connection type specific close listener method.
* RDMA: Use valkey.conf style instead of module parameters: use
`--rdma-bind` and `--rdma-port` style instead of module parameters. The
module style config `rdma.bind` and `rdma.port` are removed.
* RDMA: Support builtin: support `make BUILD_RDMA=yes`. module style is
still kept for now.

Signed-off-by: zhenwei pi <pizhenwei@bytedance.com>
This commit is contained in:
zhenwei pi 2024-11-29 18:13:34 +08:00 committed by GitHub
parent fd58f8d058
commit 4695d118dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 317 additions and 245 deletions

View File

@ -37,8 +37,13 @@ To build TLS as Valkey module:
Note that sentinel mode does not support TLS module.
To build with experimental RDMA support you'll need RDMA development libraries
(e.g. librdmacm-dev and libibverbs-dev on Debian/Ubuntu). For now, Valkey only
supports RDMA as connection module mode. Run:
(e.g. librdmacm-dev and libibverbs-dev on Debian/Ubuntu).
To build RDMA support as Valkey built-in:
% make BUILD_RDMA=yes
To build RDMA as Valkey module:
% make BUILD_RDMA=module
@ -203,20 +208,27 @@ Note that Valkey Over RDMA is an experimental feature.
It may be changed or removed in any minor or major version.
Currently, it is only supported on Linux.
To manually run a Valkey server with RDMA mode:
* RDMA built-in mode:
```
./src/valkey-server --protected-mode no \
--rdma-bind 192.168.122.100 --rdma-port 6379
```
% ./src/valkey-server --protected-mode no \
--loadmodule src/valkey-rdma.so bind=192.168.122.100 port=6379
* RDMA module mode:
```
./src/valkey-server --protected-mode no \
--loadmodule src/valkey-rdma.so --rdma-bind 192.168.122.100 --rdma-port 6379
```
It's possible to change bind address/port of RDMA by runtime command:
192.168.122.100:6379> CONFIG SET rdma.port 6380
192.168.122.100:6379> CONFIG SET rdma-port 6380
It's also possible to have both RDMA and TCP available, and there is no
conflict of TCP(6379) and RDMA(6379), Ex:
% ./src/valkey-server --protected-mode no \
--loadmodule src/valkey-rdma.so bind=192.168.122.100 port=6379 \
--loadmodule src/valkey-rdma.so --rdma-bind 192.168.122.100 --rdma-port 6379 \
--port 6379
Note that the network card (192.168.122.100 of this example) should support

View File

@ -88,6 +88,7 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/tracking.c
${CMAKE_SOURCE_DIR}/src/socket.c
${CMAKE_SOURCE_DIR}/src/tls.c
${CMAKE_SOURCE_DIR}/src/rdma.c
${CMAKE_SOURCE_DIR}/src/sha256.c
${CMAKE_SOURCE_DIR}/src/timeout.c
${CMAKE_SOURCE_DIR}/src/setcpuaffinity.c

View File

@ -208,25 +208,30 @@ if (BUILD_RDMA)
# RDMA support (Linux only)
if (LINUX AND NOT APPLE)
valkey_parse_build_option(${BUILD_RDMA} USE_RDMA)
find_package(PkgConfig REQUIRED)
# Locate librdmacm & libibverbs, fail if we can't find them
valkey_pkg_config(librdmacm RDMACM_LIBS)
valkey_pkg_config(libibverbs IBVERBS_LIBS)
message(STATUS "${RDMACM_LIBS};${IBVERBS_LIBS}")
list(APPEND RDMA_LIBS "${RDMACM_LIBS};${IBVERBS_LIBS}")
if (USE_RDMA EQUAL 2) # Module
message(STATUS "Building RDMA as module")
add_valkey_server_compiler_options("-DUSE_RDMA=2")
# Locate librdmacm & libibverbs, fail if we can't find them
valkey_pkg_config(librdmacm RDMACM_LIBS)
valkey_pkg_config(libibverbs IBVERBS_LIBS)
list(APPEND RDMA_LIBS "${RDMACM_LIBS};${IBVERBS_LIBS}")
set(BUILD_RDMA_MODULE 1)
elseif (USE_RDMA EQUAL 1)
# RDMA can only be built as a module. So disable it
message(WARNING "BUILD_RDMA can be one of: [NO | 0 | MODULE], but '${BUILD_RDMA}' was provided")
message(STATUS "RDMA build is disabled")
set(USE_RDMA 0)
set(BUILD_RDMA_MODULE 2)
elseif (USE_RDMA EQUAL 1) # Builtin
message(STATUS "Building RDMA as builtin")
add_valkey_server_compiler_options("-DUSE_RDMA=1")
add_valkey_server_compiler_options("-DBUILD_RDMA_MODULE=0")
list(APPEND SERVER_LIBS "${RDMA_LIBS}")
endif ()
else ()
message(WARNING "RDMA is only supported on Linux platforms")
endif ()
else ()
# By default, RDMA is disabled
message(STATUS "RDMA is disabled")
set(USE_RDMA 0)
endif ()
set(BUILDING_ARM64 0)

View File

@ -55,7 +55,7 @@ if (BUILD_RDMA_MODULE)
set(MODULE_NAME "valkey-rdma")
message(STATUS "Building RDMA module")
add_library(${MODULE_NAME} SHARED "${VALKEY_RDMA_MODULE_SRCS}")
target_compile_options(${MODULE_NAME} PRIVATE -DBUILD_RDMA_MODULE -DUSE_RDMA=1)
target_compile_options(${MODULE_NAME} PRIVATE -DBUILD_RDMA_MODULE=2 -DUSE_RDMA=1)
target_link_libraries(${MODULE_NAME} "${RDMA_LIBS}")
# remove the "lib" prefix from the module
set_target_properties(${MODULE_NAME} PROPERTIES PREFIX "")

View File

@ -325,26 +325,26 @@ ifeq ($(BUILD_TLS),module)
TLS_MODULE_CFLAGS+=-DUSE_OPENSSL=$(BUILD_MODULE) $(OPENSSL_CFLAGS) -DBUILD_TLS_MODULE=$(BUILD_MODULE)
endif
BUILD_RDMA:=no
RDMA_MODULE=
RDMA_MODULE_NAME:=valkey-rdma$(PROG_SUFFIX).so
RDMA_MODULE_CFLAGS:=$(FINAL_CFLAGS)
ifeq ($(BUILD_RDMA),module)
FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE)
RDMA_PKGCONFIG := $(shell $(PKG_CONFIG) --exists librdmacm libibverbs && echo $$?)
RDMA_LIBS=
RDMA_PKGCONFIG := $(shell $(PKG_CONFIG) --exists librdmacm libibverbs && echo $$?)
ifeq ($(RDMA_PKGCONFIG),0)
RDMA_LIBS=$(shell $(PKG_CONFIG) --libs librdmacm libibverbs)
else
RDMA_LIBS=-lrdmacm -libverbs
endif
RDMA_MODULE=$(RDMA_MODULE_NAME)
RDMA_MODULE_CFLAGS+=-DUSE_RDMA=$(BUILD_YES) -DBUILD_RDMA_MODULE $(RDMA_LIBS)
else
ifeq ($(BUILD_RDMA),no)
# disable RDMA, do nothing
else
$(error "RDMA is only supported as module (BUILD_RDMA=module), or disabled (BUILD_RDMA=no)")
ifeq ($(BUILD_RDMA),yes)
FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_YES) -DBUILD_RDMA_MODULE=$(BUILD_NO)
FINAL_LIBS += $(RDMA_LIBS)
endif
RDMA_MODULE=
RDMA_MODULE_NAME:=valkey-rdma$(PROG_SUFFIX).so
RDMA_MODULE_CFLAGS:=$(FINAL_CFLAGS)
ifeq ($(BUILD_RDMA),module)
FINAL_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE)
RDMA_MODULE=$(RDMA_MODULE_NAME)
RDMA_MODULE_CFLAGS+=-DUSE_RDMA=$(BUILD_MODULE) -DBUILD_RDMA_MODULE=$(BUILD_MODULE) $(RDMA_LIBS)
endif
ifndef V
@ -411,7 +411,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o slowlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)

View File

@ -1536,10 +1536,27 @@ void rewriteConfigOOMScoreAdjValuesOption(standardConfig *config, const char *na
}
/* Rewrite the bind option. */
void rewriteConfigBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) {
static void rewriteConfigBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state, char **bindaddr, int bindaddr_count) {
UNUSED(config);
int force = 1;
sds line, addresses;
/* Rewrite as bind <addr1> <addr2> ... <addrN> */
if (bindaddr_count > 0)
addresses = sdsjoin(bindaddr, bindaddr_count, " ");
else
addresses = sdsnew("\"\"");
line = sdsnew(name);
line = sdscatlen(line, " ", 1);
line = sdscatsds(line, addresses);
sdsfree(addresses);
rewriteConfigRewriteLine(state, name, line, force);
}
/* Rewrite the bind option. */
static void rewriteConfigSocketBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) {
UNUSED(config);
int is_default = 0;
/* Compare server.bindaddr with CONFIG_DEFAULT_BINDADDR */
@ -1559,17 +1576,7 @@ void rewriteConfigBindOption(standardConfig *config, const char *name, struct re
return;
}
/* Rewrite as bind <addr1> <addr2> ... <addrN> */
if (server.bindaddr_count > 0)
addresses = sdsjoin(server.bindaddr, server.bindaddr_count, " ");
else
addresses = sdsnew("\"\"");
line = sdsnew(name);
line = sdscatlen(line, " ", 1);
line = sdscatsds(line, addresses);
sdsfree(addresses);
rewriteConfigRewriteLine(state, name, line, force);
rewriteConfigBindOption(config, name, state, server.bindaddr, server.bindaddr_count);
}
/* Rewrite the loadmodule option. */
@ -2637,7 +2644,7 @@ static int applyBind(const char **err) {
tcp_listener->ct = connectionByType(CONN_TYPE_SOCKET);
if (changeListener(tcp_listener) == C_ERR) {
*err = "Failed to bind to specified addresses.";
if (tls_listener) closeListener(tls_listener); /* failed with TLS together */
if (tls_listener) connCloseListener(tls_listener); /* failed with TLS together */
return 0;
}
@ -2649,7 +2656,7 @@ static int applyBind(const char **err) {
tls_listener->ct = connectionByType(CONN_TYPE_TLS);
if (changeListener(tls_listener) == C_ERR) {
*err = "Failed to bind to specified addresses.";
closeListener(tcp_listener); /* failed with TCP together */
connCloseListener(tcp_listener); /* failed with TCP together */
return 0;
}
}
@ -2922,8 +2929,9 @@ static sds getConfigNotifyKeyspaceEventsOption(standardConfig *config) {
return keyspaceEventsFlagsToString(server.notify_keyspace_events);
}
static int setConfigBindOption(standardConfig *config, sds *argv, int argc, const char **err) {
static int setConfigBindOption(standardConfig *config, sds *argv, int argc, const char **err, char **bindaddr, int *bindaddr_count) {
UNUSED(config);
int orig_bindaddr_count = *bindaddr_count;
int j;
if (argc > CONFIG_BINDADDR_MAX) {
@ -2935,11 +2943,73 @@ static int setConfigBindOption(standardConfig *config, sds *argv, int argc, cons
if (argc == 1 && sdslen(argv[0]) == 0) argc = 0;
/* Free old bind addresses */
for (j = 0; j < server.bindaddr_count; j++) {
zfree(server.bindaddr[j]);
for (j = 0; j < orig_bindaddr_count; j++) zfree(bindaddr[j]);
for (j = 0; j < argc; j++) bindaddr[j] = zstrdup(argv[j]);
*bindaddr_count = argc;
return 1;
}
static int setConfigSocketBindOption(standardConfig *config, sds *argv, int argc, const char **err) {
UNUSED(config);
return setConfigBindOption(config, argv, argc, err, server.bindaddr, &server.bindaddr_count);
}
static int setConfigRdmaBindOption(standardConfig *config, sds *argv, int argc, const char **err) {
UNUSED(config);
return setConfigBindOption(config, argv, argc, err, server.rdma_ctx_config.bindaddr, &server.rdma_ctx_config.bindaddr_count);
}
static sds getConfigRdmaBindOption(standardConfig *config) {
UNUSED(config);
return sdsjoin(server.rdma_ctx_config.bindaddr, server.rdma_ctx_config.bindaddr_count, " ");
}
static void rewriteConfigRdmaBindOption(standardConfig *config, const char *name, struct rewriteConfigState *state) {
UNUSED(config);
if (server.rdma_ctx_config.bindaddr_count) {
rewriteConfigBindOption(config, name, state, server.rdma_ctx_config.bindaddr,
server.rdma_ctx_config.bindaddr_count);
}
}
static int applyRdmaBind(const char **err) {
connListener *rdma_listener = listenerByType(CONN_TYPE_RDMA);
if (!rdma_listener) {
*err = "No RDMA building support.";
return 0;
}
rdma_listener->bindaddr = server.rdma_ctx_config.bindaddr;
rdma_listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count;
rdma_listener->port = server.rdma_ctx_config.port;
rdma_listener->ct = connectionByType(CONN_TYPE_RDMA);
if (changeListener(rdma_listener) == C_ERR) {
*err = "Failed to bind to specified addresses for RDMA.";
return 0;
}
return 1;
}
static int updateRdmaPort(const char **err) {
connListener *listener = listenerByType(CONN_TYPE_RDMA);
if (listener == NULL) {
*err = "No RDMA building support.";
return 0;
}
listener->bindaddr = server.rdma_ctx_config.bindaddr;
listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count;
listener->port = server.rdma_ctx_config.port;
listener->ct = connectionByType(CONN_TYPE_RDMA);
if (changeListener(listener) == C_ERR) {
*err = "Unable to listen on this port for RDMA. Check server logs.";
return 0;
}
for (j = 0; j < argc; j++) server.bindaddr[j] = zstrdup(argv[j]);
server.bindaddr_count = argc;
return 1;
}
@ -3237,6 +3307,9 @@ standardConfig static_configs[] = {
createIntConfig("watchdog-period", NULL, MODIFIABLE_CONFIG | HIDDEN_CONFIG, 0, INT_MAX, server.watchdog_period, 0, INTEGER_CONFIG, NULL, updateWatchdogPeriod),
createIntConfig("shutdown-timeout", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.shutdown_timeout, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("repl-diskless-sync-max-replicas", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.repl_diskless_sync_max_replicas, 0, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdma-port", NULL, MODIFIABLE_CONFIG, 0, 65535, server.rdma_ctx_config.port, 0, INTEGER_CONFIG, NULL, updateRdmaPort),
createIntConfig("rdma-rx-size", NULL, IMMUTABLE_CONFIG, 64 * 1024, 16 * 1024 * 1024, server.rdma_ctx_config.rx_size, 1024 * 1024, INTEGER_CONFIG, NULL, NULL),
createIntConfig("rdma-completion-vector", NULL, IMMUTABLE_CONFIG, -1, 1024, server.rdma_ctx_config.completion_vector, -1, INTEGER_CONFIG, NULL, NULL),
/* Unsigned int configs */
createUIntConfig("maxclients", NULL, MODIFIABLE_CONFIG, 1, UINT_MAX, server.maxclients, 10000, INTEGER_CONFIG, NULL, updateMaxclients),
@ -3316,7 +3389,8 @@ standardConfig static_configs[] = {
createSpecialConfig("client-output-buffer-limit", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigClientOutputBufferLimitOption, getConfigClientOutputBufferLimitOption, rewriteConfigClientOutputBufferLimitOption, NULL),
createSpecialConfig("oom-score-adj-values", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigOOMScoreAdjValuesOption, getConfigOOMScoreAdjValuesOption, rewriteConfigOOMScoreAdjValuesOption, updateOOMScoreAdj),
createSpecialConfig("notify-keyspace-events", NULL, MODIFIABLE_CONFIG, setConfigNotifyKeyspaceEventsOption, getConfigNotifyKeyspaceEventsOption, rewriteConfigNotifyKeyspaceEventsOption, NULL),
createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigBindOption, getConfigBindOption, rewriteConfigBindOption, applyBind),
createSpecialConfig("bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigSocketBindOption, getConfigBindOption, rewriteConfigSocketBindOption, applyBind),
createSpecialConfig("rdma-bind", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigRdmaBindOption, getConfigRdmaBindOption, rewriteConfigRdmaBindOption, applyRdmaBind),
createSpecialConfig("replicaof", "slaveof", IMMUTABLE_CONFIG | MULTI_ARG_CONFIG, setConfigReplicaOfOption, getConfigReplicaOfOption, rewriteConfigReplicaOfOption, NULL),
createSpecialConfig("latency-tracking-info-percentiles", NULL, MODIFIABLE_CONFIG | MULTI_ARG_CONFIG, setConfigLatencyTrackingInfoPercentilesOutputOption, getConfigLatencyTrackingInfoPercentilesOutputOption, rewriteConfigLatencyTrackingInfoPercentilesOutputOption, NULL),

View File

@ -66,6 +66,9 @@ int connTypeInitialize(void) {
/* may fail if without BUILD_TLS=yes */
RedisRegisterConnectionTypeTLS();
/* may fail if without BUILD_RDMA=yes */
RegisterConnectionTypeRdma();
return C_OK;
}

View File

@ -60,6 +60,7 @@ typedef enum {
#define CONN_TYPE_SOCKET "tcp"
#define CONN_TYPE_UNIX "unix"
#define CONN_TYPE_TLS "tls"
#define CONN_TYPE_RDMA "rdma"
#define CONN_TYPE_MAX 8 /* 8 is enough to be extendable */
typedef void (*ConnectionCallbackFunc)(struct connection *conn);
@ -79,6 +80,7 @@ typedef struct ConnectionType {
int (*addr)(connection *conn, char *ip, size_t ip_len, int *port, int remote);
int (*is_local)(connection *conn);
int (*listen)(connListener *listener);
void (*closeListener)(connListener *listener);
/* create/shutdown/close connection */
connection *(*conn_create)(void);
@ -442,6 +444,13 @@ static inline int connListen(connListener *listener) {
return listener->ct->listen(listener);
}
/* Close a listened listener */
static inline void connCloseListener(connListener *listener) {
if (listener->count) {
listener->ct->closeListener(listener);
}
}
/* Get accept_handler of a connection type */
static inline aeFileProc *connAcceptHandler(ConnectionType *ct) {
if (ct) return ct->accept_handler;
@ -454,6 +463,7 @@ sds getListensInfoString(sds info);
int RedisRegisterConnectionTypeSocket(void);
int RedisRegisterConnectionTypeUnix(void);
int RedisRegisterConnectionTypeTLS(void);
int RegisterConnectionTypeRdma(void);
/* Return 1 if connection is using TLS protocol, 0 if otherwise. */
static inline int connIsTLS(connection *conn) {

View File

@ -10,9 +10,10 @@
#define VALKEYMODULE_CORE_MODULE
#include "server.h"
#if defined USE_RDMA && defined __linux__ /* currently RDMA is only supported on Linux */
#include "connection.h"
#if defined __linux__ /* currently RDMA is only supported on Linux */
#if (USE_RDMA == 1 /* BUILD_YES */) || ((USE_RDMA == 2 /* BUILD_MODULE */) && (BUILD_RDMA_MODULE == 2))
#include "connhelpers.h"
#include <assert.h>
@ -128,12 +129,10 @@ typedef struct rdma_listener {
static list *pending_list;
static rdma_listener *rdma_listeners;
static serverRdmaContextConfig *rdma_config;
static ConnectionType CT_RDMA;
static int valkey_rdma_rx_size = VALKEY_RDMA_DEFAULT_RX_SIZE;
static int valkey_rdma_comp_vector = -1; /* -1 means a random one */
static void serverRdmaError(char *err, const char *fmt, ...) {
va_list ap;
@ -272,7 +271,7 @@ static int rdmaSetupIoBuf(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
/* setup recv buf & MR */
access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_WRITE;
length = valkey_rdma_rx_size;
length = rdma_config->rx_size;
ctx->rx.addr = page_aligned_zalloc(length);
ctx->rx.length = length;
ctx->rx.mr = ibv_reg_mr(ctx->pd, ctx->rx.addr, length, access);
@ -295,6 +294,7 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
struct ibv_comp_channel *comp_channel = NULL;
struct ibv_cq *cq = NULL;
struct ibv_pd *pd = NULL;
int comp_vector = rdma_config->completion_vector;
if (ibv_query_device(cm_id->verbs, &device_attr)) {
serverLog(LL_WARNING, "RDMA: ibv ibv query device failed");
@ -317,8 +317,13 @@ static int rdmaCreateResource(RdmaContext *ctx, struct rdma_cm_id *cm_id) {
ctx->comp_channel = comp_channel;
/* negative number means a random one */
if (comp_vector < 0) {
comp_vector = abs((int)random());
}
cq = ibv_create_cq(cm_id->verbs, VALKEY_RDMA_MAX_WQE * 2, NULL, comp_channel,
valkey_rdma_comp_vector % cm_id->verbs->num_comp_vectors);
comp_vector % cm_id->verbs->num_comp_vectors);
if (!cq) {
serverLog(LL_WARNING, "RDMA: ibv create cq failed");
return C_ERR;
@ -1610,9 +1615,28 @@ int connRdmaListen(connListener *listener) {
rdma_listener++;
}
rdma_config = listener->priv;
return C_OK;
}
static void connRdmaCloseListener(connListener *listener) {
/* Close old servers */
for (int i = 0; i < listener->count; i++) {
if (listener->fd[i] == -1) continue;
aeDeleteFileEvent(server.el, listener->fd[i], AE_READABLE);
listener->fd[i] = -1;
struct rdma_listener *rdma_listener = &rdma_listeners[i];
rdma_destroy_id(rdma_listener->cm_id);
rdma_destroy_event_channel(rdma_listener->cm_channel);
}
listener->count = 0;
zfree(rdma_listeners);
rdma_listeners = NULL;
rdma_config = NULL;
}
static int connRdmaAddr(connection *conn, char *ip, size_t ip_len, int *port, int remote) {
rdma_connection *rdma_conn = (rdma_connection *)conn;
struct rdma_cm_id *cm_id = rdma_conn->cm_id;
@ -1740,6 +1764,7 @@ static ConnectionType CT_RDMA = {
//.cluster_accept_handler = NULL,
.is_local = connRdmaIsLocal,
.listen = connRdmaListen,
.closeListener = connRdmaCloseListener,
.addr = connRdmaAddr,
/* create/close connection */
@ -1769,17 +1794,6 @@ static ConnectionType CT_RDMA = {
.process_pending_data = rdmaProcessPendingData,
};
static struct connListener *rdmaListener(void) {
static struct connListener *listener = NULL;
if (listener) return listener;
listener = listenerByType(CONN_TYPE_RDMA);
serverAssert(listener != NULL);
return listener;
}
ConnectionType *connectionTypeRdma(void) {
static ConnectionType *ct_rdma = NULL;
@ -1791,133 +1805,28 @@ ConnectionType *connectionTypeRdma(void) {
return ct_rdma;
}
/* rdma listener has different create/close logic from TCP, we can't re-use 'int changeListener(connListener *listener)'
* directly */
static int rdmaChangeListener(void) {
struct connListener *listener = rdmaListener();
/* Close old servers */
for (int i = 0; i < listener->count; i++) {
if (listener->fd[i] == -1) continue;
aeDeleteFileEvent(server.el, listener->fd[i], AE_READABLE);
listener->fd[i] = -1;
struct rdma_listener *rdma_listener = &rdma_listeners[i];
rdma_destroy_id(rdma_listener->cm_id);
rdma_destroy_event_channel(rdma_listener->cm_channel);
}
listener->count = 0;
zfree(rdma_listeners);
rdma_listeners = NULL;
closeListener(listener);
/* Just close the server if port disabled */
if (listener->port == 0) {
if (server.set_proc_title) serverSetProcTitle(NULL);
return VALKEYMODULE_OK;
}
/* Re-create listener */
if (connListen(listener) != C_OK) {
return VALKEYMODULE_ERR;
}
/* Create event handlers */
if (createSocketAcceptHandler(listener, listener->ct->accept_handler) != C_OK) {
serverPanic("Unrecoverable error creating %s accept handler.", listener->ct->get_type(NULL));
}
if (server.set_proc_title) serverSetProcTitle(NULL);
return VALKEYMODULE_OK;
int RegisterConnectionTypeRdma(void) {
return connTypeRegister(&CT_RDMA);
}
#ifdef BUILD_RDMA_MODULE
#else
int RegisterConnectionTypeRdma(void) {
serverLog(LL_VERBOSE, "Connection type %s not builtin", CONN_TYPE_RDMA);
return C_ERR;
}
#endif
#if BUILD_RDMA_MODULE == 2 /* BUILD_MODULE */
#include "release.h"
static long long rdmaGetPort(const char *name, void *privdata) {
UNUSED(name);
UNUSED(privdata);
struct connListener *listener = rdmaListener();
return listener->port;
}
static int rdmaSetPort(const char *name, long long val, void *privdata, ValkeyModuleString **err) {
UNUSED(name);
UNUSED(privdata);
UNUSED(err);
struct connListener *listener = rdmaListener();
listener->port = val;
return VALKEYMODULE_OK;
}
static ValkeyModuleString *rdma_bind;
static void rdmaBuildBind(void *ctx) {
struct connListener *listener = rdmaListener();
if (rdma_bind) ValkeyModule_FreeString(NULL, rdma_bind);
sds rdma_bind_str = sdsjoin(listener->bindaddr, listener->bindaddr_count, " ");
rdma_bind = ValkeyModule_CreateString(ctx, rdma_bind_str, sdslen(rdma_bind_str));
}
static ValkeyModuleString *rdmaGetBind(const char *name, void *privdata) {
UNUSED(name);
UNUSED(privdata);
return rdma_bind;
}
static int rdmaSetBind(const char *name, ValkeyModuleString *val, void *privdata, ValkeyModuleString **err) {
UNUSED(name);
UNUSED(err);
struct connListener *listener = rdmaListener();
const char *bind = ValkeyModule_StringPtrLen(val, NULL);
int nexts;
sds *exts = sdssplitlen(bind, strlen(bind), " ", 1, &nexts);
if (nexts > CONFIG_BINDADDR_MAX) {
serverLog(LL_WARNING, "RDMA: Unsupported bind ( > %d)", CONFIG_BINDADDR_MAX);
return VALKEYMODULE_ERR;
}
/* Free old bind addresses */
for (int j = 0; j < listener->bindaddr_count; j++) {
zfree(listener->bindaddr[j]);
}
for (int j = 0; j < nexts; j++) listener->bindaddr[j] = zstrdup(exts[j]);
listener->bindaddr_count = nexts;
sdsfreesplitres(exts, nexts);
rdmaBuildBind(privdata);
return VALKEYMODULE_OK;
}
static int rdmaApplyListener(ValkeyModuleCtx *ctx, void *privdata, ValkeyModuleString **err) {
UNUSED(ctx);
UNUSED(privdata);
UNUSED(err);
return rdmaChangeListener();
}
static void rdmaListenerAddConfig(void *ctx) {
serverAssert(ValkeyModule_RegisterNumericConfig(ctx, "port", 0, VALKEYMODULE_CONFIG_DEFAULT, 0, 65535, rdmaGetPort,
rdmaSetPort, rdmaApplyListener, NULL) == VALKEYMODULE_OK);
serverAssert(ValkeyModule_RegisterStringConfig(ctx, "bind", "", VALKEYMODULE_CONFIG_DEFAULT, rdmaGetBind,
rdmaSetBind, rdmaApplyListener, ctx) == VALKEYMODULE_OK);
serverAssert(ValkeyModule_LoadConfigs(ctx) == VALKEYMODULE_OK);
}
int ValkeyModule_OnLoad(void *ctx, ValkeyModuleString **argv, int argc) {
UNUSED(argv);
UNUSED(argc);
/* Connection modules MUST be part of the same build as valkey. */
if (strcmp(REDIS_BUILD_ID_RAW, serverBuildIdRaw())) {
serverLog(LL_NOTICE, "Connection type %s was not built together with the valkey-server used.", CONN_TYPE_RDMA);
@ -1936,40 +1845,6 @@ int ValkeyModule_OnLoad(void *ctx, ValkeyModuleString **argv, int argc) {
if (connTypeRegister(&CT_RDMA) != C_OK) return VALKEYMODULE_ERR;
rdmaListenerAddConfig(ctx);
struct connListener *listener = rdmaListener();
listener->ct = connectionTypeRdma();
listener->bindaddr = zcalloc_num(CONFIG_BINDADDR_MAX, sizeof(listener->bindaddr[0]));
for (int i = 0; i < argc; i++) {
robj *str = (robj *)argv[i];
int nexts;
sds *exts = sdssplitlen(str->ptr, strlen(str->ptr), "=", 1, &nexts);
if (nexts != 2) {
serverLog(LL_WARNING, "RDMA: Unsupported argument \"%s\"", (char *)str->ptr);
return VALKEYMODULE_ERR;
}
if (!strcasecmp(exts[0], "bind")) {
listener->bindaddr[listener->bindaddr_count++] = zstrdup(exts[1]);
} else if (!strcasecmp(exts[0], "port")) {
listener->port = atoi(exts[1]);
} else if (!strcasecmp(exts[0], "rx-size")) {
valkey_rdma_rx_size = atoi(exts[1]);
} else if (!strcasecmp(exts[0], "comp-vector")) {
valkey_rdma_comp_vector = atoi(exts[1]);
} else {
serverLog(LL_WARNING, "RDMA: Unsupported argument \"%s\"", (char *)str->ptr);
return VALKEYMODULE_ERR;
}
sdsfreesplitres(exts, nexts);
}
rdmaBuildBind(ctx);
if (valkey_rdma_comp_vector == -1) valkey_rdma_comp_vector = abs((int)random());
return VALKEYMODULE_OK;
}
@ -1981,4 +1856,11 @@ int ValkeyModule_OnUnload(void *arg) {
#endif /* BUILD_RDMA_MODULE */
#endif /* USE_RDMA && __linux__ */
#else /* __linux__ */
int RegisterConnectionTypeRdma(void) {
serverLog(LL_VERBOSE, "Connection type %s is supported on Linux only", CONN_TYPE_RDMA);
return C_ERR;
}
#endif /* __linux__ */

View File

@ -2482,19 +2482,6 @@ void checkTcpBacklogSettings(void) {
#endif
}
void closeListener(connListener *sfd) {
int j;
for (j = 0; j < sfd->count; j++) {
if (sfd->fd[j] == -1) continue;
aeDeleteFileEvent(server.el, sfd->fd[j], AE_READABLE);
close(sfd->fd[j]);
}
sfd->count = 0;
}
/* Create an event handler for accepting new connections in TCP or TLS domain sockets.
* This works atomically for all socket fds */
int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler) {
@ -2558,7 +2545,7 @@ int listenToPort(connListener *sfd) {
continue;
/* Rollback successful listens before exiting */
closeListener(sfd);
connCloseListener(sfd);
return C_ERR;
}
if (server.socket_mark_id > 0) anetSetSockMarkId(NULL, sfd->fd[sfd->count], server.socket_mark_id);
@ -2899,6 +2886,17 @@ void initListeners(void) {
listener->priv = &server.unix_ctx_config; /* Unix socket specified */
}
if (server.rdma_ctx_config.port != 0) {
conn_index = connectionIndexByType(CONN_TYPE_RDMA);
if (conn_index < 0) serverPanic("Failed finding connection listener of %s", CONN_TYPE_RDMA);
listener = &server.listeners[conn_index];
listener->bindaddr = server.rdma_ctx_config.bindaddr;
listener->bindaddr_count = server.rdma_ctx_config.bindaddr_count;
listener->port = server.rdma_ctx_config.port;
listener->ct = connectionByType(CONN_TYPE_RDMA);
listener->priv = &server.rdma_ctx_config;
}
/* create all the configured listener, and add handler to start to accept */
int listen_fds = 0;
for (int j = 0; j < CONN_TYPE_MAX; j++) {
@ -6297,7 +6295,7 @@ connListener *listenerByType(const char *typename) {
/* Close original listener, re-create a new listener from the updated bind address & port */
int changeListener(connListener *listener) {
/* Close old servers */
closeListener(listener);
connCloseListener(listener);
/* Just close the server if port disabled */
if (listener->port == 0) {

View File

@ -1614,6 +1614,17 @@ typedef struct serverUnixContextConfig {
unsigned int perm; /* UNIX socket permission (see mode_t) */
} serverUnixContextConfig;
/*-----------------------------------------------------------------------------
* RDMA Context Configuration
*----------------------------------------------------------------------------*/
typedef struct serverRdmaContextConfig {
char *bindaddr[CONFIG_BINDADDR_MAX];
int bindaddr_count;
int port;
int rx_size;
int completion_vector;
} serverRdmaContextConfig;
/*-----------------------------------------------------------------------------
* AOF manifest definition
*----------------------------------------------------------------------------*/
@ -2229,6 +2240,7 @@ struct valkeyServer {
int tls_auth_clients;
serverTLSContextConfig tls_ctx_config;
serverUnixContextConfig unix_ctx_config;
serverRdmaContextConfig rdma_ctx_config;
/* cpu affinity */
char *server_cpulist; /* cpu affinity list of server main/io thread. */
char *bio_cpulist; /* cpu affinity list of bio thread. */
@ -3293,7 +3305,6 @@ void setupSignalHandlers(void);
int createSocketAcceptHandler(connListener *sfd, aeFileProc *accept_handler);
connListener *listenerByType(const char *typename);
int changeListener(connListener *listener);
void closeListener(connListener *listener);
struct serverCommand *lookupSubcommand(struct serverCommand *container, sds sub_name);
struct serverCommand *lookupCommand(robj **argv, int argc);
struct serverCommand *lookupCommandBySdsLogic(dict *commands, sds s);

View File

@ -339,6 +339,19 @@ static int connSocketListen(connListener *listener) {
return listenToPort(listener);
}
static void connSocketCloseListener(connListener *listener) {
int j;
for (j = 0; j < listener->count; j++) {
if (listener->fd[j] == -1) continue;
aeDeleteFileEvent(server.el, listener->fd[j], AE_READABLE);
close(listener->fd[j]);
}
listener->count = 0;
}
static int connSocketBlockingConnect(connection *conn, const char *addr, int port, long long timeout) {
int fd = anetTcpNonBlockConnect(NULL, addr, port);
if (fd == -1) {
@ -395,6 +408,7 @@ static ConnectionType CT_Socket = {
.addr = connSocketAddr,
.is_local = connSocketIsLocal,
.listen = connSocketListen,
.closeListener = connSocketCloseListener,
/* create/shutdown/close connection */
.conn_create = connCreateSocket,

View File

@ -805,6 +805,10 @@ static int connTLSListen(connListener *listener) {
return listenToPort(listener);
}
static void connTLSCloseListener(connListener *listener) {
connectionTypeTcp()->closeListener(listener);
}
static void connTLSShutdown(connection *conn_) {
tls_connection *conn = (tls_connection *)conn_;
@ -1147,6 +1151,7 @@ static ConnectionType CT_TLS = {
.addr = connTLSAddr,
.is_local = connTLSIsLocal,
.listen = connTLSListen,
.closeListener = connTLSCloseListener,
/* create/shutdown/close connection */
.conn_create = connCreateTLS,

View File

@ -74,6 +74,10 @@ static int connUnixListen(connListener *listener) {
return C_OK;
}
static void connUnixCloseListener(connListener *listener) {
connectionTypeTcp()->closeListener(listener);
}
static connection *connCreateUnix(void) {
connection *conn = zcalloc(sizeof(connection));
conn->type = &CT_Unix;
@ -174,6 +178,7 @@ static ConnectionType CT_Unix = {
.addr = connUnixAddr,
.is_local = connUnixIsLocal,
.listen = connUnixListen,
.closeListener = connUnixCloseListener,
/* create/shutdown/close connection */
.conn_create = connCreateUnix,

View File

@ -63,7 +63,7 @@ def test_rdma(ipaddr):
rdmapath = valkeydir + "/src/valkey-rdma.so"
svrcmd = [svrpath, "--port", "0", "--loglevel", "verbose", "--protected-mode", "yes",
"--appendonly", "no", "--daemonize", "no", "--dir", valkeydir + "/tests/rdma/tmp",
"--loadmodule", rdmapath, "port=6379", "bind=" + ipaddr]
"--loadmodule", rdmapath, "--rdma-port", "6379", "--rdma-bind", ipaddr]
svr = subprocess.Popen(svrcmd, shell=False, stdout=subprocess.PIPE)
try:

View File

@ -558,6 +558,10 @@ start_server {tags {"introspection"}} {
req-res-logfile
client-default-resp
dual-channel-replication-enabled
rdma-completion-vector
rdma-rx-size
rdma-bind
rdma-port
}
if {!$::tls} {

View File

@ -300,6 +300,54 @@ tcp-keepalive 300
#
# tls-session-cache-timeout 60
################################### RDMA ######################################
# Valkey Over RDMA is experimental, it may be changed or be removed in any minor or major version.
# By default, RDMA is disabled. To enable it, the "rdma-port" configuration
# directive can be used to define RDMA-listening ports.
#
# rdma-port 6379
# rdma-bind 192.168.1.100
# The RDMA receive transfer buffer is 1M by default. It can be set between 64K and 16M.
# Note that page size aligned size is preferred.
#
# rdma-rx-size 1048576
# The RDMA completion queue will use the completion vector to signal completion events
# via hardware interrupts. A large number of hardware interrupts can affect CPU performance.
# It is possible to tune the performance using rdma-completion-vector.
#
# Example 1. a) Pin hardware interrupt vectors [0, 3] to CPU [0, 3].
# b) Set CPU affinity for valkey to CPU [4, X].
# c) Any valkey server uses a random RDMA completion vector [-1].
# All valkey servers will not affect each other and will be isolated from kernel interrupts.
#
# SYS SYS SYS SYS VALKEY VALKEY VALKEY
# | | | | | | |
# CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 ... CPUX
# | | | |
# INTR0 INTR1 INTR2 INTR3
#
# Example 2. a) 1:1 pin hardware interrupt vectors [0, X] to CPU [0, X].
# b) Set CPU affinity for valkey [M] to CPU [M].
# c) Valkey server [M] uses RDMA completion vector [M].
# A single CPU [M] handles hardware interrupts, the RDMA completion vector [M],
# and the valkey server [M] within its context only.
# This avoids overhead and function calls across multiple CPUs, fully isolating
# each valkey server from one another.
#
# VALKEY VALKEY VALKEY VALKEY VALKEY VALKEY VALKEY
# | | | | | | |
# CPU0 CPU1 CPU2 CPU3 CPU4 CPU5 ... CPUX
# | | | | | | |
# INTR0 INTR1 INTR2 INTR3 INTR4 INTR5 INTRX
#
# Use 0 and positive numbers to specify the RDMA completion vector, or specify -1 to allow
# the server to use a random vector for a new connection. The default vector is -1.
#
# rdma-completion-vector 0
################################# GENERAL #####################################
# By default the server does not run as a daemon. Use 'yes' if you need it.