Fix issue #107, active replicas do their own expires

Former-commit-id: 8e4f323439df29a5e8c0de9db7a848291721fd07
This commit is contained in:
John Sully 2019-11-20 19:44:31 -05:00
parent f136492c45
commit 633b7398e1
6 changed files with 51 additions and 8 deletions

View File

@ -1374,7 +1374,9 @@ void propagateExpire(redisDb *db, robj *key, int lazy) {
if (g_pserver->aof_state != AOF_OFF)
feedAppendOnlyFile(cserver.delCommand,db->id,argv,2);
replicationFeedSlaves(g_pserver->slaves,db->id,argv,2);
// Active replicas do their own expiries, do not propogate
if (!g_pserver->fActiveReplica)
replicationFeedSlaves(g_pserver->slaves,db->id,argv,2);
decrRefCount(argv[0]);
decrRefCount(argv[1]);
@ -1442,7 +1444,7 @@ int expireIfNeeded(redisDb *db, robj *key) {
* Still we try to return the right information to the caller,
* that is, 0 if we think the key should be still valid, 1 if
* we think the key is expired at this time. */
if (listLength(g_pserver->masters)) return 1;
if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) return 1;
/* Delete the key */
g_pserver->stat_expiredkeys++;

View File

@ -535,7 +535,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) {
*
* Instead we take the other branch of the IF statement setting an expire
* (possibly in the past) and wait for an explicit DEL from the master. */
if (when <= mstime() && !g_pserver->loading && !listLength(g_pserver->masters)) {
if (when <= mstime() && !g_pserver->loading && (!listLength(g_pserver->masters) || g_pserver->fActiveReplica)) {
robj *aux;
int deleted = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) :

View File

@ -669,6 +669,7 @@ int masterTryPartialResynchronization(client *c) {
c->repl_ack_time = g_pserver->unixtime;
c->repl_put_online_on_ack = 0;
listAddNodeTail(g_pserver->slaves,c);
/* We can't use the connection buffers since they are used to accumulate
* new commands at this stage. But we are sure the socket send buffer is
* empty so this write will never fail actually. */
@ -1002,6 +1003,8 @@ void replconfCommand(client *c) {
c->slave_capa |= SLAVE_CAPA_EOF;
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]),"psync2"))
c->slave_capa |= SLAVE_CAPA_PSYNC2;
else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire"))
c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE;
} else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) {
/* REPLCONF ACK is used by replica to inform the master the amount
* of replication stream that it processed so far. It is an
@ -1071,6 +1074,14 @@ void putSlaveOnline(client *replica) {
refreshGoodSlavesCount();
serverLog(LL_NOTICE,"Synchronization with replica %s succeeded",
replicationGetSlaveName(replica));
if (!(replica->slave_capa & SLAVE_CAPA_ACTIVE_EXPIRE) && g_pserver->fActiveReplica)
{
serverLog(LL_WARNING, "Warning: replica %s does not support active expiration. This client may not correctly process key expirations."
"\n\tThis is OK if you are in the process of an active upgrade.", replicationGetSlaveName(replica));
serverLog(LL_WARNING, "Connections between active replicas and traditional replicas is deprecated. This will be refused in future versions."
"\n\tPlease fix your replica topology");
}
}
void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) {
@ -2094,8 +2105,16 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
*
* The master will ignore capabilities it does not understand. */
if (mi->repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF",
"capa","eof","capa","psync2",NULL);
if (g_pserver->fActiveReplica)
{
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF",
"capa","eof","capa","psync2","capa","activeExpire",NULL);
}
else
{
err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF",
"capa","eof","capa","psync2",NULL);
}
if (err) goto write_error;
sdsfree(err);
mi->repl_state = REPL_STATE_RECEIVE_CAPA;

View File

@ -1703,9 +1703,9 @@ void clientsCron(int iel) {
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0) {
if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else if (listLength(g_pserver->masters)) {
} else if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) {
expireSlaveKeys();
}
@ -2105,7 +2105,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) {
/* Run a fast expire cycle (the called function will return
* ASAP if a fast cycle is not needed). */
if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0)
if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica))
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST);
/* Send all the slaves an ACK request if at least one client blocked

View File

@ -433,6 +433,7 @@ public:
#define SLAVE_CAPA_NONE 0
#define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */
#define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */
#define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */
/* Synchronous read timeout - replica side */
#define CONFIG_REPL_SYNCIO_TIMEOUT 5

View File

@ -9,6 +9,7 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
set master [srv 0 client]
set master_host [srv 0 host]
set master_port [srv 0 port]
set master_pid [s process_id]
# Use a short replication timeout on the slave, so that if there
# are no bugs the timeout is triggered in a reasonable amount
@ -94,6 +95,26 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} {
assert_equal {0} [$slave del testkey1]
}
test {Active replica expire propogates when source is down} {
$slave flushall
$slave set testkey2 foo
$slave set testkey1 foo
wait_for_condition 50 1000 {
[string match *foo* [$master get testkey1]]
} else {
fail "Replication failed to propogate"
}
$slave expire testkey1 2
assert_equal {1} [$slave wait 1 500] { "value should propogate
within 0.5 seconds" }
exec kill -SIGSTOP $slave_pid
after 3000
# Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us
# about what is actually in the dict. The only way to know is with a count from info
assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"}
}
exec kill -SIGCONT $slave_pid
test {Active replica different databases} {
$master select 3
$master set testkey abcd