Sentinel: changes to connection handling and redirection.
We disconnect the Redis instances hiredis link in a more robust way now. Also we change the way we perform the redirection for the +switch-master event, that is not just an instance reset with an address change. Using the same system we now implement the +redirect-to-master event that is triggered by an instance that is configured to be master but found to be a slave at the first INFO reply. In that case we monitor the master instead, logging the incident as an event.
This commit is contained in:
parent
2179c26916
commit
75fb6e5b8a
134
src/sentinel.c
134
src/sentinel.c
@ -110,6 +110,10 @@ typedef struct sentinelAddr {
|
|||||||
#define SENTINEL_MASTER_LINK_STATUS_UP 0
|
#define SENTINEL_MASTER_LINK_STATUS_UP 0
|
||||||
#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
|
#define SENTINEL_MASTER_LINK_STATUS_DOWN 1
|
||||||
|
|
||||||
|
/* Generic flags that can be used with different functions. */
|
||||||
|
#define SENTINEL_NO_FLAGS 0
|
||||||
|
#define SENTINEL_GENERATE_EVENT 1
|
||||||
|
|
||||||
typedef struct sentinelRedisInstance {
|
typedef struct sentinelRedisInstance {
|
||||||
int flags; /* See SRI_... defines */
|
int flags; /* See SRI_... defines */
|
||||||
char *name; /* Master name from the point of view of this sentinel. */
|
char *name; /* Master name from the point of view of this sentinel. */
|
||||||
@ -284,6 +288,7 @@ char *sentinelGetSubjectiveLeader(sentinelRedisInstance *master);
|
|||||||
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
|
char *sentinelGetObjectiveLeader(sentinelRedisInstance *master);
|
||||||
int yesnotoi(char *s);
|
int yesnotoi(char *s);
|
||||||
void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
|
void sentinelDisconnectInstanceFromContext(const redisAsyncContext *c);
|
||||||
|
void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c);
|
||||||
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
|
const char *sentinelRedisInstanceTypeStr(sentinelRedisInstance *ri);
|
||||||
|
|
||||||
/* ========================= Dictionary types =============================== */
|
/* ========================= Dictionary types =============================== */
|
||||||
@ -595,18 +600,9 @@ void releaseSentinelRedisInstance(sentinelRedisInstance *ri) {
|
|||||||
dictRelease(ri->sentinels);
|
dictRelease(ri->sentinels);
|
||||||
dictRelease(ri->slaves);
|
dictRelease(ri->slaves);
|
||||||
|
|
||||||
/* Release hiredis connections. Note that redisAsyncFree() will call
|
/* Release hiredis connections. */
|
||||||
* the disconnection callback. */
|
if (ri->cc) sentinelKillLink(ri,ri->cc);
|
||||||
if (ri->cc) {
|
if (ri->pc) sentinelKillLink(ri,ri->pc);
|
||||||
ri->cc->data = NULL;
|
|
||||||
redisAsyncFree(ri->cc);
|
|
||||||
ri->cc = NULL;
|
|
||||||
}
|
|
||||||
if (ri->pc) {
|
|
||||||
ri->pc->data = NULL;
|
|
||||||
redisAsyncFree(ri->pc);
|
|
||||||
ri->pc = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Free other resources. */
|
/* Free other resources. */
|
||||||
sdsfree(ri->name);
|
sdsfree(ri->name);
|
||||||
@ -761,14 +757,14 @@ void sentinelDelFlagsToDictOfRedisInstances(dict *instances, int flags) {
|
|||||||
* 5) In the process of doing this undo the failover if in progress.
|
* 5) In the process of doing this undo the failover if in progress.
|
||||||
* 6) Disconnect the connections with the master (will reconnect automatically).
|
* 6) Disconnect the connections with the master (will reconnect automatically).
|
||||||
*/
|
*/
|
||||||
void sentinelResetMaster(sentinelRedisInstance *ri) {
|
void sentinelResetMaster(sentinelRedisInstance *ri, int flags) {
|
||||||
redisAssert(ri->flags & SRI_MASTER);
|
redisAssert(ri->flags & SRI_MASTER);
|
||||||
dictRelease(ri->slaves);
|
dictRelease(ri->slaves);
|
||||||
dictRelease(ri->sentinels);
|
dictRelease(ri->sentinels);
|
||||||
ri->slaves = dictCreate(&instancesDictType,NULL);
|
ri->slaves = dictCreate(&instancesDictType,NULL);
|
||||||
ri->sentinels = dictCreate(&instancesDictType,NULL);
|
ri->sentinels = dictCreate(&instancesDictType,NULL);
|
||||||
if (ri->cc) redisAsyncFree(ri->cc);
|
if (ri->cc) sentinelKillLink(ri,ri->cc);
|
||||||
if (ri->pc) redisAsyncFree(ri->pc);
|
if (ri->pc) sentinelKillLink(ri,ri->pc);
|
||||||
ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
|
ri->flags &= SRI_MASTER|SRI_CAN_FAILOVER|SRI_DISCONNECTED;
|
||||||
if (ri->leader) {
|
if (ri->leader) {
|
||||||
sdsfree(ri->leader);
|
sdsfree(ri->leader);
|
||||||
@ -778,12 +774,17 @@ void sentinelResetMaster(sentinelRedisInstance *ri) {
|
|||||||
ri->failover_state_change_time = 0;
|
ri->failover_state_change_time = 0;
|
||||||
ri->failover_start_time = 0;
|
ri->failover_start_time = 0;
|
||||||
ri->promoted_slave = NULL;
|
ri->promoted_slave = NULL;
|
||||||
|
sdsfree(ri->runid);
|
||||||
|
sdsfree(ri->slave_master_host);
|
||||||
|
ri->runid = NULL;
|
||||||
|
ri->slave_master_host = NULL;
|
||||||
|
if (flags & SENTINEL_GENERATE_EVENT)
|
||||||
sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
|
sentinelEvent(REDIS_WARNING,"+reset-master",ri,"%@");
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Call sentinelResetMaster() on every master with a name matching the specified
|
/* Call sentinelResetMaster() on every master with a name matching the specified
|
||||||
* pattern. */
|
* pattern. */
|
||||||
int sentinelResetMastersByPattern(char *pattern) {
|
int sentinelResetMastersByPattern(char *pattern, int flags) {
|
||||||
dictIterator *di;
|
dictIterator *di;
|
||||||
dictEntry *de;
|
dictEntry *de;
|
||||||
int reset = 0;
|
int reset = 0;
|
||||||
@ -794,7 +795,7 @@ int sentinelResetMastersByPattern(char *pattern) {
|
|||||||
|
|
||||||
if (ri->name) {
|
if (ri->name) {
|
||||||
if (stringmatch(pattern,ri->name,0)) {
|
if (stringmatch(pattern,ri->name,0)) {
|
||||||
sentinelResetMaster(ri);
|
sentinelResetMaster(ri,flags);
|
||||||
reset++;
|
reset++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -803,6 +804,32 @@ int sentinelResetMastersByPattern(char *pattern) {
|
|||||||
return reset;
|
return reset;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Reset the specified master with sentinelResetMaster(), and also change
|
||||||
|
* the ip:port address, but take the name of the instance unmodified.
|
||||||
|
*
|
||||||
|
* This is used to handle the +switch-master and +redirect-to-master events.
|
||||||
|
*
|
||||||
|
* The function returns REDIS_ERR if the address can't be resolved for some
|
||||||
|
* reason. Otherwise REDIS_OK is returned.
|
||||||
|
*
|
||||||
|
* TODO: make this reset so that original sentinels are re-added with
|
||||||
|
* same ip / port / runid.
|
||||||
|
*/
|
||||||
|
|
||||||
|
int sentinelResetMasterAndChangeAddress(sentinelRedisInstance *master, char *ip, int port) {
|
||||||
|
sentinelAddr *oldaddr, *newaddr;
|
||||||
|
|
||||||
|
newaddr = createSentinelAddr(ip,port);
|
||||||
|
if (newaddr == NULL) return REDIS_ERR;
|
||||||
|
sentinelResetMaster(master,SENTINEL_NO_FLAGS);
|
||||||
|
oldaddr = master->addr;
|
||||||
|
master->addr = newaddr;
|
||||||
|
/* Release the old address at the end so we are safe even if the function
|
||||||
|
* gets the master->addr->ip and master->addr->port as arguments. */
|
||||||
|
releaseSentinelAddr(oldaddr);
|
||||||
|
return REDIS_OK;
|
||||||
|
}
|
||||||
|
|
||||||
/* ============================ Config handling ============================= */
|
/* ============================ Config handling ============================= */
|
||||||
char *sentinelHandleConfiguration(char **argv, int argc) {
|
char *sentinelHandleConfiguration(char **argv, int argc) {
|
||||||
sentinelRedisInstance *ri;
|
sentinelRedisInstance *ri;
|
||||||
@ -859,6 +886,15 @@ char *sentinelHandleConfiguration(char **argv, int argc) {
|
|||||||
|
|
||||||
/* ====================== hiredis connection handling ======================= */
|
/* ====================== hiredis connection handling ======================= */
|
||||||
|
|
||||||
|
/* Completely disconnect an hiredis link from an instance. */
|
||||||
|
void sentinelKillLink(sentinelRedisInstance *ri, redisAsyncContext *c) {
|
||||||
|
if (ri->cc == c) ri->cc = NULL;
|
||||||
|
if (ri->pc == c) ri->pc = NULL;
|
||||||
|
c->data = NULL;
|
||||||
|
ri->flags |= SRI_DISCONNECTED;
|
||||||
|
redisAsyncFree(c);
|
||||||
|
}
|
||||||
|
|
||||||
/* This function takes an hiredis context that is in an error condition
|
/* This function takes an hiredis context that is in an error condition
|
||||||
* and make sure to mark the instance as disconnected performing the
|
* and make sure to mark the instance as disconnected performing the
|
||||||
* cleanup needed.
|
* cleanup needed.
|
||||||
@ -894,6 +930,7 @@ void sentinelLinkEstablishedCallback(const redisAsyncContext *c, int status) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
|
void sentinelDisconnectCallback(const redisAsyncContext *c, int status) {
|
||||||
|
printf("DISCONNECT CALLBACK CALLED: %p (%p)\n", (void*)c, (void*)c->data);
|
||||||
sentinelDisconnectInstanceFromContext(c);
|
sentinelDisconnectInstanceFromContext(c);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -909,8 +946,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
|||||||
if (ri->cc->err) {
|
if (ri->cc->err) {
|
||||||
sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
|
sentinelEvent(REDIS_DEBUG,"-cmd-link-reconnection",ri,"%@ #%s",
|
||||||
ri->cc->errstr);
|
ri->cc->errstr);
|
||||||
redisAsyncFree(ri->cc);
|
sentinelKillLink(ri,ri->cc);
|
||||||
ri->cc = NULL;
|
|
||||||
} else {
|
} else {
|
||||||
ri->cc_conn_time = mstime();
|
ri->cc_conn_time = mstime();
|
||||||
ri->cc->data = ri;
|
ri->cc->data = ri;
|
||||||
@ -927,8 +963,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
|||||||
if (ri->pc->err) {
|
if (ri->pc->err) {
|
||||||
sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
|
sentinelEvent(REDIS_DEBUG,"-pubsub-link-reconnection",ri,"%@ #%s",
|
||||||
ri->pc->errstr);
|
ri->pc->errstr);
|
||||||
redisAsyncFree(ri->pc);
|
sentinelKillLink(ri,ri->pc);
|
||||||
ri->pc = NULL;
|
|
||||||
} else {
|
} else {
|
||||||
int retval;
|
int retval;
|
||||||
|
|
||||||
@ -946,8 +981,7 @@ void sentinelReconnectInstance(sentinelRedisInstance *ri) {
|
|||||||
if (retval != REDIS_OK) {
|
if (retval != REDIS_OK) {
|
||||||
/* If we can't subscribe, the Pub/Sub connection is useless
|
/* If we can't subscribe, the Pub/Sub connection is useless
|
||||||
* and we can simply disconnect it and try again. */
|
* and we can simply disconnect it and try again. */
|
||||||
redisAsyncFree(ri->pc);
|
sentinelKillLink(ri,ri->pc);
|
||||||
ri->pc = NULL;
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1056,19 +1090,35 @@ void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
|
|||||||
|
|
||||||
if (sentinel.tilt) return;
|
if (sentinel.tilt) return;
|
||||||
|
|
||||||
|
/* Act if a master turned into a slave. */
|
||||||
|
if ((ri->flags & SRI_MASTER) && role == SRI_SLAVE) {
|
||||||
|
if (first_runid && ri->slave_master_host) {
|
||||||
|
/* If it is the first time we receive INFO from it, but it's
|
||||||
|
* a slave while it was configured as a master, we want to monitor
|
||||||
|
* its master instead. */
|
||||||
|
sentinelEvent(REDIS_WARNING,"+redirect-to-master",ri,
|
||||||
|
"%s %s %d %s %d",
|
||||||
|
ri->name, ri->addr->ip, ri->addr->port,
|
||||||
|
ri->slave_master_host, ri->slave_master_port);
|
||||||
|
sentinelResetMasterAndChangeAddress(ri,ri->slave_master_host,
|
||||||
|
ri->slave_master_port);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Act if a slave turned into a master. */
|
/* Act if a slave turned into a master. */
|
||||||
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
|
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
|
||||||
if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
|
if (!(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
|
||||||
(runid_changed || first_runid))
|
(runid_changed || first_runid))
|
||||||
{
|
{
|
||||||
int retval;
|
|
||||||
|
|
||||||
/* If a slave turned into a master, but at the same time the
|
/* If a slave turned into a master, but at the same time the
|
||||||
* runid has changed, or it is simply the first time we see and
|
* runid has changed, or it is simply the first time we see and
|
||||||
* INFO output from this instance, this is a reboot with a wrong
|
* INFO output from this instance, this is a reboot with a wrong
|
||||||
* configuration.
|
* configuration.
|
||||||
*
|
*
|
||||||
* Log the event and remove the slave. */
|
* Log the event and remove the slave. */
|
||||||
|
int retval;
|
||||||
|
|
||||||
sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
|
sentinelEvent(REDIS_WARNING,"-slave-restart-as-master",ri,"%@ #removing it from the attached slaves");
|
||||||
retval = dictDelete(ri->master->slaves,ri->name);
|
retval = dictDelete(ri->master->slaves,ri->name);
|
||||||
redisAssert(retval == REDIS_OK);
|
redisAssert(retval == REDIS_OK);
|
||||||
@ -1581,7 +1631,7 @@ void sentinelCommand(redisClient *c) {
|
|||||||
} else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"reset")) {
|
||||||
/* SENTINEL RESET <pattern> */
|
/* SENTINEL RESET <pattern> */
|
||||||
if (c->argc != 3) goto numargserr;
|
if (c->argc != 3) goto numargserr;
|
||||||
addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr));
|
addReplyLongLong(c,sentinelResetMastersByPattern(c->argv[2]->ptr,SENTINEL_GENERATE_EVENT));
|
||||||
} else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
|
} else if (!strcasecmp(c->argv[1]->ptr,"get-master-addr-by-name")) {
|
||||||
/* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
|
/* SENTINEL GET-MASTER-ADDR-BY-NAME <master-name> */
|
||||||
sentinelRedisInstance *ri;
|
sentinelRedisInstance *ri;
|
||||||
@ -1626,7 +1676,7 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
|
|||||||
(mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
|
(mstime() - ri->cc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
|
||||||
(mstime() - ri->last_pong_time) > (ri->down_after_period/2))
|
(mstime() - ri->last_pong_time) > (ri->down_after_period/2))
|
||||||
{
|
{
|
||||||
redisAsyncFree(ri->cc); /* will call the disconnection callback */
|
sentinelKillLink(ri,ri->cc);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 2) Check if the pubsub link seems connected, was connected not less
|
/* 2) Check if the pubsub link seems connected, was connected not less
|
||||||
@ -1638,7 +1688,7 @@ void sentinelCheckSubjectivelyDown(sentinelRedisInstance *ri) {
|
|||||||
(mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
|
(mstime() - ri->pc_conn_time) > SENTINEL_MIN_LINK_RECONNECT_PERIOD &&
|
||||||
(mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
|
(mstime() - ri->pc_last_activity) > (SENTINEL_PUBLISH_PERIOD*3))
|
||||||
{
|
{
|
||||||
redisAsyncFree(ri->pc); /* will call the disconnection callback */
|
sentinelKillLink(ri,ri->pc);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Update the subjectively down flag. */
|
/* Update the subjectively down flag. */
|
||||||
@ -2246,30 +2296,14 @@ void sentinelFailoverReconfNextSlave(sentinelRedisInstance *master) {
|
|||||||
* and re-add it with the same address to trigger a complete state
|
* and re-add it with the same address to trigger a complete state
|
||||||
* refresh. */
|
* refresh. */
|
||||||
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
|
void sentinelFailoverSwitchToPromotedSlave(sentinelRedisInstance *master) {
|
||||||
sentinelRedisInstance *new, *ref = master->promoted_slave ?
|
sentinelRedisInstance *ref = master->promoted_slave ?
|
||||||
master->promoted_slave : master;
|
master->promoted_slave : master;
|
||||||
int quorum = ref->quorum, parallel_syncs = ref->parallel_syncs;
|
|
||||||
char *name = sdsnew(master->name);
|
|
||||||
char *ip = sdsnew(ref->addr->ip), *oldip = sdsnew(master->addr->ip);
|
|
||||||
int port = ref->addr->port, oldport = master->addr->port;
|
|
||||||
int retval, oldflags = master->flags;
|
|
||||||
mstime_t old_down_after_period = master->down_after_period;
|
|
||||||
mstime_t old_failover_timeout = master->failover_timeout;
|
|
||||||
|
|
||||||
retval = dictDelete(sentinel.masters,master->name);
|
sentinelEvent(REDIS_WARNING,"+switch-master",master,"%s %s %d %s %d",
|
||||||
redisAssert(retval == DICT_OK);
|
master->name, master->addr->ip, master->addr->port,
|
||||||
new = createSentinelRedisInstance(name,SRI_MASTER,ip,port,quorum,NULL);
|
ref->addr->ip, ref->addr->port);
|
||||||
redisAssert(new != NULL);
|
|
||||||
new->parallel_syncs = parallel_syncs;
|
sentinelResetMasterAndChangeAddress(master,ref->addr->ip,ref->addr->port);
|
||||||
new->flags |= (oldflags & SRI_CAN_FAILOVER);
|
|
||||||
new->down_after_period = old_down_after_period;
|
|
||||||
new->failover_timeout = old_failover_timeout;
|
|
||||||
/* TODO: ... set the scripts as well. */
|
|
||||||
sentinelEvent(REDIS_WARNING,"+switch-master",new,"%s %s %d %s %d",
|
|
||||||
name, oldip, oldport, ip, port);
|
|
||||||
sdsfree(name);
|
|
||||||
sdsfree(ip);
|
|
||||||
sdsfree(oldip);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
|
void sentinelFailoverStateMachine(sentinelRedisInstance *ri) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user