Merge df720b2a29b33cce74032daaee934839b3d43537 into 26c6f1af9b29d525831c7fa9840ab3e47ed7b700

This commit is contained in:
Harkrishn Patro 2025-02-01 09:27:54 -08:00 committed by GitHub
commit 94c63892cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 247 additions and 47 deletions

View File

@ -195,11 +195,13 @@ dictType clusterSdsToListType = {
typedef struct {
enum {
ITER_DICT,
ITER_LIST
ITER_LIST,
ITER_NODE,
} type;
union {
dictIterator di;
listIter li;
clusterNode *node;
};
} ClusterNodeIterator;
@ -215,6 +217,11 @@ static void clusterNodeIterInitMyShard(ClusterNodeIterator *iter) {
listRewind(nodes, &iter->li);
}
static void clusterNodeIterNode(ClusterNodeIterator *iter, clusterNode *node) {
iter->type = ITER_NODE;
iter->node = node;
}
static clusterNode *clusterNodeIterNext(ClusterNodeIterator *iter) {
switch (iter->type) {
case ITER_DICT: {
@ -229,6 +236,15 @@ static clusterNode *clusterNodeIterNext(ClusterNodeIterator *iter) {
/* Return the value associated with the node, or NULL if no more nodes */
return ln ? listNodeValue(ln) : NULL;
}
case ITER_NODE: {
if (iter->node) {
clusterNode *node = iter->node;
iter->node = NULL;
return node;
}
return NULL;
}
}
serverPanic("Unknown iterator type %d", iter->type);
}
@ -236,6 +252,8 @@ static clusterNode *clusterNodeIterNext(ClusterNodeIterator *iter) {
static void clusterNodeIterReset(ClusterNodeIterator *iter) {
if (iter->type == ITER_DICT) {
dictResetIterator(&iter->di);
} else if (iter->type == ITER_NODE) {
iter->node = NULL;
}
}
@ -988,7 +1006,7 @@ void clusterUpdateMyselfFlags(void) {
int nofailover = server.cluster_replica_no_failover ? CLUSTER_NODE_NOFAILOVER : 0;
myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
myself->flags |= nofailover;
myself->flags |= CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
myself->flags |= CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED | CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
if (myself->flags != oldflags) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
}
@ -3019,11 +3037,26 @@ static void clusterProcessPublishPacket(clusterMsgDataPublish *publish_data, uin
}
}
static void clusterProcessLightPacket(clusterLink *link, uint16_t type) {
clusterMsgLight *hdr = (clusterMsgLight *)link->rcvbuf;
static void clusterProcessModulePacket(clusterMsgModule *module_data, clusterNode *sender) {
if (!sender) return; /* Protect the module from unknown nodes. */
/* We need to route this message back to the right module subscribed
* for the right message type. */
uint64_t module_id = module_data->module_id; /* Endian-safe ID */
uint32_t len = ntohl(module_data->len);
uint8_t type = module_data->type;
unsigned char *payload = module_data->bulk_data;
moduleCallClusterReceivers(sender->name, module_id, type, payload, len);
}
static void clusterProcessLightPacket(clusterNode *sender, clusterLink *link, uint16_t type) {
clusterMsgLight *hdr = (clusterMsgLight *)link->rcvbuf;
serverLog(LL_DEBUG, "Processing light packet of type: %s", clusterGetMessageTypeString(type));
if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
clusterProcessPublishPacket(&hdr->data.publish.msg, type);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
clusterProcessModulePacket(&hdr->data.module.msg, sender);
} else {
serverAssert(0);
}
}
@ -3031,6 +3064,7 @@ static inline int messageTypeSupportsLightHdr(uint16_t type) {
switch (type) {
case CLUSTERMSG_TYPE_PUBLISH: return 1;
case CLUSTERMSG_TYPE_PUBLISHSHARD: return 1;
case CLUSTERMSG_TYPE_MODULE: return 1;
}
return 0;
}
@ -3123,8 +3157,14 @@ int clusterIsValidPacket(clusterLink *link) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataUpdate);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr->data.module.msg.len);
if (is_light) {
clusterMsgLight *hdr_light = (clusterMsgLight *)link->rcvbuf;
explen = sizeof(clusterMsgLight) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr_light->data.module.msg.len);
} else {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgModule) - 3 + ntohl(hdr->data.module.msg.len);
}
} else {
/* We don't know this type of packet, so we assume it's well formed. */
explen = totlen;
@ -3178,7 +3218,7 @@ int clusterProcessPacket(clusterLink *link) {
}
clusterNode *sender = link->node;
sender->data_received = now;
clusterProcessLightPacket(link, type);
clusterProcessLightPacket(sender, link, type);
return 1;
}
@ -3195,10 +3235,16 @@ int clusterProcessPacket(clusterLink *link) {
/* Checks if the node supports light message hdr */
if (sender) {
if (flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
if (flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED;
}
if (flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED;
}
}
@ -3701,14 +3747,7 @@ int clusterProcessPacket(clusterLink *link) {
* config accordingly. */
clusterUpdateSlotsConfigWith(n, reportedConfigEpoch, hdr->data.update.nodecfg.slots);
} else if (type == CLUSTERMSG_TYPE_MODULE) {
if (!sender) return 1; /* Protect the module from unknown nodes. */
/* We need to route this message back to the right module subscribed
* for the right message type. */
uint64_t module_id = hdr->data.module.msg.module_id; /* Endian-safe ID */
uint32_t len = ntohl(hdr->data.module.msg.len);
uint8_t type = hdr->data.module.msg.type;
unsigned char *payload = hdr->data.module.msg.bulk_data;
moduleCallClusterReceivers(sender->name, module_id, type, payload, len);
clusterProcessModulePacket(&hdr->data.module.msg, sender);
} else {
serverLog(LL_WARNING, "Received unknown packet type: %d", type);
}
@ -4322,26 +4361,69 @@ void clusterSendUpdate(clusterLink *link, clusterNode *node) {
clusterMsgSendBlockDecrRefCount(msgblock);
}
/* Create a MODULE message block.
*
* If is_light is 1, then build a message block with `clusterMsgLight` struct else `clusterMsg`. */
static clusterMsgSendBlock *createModuleMsgBlock(int64_t module_id, uint8_t type, const char *payload, uint32_t len, int is_light) {
uint32_t msglen;
int msgtype;
clusterMsgSendBlock *msgblock;
clusterMsgModule *module_data;
if (is_light) {
msglen = sizeof(clusterMsgLight) - sizeof(union clusterMsgData);
msgtype = CLUSTERMSG_TYPE_MODULE | CLUSTERMSG_LIGHT;
} else {
msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
msgtype = CLUSTERMSG_TYPE_MODULE;
}
msglen += sizeof(clusterMsgModule) - 3 + len;
msgblock = createClusterMsgSendBlock(msgtype, msglen);
if (is_light) {
clusterMsgLight *hdr = getLightMessageFromSendBlock(msgblock);
module_data = &hdr->data.module.msg;
} else {
clusterMsg *hdr = getMessageFromSendBlock(msgblock);
module_data = &hdr->data.module.msg;
}
module_data->module_id = module_id; /* Already endian adjusted */
module_data->type = type;
module_data->len = htonl(len);
memcpy(module_data->bulk_data, payload, len);
return msgblock;
}
/* Send a MODULE message.
*
* If link is NULL, then the message is broadcasted to the whole cluster. */
void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, const char *payload, uint32_t len) {
uint32_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
msglen += sizeof(clusterMsgModule) - 3 + len;
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(CLUSTERMSG_TYPE_MODULE, msglen);
clusterMsgSendBlock *msgblock[CLUSTERMSG_HDR_NUM] = {0};
ClusterNodeIterator iter;
clusterMsg *hdr = getMessageFromSendBlock(msgblock);
hdr->data.module.msg.module_id = module_id; /* Already endian adjusted. */
hdr->data.module.msg.type = type;
hdr->data.module.msg.len = htonl(len);
memcpy(hdr->data.module.msg.bulk_data, payload, len);
if (link)
clusterSendMessage(link, msgblock);
else
clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
if (link) {
clusterNodeIterNode(&iter, link->node);
} else {
/* Broadcast to all the nodes. */
clusterNodeIterInitAllNodes(&iter);
}
clusterNode *node;
while ((node = clusterNodeIterNext(&iter)) != NULL) {
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
int is_light = nodeSupportsLightMsgHdrForModule(node) ? CLUSTERMSG_HDR_LIGHT : CLUSTERMSG_HDR_NORMAL;
if (msgblock[is_light] == NULL) {
msgblock[is_light] = createModuleMsgBlock(module_id, type, payload, len, is_light);
}
clusterSendMessage(node->link, msgblock[is_light]);
}
clusterNodeIterReset(&iter);
for (int hdr_type = CLUSTERMSG_HDR_NORMAL; hdr_type < CLUSTERMSG_HDR_NUM; hdr_type++) {
if (msgblock[hdr_type]) {
clusterMsgSendBlockDecrRefCount(msgblock[hdr_type]);
}
}
}
/* This function gets a cluster node ID string as target, the same way the nodes
@ -4391,7 +4473,7 @@ void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterNode *node;
while ((node = clusterNodeIterNext(&iter)) != NULL) {
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdr(node)) {
if (nodeSupportsLightMsgHdrForPubSub(node)) {
clusterSendMessage(node->link, msgblock_light);
} else {
if (msgblock == NULL) {

View File

@ -42,18 +42,19 @@ typedef struct clusterLink {
} clusterLink;
/* Cluster node flags and macros. */
#define CLUSTER_NODE_PRIMARY (1 << 0) /* The node is a primary */
#define CLUSTER_NODE_REPLICA (1 << 1) /* The node is a replica */
#define CLUSTER_NODE_PFAIL (1 << 2) /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL (1 << 3) /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF (1 << 4) /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE (1 << 5) /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR (1 << 6) /* We don't know the address of this node */
#define CLUSTER_NODE_MEET (1 << 7) /* Send a MEET message to this node */
#define CLUSTER_NODE_MIGRATE_TO (1 << 8) /* Primary eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* Replica will not try to failover. */
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_SUPPORTED (1 << 11) /* This node supports light pubsub message header. */
#define CLUSTER_NODE_PRIMARY (1 << 0) /* The node is a primary */
#define CLUSTER_NODE_REPLICA (1 << 1) /* The node is a replica */
#define CLUSTER_NODE_PFAIL (1 << 2) /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL (1 << 3) /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF (1 << 4) /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE (1 << 5) /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR (1 << 6) /* We don't know the address of this node */
#define CLUSTER_NODE_MEET (1 << 7) /* Send a MEET message to this node */
#define CLUSTER_NODE_MIGRATE_TO (1 << 8) /* Primary eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* Replica will not try to failover. */
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED (1 << 11) /* This node supports light message header for publish type. */
#define CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED (1 << 12) /* This node supports light message header for module type. */
#define CLUSTER_NODE_NULL_NAME \
"\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \
"\000\000\000\000\000\000\000\000\000\000\000\000"
@ -67,7 +68,8 @@ typedef struct clusterLink {
#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER)
#define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED)
#define nodeSupportsLightMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED)
#define nodeSupportsLightMsgHdrForPubSub(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_PUBLISH_SUPPORTED)
#define nodeSupportsLightMsgHdrForModule(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_MODULE_SUPPORTED)
#define nodeInNormalState(n) (!((n)->flags & (CLUSTER_NODE_HANDSHAKE | CLUSTER_NODE_MEET | CLUSTER_NODE_PFAIL | CLUSTER_NODE_FAIL)))
/* This structure represent elements of node->fail_reports. */
@ -104,6 +106,13 @@ typedef struct clusterNodeFailReport {
/* We check for the modifier bit to determine if the message is sent using light header.*/
#define IS_LIGHT_MESSAGE(type) ((type) & CLUSTERMSG_LIGHT)
/* Types of header supported over the cluster bus. */
typedef enum {
CLUSTERMSG_HDR_NORMAL = 0, /* This corresponds to `clusterMsg` struct. */
CLUSTERMSG_HDR_LIGHT, /* This corresponds to `clusterMsgLight` struct. */
CLUSTERMSG_HDR_NUM, /* Overall count of header type supported. */
} clusterMsgHdrType;
/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
* address for all the next messages. */

View File

@ -8983,6 +8983,9 @@ void VM_RegisterClusterMessageReceiver(ValkeyModuleCtx *ctx,
* at the specified target, which is a VALKEYMODULE_NODE_ID_LEN bytes node ID, as
* returned by the receiver callback or by the nodes iteration functions.
*
* In Valkey 8.1 and later, the cluster protocol overhead for this message is
* ~30B, to compare with earlier versions where it's ~2KB.
*
* The function returns VALKEYMODULE_OK if the message was successfully sent,
* otherwise if the node is not connected or such node ID does not map to any
* known cluster node, VALKEYMODULE_ERR is returned. */

View File

@ -0,0 +1,8 @@
# Minimal configuration for testing.
always-show-logo yes
daemonize no
pidfile /var/run/valkey.pid
loglevel verbose
enable-debug-command yes
cluster-enabled yes
enable-module-command yes

View File

@ -34,6 +34,18 @@ int test_cluster_shards(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
return VALKEYMODULE_OK;
}
#define MSGTYPE_DING 1
#define MSGTYPE_DONG 2
/* test.pingall */
int PingallCommand(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
ValkeyModule_SendClusterMessage(ctx, NULL, MSGTYPE_DING, "Hey", 3);
return ValkeyModule_ReplyWithSimpleString(ctx, "OK");
}
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int argc) {
VALKEYMODULE_NOT_USED(argv);
VALKEYMODULE_NOT_USED(argc);
@ -41,6 +53,10 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx, ValkeyModuleString **argv, int arg
if (ValkeyModule_Init(ctx, "cluster", 1, VALKEYMODULE_APIVER_1)== VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;
if (ValkeyModule_CreateCommand(ctx, "test.pingall", PingallCommand, "readonly", 0, 0, 0) ==
VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;
if (ValkeyModule_CreateCommand(ctx, "test.cluster_slots", test_cluster_slots, "", 0, 0, 0) == VALKEYMODULE_ERR)
return VALKEYMODULE_ERR;

View File

@ -5,6 +5,25 @@ source tests/support/cli.tcl
# cluster creation is complicated with TLS, and the current tests don't really need that coverage
tags {tls:skip external:skip cluster modules} {
set testmodule [file normalize tests/modules/cluster.so]
set modules [list loadmodule $testmodule]
start_cluster 3 0 [list config_lines $modules] {
set node1 [srv 0 client]
set node2 [srv -1 client]
set node3 [srv -2 client]
test "Cluster module send message API - VM_SendClusterMessage" {
assert_equal OK [$node1 test.pingall]
assert_equal 2 [CI 0 cluster_stats_messages_module_sent]
wait_for_condition 50 100 {
[CI 1 cluster_stats_messages_module_received] eq 1 &&
[CI 2 cluster_stats_messages_module_received] eq 1
} else {
fail "node 2 or node 3 didn't receive cluster module message"
}
}
}
set testmodule_nokey [file normalize tests/modules/blockonbackground.so]
set testmodule_blockedclient [file normalize tests/modules/blockedclient.so]
set testmodule [file normalize tests/modules/blockonkeys.so]

View File

@ -0,0 +1,63 @@
# Test cross version compatibility of cluster.
#
# Use minimal.conf to make sure we don't use any configs not supported on the old version.
# make sure the test infra won't use SELECT
set old_singledb $::singledb
set ::singledb 1
tags {external:skip needs:other-server cluster modules} {
# To run this test use the `--other-server-path` parameter and pass in a compatible server path supporting
# SendClusterMessage module API.
#
# ./runtest-moduleapi --single unit/moduleapi/cross-version-cluster --other-server-path tests/tmp/valkey-8-0/valkey-server
#
# Test cross version compatibility of cluster module send message API.
start_server {config "minimal-cluster.conf" start-other-server 1} {
set testmodule [file normalize tests/modules/cluster.so]
r MODULE LOAD $testmodule
set other_node_name [r CLUSTER MYID]
start_server {config "minimal-cluster.conf"} {
r MODULE LOAD $testmodule
test "set up cluster" {
r CLUSTER MEET [srv -1 host] [srv -1 port]
# Link establishment requires few PING-PONG between two nodes
wait_for_condition 50 100 {
[string match {*handshake*} [r CLUSTER NODES]] eq 0 &&
[string match {*handshake*} [r -1 CLUSTER NODES]] eq 0
} else {
puts [r CLUSTER NODES]
puts [r -1 CLUSTER NODES]
fail "Cluster meet stuck in handshake state"
}
}
test "Send cluster message via module from latest to other server" {
assert_equal OK [r test.pingall]
assert_match "*cluster_stats_messages_module_sent:1*" [r CLUSTER INFO]
wait_for_condition 50 100 {
[string match {*cluster_stats_messages_module_received:1*} [r -1 CLUSTER INFO]]
} else {
fail "Node didn't receive the message"
}
}
test "Send cluster message via module from other server to latest" {
r CONFIG resetstat
r -1 CONFIG resetstat
assert_equal OK [r -1 test.pingall]
assert_match "*cluster_stats_messages_module_sent:1*" [r -1 CLUSTER INFO]
wait_for_condition 50 100 {
[string match {*cluster_stats_messages_module_received:1*} [r CLUSTER INFO]]
} else {
fail "Node didn't receive the message"
}
}
}
}
}
set ::singledb $old_singledb