From 98d1253053f580f53c077597d328dec81e9c6998 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 09:26:36 +0200 Subject: [PATCH 01/33] htonu64() and ntohu64 added to endianconv.h. --- src/endianconv.h | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/endianconv.h b/src/endianconv.h index 7afe61c62..7c16e175f 100644 --- a/src/endianconv.h +++ b/src/endianconv.h @@ -61,4 +61,14 @@ uint64_t intrev64(uint64_t v); #define intrev64ifbe(v) intrev64(v) #endif +/* The functions htonu64() and ntohu64() convert the specified value to + * network byte ordering and back. In big endian systems they are no-ops. */ +#if (BYTE_ORDER == BIG_ENDIAN) +#define htonu64(v) (v) +#define ntohu64(v) (v) +#else +#define htonu64(v) intrev64(v) +#define ntohu64(v) intrev64(v) +#endif + #endif From cdf4eede587ab7fada61c8c3abe695fc26b69200 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 11:47:13 +0200 Subject: [PATCH 02/33] Cluster: configEpoch added in cluster nodes description. --- src/cluster.c | 28 ++++++++++++++++++++++++++-- src/redis.h | 5 +++++ 2 files changed, 31 insertions(+), 2 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e562c00c1..43a8133f9 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -59,6 +59,23 @@ int bitmapTestBit(unsigned char *bitmap, int pos); * Initialization * -------------------------------------------------------------------------- */ +/* This function is called at startup in order to set the currentEpoch + * (which is not saved on permanent storage) to the greatest configEpoch found + * in the loaded nodes (configEpoch is stored on permanent storage as soon as + * it changes for some node). */ +void clusterSetStartupEpoch() { + dictIterator *di; + dictEntry *de; + + di = dictGetSafeIterator(server.cluster->nodes); + while((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node->configEpoch > server.cluster->currentEpoch) + server.cluster->currentEpoch = node->configEpoch; + } + dictReleaseIterator(di); +} + int clusterLoadConfig(char *filename) { FILE *fp = fopen(filename,"r"); char *line; @@ -143,8 +160,11 @@ int clusterLoadConfig(char *filename) { if (atoi(argv[4])) n->ping_sent = time(NULL); if (atoi(argv[5])) n->pong_received = time(NULL); + /* Set configEpoch for this node. */ + n->configEpoch = strtoull(argv[6],NULL,10); + /* Populate hash slots served by this instance. */ - for (j = 7; j < argc; j++) { + for (j = 8; j < argc; j++) { int start, stop; if (argv[j][0] == '[') { @@ -189,6 +209,7 @@ int clusterLoadConfig(char *filename) { redisAssert(server.cluster->myself != NULL); redisLog(REDIS_NOTICE,"Node configuration loaded, I'm %.40s", server.cluster->myself->name); + clusterSetStartupEpoch(); clusterUpdateState(); return REDIS_OK; @@ -230,6 +251,7 @@ void clusterInit(void) { server.cluster = zmalloc(sizeof(clusterState)); server.cluster->myself = NULL; + server.cluster->currentEpoch = 0; server.cluster->state = REDIS_CLUSTER_FAIL; server.cluster->size = 1; server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); @@ -360,6 +382,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { else getRandomHexChars(node->name, REDIS_CLUSTER_NAMELEN); node->ctime = time(NULL); + node->configEpoch = 0; node->flags = flags; memset(node->slots,0,sizeof(node->slots)); node->numslots = 0; @@ -2071,9 +2094,10 @@ sds clusterGenNodesDescription(int filter) { ci = sdscatprintf(ci,"- "); /* Latency from the POV of this node, link status */ - ci = sdscatprintf(ci,"%ld %ld %s", + ci = sdscatprintf(ci,"%ld %ld %llu %s", (long) node->ping_sent, (long) node->pong_received, + (unsigned long long) node->configEpoch, (node->link || node->flags & REDIS_NODE_MYSELF) ? "connected" : "disconnected"); diff --git a/src/redis.h b/src/redis.h index 6e19ea3bd..cd2495fd8 100644 --- a/src/redis.h +++ b/src/redis.h @@ -617,6 +617,7 @@ struct clusterNode { time_t ctime; /* Node object creation time. */ char name[REDIS_CLUSTER_NAMELEN]; /* Node name, hex string, sha1-size */ int flags; /* REDIS_NODE_... */ + uint64_t configEpoch; /* Last configEpoch observed for this node */ unsigned char slots[REDIS_CLUSTER_SLOTS/8]; /* slots handled by this node */ int numslots; /* Number of slots handled by this node */ int numslaves; /* Number of slave nodes, if this is a master */ @@ -634,6 +635,7 @@ typedef struct clusterNode clusterNode; typedef struct { clusterNode *myself; /* This node */ + uint64_t currentEpoch; int state; /* REDIS_CLUSTER_OK, REDIS_CLUSTER_FAIL, ... */ int size; /* Num of master nodes with at least one slot */ dict *nodes; /* Hash table of name -> clusterNode structures */ @@ -707,6 +709,9 @@ typedef struct { uint64_t time; /* Time at which this request was sent (in milliseconds), this field is copied in reply messages so that the original sender knows how old the reply is. */ + uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ + uint64_t configEpoch; /* The config epoch if it's a master, or the last epoch + advertised by its master if it is a slave. */ char sender[REDIS_CLUSTER_NAMELEN]; /* Name of the sender node */ unsigned char myslots[REDIS_CLUSTER_SLOTS/8]; char slaveof[REDIS_CLUSTER_NAMELEN]; From 1adf457b5bfe59c780b363e7290e590b2943ab39 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 11:53:35 +0200 Subject: [PATCH 03/33] Cluster: broadcast currentEpoch and configEpoch in packets header. --- src/cluster.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 43a8133f9..f1cdb915e 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1321,12 +1321,20 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { hdr->flags = htons(server.cluster->myself->flags); hdr->state = server.cluster->state; + /* Set the currentEpoch and configEpochs. Note that configEpoch is + * set to the master configEpoch if this node is a slave. */ + hdr->currentEpoch = htonu64(server.cluster->currentEpoch); + if (server.cluster->myself->flags & REDIS_NODE_SLAVE) + hdr->configEpoch = htonu64(server.cluster->myself->slaveof->configEpoch); + else + hdr->configEpoch = htonu64(server.cluster->myself->configEpoch); + if (type == CLUSTERMSG_TYPE_FAIL) { totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); totlen += sizeof(clusterMsgDataFail); } hdr->totlen = htonl(totlen); - /* For PING, PONG, and MEET, fixing the totlen field is up to the caller */ + /* For PING, PONG, and MEET, fixing the totlen field is up to the caller. */ } /* Send a PING or PONG packet to the specified node, making sure to add enough From 6dbd939a2451e7e3176227bd3318a8d62a552a65 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 12:36:29 +0200 Subject: [PATCH 04/33] Cluster: update our currentEpoch when a greater one is seen. --- src/cluster.c | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index f1cdb915e..c5837647f 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -873,6 +873,7 @@ int clusterProcessPacket(clusterLink *link) { uint32_t totlen = ntohl(hdr->totlen); uint16_t type = ntohs(hdr->type); uint16_t flags = ntohs(hdr->flags); + uint64_t senderCurrentEpoch, senderConfigEpoch; clusterNode *sender; redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", @@ -909,9 +910,17 @@ int clusterProcessPacket(clusterLink *link) { if (totlen != explen) return 1; } - /* Process packets by type. */ + /* Check if the sender is known. + * If it is, update our currentEpoch to its epoch if greater than our. */ sender = clusterLookupNode(hdr->sender); + if (sender && !(sender->flags & REDIS_NODE_HANDSHAKE)) { + senderCurrentEpoch = ntohu64(hdr->currentEpoch); + senderConfigEpoch = ntohu64(hdr->configEpoch); + if (senderCurrentEpoch > server.cluster->currentEpoch) + server.cluster->currentEpoch = senderCurrentEpoch; + } + /* Process packets by type. */ if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { int update_config = 0; redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node); From 24b28941941fb364fb41e12c0a61c1423d6b9003 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 12:38:36 +0200 Subject: [PATCH 05/33] Cluster: add currentEpoch to CLUSTER INFO. --- src/cluster.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index c5837647f..ede00f793 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -2393,13 +2393,15 @@ void clusterCommand(redisClient *c) { "cluster_slots_fail:%d\r\n" "cluster_known_nodes:%lu\r\n" "cluster_size:%d\r\n" + "cluster_current_epoch:%llu\r\n" , statestr[server.cluster->state], slots_assigned, slots_ok, slots_pfail, slots_fail, dictSize(server.cluster->nodes), - server.cluster->size + server.cluster->size, + (unsigned long long) server.cluster->currentEpoch ); addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", (unsigned long)sdslen(info))); From 2cac667a8b68238354404f80e5d4a1c2d1b93e3b Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 12:44:47 +0200 Subject: [PATCH 06/33] Cluster: fix redis-trib for added configEpoch field in CLUSTER NODES. --- src/redis-trib.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/redis-trib.rb b/src/redis-trib.rb index 96fdddc1c..1dc18e957 100755 --- a/src/redis-trib.rb +++ b/src/redis-trib.rb @@ -118,7 +118,7 @@ class ClusterNode nodes.each{|n| # name addr flags role ping_sent ping_recv link_status slots split = n.split - name,addr,flags,role,ping_sent,ping_recv,link_status = split[0..6] + name,addr,flags,role,ping_sent,ping_recv,config_epoch,link_status = split[0..6] slots = split[7..-1] info = { :name => name, From d392f33abb632abf59db43d6cb4639ec22f02766 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 25 Sep 2013 12:51:01 +0200 Subject: [PATCH 07/33] Cluster: fix redis-trib node config fingerprinting for new nodes format. --- src/redis-trib.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/redis-trib.rb b/src/redis-trib.rb index 1dc18e957..4b7acea42 100755 --- a/src/redis-trib.rb +++ b/src/redis-trib.rb @@ -119,7 +119,7 @@ class ClusterNode # name addr flags role ping_sent ping_recv link_status slots split = n.split name,addr,flags,role,ping_sent,ping_recv,config_epoch,link_status = split[0..6] - slots = split[7..-1] + slots = split[8..-1] info = { :name => name, :addr => addr, @@ -230,7 +230,7 @@ class ClusterNode config = [] @r.cluster("nodes").each_line{|l| s = l.split - slots = s[7..-1].select {|x| x[0..0] != "["} + slots = s[8..-1].select {|x| x[0..0] != "["} next if slots.length == 0 config << s[0]+":"+(slots.sort.join(",")) } From f94165009152a93e95d6bae078850852326e901c Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Sep 2013 11:13:17 +0200 Subject: [PATCH 08/33] Cluster: slave node now uses the new protocol to get elected. --- src/cluster.c | 52 ++++++++++++++++++++++++++++++++++++++------------ src/redis.h | 11 ++++++++--- src/sentinel.c | 2 -- 3 files changed, 48 insertions(+), 17 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index ede00f793..9ce2905e8 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -257,6 +257,7 @@ void clusterInit(void) { server.cluster->nodes = dictCreate(&clusterNodesDictType,NULL); server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; + server.cluster->failover_auth_epoch = 0; memset(server.cluster->migrating_slots_to,0, sizeof(server.cluster->migrating_slots_to)); memset(server.cluster->importing_slots_from,0, @@ -1581,16 +1582,22 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * * The gaol of this function is: * 1) To check if we are able to perform a failover, is our data updated? - * 2) Ask reachable masters the authorization to perform the failover. + * 2) Try to get elected by masters. * 3) Check if there is the majority of masters agreeing we should failover. * 4) Perform the failover informing all the other nodes. */ void clusterHandleSlaveFailover(void) { time_t data_age = server.unixtime - server.repl_down_since; - time_t auth_age = server.unixtime - server.cluster->failover_auth_time; + mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1; int j; + /* Remove the node timeout from the data age as it is fine that we are + * disconnected from our master at least for the time it was down to be + * flagged as FAIL, that's the baseline. */ + if (data_age > server.cluster_node_timeout) + data_age -= server.cluster_node_timeout; + /* Check if our data is recent enough. For now we just use a fixed * constant of ten times the node timeout since the cluster should * react much faster to a master down. */ @@ -1598,19 +1605,37 @@ void clusterHandleSlaveFailover(void) { server.cluster_node_timeout * REDIS_CLUSTER_SLAVE_VALIDITY_MULT) return; - /* TODO: check if we are the first slave as well? Or just rely on the - * master authorization? */ - - /* Ask masters if we are authorized to perform the failover. If there - * is a pending auth request that's too old, reset it. */ + /* Compute the time at which we can start an election. */ if (server.cluster->failover_auth_time == 0 || auth_age > - server.cluster_node_timeout * REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT) + server.cluster_node_timeout * 1000 * REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT) { - redisLog(REDIS_WARNING,"Asking masters if I can failover..."); - server.cluster->failover_auth_time = time(NULL); + server.cluster->failover_auth_time = mstime() + + 500 + /* Fixed delay of 500 milliseconds, let FAIL msg propagate. */ + data_age * 100 + /* Add 100 milliseconds for every second of age. */ + random() % 500; /* Random delay between 0 and 500 milliseconds. */ server.cluster->failover_auth_count = 0; + server.cluster->failover_auth_sent = 0; + redisLog(REDIS_WARNING,"Start of election delayed for %lld milliseconds.", + server.cluster->failover_auth_time - mstime()); + return; + } + + /* Return ASAP if we can't still start the election. */ + if (mstime() < server.cluster->failover_auth_time) return; + + /* Return ASAP if the election is too old to be valid. */ + if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout) + return; + + /* Ask for votes if needed. */ + if (server.cluster->failover_auth_sent == 0) { + server.cluster->currentEpoch++; + server.cluster->failover_auth_epoch = server.cluster->currentEpoch; + redisLog(REDIS_WARNING,"Starting a failover election for epoch %llu.", + server.cluster->currentEpoch); clusterRequestFailoverAuth(); + server.cluster->failover_auth_sent = 1; return; /* Wait for replies. */ } @@ -1619,7 +1644,7 @@ void clusterHandleSlaveFailover(void) { clusterNode *oldmaster = server.cluster->myself->slaveof; redisLog(REDIS_WARNING, - "Masters quorum reached: failing over my (failing) master."); + "Failover election won: failing over my (failing) master."); /* We have the quorum, perform all the steps to correctly promote * this slave to a master. * @@ -1644,7 +1669,10 @@ void clusterHandleSlaveFailover(void) { * accordingly and detect that we switched to master role. */ clusterBroadcastPong(); - /* 4) Update state and save config. */ + /* 4) Update my configEpoch to the epoch of the election. */ + server.cluster->myself->configEpoch = server.cluster->failover_auth_epoch; + + /* 5) Update state and save config. */ clusterUpdateState(); clusterSaveConfigOrDie(); } diff --git a/src/redis.h b/src/redis.h index cd2495fd8..2b7ca7a04 100644 --- a/src/redis.h +++ b/src/redis.h @@ -368,6 +368,8 @@ * Data types *----------------------------------------------------------------------------*/ +typedef long long mstime_t; /* millisecond time type. */ + /* A redis object, that is a type able to hold a string / list / set */ /* The actual Redis Object */ @@ -581,7 +583,7 @@ typedef struct redisOpArray { #define REDIS_CLUSTER_FAIL_UNDO_TIME_MULT 2 /* Undo fail if master is back. */ #define REDIS_CLUSTER_FAIL_UNDO_TIME_ADD 10 /* Some additional time. */ #define REDIS_CLUSTER_SLAVE_VALIDITY_MULT 10 /* Slave data validity. */ -#define REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT 1 /* Auth request retry time. */ +#define REDIS_CLUSTER_FAILOVER_AUTH_RETRY_MULT 4 /* Auth request retry time. */ #define REDIS_CLUSTER_FAILOVER_DELAY 5 /* Seconds */ struct clusterNode; @@ -643,8 +645,11 @@ typedef struct { clusterNode *importing_slots_from[REDIS_CLUSTER_SLOTS]; clusterNode *slots[REDIS_CLUSTER_SLOTS]; zskiplist *slots_to_keys; - int failover_auth_time; /* Time at which we sent the AUTH request. */ - int failover_auth_count; /* Number of authorizations received. */ + /* The following fields are used to take the slave state on elections. */ + mstime_t failover_auth_time;/* Time at which we'll try to get elected in ms. */ + int failover_auth_count; /* Number of votes received so far. */ + int failover_auth_sent; /* True if we already asked for votes. */ + uint64_t failover_auth_epoch; /* Epoch of the current election. */ } clusterState; /* Redis cluster messages header */ diff --git a/src/sentinel.c b/src/sentinel.c index b257ad685..4bea156d6 100644 --- a/src/sentinel.c +++ b/src/sentinel.c @@ -43,8 +43,6 @@ extern char **environ; /* ======================== Sentinel global state =========================== */ -typedef long long mstime_t; /* millisecond time type. */ - /* Address object, used to describe an ip:port pair. */ typedef struct sentinelAddr { char *ip; From 3bd69bcdf11a555c0282d86b2de13d1ac3c12d0b Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Sep 2013 13:00:41 +0200 Subject: [PATCH 09/33] Cluster: master node now uses new protocol to vote. --- src/cluster.c | 69 +++++++++++++++++++++++++-------------------------- src/redis.h | 3 +++ 2 files changed, 37 insertions(+), 35 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 9ce2905e8..24d3efe33 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -258,6 +258,7 @@ void clusterInit(void) { server.cluster->failover_auth_time = 0; server.cluster->failover_auth_count = 0; server.cluster->failover_auth_epoch = 0; + server.cluster->last_vote_epoch = 0; memset(server.cluster->migrating_slots_to,0, sizeof(server.cluster->migrating_slots_to)); memset(server.cluster->importing_slots_from,0, @@ -396,6 +397,7 @@ clusterNode *createClusterNode(char *nodename, int flags) { memset(node->ip,0,sizeof(node->ip)); node->port = 0; node->fail_reports = listCreate(); + node->voted_time = 0; listSetFreeMethod(node->fail_reports,zfree); return node; } @@ -1178,15 +1180,18 @@ int clusterProcessPacket(clusterLink *link) { } } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST) { if (!sender) return 1; /* We don't know that node. */ - /* If we are not a master, ignore that message at all. */ - if (!(server.cluster->myself->flags & REDIS_NODE_MASTER)) return 0; clusterSendFailoverAuthIfNeeded(sender,hdr); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */ - /* If this is a master, increment the number of acknowledges - * we received so far. */ - if (sender->flags & REDIS_NODE_MASTER) + /* We consider this vote only if the sender if a master serving + * a non zero number of slots, with the currentEpoch that is equal + * to our currentEpoch. */ + if (sender->flags & REDIS_NODE_MASTER && + sender->numslots > 0 && + senderCurrentEpoch == server.cluster->currentEpoch) + { server.cluster->failover_auth_count++; + } } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); } @@ -1538,43 +1543,38 @@ void clusterSendFailoverAuth(clusterNode *node, uint64_t reqtime) { clusterSendMessage(node->link,buf,totlen); } -/* If we believe 'node' is the "first slave" of it's master, reply with - * a FAILOVER_AUTH_GRANTED packet. - * The 'request' field points to the authorization request packet header, we - * need it in order to copy back the 'time' field in our reply. - * - * To be a first slave the sender must: - * 1) Be a slave. - * 2) Its master should be in FAIL state. - * 3) Ordering all the slaves IDs for its master by run-id, it should be the - * first (the smallest) among the ones not in FAIL / PFAIL state. - */ +/* Vote for the node asking for our vote if there are the conditions. */ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { - char first[REDIS_CLUSTER_NAMELEN]; clusterNode *master = node->slaveof; - int j; + uint64_t requestEpoch = ntohu64(request->currentEpoch); - /* Node is a slave? Its master is down? */ + /* IF we are not a master serving at least 1 slot, we don't have the + * right to vote, as the cluster size in Redis Cluster is the number + * of masters serving at least one slot, and quorum is the cluster size + 1 */ + if (!(server.cluster->myself->flags & REDIS_NODE_MASTER)) return; + if (server.cluster->myself->numslots == 0) return; + + /* Request epoch must be >= our currentEpoch. */ + if (requestEpoch < server.cluster->currentEpoch) return; + + /* I already voted for this epoch? Return ASAP. */ + if (server.cluster->last_vote_epoch == server.cluster->currentEpoch) return; + + /* Node must be a slave and its master down. */ if (!(node->flags & REDIS_NODE_SLAVE) || master == NULL || !(master->flags & REDIS_NODE_FAIL)) return; - /* Iterate all the master slaves to check what's the first one. */ - memset(first,0xff,sizeof(first)); - for (j = 0; j < master->numslaves; j++) { - clusterNode *slave = master->slaves[j]; + /* We did not voted for a slave about this master for two + * times the node timeout. This is not strictly needed for correctness + * of the algorithm but makes the base case more linear. */ + if (server.unixtime - node->slaveof->voted_time < + server.cluster_node_timeout * 2) return; - if (slave->flags & (REDIS_NODE_FAIL|REDIS_NODE_PFAIL)) continue; - if (memcmp(slave->name,first,sizeof(first)) < 0) { - memcpy(first,slave->name,sizeof(first)); - } - } - - /* Is 'node' the first slave? */ - if (memcmp(node->name,first,sizeof(first)) != 0) return; - - /* We can send the packet. */ + /* We can vote for this slave. */ clusterSendFailoverAuth(node,request->time); + server.cluster->last_vote_epoch = server.cluster->currentEpoch; + node->slaveof->voted_time = server.unixtime; } /* This function is called if we are a slave node and our master serving @@ -1583,8 +1583,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * The gaol of this function is: * 1) To check if we are able to perform a failover, is our data updated? * 2) Try to get elected by masters. - * 3) Check if there is the majority of masters agreeing we should failover. - * 4) Perform the failover informing all the other nodes. + * 3) Perform the failover informing all the other nodes. */ void clusterHandleSlaveFailover(void) { time_t data_age = server.unixtime - server.repl_down_since; diff --git a/src/redis.h b/src/redis.h index 2b7ca7a04..66c751a12 100644 --- a/src/redis.h +++ b/src/redis.h @@ -628,6 +628,7 @@ struct clusterNode { time_t ping_sent; /* Unix time we sent latest ping */ time_t pong_received; /* Unix time we received the pong */ time_t fail_time; /* Unix time when FAIL flag was set */ + time_t voted_time; /* Last time we voted for a slave of this master */ char ip[REDIS_IP_STR_LEN]; /* Latest known IP address of this node */ int port; /* Latest known port of this node */ clusterLink *link; /* TCP/IP link with this node */ @@ -650,6 +651,8 @@ typedef struct { int failover_auth_count; /* Number of votes received so far. */ int failover_auth_sent; /* True if we already asked for votes. */ uint64_t failover_auth_epoch; /* Epoch of the current election. */ + /* The followign fields are uesd by masters to take state on elections. */ + uint64_t last_vote_epoch; /* Epoch of the last vote granted. */ } clusterState; /* Redis cluster messages header */ From 7dfa4c59810014ddef09ff502151dc00234c99b1 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Sep 2013 13:28:19 +0200 Subject: [PATCH 10/33] Cluster: removed an old source of delay to start the slave failover. --- src/cluster.c | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 24d3efe33..d85086069 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1624,7 +1624,7 @@ void clusterHandleSlaveFailover(void) { if (mstime() < server.cluster->failover_auth_time) return; /* Return ASAP if the election is too old to be valid. */ - if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout) + if (mstime() - server.cluster->failover_auth_time > server.cluster_node_timeout * 1000) return; /* Ask for votes if needed. */ @@ -1835,16 +1835,10 @@ void clusterCron(void) { } /* If we are a slave and our master is down, but is serving slots, - * call the function that handles the failover. - * This function is called with a small delay in order to let the - * FAIL message to propagate after failure detection, this is not - * strictly required but makes 99.99% of failovers mechanically - * simpler. */ + * call the function that handles the failover. */ if (server.cluster->myself->flags & REDIS_NODE_SLAVE && server.cluster->myself->slaveof && server.cluster->myself->slaveof->flags & REDIS_NODE_FAIL && - (server.unixtime - server.cluster->myself->slaveof->fail_time) > - REDIS_CLUSTER_FAILOVER_DELAY && server.cluster->myself->slaveof->numslots != 0) { clusterHandleSlaveFailover(); From c8d6bc94e4cd074af0679f17ac7cd0ff821eb449 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 26 Sep 2013 16:54:43 +0200 Subject: [PATCH 11/33] Cluster: react faster when a slave wins an election. --- src/cluster.c | 55 +++++++++++++++++++++++++++++++++++++-------------- src/redis.c | 3 +++ src/redis.h | 4 +++- 3 files changed, 46 insertions(+), 16 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index d85086069..dd496be4c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -53,6 +53,7 @@ int clusterDelSlot(int slot); int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); void clusterSetMaster(clusterNode *n); +void clusterHandleSlaveFailover(void); int bitmapTestBit(unsigned char *bitmap, int pos); /* ----------------------------------------------------------------------------- @@ -1191,6 +1192,9 @@ int clusterProcessPacket(clusterLink *link) { senderCurrentEpoch == server.cluster->currentEpoch) { server.cluster->failover_auth_count++; + /* Maybe we reached a quorum here, set a flag to make sure + * we check ASAP. */ + server.cluster->handle_slave_failover_asap++; } } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); @@ -1291,7 +1295,11 @@ void clusterReadHandler(aeEventLoop *el, int fd, void *privdata, int mask) { } } -/* Put stuff into the send buffer. */ +/* Put stuff into the send buffer. + * + * It is guaranteed that this function will never have as a side effect + * the link to be invalidated, so it is safe to call this function + * from event handlers that will do stuff with the same link later. */ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { if (sdslen(link->sndbuf) == 0 && msglen != 0) aeCreateFileEvent(server.el,link->fd,AE_WRITABLE, @@ -1301,7 +1309,11 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { } /* Send a message to all the nodes that are part of the cluster having - * a connected link. */ + * a connected link. + * + * It is guaranteed that this function will never have as a side effect + * some node->link to be invalidated, so it is safe to call this function + * from event handlers that will do stuff with node links later. */ void clusterBroadcastMessage(void *buf, size_t len) { dictIterator *di; dictEntry *de; @@ -1416,10 +1428,11 @@ void clusterSendPing(clusterLink *link, int type) { clusterSendMessage(link,buf,totlen); } -/* Send a PONG packet to every connected node that's not in handshake state. +/* Send a PONG packet to every connected node that's not in handshake state + * and for which we have a valid link. * - * In Redis Cluster pings are not just used for failure detection, but also - * to carry important configuration informations. So broadcasting a pong is + * In Redis Cluster pongs are not used just for failure detection, but also + * to carry important configuration information. So broadcasting a pong is * useful when something changes in the configuration and we want to make * the cluster aware ASAP (for instance after a slave promotion). */ void clusterBroadcastPong(void) { @@ -1430,6 +1443,7 @@ void clusterBroadcastPong(void) { while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); + if (!node->link) continue; if (node->flags & (REDIS_NODE_MYSELF|REDIS_NODE_HANDSHAKE)) continue; clusterSendPing(node->link,CLUSTERMSG_TYPE_PONG); } @@ -1591,6 +1605,15 @@ void clusterHandleSlaveFailover(void) { int needed_quorum = (server.cluster->size / 2) + 1; int j; + /* Pre conditions to run the function: + * 1) We are a slave. + * 2) Our master is flagged as FAIL. + * 3) It is serving slots. */ + if (!(server.cluster->myself->flags & REDIS_NODE_SLAVE) || + server.cluster->myself->slaveof == NULL || + !(server.cluster->myself->slaveof->flags & REDIS_NODE_FAIL) || + server.cluster->myself->slaveof->numslots == 0) return; + /* Remove the node timeout from the data age as it is fine that we are * disconnected from our master at least for the time it was down to be * flagged as FAIL, that's the baseline. */ @@ -1834,19 +1857,21 @@ void clusterCron(void) { server.cluster->myself->slaveof->port); } - /* If we are a slave and our master is down, but is serving slots, - * call the function that handles the failover. */ - if (server.cluster->myself->flags & REDIS_NODE_SLAVE && - server.cluster->myself->slaveof && - server.cluster->myself->slaveof->flags & REDIS_NODE_FAIL && - server.cluster->myself->slaveof->numslots != 0) - { - clusterHandleSlaveFailover(); - } - + clusterHandleSlaveFailover(); if (update_state) clusterUpdateState(); } +/* This function is called before the event handler returns to sleep for + * events. It is useful to perform operations that must be done ASAP in + * reaction to events fired but that are not safe to perform inside event + * handlers. */ +void clusterBeforeSleep(void) { + if (server.cluster->handle_slave_failover_asap) { + clusterHandleSlaveFailover(); + server.cluster->handle_slave_failover_asap = 0; + } +} + /* ----------------------------------------------------------------------------- * Slots management * -------------------------------------------------------------------------- */ diff --git a/src/redis.c b/src/redis.c index 2792f1393..bd547cd37 100644 --- a/src/redis.c +++ b/src/redis.c @@ -1203,6 +1203,9 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Write the AOF buffer on disk */ flushAppendOnlyFile(0); + + /* Call the Redis Cluster before sleep function. */ + if (server.cluster_enabled) clusterBeforeSleep(); } /* =========================== Server initialization ======================== */ diff --git a/src/redis.h b/src/redis.h index 66c751a12..5883bd382 100644 --- a/src/redis.h +++ b/src/redis.h @@ -647,12 +647,13 @@ typedef struct { clusterNode *slots[REDIS_CLUSTER_SLOTS]; zskiplist *slots_to_keys; /* The following fields are used to take the slave state on elections. */ - mstime_t failover_auth_time;/* Time at which we'll try to get elected in ms. */ + mstime_t failover_auth_time;/* Time at which we'll try to get elected in ms*/ int failover_auth_count; /* Number of votes received so far. */ int failover_auth_sent; /* True if we already asked for votes. */ uint64_t failover_auth_epoch; /* Epoch of the current election. */ /* The followign fields are uesd by masters to take state on elections. */ uint64_t last_vote_epoch; /* Epoch of the last vote granted. */ + int handle_slave_failover_asap; /* Call clusterHandleSlaveFailover() ASAP. */ } clusterState; /* Redis cluster messages header */ @@ -1380,6 +1381,7 @@ void clusterCron(void); clusterNode *getNodeByQuery(redisClient *c, struct redisCommand *cmd, robj **argv, int argc, int *hashslot, int *ask); void clusterPropagatePublish(robj *channel, robj *message); void migrateCloseTimedoutSockets(void); +void clusterBeforeSleep(void); /* Sentinel */ void initSentinelConfig(void); From 8fa4e7817a4f0d16a83963e07280685ee0722833 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 27 Sep 2013 09:55:41 +0200 Subject: [PATCH 12/33] Cluster: update the node configEpoch when newer is detected. --- src/cluster.c | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index dd496be4c..7aad66542 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -914,14 +914,17 @@ int clusterProcessPacket(clusterLink *link) { if (totlen != explen) return 1; } - /* Check if the sender is known. - * If it is, update our currentEpoch to its epoch if greater than our. */ + /* Check if the sender is a known node. */ sender = clusterLookupNode(hdr->sender); if (sender && !(sender->flags & REDIS_NODE_HANDSHAKE)) { + /* Update our curretEpoch if we see a newer epoch in the cluster. */ senderCurrentEpoch = ntohu64(hdr->currentEpoch); senderConfigEpoch = ntohu64(hdr->configEpoch); if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; + /* Update the sender configEpoch if it is publishing a newer one. */ + if (senderConfigEpoch > sender->configEpoch) + sender->configEpoch = senderConfigEpoch; } /* Process packets by type. */ @@ -1999,8 +2002,14 @@ void clusterUpdateState(void) { } /* If we can't reach at least half the masters, change the cluster state - * as FAIL, as we are not even able to mark nodes as FAIL in this side - * of the netsplit because of lack of majority. */ + * to FAIL, as we are not even able to mark nodes as FAIL in this side + * of the netsplit because of lack of majority. + * + * TODO: when this condition is entered, we should not undo it for some + * (small) time after the majority is reachable again, to make sure that + * other nodes have enough time to inform this node of a configuration change. + * Otherwise a client with an old routing table may write to this node + * and later it may turn into a slave losing the write. */ { int needed_quorum = (server.cluster->size / 2) + 1; From 5d393adeac8251ff54dbf4eef4c199dde2862d5b Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 10:13:07 +0200 Subject: [PATCH 13/33] Cluster: fsync data when saving the cluster config. --- src/cluster.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/cluster.c b/src/cluster.c index 7aad66542..4751c78f7 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -231,6 +231,7 @@ int clusterSaveConfig(void) { if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT|O_TRUNC,0644)) == -1) goto err; if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err; + fsync(fd); close(fd); sdsfree(ci); return 0; From 0b63dc2841984afe5a3d913e51aba9cb21c5f486 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 10:13:33 +0200 Subject: [PATCH 14/33] Cluster: when upading the configEpoch for a node, save config on disk ASAP. --- src/cluster.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 4751c78f7..f0a6ddeb4 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -924,8 +924,10 @@ int clusterProcessPacket(clusterLink *link) { if (senderCurrentEpoch > server.cluster->currentEpoch) server.cluster->currentEpoch = senderCurrentEpoch; /* Update the sender configEpoch if it is publishing a newer one. */ - if (senderConfigEpoch > sender->configEpoch) + if (senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; + clusterSaveConfigOrDie(); + } } /* Process packets by type. */ From 2a391b8bac975b0f547064010226b71278c4e22c Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 10:13:58 +0200 Subject: [PATCH 15/33] Cluster: re-order failover operations to make it safer. We need to: 1) Increment the configEpoch. 2) Save it to disk and fsync the file. 3) Broadcast the PONG with the new configuration. If other nodes will receive the updated configuration we need to be sure to restart with this new config in the event of a crash. --- src/cluster.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index f0a6ddeb4..e8ee45c40 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1693,16 +1693,16 @@ void clusterHandleSlaveFailover(void) { } } - /* 3) Pong all the other nodes so that they can update the state - * accordingly and detect that we switched to master role. */ - clusterBroadcastPong(); - - /* 4) Update my configEpoch to the epoch of the election. */ + /* 3) Update my configEpoch to the epoch of the election. */ server.cluster->myself->configEpoch = server.cluster->failover_auth_epoch; - /* 5) Update state and save config. */ + /* 4) Update state and save config. */ clusterUpdateState(); clusterSaveConfigOrDie(); + + /* 5) Pong all the other nodes so that they can update the state + * accordingly and detect that we switched to master role. */ + clusterBroadcastPong(); } } From 1239f490659397dd4eaf3c73baa8baf06833af3e Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 11:44:23 +0200 Subject: [PATCH 16/33] Cluster: detect cluster reconfiguration when master slots drop to 0. The old algorithm used a PROMOTED flag and explicitly checks about slave->master convertions. Wit the new cluster meta-data propagation algorithm we just look at the configEpoch to check if we need to reconfigure slots, then: 1) If a node is a master but it reaches zero served slots becuase of reconfiguration. 2) If a node is a slave but the master reaches zero served slots because of a reconfiguration. We switch as a replica of the new slots owner. --- src/cluster.c | 77 ++++++++++++++++++++++++--------------------------- 1 file changed, 36 insertions(+), 41 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e8ee45c40..a5faa1b86 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -135,8 +135,6 @@ int clusterLoadConfig(char *filename) { n->flags |= REDIS_NODE_HANDSHAKE; } else if (!strcasecmp(s,"noaddr")) { n->flags |= REDIS_NODE_NOADDR; - } else if (!strcasecmp(s,"promoted")) { - n->flags |= REDIS_NODE_PROMOTED; } else if (!strcasecmp(s,"noflags")) { /* nothing to do */ } else { @@ -755,7 +753,6 @@ void clusterProcessGossipSection(clusterMsg *hdr, clusterLink *link) { if (flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,"); if (flags & REDIS_NODE_HANDSHAKE) ci = sdscat(ci,"handshake,"); if (flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,"); - if (flags & REDIS_NODE_PROMOTED) ci = sdscat(ci,"promoted,"); if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' '; redisLog(REDIS_DEBUG,"GOSSIP %.40s %s:%d %s", @@ -1051,9 +1048,6 @@ int clusterProcessPacket(clusterLink *link) { { /* Node is a master. */ if (sender->flags & REDIS_NODE_SLAVE) { - /* Slave turned into master! */ - clusterNode *oldmaster = sender->slaveof; - /* Reconfigure node as master. */ if (sender->slaveof) clusterNodeRemoveSlave(sender->slaveof,sender); @@ -1061,29 +1055,6 @@ int clusterProcessPacket(clusterLink *link) { sender->flags |= REDIS_NODE_MASTER; sender->slaveof = NULL; - /* If this node used to be our slave, and now has the - * PROMOTED flag set. We'll turn ourself into a slave - * of the new master. */ - if (flags & REDIS_NODE_PROMOTED && - oldmaster == server.cluster->myself) - { - redisLog(REDIS_WARNING,"One of my slaves took my place. Reconfiguring myself as a replica of %.40s", sender->name); - clusterDelNodeSlots(server.cluster->myself); - clusterSetMaster(sender); - } - - /* If we are a slave, and this node used to be a slave - * of our master, and now has the PROMOTED flag set, we - * need to switch our replication setup over it. */ - if (flags & REDIS_NODE_PROMOTED && - server.cluster->myself->flags & REDIS_NODE_SLAVE && - server.cluster->myself->slaveof == oldmaster) - { - redisLog(REDIS_WARNING,"One of the slaves failed over my master. Reconfiguring myself as a replica of %.40s", sender->name); - clusterDelNodeSlots(server.cluster->myself); - clusterSetMaster(sender); - } - /* Update config and state. */ update_state = 1; update_config = 1; @@ -1125,26 +1096,55 @@ int clusterProcessPacket(clusterLink *link) { changes = memcmp(sender->slots,hdr->myslots,sizeof(hdr->myslots)) != 0; if (changes) { + clusterNode *curmaster, *newmaster = NULL; + + /* Here we set curmaster to this node or the node this node + * replicates to if it's a slave. In the for loop we are + * interested to check if slots are taken away from curmaster. */ + if (server.cluster->myself->flags & REDIS_NODE_MASTER) + curmaster = server.cluster->myself; + else + curmaster = server.cluster->myself->slaveof; + for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { if (bitmapTestBit(hdr->myslots,j)) { - /* If this slot was not served, or served by a node - * in FAIL state, update the table with the new node - * claiming to serve the slot. */ + /* We rebind the slot to the new node claiming it if: + * 1) The slot was unassigned. + * 2) The new node claims it with a greater configEpoch. */ if (server.cluster->slots[j] == sender) continue; if (server.cluster->slots[j] == NULL || - server.cluster->slots[j]->flags & REDIS_NODE_FAIL) + server.cluster->slots[j]->configEpoch < + senderConfigEpoch) { + if (server.cluster->slots[j] == curmaster) + newmaster = sender; clusterDelSlot(j); clusterAddSlot(sender,j); update_state = update_config = 1; } } else { /* This node claims to no longer handling the slot, - * however we don't change our config as this is likely - * happening because a resharding is in progress, and - * it already knows where to redirect clients. */ + * however we don't change our config as this is likely: + * 1) Rehashing in progress. + * 2) Failover. + * In both cases we'll be informed about who is serving + * the slot eventually. In the meantime it's up to the + * original owner to try to redirect our clients to the + * right node. */ } } + + /* If at least one slot was reassigned from a node to another node + * with a greater configEpoch, it is possible that: + * 1) We are a master is left without slots. This means that we were + * failed over and we should turn into a replica of the new + * master. + * 2) We are a slave and our master is left without slots. We need + * to replicate to the new slots owner. */ + if (newmaster && curmaster->numslots == 0) { + redisLog(REDIS_WARNING,"Configuration change detected. Reconfiguring myself as a replica of %.40s", sender->name); + clusterSetMaster(sender); + } } } @@ -1681,7 +1681,6 @@ void clusterHandleSlaveFailover(void) { server.cluster->myself); server.cluster->myself->flags &= ~REDIS_NODE_SLAVE; server.cluster->myself->flags |= REDIS_NODE_MASTER; - server.cluster->myself->flags |= REDIS_NODE_PROMOTED; server.cluster->myself->slaveof = NULL; replicationUnsetMaster(); @@ -2109,9 +2108,6 @@ void clusterSetMaster(clusterNode *n) { myself->flags &= ~REDIS_NODE_MASTER; myself->flags |= REDIS_NODE_SLAVE; } - /* Clear the promoted flag anyway if we are a slave, to ensure it will - * be set only when the node turns into a master because of fail over. */ - myself->flags &= ~REDIS_NODE_PROMOTED; myself->slaveof = n; replicationSetMaster(n->ip, n->port); } @@ -2159,7 +2155,6 @@ sds clusterGenNodesDescription(int filter) { if (node->flags & REDIS_NODE_FAIL) ci = sdscat(ci,"fail,"); if (node->flags & REDIS_NODE_HANDSHAKE) ci =sdscat(ci,"handshake,"); if (node->flags & REDIS_NODE_NOADDR) ci = sdscat(ci,"noaddr,"); - if (node->flags & REDIS_NODE_PROMOTED) ci = sdscat(ci,"promoted,"); if (ci[sdslen(ci)-1] == ',') ci[sdslen(ci)-1] = ' '; /* Slave of... or just "-" */ From ec3bd0695b69bf7f1db43a099342e7281a9594b3 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 11:48:09 +0200 Subject: [PATCH 17/33] Make clear that runids are not cluster node IDs. --- src/replication.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 81323cb60..0b75f4eeb 100644 --- a/src/replication.c +++ b/src/replication.c @@ -343,7 +343,7 @@ int masterTryPartialResynchronization(redisClient *c) { /* Run id "?" is used by slaves that want to force a full resync. */ if (master_runid[0] != '?') { redisLog(REDIS_NOTICE,"Partial resynchronization not accepted: " - "Runid mismatch (Client asked for '%s', I'm '%s')", + "Runid mismatch (Client asked for runid '%s', my runid is '%s')", master_runid, server.runid); } else { redisLog(REDIS_NOTICE,"Full resync requested by slave."); From 60d4ae49be51a9be88d69162624a996e13e9969f Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 11:51:58 +0200 Subject: [PATCH 18/33] Cluster: log message shortened. --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index a5faa1b86..a174a964b 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1672,7 +1672,7 @@ void clusterHandleSlaveFailover(void) { clusterNode *oldmaster = server.cluster->myself->slaveof; redisLog(REDIS_WARNING, - "Failover election won: failing over my (failing) master."); + "Failover election won: I'm the new master."); /* We have the quorum, perform all the steps to correctly promote * this slave to a master. * From 0b3a8f20727e0b87d7f0a83ba3b147c49626016a Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 11:53:18 +0200 Subject: [PATCH 19/33] Add REWRITE to CONFIG subcommands help message. --- src/config.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/config.c b/src/config.c index 3a14a6e94..2707bfaeb 100644 --- a/src/config.c +++ b/src/config.c @@ -1786,7 +1786,7 @@ void configCommand(redisClient *c) { } } else { addReplyError(c, - "CONFIG subcommand must be one of GET, SET, RESETSTAT"); + "CONFIG subcommand must be one of GET, SET, RESETSTAT, REWRITE"); } return; From 6ed0dee927f8476418f7330ab561dfa8e8753721 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 30 Sep 2013 16:19:44 +0200 Subject: [PATCH 20/33] Cluster: time field removed from cluster messages header. The new algorithm does not check replies time as checking for the currentEpoch in the reply ensures that the reply is about the current election process. --- src/cluster.c | 10 +++------- src/redis.h | 3 --- 2 files changed, 3 insertions(+), 10 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index a174a964b..42c4acb9b 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1543,14 +1543,11 @@ void clusterRequestFailoverAuth(void) { clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); hdr->totlen = htonl(totlen); - hdr->time = mstime(); clusterBroadcastMessage(buf,totlen); } -/* Send a FAILOVER_AUTH_ACK message to the specified node. - * Reqtime is the time field from the original failover auth request packet, - * so that the receiver is able to check the reply age. */ -void clusterSendFailoverAuth(clusterNode *node, uint64_t reqtime) { +/* Send a FAILOVER_AUTH_ACK message to the specified node. */ +void clusterSendFailoverAuth(clusterNode *node) { unsigned char buf[4096]; clusterMsg *hdr = (clusterMsg*) buf; uint32_t totlen; @@ -1559,7 +1556,6 @@ void clusterSendFailoverAuth(clusterNode *node, uint64_t reqtime) { clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK); totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); hdr->totlen = htonl(totlen); - hdr->time = reqtime; clusterSendMessage(node->link,buf,totlen); } @@ -1592,7 +1588,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { server.cluster_node_timeout * 2) return; /* We can vote for this slave. */ - clusterSendFailoverAuth(node,request->time); + clusterSendFailoverAuth(node); server.cluster->last_vote_epoch = server.cluster->currentEpoch; node->slaveof->voted_time = server.unixtime; } diff --git a/src/redis.h b/src/redis.h index 5883bd382..995198f66 100644 --- a/src/redis.h +++ b/src/redis.h @@ -715,9 +715,6 @@ typedef struct { uint32_t totlen; /* Total length of this message */ uint16_t type; /* Message type */ uint16_t count; /* Only used for some kind of messages. */ - uint64_t time; /* Time at which this request was sent (in milliseconds), - this field is copied in reply messages so that the - original sender knows how old the reply is. */ uint64_t currentEpoch; /* The epoch accordingly to the sending node. */ uint64_t configEpoch; /* The config epoch if it's a master, or the last epoch advertised by its master if it is a slave. */ From 0000cfbf3888bf591d7ed49de1848470029bdfa5 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 1 Oct 2013 15:40:20 +0200 Subject: [PATCH 21/33] Cluster: fix typo in clusterProcessPacket() comment. --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 42c4acb9b..e5aa043fa 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1190,7 +1190,7 @@ int clusterProcessPacket(clusterLink *link) { clusterSendFailoverAuthIfNeeded(sender,hdr); } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */ - /* We consider this vote only if the sender if a master serving + /* We consider this vote only if the sender is a master serving * a non zero number of slots, with the currentEpoch that is equal * to our currentEpoch. */ if (sender->flags & REDIS_NODE_MASTER && From 3be5010adb1abded30ab48b4222f681db9455d72 Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 1 Oct 2013 17:21:28 +0200 Subject: [PATCH 22/33] Cluster: senderCurrentEpoch == node currentEpoch was too strict. We can accept a vote as long as its epoch is >= the epoch at which we started the voting process. There is no need for it to be exactly the same. --- src/cluster.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index e5aa043fa..ab5283933 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1191,11 +1191,11 @@ int clusterProcessPacket(clusterLink *link) { } else if (type == CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK) { if (!sender) return 1; /* We don't know that node. */ /* We consider this vote only if the sender is a master serving - * a non zero number of slots, with the currentEpoch that is equal - * to our currentEpoch. */ + * a non zero number of slots, and its currentEpoch is greater or + * equal to epoch where this node started the election. */ if (sender->flags & REDIS_NODE_MASTER && sender->numslots > 0 && - senderCurrentEpoch == server.cluster->currentEpoch) + senderCurrentEpoch >= server.cluster->failover_auth_epoch) { server.cluster->failover_auth_count++; /* Maybe we reached a quorum here, set a flag to make sure From 90b06ab7b5b06cb098be5bd2727dd2f12b63053b Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 2 Oct 2013 09:42:35 +0200 Subject: [PATCH 23/33] Cluster: FAIL messages from unknown senders are handled better. Previously the event was not logged but instead the node reported an unknown packet type received. --- src/cluster.c | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index ab5283933..9328ab1be 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1154,20 +1154,26 @@ int clusterProcessPacket(clusterLink *link) { /* Update the cluster state if needed */ if (update_state) clusterUpdateState(); if (update_config) clusterSaveConfigOrDie(); - } else if (type == CLUSTERMSG_TYPE_FAIL && sender) { + } else if (type == CLUSTERMSG_TYPE_FAIL) { clusterNode *failing; - failing = clusterLookupNode(hdr->data.fail.about.nodename); - if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF))) - { + if (sender) { + failing = clusterLookupNode(hdr->data.fail.about.nodename); + if (failing && !(failing->flags & (REDIS_NODE_FAIL|REDIS_NODE_MYSELF))) + { + redisLog(REDIS_NOTICE, + "FAIL message received from %.40s about %.40s", + hdr->sender, hdr->data.fail.about.nodename); + failing->flags |= REDIS_NODE_FAIL; + failing->fail_time = time(NULL); + failing->flags &= ~REDIS_NODE_PFAIL; + clusterUpdateState(); + clusterSaveConfigOrDie(); + } + } else { redisLog(REDIS_NOTICE, - "FAIL message received from %.40s about %.40s", + "Ignoring FAIL message from unknonw node %.40s about %.40s", hdr->sender, hdr->data.fail.about.nodename); - failing->flags |= REDIS_NODE_FAIL; - failing->fail_time = time(NULL); - failing->flags &= ~REDIS_NODE_PFAIL; - clusterUpdateState(); - clusterSaveConfigOrDie(); } } else if (type == CLUSTERMSG_TYPE_PUBLISH) { robj *channel, *message; From 5cbb913994afeb6117559bf21dd83aba46389d4c Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 2 Oct 2013 10:10:08 +0200 Subject: [PATCH 24/33] Cluster: bus messages stats in CLUSTER info. --- src/cluster.c | 10 +++++++++- src/redis.h | 2 ++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 9328ab1be..c90112460 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -259,6 +259,8 @@ void clusterInit(void) { server.cluster->failover_auth_count = 0; server.cluster->failover_auth_epoch = 0; server.cluster->last_vote_epoch = 0; + server.cluster->stats_bus_messages_sent = 0; + server.cluster->stats_bus_messages_received = 0; memset(server.cluster->migrating_slots_to,0, sizeof(server.cluster->migrating_slots_to)); memset(server.cluster->importing_slots_from,0, @@ -878,6 +880,7 @@ int clusterProcessPacket(clusterLink *link) { uint64_t senderCurrentEpoch, senderConfigEpoch; clusterNode *sender; + server.cluster->stats_bus_messages_received++; redisLog(REDIS_DEBUG,"--- Processing packet of type %d, %lu bytes", type, (unsigned long) totlen); @@ -1318,6 +1321,7 @@ void clusterSendMessage(clusterLink *link, unsigned char *msg, size_t msglen) { clusterWriteHandler,link); link->sndbuf = sdscatlen(link->sndbuf, msg, msglen); + server.cluster->stats_bus_messages_sent++; } /* Send a message to all the nodes that are part of the cluster having @@ -2449,6 +2453,8 @@ void clusterCommand(redisClient *c) { "cluster_known_nodes:%lu\r\n" "cluster_size:%d\r\n" "cluster_current_epoch:%llu\r\n" + "cluster_stats_messages_sent:%lld\r\n" + "cluster_stats_messages_received:%lld\r\n" , statestr[server.cluster->state], slots_assigned, slots_ok, @@ -2456,7 +2462,9 @@ void clusterCommand(redisClient *c) { slots_fail, dictSize(server.cluster->nodes), server.cluster->size, - (unsigned long long) server.cluster->currentEpoch + (unsigned long long) server.cluster->currentEpoch, + server.cluster->stats_bus_messages_sent, + server.cluster->stats_bus_messages_received ); addReplySds(c,sdscatprintf(sdsempty(),"$%lu\r\n", (unsigned long)sdslen(info))); diff --git a/src/redis.h b/src/redis.h index 995198f66..844c4a323 100644 --- a/src/redis.h +++ b/src/redis.h @@ -654,6 +654,8 @@ typedef struct { /* The followign fields are uesd by masters to take state on elections. */ uint64_t last_vote_epoch; /* Epoch of the last vote granted. */ int handle_slave_failover_asap; /* Call clusterHandleSlaveFailover() ASAP. */ + long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */ + long long stats_bus_messages_received; /* Num of msg received via cluster bus. */ } clusterState; /* Redis cluster messages header */ From 43f3df99c897533edf12dbe94ee998487dcea773 Mon Sep 17 00:00:00 2001 From: antirez Date: Wed, 2 Oct 2013 12:27:12 +0200 Subject: [PATCH 25/33] Cluster: update cluster config when slave changes master. --- src/cluster.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/cluster.c b/src/cluster.c index c90112460..35b6a5b45 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1086,6 +1086,9 @@ int clusterProcessPacket(clusterLink *link) { clusterNodeRemoveSlave(sender->slaveof,sender); clusterNodeAddSlave(master,sender); sender->slaveof = master; + + /* Update config. */ + update_config = 1; } } } From dbf6c85d5e569d9342bb2169f520acc550e10623 Mon Sep 17 00:00:00 2001 From: antirez Date: Thu, 3 Oct 2013 09:55:20 +0200 Subject: [PATCH 26/33] Cluster: new clusterDoBeforeSleep() API. The new API is able to remember operations to perform before returning to the event loop, such as checking if there is the failover quorum for a slave, save and fsync the configuraiton file, and so forth. Because this operations are performed before returning on the event loop we are sure that messages that are sent in the same event loop run will be delivered *after* the configuration is already saved, that is a requirement sometimes. For instance we want to publish a new epoch only when it is already stored in nodes.conf in order to avoid returning back in the logical clock when a node is restarted. This new API provides a big performance advantage compared to saving and possibly fsyncing the configuration file multiple times in the same event loop run, especially in the case of big clusters with tens or hundreds of nodes. --- src/cluster.c | 127 ++++++++++++++++++++++++++------------------------ src/redis.h | 8 +++- 2 files changed, 72 insertions(+), 63 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 35b6a5b45..403ccf387 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -55,6 +55,7 @@ int clusterNodeSetSlotBit(clusterNode *n, int slot); void clusterSetMaster(clusterNode *n); void clusterHandleSlaveFailover(void); int bitmapTestBit(unsigned char *bitmap, int pos); +void clusterDoBeforeSleep(int flags); /* ----------------------------------------------------------------------------- * Initialization @@ -222,14 +223,14 @@ fmterr: * * This function writes the node config and returns 0, on error -1 * is returned. */ -int clusterSaveConfig(void) { +int clusterSaveConfig(int do_fsync) { sds ci = clusterGenNodesDescription(REDIS_NODE_HANDSHAKE); int fd; if ((fd = open(server.cluster_configfile,O_WRONLY|O_CREAT|O_TRUNC,0644)) == -1) goto err; if (write(fd,ci,sdslen(ci)) != (ssize_t)sdslen(ci)) goto err; - fsync(fd); + if (do_fsync) fsync(fd); close(fd); sdsfree(ci); return 0; @@ -239,8 +240,8 @@ err: return -1; } -void clusterSaveConfigOrDie(void) { - if (clusterSaveConfig() == -1) { +void clusterSaveConfigOrDie(int do_fsync) { + if (clusterSaveConfig(do_fsync) == -1) { redisLog(REDIS_WARNING,"Fatal: can't update cluster config file."); exit(1); } @@ -277,7 +278,7 @@ void clusterInit(void) { clusterAddNode(server.cluster->myself); saveconf = 1; } - if (saveconf) clusterSaveConfigOrDie(); + if (saveconf) clusterSaveConfigOrDie(1); /* We need a listening TCP port for our cluster messaging needs. */ server.cfd_count = 0; @@ -665,15 +666,13 @@ void markNodeAsFailingIfNeeded(clusterNode *node) { * reachable nodes to flag the node as FAIL. */ if (server.cluster->myself->flags & REDIS_NODE_MASTER) clusterSendFail(node->name); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); } /* This function is called only if a node is marked as FAIL, but we are able * to reach it again. It checks if there are the conditions to undo the FAIL * state. */ void clearNodeFailureIfNeeded(clusterNode *node) { - int changes = 0; time_t now = time(NULL); redisAssert(node->flags & REDIS_NODE_FAIL); @@ -685,7 +684,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { "Clear FAIL state for node %.40s: slave is already reachable.", node->name); node->flags &= ~REDIS_NODE_FAIL; - changes++; + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); } /* If it is a master and... @@ -705,13 +704,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { "Clear FAIL state for node %.40s: is reachable again and nobody is serving its slots after some time.", node->name); node->flags &= ~REDIS_NODE_FAIL; - changes++; - } - - /* Update state and save config. */ - if (changes) { - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); } } @@ -926,13 +919,12 @@ int clusterProcessPacket(clusterLink *link) { /* Update the sender configEpoch if it is publishing a newer one. */ if (senderConfigEpoch > sender->configEpoch) { sender->configEpoch = senderConfigEpoch; - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_FSYNC_CONFIG); } } /* Process packets by type. */ if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_MEET) { - int update_config = 0; redisLog(REDIS_DEBUG,"Ping packet received: %p", (void*)link->node); /* Add this node if it is new for us and the msg type is MEET. @@ -946,7 +938,7 @@ int clusterProcessPacket(clusterLink *link) { nodeIp2String(node->ip,link); node->port = ntohs(hdr->port); clusterAddNode(node); - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } /* Get info from the gossip section */ @@ -954,18 +946,12 @@ int clusterProcessPacket(clusterLink *link) { /* Anyway reply with a PONG */ clusterSendPing(link,CLUSTERMSG_TYPE_PONG); - - /* Update config if needed */ - if (update_config) clusterSaveConfigOrDie(); } /* PING or PONG: process config information. */ if (type == CLUSTERMSG_TYPE_PING || type == CLUSTERMSG_TYPE_PONG || type == CLUSTERMSG_TYPE_MEET) { - int update_state = 0; - int update_config = 0; - redisLog(REDIS_DEBUG,"%s packet received: %p", type == CLUSTERMSG_TYPE_PING ? "ping" : "pong", (void*)link->node); @@ -978,8 +964,8 @@ int clusterProcessPacket(clusterLink *link) { "Handshake error: we already know node %.40s, updating the address if needed.", sender->name); if (nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } /* Free this node as we alrady have it. This will * cause the link to be freed as well. */ @@ -994,7 +980,7 @@ int clusterProcessPacket(clusterLink *link) { link->node->name); link->node->flags &= ~REDIS_NODE_HANDSHAKE; link->node->flags |= flags&(REDIS_NODE_MASTER|REDIS_NODE_SLAVE); - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } else if (memcmp(link->node->name,hdr->sender, REDIS_CLUSTER_NAMELEN) != 0) { @@ -1006,7 +992,7 @@ int clusterProcessPacket(clusterLink *link) { link->node->ip[0] = '\0'; link->node->port = 0; freeClusterLink(link); - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); /* FIXME: remove this node if we already have it. * * If we already have it but the IP is different, use @@ -1021,8 +1007,7 @@ int clusterProcessPacket(clusterLink *link) { !(sender->flags & REDIS_NODE_HANDSHAKE) && nodeUpdateAddressIfNeeded(sender,link,ntohs(hdr->port))) { - update_state = 1; - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); } /* Update our info about the node */ @@ -1038,7 +1023,8 @@ int clusterProcessPacket(clusterLink *link) { * conditions detected by clearNodeFailureIfNeeded(). */ if (link->node->flags & REDIS_NODE_PFAIL) { link->node->flags &= ~REDIS_NODE_PFAIL; - update_state = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } else if (link->node->flags & REDIS_NODE_FAIL) { clearNodeFailureIfNeeded(link->node); } @@ -1059,8 +1045,8 @@ int clusterProcessPacket(clusterLink *link) { sender->slaveof = NULL; /* Update config and state. */ - update_state = 1; - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } } else { /* Node is a slave. */ @@ -1076,8 +1062,8 @@ int clusterProcessPacket(clusterLink *link) { if (sender->numslaves) clusterNodeResetSlaves(sender); /* Update config and state. */ - update_state = 1; - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE); } /* Master node changed for this slave? */ @@ -1088,7 +1074,7 @@ int clusterProcessPacket(clusterLink *link) { sender->slaveof = master; /* Update config. */ - update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG); } } } @@ -1126,7 +1112,9 @@ int clusterProcessPacket(clusterLink *link) { newmaster = sender; clusterDelSlot(j); clusterAddSlot(sender,j); - update_state = update_config = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); } } else { /* This node claims to no longer handling the slot, @@ -1150,16 +1138,15 @@ int clusterProcessPacket(clusterLink *link) { if (newmaster && curmaster->numslots == 0) { redisLog(REDIS_WARNING,"Configuration change detected. Reconfiguring myself as a replica of %.40s", sender->name); clusterSetMaster(sender); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); } } } /* Get info from the gossip section */ clusterProcessGossipSection(hdr,link); - - /* Update the cluster state if needed */ - if (update_state) clusterUpdateState(); - if (update_config) clusterSaveConfigOrDie(); } else if (type == CLUSTERMSG_TYPE_FAIL) { clusterNode *failing; @@ -1173,8 +1160,7 @@ int clusterProcessPacket(clusterLink *link) { failing->flags |= REDIS_NODE_FAIL; failing->fail_time = time(NULL); failing->flags &= ~REDIS_NODE_PFAIL; - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG|CLUSTER_TODO_UPDATE_STATE); } } else { redisLog(REDIS_NOTICE, @@ -1185,7 +1171,8 @@ int clusterProcessPacket(clusterLink *link) { robj *channel, *message; uint32_t channel_len, message_len; - /* Don't bother creating useless objects if there are no Pub/Sub subscribers. */ + /* Don't bother creating useless objects if there are no + * Pub/Sub subscribers. */ if (dictSize(server.pubsub_channels) || listLength(server.pubsub_patterns)) { channel_len = ntohl(hdr->data.publish.msg.channel_len); message_len = ntohl(hdr->data.publish.msg.message_len); @@ -1212,7 +1199,7 @@ int clusterProcessPacket(clusterLink *link) { server.cluster->failover_auth_count++; /* Maybe we reached a quorum here, set a flag to make sure * we check ASAP. */ - server.cluster->handle_slave_failover_asap++; + clusterDoBeforeSleep(CLUSTER_TODO_HANDLE_FAILOVER); } } else { redisLog(REDIS_WARNING,"Received unknown packet type: %d", type); @@ -1673,6 +1660,9 @@ void clusterHandleSlaveFailover(void) { server.cluster->currentEpoch); clusterRequestFailoverAuth(); server.cluster->failover_auth_sent = 1; + clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG| + CLUSTER_TODO_UPDATE_STATE| + CLUSTER_TODO_FSYNC_CONFIG); return; /* Wait for replies. */ } @@ -1706,7 +1696,7 @@ void clusterHandleSlaveFailover(void) { /* 4) Update state and save config. */ clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterSaveConfigOrDie(1); /* 5) Pong all the other nodes so that they can update the state * accordingly and detect that we switched to master role. */ @@ -1878,12 +1868,30 @@ void clusterCron(void) { /* This function is called before the event handler returns to sleep for * events. It is useful to perform operations that must be done ASAP in * reaction to events fired but that are not safe to perform inside event - * handlers. */ + * handlers, or to perform potentially expansive tasks that we need to do + * a single time before replying to clients. */ void clusterBeforeSleep(void) { - if (server.cluster->handle_slave_failover_asap) { + /* Handle failover, this is needed when it is likely that there is already + * the quorum from masters in order to react fast. */ + if (server.cluster->todo_before_sleep & CLUSTER_TODO_HANDLE_FAILOVER) clusterHandleSlaveFailover(); - server.cluster->handle_slave_failover_asap = 0; + + /* Update the cluster state. */ + if (server.cluster->todo_before_sleep & CLUSTER_TODO_UPDATE_STATE) + clusterUpdateState(); + + /* Save the config, possibly using fsync. */ + if (server.cluster->todo_before_sleep & CLUSTER_TODO_SAVE_CONFIG) { + int fsync = server.cluster->todo_before_sleep & CLUSTER_TODO_FSYNC_CONFIG; + clusterSaveConfigOrDie(fsync); } + + /* Reset our flags. */ + server.cluster->todo_before_sleep = 0; +} + +void clusterDoBeforeSleep(int flags) { + server.cluster->todo_before_sleep |= flags; } /* ----------------------------------------------------------------------------- @@ -2097,7 +2105,7 @@ int verifyClusterConfigWithData(void) { server.cluster->importing_slots_from[j] = server.cluster->slots[j]; } } - if (update_config) clusterSaveConfigOrDie(); + if (update_config) clusterSaveConfigOrDie(1); return REDIS_OK; } @@ -2296,8 +2304,7 @@ void clusterCommand(redisClient *c) { return; } clusterDelNodeSlots(server.cluster->myself); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if ((!strcasecmp(c->argv[1]->ptr,"addslots") || !strcasecmp(c->argv[1]->ptr,"delslots")) && c->argc >= 3) @@ -2347,8 +2354,7 @@ void clusterCommand(redisClient *c) { } } zfree(slots); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"setslot") && c->argc >= 4) { /* SETSLOT 10 MIGRATING */ @@ -2424,8 +2430,7 @@ void clusterCommand(redisClient *c) { addReplyError(c,"Invalid CLUSTER SETSLOT action or number of arguments"); return; } - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"info") && c->argc == 2) { /* CLUSTER INFO */ @@ -2474,7 +2479,7 @@ void clusterCommand(redisClient *c) { addReplySds(c,info); addReply(c,shared.crlf); } else if (!strcasecmp(c->argv[1]->ptr,"saveconfig") && c->argc == 2) { - int retval = clusterSaveConfig(); + int retval = clusterSaveConfig(1); if (retval == 0) addReply(c,shared.ok); @@ -2526,8 +2531,7 @@ void clusterCommand(redisClient *c) { return; } clusterDelNode(n); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else if (!strcasecmp(c->argv[1]->ptr,"replicate") && c->argc == 3) { /* CLUSTER REPLICATE */ @@ -2562,8 +2566,7 @@ void clusterCommand(redisClient *c) { /* Set the master. */ clusterSetMaster(n); - clusterUpdateState(); - clusterSaveConfigOrDie(); + clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); addReply(c,shared.ok); } else { addReplyError(c,"Wrong CLUSTER subcommand or number of arguments"); diff --git a/src/redis.h b/src/redis.h index 844c4a323..94decca9d 100644 --- a/src/redis.h +++ b/src/redis.h @@ -653,11 +653,17 @@ typedef struct { uint64_t failover_auth_epoch; /* Epoch of the current election. */ /* The followign fields are uesd by masters to take state on elections. */ uint64_t last_vote_epoch; /* Epoch of the last vote granted. */ - int handle_slave_failover_asap; /* Call clusterHandleSlaveFailover() ASAP. */ + int todo_before_sleep; /* Things to do in clusterBeforeSleep(). */ long long stats_bus_messages_sent; /* Num of msg sent via cluster bus. */ long long stats_bus_messages_received; /* Num of msg received via cluster bus. */ } clusterState; +/* clusterState todo_before_sleep flags. */ +#define CLUSTER_TODO_HANDLE_FAILOVER (1<<0) +#define CLUSTER_TODO_UPDATE_STATE (1<<1) +#define CLUSTER_TODO_SAVE_CONFIG (1<<2) +#define CLUSTER_TODO_FSYNC_CONFIG (1<<3) + /* Redis cluster messages header */ /* Note that the PING, PONG and MEET messages are actually the same exact From cd73a69c18c0b0c4c94a91cfb22da1ed2ece8c21 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 4 Oct 2013 12:25:09 +0200 Subject: [PATCH 27/33] PSYNC: safer handling of PSYNC requests. There was a bug that over-esteemed the amount of backlog available, however this could only happen when a slave was asking for an offset that was in the "future" compared to the master replication backlog. Now this case is handled well and logged as an incident in the master log file. --- src/replication.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/replication.c b/src/replication.c index 0b75f4eeb..bcc80b110 100644 --- a/src/replication.c +++ b/src/replication.c @@ -356,10 +356,14 @@ int masterTryPartialResynchronization(redisClient *c) { REDIS_OK) goto need_full_resync; if (!server.repl_backlog || psync_offset < server.repl_backlog_off || - psync_offset >= (server.repl_backlog_off + server.repl_backlog_size)) + psync_offset > (server.repl_backlog_off + server.repl_backlog_histlen)) { redisLog(REDIS_NOTICE, "Unable to partial resync with the slave for lack of backlog (Slave request was: %lld).", psync_offset); + if (psync_offset > server.master_repl_offset) { + redisLog(REDIS_WARNING, + "Warning: slave tried to PSYNC with an offset that is greater than the master replication offset."); + } goto need_full_resync; } From cca9f8c4323c63141afbe8f99dbb657e0a0fea37 Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 4 Oct 2013 12:59:24 +0200 Subject: [PATCH 28/33] Replication: fix master timeout. Since we started sending REPLCONF ACK from slaves to masters, the lastinteraction field of the client structure is always refreshed as soon as there is room in the socket output buffer, so masters in timeout are detected with too much delay (the socket buffer takes a lot of time to be filled by small REPLCONF ACK entries). This commit only counts data received as interactions with a master, solving the issue. --- src/networking.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/networking.c b/src/networking.c index 23ef11dc8..1da5a5a58 100644 --- a/src/networking.c +++ b/src/networking.c @@ -829,7 +829,13 @@ void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) { return; } } - if (totwritten > 0) c->lastinteraction = server.unixtime; + if (totwritten > 0) { + /* For clients representing masters we don't count sending data + * as an interaction, since we always send REPLCONF ACK commands + * that take some time to just fill the socket output buffer. + * We just rely on data / pings received for timeout detection. */ + if (!(c->flags & REDIS_MASTER)) c->lastinteraction = server.unixtime; + } if (c->bufpos == 0 && listLength(c->reply) == 0) { c->sentlen = 0; aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE); From 8432ddcedbaaa08bc3a92e1aba581bd759ef947b Mon Sep 17 00:00:00 2001 From: antirez Date: Fri, 4 Oct 2013 16:12:25 +0200 Subject: [PATCH 29/33] Replication: install the write handler when reusing a cached master. Sometimes when we resurrect a cached master after a successful partial resynchronization attempt, there is pending data in the output buffers of the client structure representing the master (likely REPLCONF ACK commands). If we don't reinstall the write handler, it will never be installed again by addReply*() family functions as they'll assume that if there is already data pending, the write handler is already installed. This bug caused some slaves after a successful partial sync to never send REPLCONF ACK, and continuously being detected as timing out by the master, with a disconnection / reconnection loop. --- src/replication.c | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/replication.c b/src/replication.c index bcc80b110..8102fc2db 100644 --- a/src/replication.c +++ b/src/replication.c @@ -1375,6 +1375,16 @@ void replicationResurrectCachedMaster(int newfd) { redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the readable handler: %s", strerror(errno)); freeClientAsync(server.master); /* Close ASAP. */ } + + /* We may also need to install the write handler as well if there is + * pending data in the write buffers. */ + if (server.master->bufpos || listLength(server.master->reply)) { + if (aeCreateFileEvent(server.el, newfd, AE_WRITABLE, + sendReplyToClient, server.master)) { + redisLog(REDIS_WARNING,"Error resurrecting the cached master, impossible to add the writable handler: %s", strerror(errno)); + freeClientAsync(server.master); /* Close ASAP. */ + } + } } /* ------------------------- MIN-SLAVES-TO-WRITE --------------------------- */ From e9b8b30c81002be04a69d060520ee9d6a021e68f Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 7 Oct 2013 11:30:58 +0200 Subject: [PATCH 30/33] Cluster: slave nodes advertise master slots bitmap and configEpoch. --- src/cluster.c | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 403ccf387..bee1b7b5c 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1339,12 +1339,21 @@ void clusterBroadcastMessage(void *buf, size_t len) { /* Build the message header */ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { int totlen = 0; + clusterNode *master; + + /* If this node is a master, we send its slots bitmap and configEpoch. + * If this node is a slave we send the master's information instead (the + * node is flagged as slave so the receiver knows that it is NOT really + * in charge for this slots. */ + master = (server.cluster->myself->flags & REDIS_NODE_SLAVE && + server.cluster->myself->slaveof) ? + server.cluster->myself->slaveof : server.cluster->myself; memset(hdr,0,sizeof(*hdr)); hdr->type = htons(type); memcpy(hdr->sender,server.cluster->myself->name,REDIS_CLUSTER_NAMELEN); - memcpy(hdr->myslots,server.cluster->myself->slots, - sizeof(hdr->myslots)); + + memcpy(hdr->myslots,master->slots,sizeof(hdr->myslots)); memset(hdr->slaveof,0,REDIS_CLUSTER_NAMELEN); if (server.cluster->myself->slaveof != NULL) { memcpy(hdr->slaveof,server.cluster->myself->slaveof->name, @@ -1354,13 +1363,9 @@ void clusterBuildMessageHdr(clusterMsg *hdr, int type) { hdr->flags = htons(server.cluster->myself->flags); hdr->state = server.cluster->state; - /* Set the currentEpoch and configEpochs. Note that configEpoch is - * set to the master configEpoch if this node is a slave. */ + /* Set the currentEpoch and configEpochs. */ hdr->currentEpoch = htonu64(server.cluster->currentEpoch); - if (server.cluster->myself->flags & REDIS_NODE_SLAVE) - hdr->configEpoch = htonu64(server.cluster->myself->slaveof->configEpoch); - else - hdr->configEpoch = htonu64(server.cluster->myself->configEpoch); + hdr->configEpoch = htonu64(master->configEpoch); if (type == CLUSTERMSG_TYPE_FAIL) { totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData); From acd9ec222ee9388399ddd2a51232f41e12bef4e7 Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 7 Oct 2013 15:44:58 +0200 Subject: [PATCH 31/33] Cluster: log message improved when FAIL is cleared from a slave node. --- src/cluster.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index bee1b7b5c..6939bac32 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -681,7 +681,7 @@ void clearNodeFailureIfNeeded(clusterNode *node) { * node again. */ if (node->flags & REDIS_NODE_SLAVE) { redisLog(REDIS_NOTICE, - "Clear FAIL state for node %.40s: slave is already reachable.", + "Clear FAIL state for node %.40s: slave is reachable again.", node->name); node->flags &= ~REDIS_NODE_FAIL; clusterDoBeforeSleep(CLUSTER_TODO_UPDATE_STATE|CLUSTER_TODO_SAVE_CONFIG); From 26ea55b7f50d68e203f57c9727aaa29bb56a83ed Mon Sep 17 00:00:00 2001 From: antirez Date: Mon, 7 Oct 2013 16:07:13 +0200 Subject: [PATCH 32/33] Cluster: fix slave data age computation when master is still connected. --- src/cluster.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/cluster.c b/src/cluster.c index 6939bac32..332120720 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1607,11 +1607,18 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { * 3) Perform the failover informing all the other nodes. */ void clusterHandleSlaveFailover(void) { - time_t data_age = server.unixtime - server.repl_down_since; + time_t data_age; mstime_t auth_age = mstime() - server.cluster->failover_auth_time; int needed_quorum = (server.cluster->size / 2) + 1; int j; + /* Set data_age to the number of seconds we are disconnected from the master. */ + if (server.repl_state == REDIS_REPL_CONNECTED) { + data_age = server.unixtime - server.master->lastinteraction; + } else { + data_age = server.unixtime - server.repl_down_since; + } + /* Pre conditions to run the function: * 1) We are a slave. * 2) Our master is flagged as FAIL. From 0f079966c73f2d00ea7c3ec80e81e744bed7793d Mon Sep 17 00:00:00 2001 From: antirez Date: Tue, 8 Oct 2013 12:45:35 +0200 Subject: [PATCH 33/33] Cluster: masters don't vote for a slave with stale config. When a slave requests our vote, the configEpoch he claims for its master and the set of served slots must be greater or equal to the configEpoch of the nodes serving these slots in the current configuraiton of the master granting its vote. In other terms, masters don't vote for slaves having a stale configuration for the slots they want to serve. --- src/cluster.c | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 332120720..23d4196d2 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1567,7 +1567,10 @@ void clusterSendFailoverAuth(clusterNode *node) { /* Vote for the node asking for our vote if there are the conditions. */ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { clusterNode *master = node->slaveof; - uint64_t requestEpoch = ntohu64(request->currentEpoch); + uint64_t requestCurrentEpoch = ntohu64(request->currentEpoch); + uint64_t requestConfigEpoch = ntohu64(request->configEpoch); + unsigned char *claimed_slots = request->myslots; + int j; /* IF we are not a master serving at least 1 slot, we don't have the * right to vote, as the cluster size in Redis Cluster is the number @@ -1576,7 +1579,7 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { if (server.cluster->myself->numslots == 0) return; /* Request epoch must be >= our currentEpoch. */ - if (requestEpoch < server.cluster->currentEpoch) return; + if (requestCurrentEpoch < server.cluster->currentEpoch) return; /* I already voted for this epoch? Return ASAP. */ if (server.cluster->last_vote_epoch == server.cluster->currentEpoch) return; @@ -1592,6 +1595,19 @@ void clusterSendFailoverAuthIfNeeded(clusterNode *node, clusterMsg *request) { if (server.unixtime - node->slaveof->voted_time < server.cluster_node_timeout * 2) return; + /* The slave requesting the vote must have a configEpoch for the claimed slots + * that is >= the one of the masters currently serving the same slots in the + * current configuration. */ + for (j = 0; j < REDIS_CLUSTER_SLOTS; j++) { + if (bitmapTestBit(claimed_slots, j) == 0) continue; + if (server.cluster->slots[j] == NULL || + server.cluster->slots[j]->configEpoch <= requestConfigEpoch) continue; + /* If we reached this point we found a slot that in our current slots + * is served by a master with a greater configEpoch than the one claimed + * by the slave requesting our vote. Refuse to vote for this slave. */ + return; + } + /* We can vote for this slave. */ clusterSendFailoverAuth(node); server.cluster->last_vote_epoch = server.cluster->currentEpoch; @@ -1910,7 +1926,7 @@ void clusterDoBeforeSleep(int flags) { * Slots management * -------------------------------------------------------------------------- */ -/* Test bit 'pos' in a generic bitmap. Return 1 if the bit is zet, +/* Test bit 'pos' in a generic bitmap. Return 1 if the bit is set, * otherwise 0. */ int bitmapTestBit(unsigned char *bitmap, int pos) { off_t byte = pos/8;