Adds Light-weight cluster bus header for pubsub message. (#654)

Adds light-weight cluster bus header for pubsub message. Closes #557.

This also supports sending to and receiving non-light messages from
older versions of the engine.

The light-weight cluster bus message supports multiple pubsub messages
(payloads) for one pubsub channel. Receiving messages with multiple
payloads is supported but we're not yet sending such multi-payload
messages to other nodes.

---------

Signed-off-by: Roshan Khatri <rvkhatri@amazon.com>
This commit is contained in:
Roshan Khatri 2024-07-26 10:49:18 -07:00 committed by GitHub
parent 48ca2c9176
commit e745e9c240
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 251 additions and 57 deletions

View File

@ -117,6 +117,7 @@ int auxTcpPortPresent(clusterNode *n);
int auxTlsPortSetter(clusterNode *n, void *value, size_t length);
sds auxTlsPortGetter(clusterNode *n, sds s);
int auxTlsPortPresent(clusterNode *n);
static void clusterBuildMessageHdrLight(clusterMsgLight *hdr, int type, size_t msglen);
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen);
void freeClusterLink(clusterLink *link);
int verifyClusterNodeId(const char *name, int length);
@ -149,6 +150,10 @@ static inline int defaultClientPort(void) {
(server.cluster->slots[slot] == NULL || bitmapTestBit(server.cluster->owner_not_claiming_slot, slot))
#define RCVBUF_INIT_LEN 1024
#define RCVBUF_MIN_READ_LEN 14
static_assert(offsetof(clusterMsg, type) + sizeof(uint16_t) == RCVBUF_MIN_READ_LEN,
"Incorrect length to read to identify type");
#define RCVBUF_MAX_PREALLOC (1 << 20) /* 1MB */
/* Fixed timeout value for cluster operations (milliseconds) */
@ -187,6 +192,54 @@ dictType clusterSdsToListType = {
NULL /* allow to expand */
};
typedef struct {
enum { ITER_DICT, ITER_LIST } type;
union {
dictIterator di;
listIter li;
};
} ClusterNodeIterator;
static void clusterNodeIterInitAllNodes(ClusterNodeIterator *iter) {
iter->type = ITER_DICT;
dictInitSafeIterator(&iter->di, server.cluster->nodes);
}
static void clusterNodeIterInitMyShard(ClusterNodeIterator *iter) {
list *nodes = clusterGetNodesInMyShard(server.cluster->myself);
serverAssert(nodes != NULL);
iter->type = ITER_LIST;
listRewind(nodes, &iter->li);
}
static clusterNode *clusterNodeIterNext(ClusterNodeIterator *iter) {
switch (iter->type) {
case ITER_DICT: {
/* Get the next entry in the dictionary */
dictEntry *de = dictNext(&iter->di);
/* Return the value associated with the entry, or NULL if no more entries */
return de ? dictGetVal(de) : NULL;
}
case ITER_LIST: {
/* Get the next node in the list */
listNode *ln = listNext(&iter->li);
/* Return the value associated with the node, or NULL if no more nodes */
return ln ? listNodeValue(ln) : NULL;
}
/* This line is unreachable but added to avoid compiler warnings */
default: {
serverPanic("bad type");
return NULL;
}
}
}
static void clusterNodeIterReset(ClusterNodeIterator *iter) {
if (iter->type == ITER_DICT) {
dictResetIterator(&iter->di);
}
}
/* Aux fields were introduced in Redis OSS 7.2 to support the persistence
* of various important node properties, such as shard id, in nodes.conf.
* Aux fields take an explicit format of name=value pairs and have no
@ -371,7 +424,10 @@ int auxTlsPortPresent(clusterNode *n) {
typedef struct {
size_t totlen; /* Total length of this block including the message */
int refcount; /* Number of cluster link send msg queues containing the message */
clusterMsg msg;
union {
clusterMsg msg;
clusterMsgLight msg_light;
};
} clusterMsgSendBlock;
/* -----------------------------------------------------------------------------
@ -896,6 +952,7 @@ void clusterUpdateMyselfFlags(void) {
int nofailover = server.cluster_replica_no_failover ? CLUSTER_NODE_NOFAILOVER : 0;
myself->flags &= ~CLUSTER_NODE_NOFAILOVER;
myself->flags |= nofailover;
myself->flags |= CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
if (myself->flags != oldflags) {
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE);
}
@ -1211,12 +1268,16 @@ void clusterReset(int hard) {
* CLUSTER communication link
* -------------------------------------------------------------------------- */
static clusterMsgSendBlock *createClusterMsgSendBlock(int type, uint32_t msglen) {
uint32_t blocklen = msglen + sizeof(clusterMsgSendBlock) - sizeof(clusterMsg);
uint32_t blocklen = msglen + offsetof(clusterMsgSendBlock, msg);
clusterMsgSendBlock *msgblock = zcalloc(blocklen);
msgblock->refcount = 1;
msgblock->totlen = blocklen;
server.stat_cluster_links_memory += blocklen;
clusterBuildMessageHdr(&msgblock->msg, type, msglen);
if (IS_LIGHT_MESSAGE(type)) {
clusterBuildMessageHdrLight(&msgblock->msg_light, type, msglen);
} else {
clusterBuildMessageHdr(&msgblock->msg, type, msglen);
}
return msgblock;
}
@ -2817,10 +2878,53 @@ static clusterNode *getNodeFromLinkAndMsg(clusterLink *link, clusterMsg *hdr) {
return sender;
}
static void clusterProcessPublishPacket(clusterMsgDataPublish *publish_data, uint16_t type) {
robj *channel, *message;
uint32_t channel_len, message_len;
/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) ||
(type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) {
channel_len = ntohl(publish_data->channel_len);
message_len = ntohl(publish_data->message_len);
channel = createStringObject((char *)publish_data->bulk_data, channel_len);
message = createStringObject((char *)publish_data->bulk_data + channel_len, message_len);
pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
decrRefCount(channel);
decrRefCount(message);
}
}
static void clusterProcessLightPacket(clusterLink *link, uint16_t type) {
clusterMsgLight *hdr = (clusterMsgLight *)link->rcvbuf;
if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
clusterProcessPublishPacket(&hdr->data.publish.msg, type);
}
}
static inline int messageTypeSupportsLightHdr(uint16_t type) {
switch (type) {
case CLUSTERMSG_TYPE_PUBLISH: return 1;
case CLUSTERMSG_TYPE_PUBLISHSHARD: return 1;
}
return 0;
}
int clusterIsValidPacket(clusterLink *link) {
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint32_t totlen = ntohl(hdr->totlen);
uint16_t type = ntohs(hdr->type);
int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type));
uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK;
if (is_light && !messageTypeSupportsLightHdr(type)) {
serverLog(LL_NOTICE,
"Packet of type '%s' (%u) does not support light cluster header. Marking packet as invalid.",
clusterGetMessageTypeString(type), type);
return 0;
}
if (type < CLUSTERMSG_TYPE_COUNT) server.cluster->stats_bus_messages_received[type]++;
@ -2876,9 +2980,18 @@ int clusterIsValidPacket(clusterLink *link) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
explen += sizeof(clusterMsgDataPublish) - 8 + ntohl(hdr->data.publish.msg.channel_len) +
ntohl(hdr->data.publish.msg.message_len);
clusterMsgDataPublish *publish_data;
if (is_light) {
clusterMsgLight *hdr_light = (clusterMsgLight *)link->rcvbuf;
publish_data = &hdr_light->data.publish.msg;
explen = sizeof(clusterMsgLight);
} else {
publish_data = &hdr->data.publish.msg;
explen = sizeof(clusterMsg);
}
explen -= sizeof(union clusterMsgData);
explen +=
sizeof(clusterMsgDataPublish) - 8 + ntohl(publish_data->channel_len) + ntohl(publish_data->message_len);
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST || type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK ||
type == CLUSTERMSG_TYPE_MFSTART) {
explen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
@ -2925,8 +3038,24 @@ int clusterProcessPacket(clusterLink *link) {
}
clusterMsg *hdr = (clusterMsg *)link->rcvbuf;
uint16_t type = ntohs(hdr->type);
mstime_t now = mstime();
int is_light = IS_LIGHT_MESSAGE(ntohs(hdr->type));
uint16_t type = ntohs(hdr->type) & ~CLUSTERMSG_MODIFIER_MASK;
if (is_light) {
if (!link->node || nodeInHandshake(link->node)) {
freeClusterLink(link);
serverLog(
LL_NOTICE,
"Closing link for node that sent a lightweight message of type %hu as its first message on the link",
type);
return 0;
}
clusterNode *sender = link->node;
sender->data_received = now;
clusterProcessLightPacket(link, type);
return 1;
}
uint16_t flags = ntohs(hdr->flags);
uint64_t sender_claimed_current_epoch = 0, sender_claimed_config_epoch = 0;
@ -2939,6 +3068,15 @@ int clusterProcessPacket(clusterLink *link) {
sender->flags |= CLUSTER_NODE_EXTENSIONS_SUPPORTED;
}
/* Checks if the node supports light message hdr */
if (sender) {
if (flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED) {
sender->flags |= CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
} else {
sender->flags &= ~CLUSTER_NODE_LIGHT_HDR_SUPPORTED;
}
}
/* Update the last time we saw any data from this node. We
* use this in order to avoid detecting a timeout from a node that
* is just sending a lot of data in the cluster bus, for instance
@ -3285,22 +3423,7 @@ int clusterProcessPacket(clusterLink *link) {
}
} else if (type == CLUSTERMSG_TYPE_PUBLISH || type == CLUSTERMSG_TYPE_PUBLISHSHARD) {
if (!sender) return 1; /* We don't know that node. */
robj *channel, *message;
uint32_t channel_len, message_len;
/* Don't bother creating useless objects if there are no
* Pub/Sub subscribers. */
if ((type == CLUSTERMSG_TYPE_PUBLISH && serverPubsubSubscriptionCount() > 0) ||
(type == CLUSTERMSG_TYPE_PUBLISHSHARD && serverPubsubShardSubscriptionCount() > 0)) {
channel_len = ntohl(hdr->data.publish.msg.channel_len);
message_len = ntohl(hdr->data.publish.msg.message_len);
channel = createStringObject((char *)hdr->data.publish.msg.bulk_data, channel_len);
message = createStringObject((char *)hdr->data.publish.msg.bulk_data + channel_len, message_len);
pubsubPublishMessage(channel, message, type == CLUSTERMSG_TYPE_PUBLISHSHARD);
decrRefCount(channel);
decrRefCount(message);
}
clusterProcessPublishPacket(&hdr->data.publish.msg, type);
} else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) {
if (!sender) return 1; /* We don't know that node. */
clusterSendFailoverAuthIfNeeded(sender, hdr);
@ -3465,6 +3588,16 @@ void clusterLinkConnectHandler(connection *conn) {
serverLog(LL_DEBUG, "Connecting with Node %.40s at %s:%d", node->name, node->ip, node->cport);
}
/* Performs sanity check on the message signature and length depending on the type. */
static inline int isClusterMsgSignatureAndLengthValid(clusterMsg *hdr) {
if (memcmp(hdr->sig, "RCmb", 4) != 0) return 0;
uint16_t type = ntohs(hdr->type);
uint32_t totlen = ntohl(hdr->totlen);
uint32_t minlen = IS_LIGHT_MESSAGE(type) ? CLUSTERMSG_LIGHT_MIN_LEN : CLUSTERMSG_MIN_LEN;
if (totlen < minlen) return 0;
return 1;
}
/* Read data. Try to read the first field of the header first to check the
* full length of the packet. When a whole packet is in memory this function
* will call the function to process the packet. And so forth. */
@ -3477,17 +3610,17 @@ void clusterReadHandler(connection *conn) {
while (1) { /* Read as long as there is data to read. */
rcvbuflen = link->rcvbuf_len;
if (rcvbuflen < 8) {
/* First, obtain the first 8 bytes to get the full message
* length. */
readlen = 8 - rcvbuflen;
if (rcvbuflen < RCVBUF_MIN_READ_LEN) {
/* First, obtain the first 16 bytes to get the full message
* length and type. */
readlen = RCVBUF_MIN_READ_LEN - rcvbuflen;
} else {
/* Finally read the full message. */
hdr = (clusterMsg *)link->rcvbuf;
if (rcvbuflen == 8) {
if (rcvbuflen == RCVBUF_MIN_READ_LEN) {
/* Perform some sanity check on the message signature
* and length. */
if (memcmp(hdr->sig, "RCmb", 4) != 0 || ntohl(hdr->totlen) < CLUSTERMSG_MIN_LEN) {
if (!isClusterMsgSignatureAndLengthValid(hdr)) {
char ip[NET_IP_STR_LEN];
int port;
if (connAddrPeerName(conn, ip, sizeof(ip), &port) == -1) {
@ -3534,7 +3667,7 @@ void clusterReadHandler(connection *conn) {
}
/* Total length obtained? Process this packet. */
if (rcvbuflen >= 8 && rcvbuflen == ntohl(hdr->totlen)) {
if (rcvbuflen >= RCVBUF_MIN_READ_LEN && rcvbuflen == ntohl(hdr->totlen)) {
if (clusterProcessPacket(link)) {
if (link->rcvbuf_alloc > RCVBUF_INIT_LEN) {
size_t prev_rcvbuf_alloc = link->rcvbuf_alloc;
@ -3594,6 +3727,18 @@ void clusterBroadcastMessage(clusterMsgSendBlock *msgblock) {
dictReleaseIterator(di);
}
static void clusterBuildMessageHdrLight(clusterMsgLight *hdr, int type, size_t msglen) {
hdr->ver = htons(CLUSTER_PROTO_VER);
hdr->sig[0] = 'R';
hdr->sig[1] = 'C';
hdr->sig[2] = 'm';
hdr->sig[3] = 'b';
hdr->type = htons(type);
hdr->notused1 = 0;
hdr->notused2 = 0;
hdr->totlen = htonl(msglen);
}
/* Build the message header. hdr must point to a buffer at least
* sizeof(clusterMsg) in bytes. */
static void clusterBuildMessageHdr(clusterMsg *hdr, int type, size_t msglen) {
@ -3861,23 +4006,38 @@ void clusterBroadcastPong(int target) {
* the 'bulk_data', sanitizer generates an out-of-bounds error which is a false
* positive in this context. */
VALKEY_NO_SANITIZE("bounds")
clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message, uint16_t type) {
clusterMsgSendBlock *clusterCreatePublishMsgBlock(robj *channel, robj *message, int is_light, int is_sharded) {
uint32_t channel_len, message_len;
uint16_t type = is_sharded ? CLUSTERMSG_TYPE_PUBLISHSHARD : CLUSTERMSG_TYPE_PUBLISH;
channel = getDecodedObject(channel);
message = getDecodedObject(message);
channel_len = sdslen(channel->ptr);
message_len = sdslen(message->ptr);
size_t msglen;
size_t msglen = sizeof(clusterMsg) - sizeof(union clusterMsgData);
if (is_light) {
/* We set the MSB for message that needs to sent using light header */
type |= CLUSTERMSG_LIGHT;
msglen = sizeof(clusterMsgLight);
} else {
msglen = sizeof(clusterMsg);
}
msglen -= sizeof(union clusterMsgData);
msglen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
clusterMsgSendBlock *msgblock = createClusterMsgSendBlock(type, msglen);
clusterMsg *hdr = &msgblock->msg;
hdr->data.publish.msg.channel_len = htonl(channel_len);
hdr->data.publish.msg.message_len = htonl(message_len);
memcpy(hdr->data.publish.msg.bulk_data, channel->ptr, sdslen(channel->ptr));
memcpy(hdr->data.publish.msg.bulk_data + sdslen(channel->ptr), message->ptr, sdslen(message->ptr));
clusterMsgDataPublish *hdr_data_msg;
if (is_light) {
clusterMsgLight *hdr_light = &msgblock->msg_light;
hdr_data_msg = &hdr_light->data.publish.msg;
} else {
clusterMsg *hdr = &msgblock->msg;
hdr_data_msg = &hdr->data.publish.msg;
}
hdr_data_msg->channel_len = htonl(channel_len);
hdr_data_msg->message_len = htonl(message_len);
memcpy(hdr_data_msg->bulk_data, channel->ptr, sdslen(channel->ptr));
memcpy(hdr_data_msg->bulk_data + sdslen(channel->ptr), message->ptr, sdslen(message->ptr));
decrRefCount(channel);
decrRefCount(message);
@ -3979,27 +4139,32 @@ int clusterSendModuleMessageToTarget(const char *target,
* Publish this message across the slot (primary/replica).
* -------------------------------------------------------------------------- */
void clusterPropagatePublish(robj *channel, robj *message, int sharded) {
clusterMsgSendBlock *msgblock;
if (!sharded) {
msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISH);
clusterBroadcastMessage(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock);
return;
clusterMsgSendBlock *msgblock, *msgblock_light;
msgblock_light = clusterCreatePublishMsgBlock(channel, message, 1, sharded);
/* We will only create msgblock with normal hdr if there are any nodes that do not support light hdr */
msgblock = NULL;
ClusterNodeIterator iter;
if (sharded) {
clusterNodeIterInitMyShard(&iter);
} else {
clusterNodeIterInitAllNodes(&iter);
}
listIter li;
listNode *ln;
list *nodes_for_slot = clusterGetNodesInMyShard(server.cluster->myself);
serverAssert(nodes_for_slot != NULL);
listRewind(nodes_for_slot, &li);
msgblock = clusterCreatePublishMsgBlock(channel, message, CLUSTERMSG_TYPE_PUBLISHSHARD);
while ((ln = listNext(&li))) {
clusterNode *node = listNodeValue(ln);
clusterNode *node;
while ((node = clusterNodeIterNext(&iter)) != NULL) {
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
clusterSendMessage(node->link, msgblock);
if (nodeSupportsLightMsgHdr(node)) {
clusterSendMessage(node->link, msgblock_light);
} else {
if (msgblock == NULL) {
msgblock = clusterCreatePublishMsgBlock(channel, message, 0, sharded);
}
clusterSendMessage(node->link, msgblock);
}
}
clusterMsgSendBlockDecrRefCount(msgblock);
clusterNodeIterReset(&iter);
if (msgblock != NULL) clusterMsgSendBlockDecrRefCount(msgblock);
clusterMsgSendBlockDecrRefCount(msgblock_light);
}
/* -----------------------------------------------------------------------------

View File

@ -50,8 +50,9 @@ typedef struct clusterLink {
#define CLUSTER_NODE_NOADDR (1 << 6) /* We don't know the address of this node */
#define CLUSTER_NODE_MEET (1 << 7) /* Send a MEET message to this node */
#define CLUSTER_NODE_MIGRATE_TO (1 << 8) /* Primary eligible for replica migration. */
#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* replica will not try to failover. */
#define CLUSTER_NODE_NOFAILOVER (1 << 9) /* Replica will not try to failover. */
#define CLUSTER_NODE_EXTENSIONS_SUPPORTED (1 << 10) /* This node supports extensions. */
#define CLUSTER_NODE_LIGHT_HDR_SUPPORTED (1 << 11) /* This node supports light pubsub message header. */
#define CLUSTER_NODE_NULL_NAME \
"\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000" \
"\000\000\000\000\000\000\000\000\000\000\000\000"
@ -64,6 +65,7 @@ typedef struct clusterLink {
#define nodeFailed(n) ((n)->flags & CLUSTER_NODE_FAIL)
#define nodeCantFailover(n) ((n)->flags & CLUSTER_NODE_NOFAILOVER)
#define nodeSupportsExtensions(n) ((n)->flags & CLUSTER_NODE_EXTENSIONS_SUPPORTED)
#define nodeSupportsLightMsgHdr(n) ((n)->flags & CLUSTER_NODE_LIGHT_HDR_SUPPORTED)
/* This structure represent elements of node->fail_reports. */
typedef struct clusterNodeFailReport {
@ -92,6 +94,13 @@ typedef struct clusterNodeFailReport {
#define CLUSTERMSG_TYPE_PUBLISHSHARD 10 /* Pub/Sub Publish shard propagation */
#define CLUSTERMSG_TYPE_COUNT 11 /* Total number of message types. */
#define CLUSTERMSG_LIGHT 0x8000 /* Modifier bit for message types that support light header */
#define CLUSTERMSG_MODIFIER_MASK (CLUSTERMSG_LIGHT) /* Modifier mask for header types. (if we add more in the future) */
/* We check for the modifier bit to determine if the message is sent using light header.*/
#define IS_LIGHT_MESSAGE(type) ((type) & CLUSTERMSG_LIGHT)
/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
* address for all the next messages. */
@ -289,6 +298,26 @@ static_assert(offsetof(clusterMsg, data) == 2256, "unexpected field offset");
primary is up. */
#define CLUSTERMSG_FLAG0_EXT_DATA (1 << 2) /* Message contains extension data */
typedef struct {
char sig[4]; /* Signature "RCmb" (Cluster message bus). */
uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to CLUSTER_PROTO_VER. */
uint16_t notused1;
uint16_t type; /* Message type */
uint16_t notused2;
union clusterMsgData data;
} clusterMsgLight;
static_assert(offsetof(clusterMsgLight, sig) == offsetof(clusterMsg, sig), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, totlen) == offsetof(clusterMsg, totlen), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, ver) == offsetof(clusterMsg, ver), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, notused1) == offsetof(clusterMsg, port), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, type) == offsetof(clusterMsg, type), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, notused2) == offsetof(clusterMsg, count), "unexpected field offset");
static_assert(offsetof(clusterMsgLight, data) == 16, "unexpected field offset");
#define CLUSTERMSG_LIGHT_MIN_LEN (sizeof(clusterMsgLight) - sizeof(union clusterMsgData))
struct _clusterNode {
mstime_t ctime; /* Node object creation time. */
char name[CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */