Signed-off-by: Harkrishn Patro <harkrisp@amazon.com>
This commit is contained in:
Harkrishn Patro 2025-01-31 22:55:42 +00:00
parent 6a2edda02f
commit cb970e6626
2 changed files with 19 additions and 13 deletions

View File

@ -4397,7 +4397,8 @@ static clusterMsgSendBlock *createModuleMsgBlock(int64_t module_id, uint8_t type
*
* If link is NULL, then the message is broadcasted to the whole cluster. */
void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, const char *payload, uint32_t len) {
clusterMsgSendBlock *msgblock = NULL, *msgblock_light = NULL;
clusterMsgSendBlock *msgblock[CLUSTERMSG_HDR_NUM];
memset(msgblock, 0, sizeof(msgblock));
ClusterNodeIterator iter;
if (link) {
@ -4409,21 +4410,18 @@ void clusterSendModule(clusterLink *link, uint64_t module_id, uint8_t type, cons
clusterNode *node;
while ((node = clusterNodeIterNext(&iter)) != NULL) {
if (node->flags & (CLUSTER_NODE_MYSELF | CLUSTER_NODE_HANDSHAKE)) continue;
if (nodeSupportsLightMsgHdrForModule(node)) {
if (msgblock_light == NULL) {
msgblock_light = createModuleMsgBlock(module_id, type, payload, len, 1);
}
clusterSendMessage(node->link, msgblock_light);
} else {
if (msgblock == NULL) {
msgblock = createModuleMsgBlock(module_id, type, payload, len, 0);
}
clusterSendMessage(node->link, msgblock);
int is_light = nodeSupportsLightMsgHdrForModule(node) ? CLUSTERMSG_HDR_LIGHT : CLUSTERMSG_HDR_NORMAL;
if (msgblock[is_light] == NULL) {
msgblock[is_light] = createModuleMsgBlock(module_id, type, payload, len, is_light);
}
clusterSendMessage(node->link, msgblock[is_light]);
}
clusterNodeIterReset(&iter);
if (msgblock != NULL) clusterMsgSendBlockDecrRefCount(msgblock);
if (msgblock_light != NULL) clusterMsgSendBlockDecrRefCount(msgblock_light);
for (int hdr_type = CLUSTERMSG_HDR_NORMAL; hdr_type < CLUSTERMSG_HDR_NUM; hdr_type++) {
if (msgblock[hdr_type]) {
clusterMsgSendBlockDecrRefCount(msgblock[hdr_type]);
}
}
}
/* This function gets a cluster node ID string as target, the same way the nodes

View File

@ -106,6 +106,14 @@ typedef struct clusterNodeFailReport {
/* We check for the modifier bit to determine if the message is sent using light header.*/
#define IS_LIGHT_MESSAGE(type) ((type) & CLUSTERMSG_LIGHT)
/* Types of header supported over the cluster bus. */
typedef enum {
CLUSTERMSG_HDR_NORMAL = 0, /* This corresponds to `clusterMsg` struct. */
CLUSTERMSG_HDR_LIGHT, /* This corresponds to `clusterMsgLight` struct. */
CLUSTERMSG_HDR_NUM, /* Overall count of header type supported. */
} clusterMsgHdrType;
/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
* address for all the next messages. */