Cluster: configEpoch added in cluster nodes description.

This commit is contained in:
antirez 2013-09-25 11:47:13 +02:00
parent da257afe57
commit 12483b0061
2 changed files with 31 additions and 2 deletions

View File

@ -59,6 +59,23 @@ int bitmapTestBit(unsigned char *bitmap, int pos);
* Initialization * Initialization
* -------------------------------------------------------------------------- */ * -------------------------------------------------------------------------- */
/* This function is called at startup in order to set the currentEpoch
* (which is not saved on permanent storage) to the greatest configEpoch found
* in the loaded nodes (configEpoch is stored on permanent storage as soon as
* it changes for some node). */
void clusterSetStartupEpoch() {
dictIterator *di;
dictEntry *de;
di = dictGetSafeIterator(server.cluster->nodes);
while((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);
if (node->configEpoch > server.cluster->currentEpoch)
server.cluster->currentEpoch = node->configEpoch;
}
dictReleaseIterator(di);
}
int clusterLoadConfig(char *filename) { int clusterLoadConfig(char *filename) {
FILE *fp = fopen(filename,"r"); FILE *fp = fopen(filename,"r");
char *line; char *line;
@ -143,8 +160,11 @@ int clusterLoadConfig(char *filename) {
if (atoi(argv[4])) n->ping_sent = time(NULL); if (atoi(argv[4])) n->ping_sent = time(NULL);
if (atoi(argv[5])) n->pong_received = time(NULL); if (atoi(argv[5])) n->pong_received = time(NULL);
/* Set configEpoch for this node. */
n->configEpoch = strtoull(argv[6],NULL,10);
/* Populate hash slots served by this instance. */ /* Populate hash slots served by this instance. */
for (j = 7; j < argc; j++) { for (j = 8; j < argc; j++) {
int start, stop; int start, stop;
if (argv[j][0] == '[') { if (argv[j][0] == '[') {
@ -189,6 +209,7 @@ int clusterLoadConfig(char *filename) {
redisAssert(server.cluster->myself != NULL); redisAssert(server.cluster->myself != NULL);
redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s", redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s",
server.cluster->myself->name); server.cluster->myself->name);
clusterSetStartupEpoch();
clusterUpdateState(); clusterUpdateState();
return REDIS_OK; return REDIS_OK;
@ -230,6 +251,7 @@ void clusterInit(void) {
server.cluster = zmalloc(sizeof(clusterState)); server.cluster = zmalloc(sizeof(clusterState));
server.cluster->myself = NULL; server.cluster->myself = NULL;
server.cluster->currentEpoch = 0;
server.cluster->state = REDIS_CLUSTER_FAIL; server.cluster->state = REDIS_CLUSTER_FAIL;
server.cluster->size = 1; server.cluster->size = 1;
server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL);
@ -360,6 +382,7 @@ clusterNode *createClusterNode(char *nodename, int flags) {
else else
getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN); getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN);
node->ctime = time(NULL); node->ctime = time(NULL);
node->configEpoch = 0;
node->flags = flags; node->flags = flags;
memset(node->slots,0,sizeof(node->slots)); memset(node->slots,0,sizeof(node->slots));
node->numslots = 0; node->numslots = 0;
@ -2071,9 +2094,10 @@ sds clusterGenNodesDescription(int filter) {
ci = sdscatprintf(ci,"- "); ci = sdscatprintf(ci,"- ");
/* Latency from the POV of this node, link status */ /* Latency from the POV of this node, link status */
ci = sdscatprintf(ci,"%ld %ld %s", ci = sdscatprintf(ci,"%ld %ld %llu %s",
(long) node->ping_sent, (long) node->ping_sent,
(long) node->pong_received, (long) node->pong_received,
(unsigned long long) node->configEpoch,
(node->link || node->flags & REDIS_NODE_MYSELF) ? (node->link || node->flags & REDIS_NODE_MYSELF) ?
"connected" : "disconnected"); "connected" : "disconnected");

View File

@ -617,6 +617,7 @@ struct clusterNode {
time_t ctime; /* Node object creation time. */ time_t ctime; /* Node object creation time. */
char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */
int flags; /* REDIS_NODE_... */ int flags; /* REDIS_NODE_... */
uint64_t configEpoch; /* Last configEpoch observed for this node */
unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */ unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */
int numslots; /* Number of slots handled by this node */ int numslots; /* Number of slots handled by this node */
int numslaves; /* Number of slave nodes, if this is a master */ int numslaves; /* Number of slave nodes, if this is a master */
@ -634,6 +635,7 @@ typedef struct clusterNode clusterNode;
typedef struct { typedef struct {
clusterNode *myself; /* This node */ clusterNode *myself; /* This node */
uint64_t currentEpoch;
int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */ int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */
int size; /* Num of master nodes with at least one slot */ int size; /* Num of master nodes with at least one slot */
dict *nodes; /* Hash table of name -> clusterNode structures */ dict *nodes; /* Hash table of name -> clusterNode structures */
@ -707,6 +709,9 @@ typedef struct {
uint64_t time; /* Time at which this request was sent (in milliseconds), uint64_t time; /* Time at which this request was sent (in milliseconds),
this field is copied in reply messages so that the this field is copied in reply messages so that the
original sender knows how old the reply is. */ original sender knows how old the reply is. */
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
uint64_t configEpoch; /* The config epoch if it's a master, or the last epoch
advertised by its master if it is a slave. */
char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */ char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */
unsigned char myslots[REDIS_CLUSTER_SLOTS/8]; unsigned char myslots[REDIS_CLUSTER_SLOTS/8];
char slaveof[REDIS_CLUSTER_NAMELEN]; char slaveof[REDIS_CLUSTER_NAMELEN];