Support read only replicas attaching to active replicas (Bug #229)
Former-commit-id: 676644f2db1634f97dd557a6e321234914a5e5d3
This commit is contained in:
parent
28fdf8d7d4
commit
c373c2b652
@ -2260,7 +2260,7 @@ int processMultibulkBuffer(client *c) {
|
|||||||
* 1. The client is reset unless there are reasons to avoid doing it.
|
* 1. The client is reset unless there are reasons to avoid doing it.
|
||||||
* 2. In the case of master clients, the replication offset is updated.
|
* 2. In the case of master clients, the replication offset is updated.
|
||||||
* 3. Propagate commands we got from our master to replicas down the line. */
|
* 3. Propagate commands we got from our master to replicas down the line. */
|
||||||
void commandProcessed(client *c) {
|
void commandProcessed(client *c, int flags) {
|
||||||
long long prev_offset = c->reploff;
|
long long prev_offset = c->reploff;
|
||||||
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
|
if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) {
|
||||||
/* Update the applied replication offset of our master. */
|
/* Update the applied replication offset of our master. */
|
||||||
@ -2288,7 +2288,7 @@ void commandProcessed(client *c) {
|
|||||||
ae.arm(c);
|
ae.arm(c);
|
||||||
long long applied = c->reploff - prev_offset;
|
long long applied = c->reploff - prev_offset;
|
||||||
if (applied) {
|
if (applied) {
|
||||||
if (!g_pserver->fActiveReplica)
|
if (!g_pserver->fActiveReplica && (flags & CMD_CALL_PROPAGATE))
|
||||||
{
|
{
|
||||||
replicationFeedSlavesFromMasterStream(g_pserver->slaves,
|
replicationFeedSlavesFromMasterStream(g_pserver->slaves,
|
||||||
c->pending_querybuf, applied);
|
c->pending_querybuf, applied);
|
||||||
@ -2312,7 +2312,7 @@ int processCommandAndResetClient(client *c, int flags) {
|
|||||||
serverAssert(GlobalLocksAcquired());
|
serverAssert(GlobalLocksAcquired());
|
||||||
|
|
||||||
if (processCommand(c, flags) == C_OK) {
|
if (processCommand(c, flags) == C_OK) {
|
||||||
commandProcessed(c);
|
commandProcessed(c, flags);
|
||||||
}
|
}
|
||||||
if (serverTL->current_client == NULL) deadclient = 1;
|
if (serverTL->current_client == NULL) deadclient = 1;
|
||||||
serverTL->current_client = NULL;
|
serverTL->current_client = NULL;
|
||||||
|
@ -1166,6 +1166,7 @@ LError:
|
|||||||
* full resync. */
|
* full resync. */
|
||||||
void replconfCommand(client *c) {
|
void replconfCommand(client *c) {
|
||||||
int j;
|
int j;
|
||||||
|
bool fCapaCommand = false;
|
||||||
|
|
||||||
if ((c->argc % 2) == 0) {
|
if ((c->argc % 2) == 0) {
|
||||||
/* Number of arguments must be odd to make sure that every
|
/* Number of arguments must be odd to make sure that every
|
||||||
@ -1176,6 +1177,7 @@ void replconfCommand(client *c) {
|
|||||||
|
|
||||||
/* Process every option-value pair. */
|
/* Process every option-value pair. */
|
||||||
for (j = 1; j < c->argc; j+=2) {
|
for (j = 1; j < c->argc; j+=2) {
|
||||||
|
fCapaCommand = false;
|
||||||
if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"listening-port")) {
|
if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"listening-port")) {
|
||||||
long port;
|
long port;
|
||||||
|
|
||||||
@ -1200,6 +1202,8 @@ void replconfCommand(client *c) {
|
|||||||
c->slave_capa |= SLAVE_CAPA_PSYNC2;
|
c->slave_capa |= SLAVE_CAPA_PSYNC2;
|
||||||
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
|
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
|
||||||
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
|
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
|
||||||
|
|
||||||
|
fCapaCommand = true;
|
||||||
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
|
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
|
||||||
/* REPLCONF ACK is used by replica to inform the master the amount
|
/* REPLCONF ACK is used by replica to inform the master the amount
|
||||||
* of replication stream that it processed so far. It is an
|
* of replication stream that it processed so far. It is an
|
||||||
@ -1242,7 +1246,16 @@ void replconfCommand(client *c) {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (fCapaCommand) {
|
||||||
|
sds reply = sdsnew("+OK");
|
||||||
|
if (g_pserver->fActiveReplica)
|
||||||
|
reply = sdscat(reply, " active-replica");
|
||||||
|
reply = sdscat(reply, "\r\n");
|
||||||
|
addReplySds(c, reply);
|
||||||
|
} else {
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* This function puts a replica in the online state, and should be called just
|
/* This function puts a replica in the online state, and should be called just
|
||||||
@ -2557,6 +2570,30 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read
|
|||||||
return PSYNC_NOT_SUPPORTED;
|
return PSYNC_NOT_SUPPORTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void parseMasterCapa(redisMaster *mi, sds strcapa)
|
||||||
|
{
|
||||||
|
if (sdslen(strcapa) < 1 || strcapa[0] != '+')
|
||||||
|
return;
|
||||||
|
|
||||||
|
char *szStart = strcapa + 1; // skip the +
|
||||||
|
char *pchEnd = szStart;
|
||||||
|
|
||||||
|
mi->isActive = false;
|
||||||
|
for (;;)
|
||||||
|
{
|
||||||
|
if (*pchEnd == ' ' || *pchEnd == '\0') {
|
||||||
|
// Parse the word
|
||||||
|
if (strncmp(szStart, "active-replica", pchEnd - szStart) == 0) {
|
||||||
|
mi->isActive = true;
|
||||||
|
}
|
||||||
|
szStart = pchEnd + 1;
|
||||||
|
}
|
||||||
|
if (*pchEnd == '\0')
|
||||||
|
break;
|
||||||
|
++pchEnd;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* This handler fires when the non blocking connect was able to
|
/* This handler fires when the non blocking connect was able to
|
||||||
* establish a connection with the master. */
|
* establish a connection with the master. */
|
||||||
void syncWithMaster(connection *conn) {
|
void syncWithMaster(connection *conn) {
|
||||||
@ -2750,16 +2787,8 @@ void syncWithMaster(connection *conn) {
|
|||||||
*
|
*
|
||||||
* The master will ignore capabilities it does not understand. */
|
* The master will ignore capabilities it does not understand. */
|
||||||
if (mi->repl_state == REPL_STATE_SEND_CAPA) {
|
if (mi->repl_state == REPL_STATE_SEND_CAPA) {
|
||||||
if (g_pserver->fActiveReplica)
|
|
||||||
{
|
|
||||||
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
|
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
|
||||||
"capa","eof","capa","psync2","capa","activeExpire",NULL);
|
"capa","eof","capa","psync2","capa","activeExpire",NULL);
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,conn,"REPLCONF",
|
|
||||||
"capa","eof","capa","psync2",NULL);
|
|
||||||
}
|
|
||||||
if (err) goto write_error;
|
if (err) goto write_error;
|
||||||
sdsfree(err);
|
sdsfree(err);
|
||||||
mi->repl_state = REPL_STATE_RECEIVE_CAPA;
|
mi->repl_state = REPL_STATE_RECEIVE_CAPA;
|
||||||
@ -2774,6 +2803,8 @@ void syncWithMaster(connection *conn) {
|
|||||||
if (err[0] == '-') {
|
if (err[0] == '-') {
|
||||||
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
|
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
|
||||||
"REPLCONF capa: %s", err);
|
"REPLCONF capa: %s", err);
|
||||||
|
} else {
|
||||||
|
parseMasterCapa(mi, err);
|
||||||
}
|
}
|
||||||
sdsfree(err);
|
sdsfree(err);
|
||||||
mi->repl_state = REPL_STATE_SEND_PSYNC;
|
mi->repl_state = REPL_STATE_SEND_PSYNC;
|
||||||
|
@ -1735,6 +1735,18 @@ void clientsCron(int iel) {
|
|||||||
freeClientsInAsyncFreeQueue(iel);
|
freeClientsInAsyncFreeQueue(iel);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool expireOwnKeys()
|
||||||
|
{
|
||||||
|
if (iAmMaster()) {
|
||||||
|
return true;
|
||||||
|
} else if (!g_pserver->fActiveReplica && (listLength(g_pserver->masters) == 1)) {
|
||||||
|
redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters));
|
||||||
|
if (mi->isActive)
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/* This function handles 'background' operations we are required to do
|
/* This function handles 'background' operations we are required to do
|
||||||
* incrementally in Redis databases, such as active key expiring, resizing,
|
* incrementally in Redis databases, such as active key expiring, resizing,
|
||||||
* rehashing. */
|
* rehashing. */
|
||||||
@ -1742,7 +1754,7 @@ void databasesCron(void) {
|
|||||||
/* Expire keys by random sampling. Not required for slaves
|
/* Expire keys by random sampling. Not required for slaves
|
||||||
* as master will synthesize DELs for us. */
|
* as master will synthesize DELs for us. */
|
||||||
if (g_pserver->active_expire_enabled) {
|
if (g_pserver->active_expire_enabled) {
|
||||||
if (iAmMaster()) {
|
if (expireOwnKeys()) {
|
||||||
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
|
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
|
||||||
} else {
|
} else {
|
||||||
expireSlaveKeys();
|
expireSlaveKeys();
|
||||||
@ -2461,6 +2473,7 @@ void initMasterInfo(redisMaster *master)
|
|||||||
master->cached_master = NULL;
|
master->cached_master = NULL;
|
||||||
master->master_initial_offset = -1;
|
master->master_initial_offset = -1;
|
||||||
|
|
||||||
|
master->isActive = false;
|
||||||
|
|
||||||
master->repl_state = REPL_STATE_NONE;
|
master->repl_state = REPL_STATE_NONE;
|
||||||
master->repl_down_since = 0; /* Never connected, repl is down since EVER. */
|
master->repl_down_since = 0; /* Never connected, repl is down since EVER. */
|
||||||
@ -3551,7 +3564,7 @@ void call(client *c, int flags) {
|
|||||||
!(flags & CMD_CALL_PROPAGATE_AOF))
|
!(flags & CMD_CALL_PROPAGATE_AOF))
|
||||||
propagate_flags &= ~PROPAGATE_AOF;
|
propagate_flags &= ~PROPAGATE_AOF;
|
||||||
|
|
||||||
if (c->cmd->flags & CMD_SKIP_PROPOGATE)
|
if ((c->cmd->flags & CMD_SKIP_PROPOGATE) && g_pserver->fActiveReplica)
|
||||||
propagate_flags &= ~PROPAGATE_REPL;
|
propagate_flags &= ~PROPAGATE_REPL;
|
||||||
|
|
||||||
/* Call propagate() only if at least one of AOF / replication
|
/* Call propagate() only if at least one of AOF / replication
|
||||||
|
@ -1371,6 +1371,7 @@ struct redisMaster {
|
|||||||
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
|
char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */
|
||||||
long long master_initial_offset; /* Master PSYNC offset. */
|
long long master_initial_offset; /* Master PSYNC offset. */
|
||||||
|
|
||||||
|
bool isActive = false;
|
||||||
int repl_state; /* Replication status if the instance is a replica */
|
int repl_state; /* Replication status if the instance is a replica */
|
||||||
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
|
off_t repl_transfer_size; /* Size of RDB to read from master during sync. */
|
||||||
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
|
off_t repl_transfer_read; /* Amount of RDB read from master during sync. */
|
||||||
|
Loading…
x
Reference in New Issue
Block a user