Cluster: UPDATE msg data structure and sending function.

This commit is contained in:
antirez 2013-11-08 16:26:50 +01:00
parent 6c6572be95
commit dc43f66eac
2 changed files with 36 additions and 3 deletions

View File

@ -59,6 +59,7 @@ void clusterSetMaster(clusterNode *n);
void clusterHandleSlaveFailover(void); void clusterHandleSlaveFailover(void);
int bitmapTestBit(unsigned char *bitmap, int pos); int bitmapTestBit(unsigned char *bitmap, int pos);
void clusterDoBeforeSleep(int flags); void clusterDoBeforeSleep(int flags);
void clusterSendUpdate(clusterLink *link, clusterNode *node);
/* ----------------------------------------------------------------------------- /* -----------------------------------------------------------------------------
* Initialization * Initialization
@ -1181,8 +1182,11 @@ int clusterProcessPacket(clusterLink *link) {
if (server.cluster->slots[j]->configEpoch > if (server.cluster->slots[j]->configEpoch >
senderConfigEpoch) senderConfigEpoch)
{ {
printf("MASTER or SLAVE have old config\n"); redisLog(REDIS_WARNING,
break; "Node %.40s has old slots configuration, sending "
"an UPDATE message about %.40s\n",
sender->name, server.cluster->slots[j]->name);
clusterSendUpdate(sender->link,server.cluster->slots[j]);
} }
} }
} }
@ -1413,6 +1417,9 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) {
if (type == CLUSTERMSG_TYPE_FAIL) { if (type == CLUSTERMSG_TYPE_FAIL) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataFail); totlen += sizeof(clusterMsgDataFail);
} else if (type == CLUSTERMSG_TYPE_UPDATE) {
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataUpdate);
} }
hdr->totlen = htonl(totlen); hdr->totlen = htonl(totlen);
/* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */ /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */
@ -1562,6 +1569,20 @@ void clusterSendFail(char *nodename) {
clusterBroadcastMessage(buf,ntohl(hdr->totlen)); clusterBroadcastMessage(buf,ntohl(hdr->totlen));
} }
/* Send an UPDATE message to the specified link carrying the specified 'node'
* slots configuration. The node name, slots bitmap, and configEpoch info
* are included. */
void clusterSendUpdate(clusterLink *link, clusterNode *node) {
unsigned char buf[4096];
clusterMsg *hdr = (clusterMsg*) buf;
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_UPDATE);
memcpy(hdr->data.update.nodecfg.nodename,node->name,REDIS_CLUSTER_NAMELEN);
hdr->data.update.nodecfg.configEpoch = htonu64(node->configEpoch);
memcpy(hdr->data.update.nodecfg.slots,node->slots,sizeof(node->slots));
clusterSendMessage(link,buf,ntohl(hdr->totlen));
}
/* ----------------------------------------------------------------------------- /* -----------------------------------------------------------------------------
* CLUSTER Pub/Sub support * CLUSTER Pub/Sub support
* *

View File

@ -112,7 +112,8 @@ typedef struct clusterState {
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */ #define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propagation */ #define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propagation */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */ #define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you can failover. */ #define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you have my vote */
#define CLUSTERMSG_TYPE_UPDATE 7 /* Another node slots configuration */
/* Initially we don't know our "name", but we'll find it once we connect /* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this * to the first node, using the getsockname() function. Then we'll use this
@ -137,6 +138,12 @@ typedef struct {
unsigned char bulk_data[8]; /* defined as 8 just for alignment concerns. */ unsigned char bulk_data[8]; /* defined as 8 just for alignment concerns. */
} clusterMsgDataPublish; } clusterMsgDataPublish;
typedef struct {
uint64_t configEpoch; /* Config epoch of the specified instance. */
char nodename[REDIS_CLUSTER_NAMELEN]; /* Name of the slots owner. */
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* Slots bitmap. */
} clusterMsgDataUpdate;
union clusterMsgData { union clusterMsgData {
/* PING, MEET and PONG */ /* PING, MEET and PONG */
struct { struct {
@ -153,6 +160,11 @@ union clusterMsgData {
struct { struct {
clusterMsgDataPublish msg; clusterMsgDataPublish msg;
} publish; } publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
}; };
typedef struct { typedef struct {