diff --git a/src/cluster.c b/src/cluster.c index 1954b603e..149faf047 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -63,6 +63,7 @@ int clusterDelNodeSlots(clusterNode *node); int clusterNodeSetSlotBit(clusterNode *n, int slot); void clusterSetMaster(clusterNode *n); void clusterHandleSlaveFailover(void); +void clusterHandleSlaveMigration(int max_slaves); int bitmapTestBit(unsigned char *bitmap, int pos); void clusterDoBeforeSleep(int flags); void clusterSendUpdate(clusterLink *link, clusterNode *node); @@ -579,6 +580,14 @@ void clusterNodeResetSlaves(clusterNode *n) { n->slaves = NULL; } +int clusterCountNonFailingSlaves(clusterNode *n) { + int j, okslaves = 0; + + for (j = 0; j < n->numslaves; j++) + if (!nodeFailed(n->slaves[j])) okslaves++; + return okslaves; +} + void freeClusterNode(clusterNode *n) { sds nodename; @@ -2096,6 +2105,90 @@ void clusterHandleSlaveFailover(void) { } } +/* ----------------------------------------------------------------------------- + * CLUSTER slave migration + * + * Slave migration is the process that allows a slave of a master that is + * already covered by at least another slave, to "migrate" to a master that + * is orpaned, that is, left with no working slaves. + * -------------------------------------------------------------------------- */ + +/* This function is responsible to decide if this replica should be migrated + * to a different (orphaned) master. It is called by the clusterCron() function + * only if: + * + * 1) We are a slave node. + * 2) It was detected that there is at least one orphaned master in + * the cluster. + * 3) We are a slave of one of the masters with the greatest number of + * slaves. + * + * This checks are performed by the caller since it requires to iterate + * the nodes anyway, so we spend time into clusterHandleSlaveMigration() + * if definitely needed. + * + * The fuction is called with a pre-computed max_slaves, that is the max + * number of working (not in FAIL state) slaves for a single master. + * + * Additional conditions for migration are examined inside the function. + */ +void clusterHandleSlaveMigration(int max_slaves) { + int j, okslaves = 0; + clusterNode *mymaster = myself->slaveof, *target = NULL, *candidate = NULL; + dictIterator *di; + dictEntry *de; + + /* Step 1: Don't migrate if the cluster state is not ok. */ + if (server.cluster->state != REDIS_CLUSTER_OK) return; + + /* Step 2: Don't migrate if my master has just me as working slave. */ + if (mymaster == NULL) return; + for (j = 0; j < mymaster->numslaves; j++) + if (!nodeFailed(mymaster->slaves[j]) && + !nodeTimedOut(mymaster->slaves[j])) okslaves++; + if (okslaves == 1) return; + + /* Step 3: Idenitfy a candidate for migration, and check if among the + * masters with the greatest number of ok slaves, I'm the one with the + * smaller node ID. + * + * Note that this means that eventually a replica migration will occurr + * since slaves that are reachable again always have their FAIL flag + * cleared. At the same time this does not mean that there are no + * race conditions possible (two slaves migrating at the same time), but + * this is extremely unlikely to happen, and harmless. */ + candidate = myself; + di = dictGetSafeIterator(server.cluster->nodes); + while((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + int okslaves; + + /* Only iterate over working masters. */ + if (nodeIsSlave(node) || nodeFailed(node)) continue; + okslaves = clusterCountNonFailingSlaves(node); + + if (okslaves == 0 && target == NULL) target = node; + if (okslaves == max_slaves) { + for (j = 0; j < node->numslaves; j++) { + if (memcmp(node->slaves[j]->name, + candidate->name, + REDIS_CLUSTER_NAMELEN) < 0) + { + candidate = node->slaves[j]; + } + } + } + } + + /* Step 4: perform the migration if there is a target, and if I'm the + * candidate. */ + if (target && candidate == myself) { + redisLog(REDIS_WARNING,"Migrating to orphaned master %.40s", + target->name); + clusterSetMaster(target); + } +} + /* ----------------------------------------------------------------------------- * CLUSTER cron job * -------------------------------------------------------------------------- */ @@ -2104,7 +2197,10 @@ void clusterHandleSlaveFailover(void) { void clusterCron(void) { dictIterator *di; dictEntry *de; - int j, update_state = 0; + int update_state = 0; + int orphaned_masters; /* How many masters there are without ok slaves. */ + int max_slaves; /* Max number of ok slaves for a single master. */ + int this_slaves; /* Number of ok slaves for our master (if we are slave). */ mstime_t min_pong = 0, now = mstime(); clusterNode *min_pong_node = NULL; static unsigned long long iteration = 0; @@ -2175,6 +2271,8 @@ void clusterCron(void) { /* Ping some random node 1 time every 10 iterations, so that we usually ping * one random node every second. */ if (!(iteration % 10)) { + int j; + /* Check a few random nodes and ping the one with the oldest * pong_received time. */ for (j = 0; j < 5; j++) { @@ -2195,7 +2293,15 @@ void clusterCron(void) { } } - /* Iterate nodes to check if we need to flag something as failing */ + /* Iterate nodes to check if we need to flag something as failing. + * This loop is also responsible to: + * 1) Check if there are orphaned masters (masters without non failing + * slaves). + * 2) Count the max number of non failing slaves for a single master. + * 3) Count the number of slaves for our master, if we are a slave. */ + orphaned_masters = 0; + max_slaves = 0; + this_slaves = 0; di = dictGetSafeIterator(server.cluster->nodes); while((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); @@ -2251,6 +2357,17 @@ void clusterCron(void) { update_state = 1; } } + + /* Orphaned master check, useful only if the current instance + * is a slave that may migrate to another master. */ + if (nodeIsSlave(myself) && nodeIsMaster(node) && !nodeFailed(node)) { + int okslaves = clusterCountNonFailingSlaves(node); + + if (okslaves == 0) orphaned_masters++; + if (okslaves > max_slaves) max_slaves = okslaves; + if (nodeIsSlave(myself) && myself->slaveof == node) + this_slaves = okslaves; + } } dictReleaseIterator(di); @@ -2265,7 +2382,17 @@ void clusterCron(void) { replicationSetMaster(myself->slaveof->ip, myself->slaveof->port); } - clusterHandleSlaveFailover(); + if (nodeIsSlave(myself)) { + clusterHandleSlaveFailover(); + /* If there are orphaned slaves, and we are a slave among the masters + * with the max number of non-failing slaves, consider migrating to + * the orphaned masters. Note that it does not make sense to try + * a migration if there is no master with at least *two* working + * slaves. */ + if (orphaned_masters && max_slaves >= 2 && this_slaves == max_slaves) + clusterHandleSlaveMigration(max_slaves); + } + if (update_state || server.cluster->state == REDIS_CLUSTER_FAIL) clusterUpdateState(); }