diff --git a/src/db.cpp b/src/db.cpp index d8aa0dc47..605baaf40 100644 --- a/src/db.cpp +++ b/src/db.cpp @@ -1374,7 +1374,9 @@ void propagateExpire(redisDb *db, robj *key, int lazy) { if (g_pserver->aof_state != AOF_OFF) feedAppendOnlyFile(cserver.delCommand,db->id,argv,2); - replicationFeedSlaves(g_pserver->slaves,db->id,argv,2); + // Active replicas do their own expiries, do not propogate + if (!g_pserver->fActiveReplica) + replicationFeedSlaves(g_pserver->slaves,db->id,argv,2); decrRefCount(argv[0]); decrRefCount(argv[1]); @@ -1442,7 +1444,7 @@ int expireIfNeeded(redisDb *db, robj *key) { * Still we try to return the right information to the caller, * that is, 0 if we think the key should be still valid, 1 if * we think the key is expired at this time. */ - if (listLength(g_pserver->masters)) return 1; + if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) return 1; /* Delete the key */ g_pserver->stat_expiredkeys++; diff --git a/src/expire.cpp b/src/expire.cpp index 62b5e7a6e..fdad83638 100644 --- a/src/expire.cpp +++ b/src/expire.cpp @@ -535,7 +535,7 @@ void expireGenericCommand(client *c, long long basetime, int unit) { * * Instead we take the other branch of the IF statement setting an expire * (possibly in the past) and wait for an explicit DEL from the master. */ - if (when <= mstime() && !g_pserver->loading && !listLength(g_pserver->masters)) { + if (when <= mstime() && !g_pserver->loading && (!listLength(g_pserver->masters) || g_pserver->fActiveReplica)) { robj *aux; int deleted = g_pserver->lazyfree_lazy_expire ? dbAsyncDelete(c->db,key) : diff --git a/src/replication.cpp b/src/replication.cpp index d2f948567..89f73eb06 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -669,6 +669,7 @@ int masterTryPartialResynchronization(client *c) { c->repl_ack_time = g_pserver->unixtime; c->repl_put_online_on_ack = 0; listAddNodeTail(g_pserver->slaves,c); + /* We can't use the connection buffers since they are used to accumulate * new commands at this stage. But we are sure the socket send buffer is * empty so this write will never fail actually. */ @@ -1002,6 +1003,8 @@ void replconfCommand(client *c) { c->slave_capa |= SLAVE_CAPA_EOF; else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]),"psync2")) c->slave_capa |= SLAVE_CAPA_PSYNC2; + else if (!strcasecmp((const char*)ptrFromObj(c->argv[j+1]), "activeExpire")) + c->slave_capa |= SLAVE_CAPA_ACTIVE_EXPIRE; } else if (!strcasecmp((const char*)ptrFromObj(c->argv[j]),"ack")) { /* REPLCONF ACK is used by replica to inform the master the amount * of replication stream that it processed so far. It is an @@ -1071,6 +1074,14 @@ void putSlaveOnline(client *replica) { refreshGoodSlavesCount(); serverLog(LL_NOTICE,"Synchronization with replica %s succeeded", replicationGetSlaveName(replica)); + + if (!(replica->slave_capa & SLAVE_CAPA_ACTIVE_EXPIRE) && g_pserver->fActiveReplica) + { + serverLog(LL_WARNING, "Warning: replica %s does not support active expiration. This client may not correctly process key expirations." + "\n\tThis is OK if you are in the process of an active upgrade.", replicationGetSlaveName(replica)); + serverLog(LL_WARNING, "Connections between active replicas and traditional replicas is deprecated. This will be refused in future versions." + "\n\tPlease fix your replica topology"); + } } void sendBulkToSlave(aeEventLoop *el, int fd, void *privdata, int mask) { @@ -2094,8 +2105,16 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { * * The master will ignore capabilities it does not understand. */ if (mi->repl_state == REPL_STATE_SEND_CAPA) { - err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", - "capa","eof","capa","psync2",NULL); + if (g_pserver->fActiveReplica) + { + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", + "capa","eof","capa","psync2","capa","activeExpire",NULL); + } + else + { + err = sendSynchronousCommand(mi, SYNC_CMD_WRITE,fd,"REPLCONF", + "capa","eof","capa","psync2",NULL); + } if (err) goto write_error; sdsfree(err); mi->repl_state = REPL_STATE_RECEIVE_CAPA; diff --git a/src/server.cpp b/src/server.cpp index f01c8bb00..6fc44f387 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1703,9 +1703,9 @@ void clientsCron(int iel) { void databasesCron(void) { /* Expire keys by random sampling. Not required for slaves * as master will synthesize DELs for us. */ - if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0) { + if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) { activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW); - } else if (listLength(g_pserver->masters)) { + } else if (listLength(g_pserver->masters) && !g_pserver->fActiveReplica) { expireSlaveKeys(); } @@ -2105,7 +2105,7 @@ void beforeSleep(struct aeEventLoop *eventLoop) { /* Run a fast expire cycle (the called function will return * ASAP if a fast cycle is not needed). */ - if (g_pserver->active_expire_enabled && listLength(g_pserver->masters) == 0) + if (g_pserver->active_expire_enabled && (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica)) activeExpireCycle(ACTIVE_EXPIRE_CYCLE_FAST); /* Send all the slaves an ACK request if at least one client blocked diff --git a/src/server.h b/src/server.h index 29c2db301..81a5da8c3 100644 --- a/src/server.h +++ b/src/server.h @@ -433,6 +433,7 @@ public: #define SLAVE_CAPA_NONE 0 #define SLAVE_CAPA_EOF (1<<0) /* Can parse the RDB EOF streaming format. */ #define SLAVE_CAPA_PSYNC2 (1<<1) /* Supports PSYNC2 protocol. */ +#define SLAVE_CAPA_ACTIVE_EXPIRE (1<<2) /* Will the slave perform its own expirations? (Don't send delete) */ /* Synchronous read timeout - replica side */ #define CONFIG_REPL_SYNCIO_TIMEOUT 5 diff --git a/tests/integration/replication-active.tcl b/tests/integration/replication-active.tcl index 2ba761766..4f37f2adf 100644 --- a/tests/integration/replication-active.tcl +++ b/tests/integration/replication-active.tcl @@ -9,6 +9,7 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { set master [srv 0 client] set master_host [srv 0 host] set master_port [srv 0 port] + set master_pid [s process_id] # Use a short replication timeout on the slave, so that if there # are no bugs the timeout is triggered in a reasonable amount @@ -94,6 +95,26 @@ start_server {tags {"active-repl"} overrides {active-replica yes}} { assert_equal {0} [$slave del testkey1] } + test {Active replica expire propogates when source is down} { + $slave flushall + $slave set testkey2 foo + $slave set testkey1 foo + wait_for_condition 50 1000 { + [string match *foo* [$master get testkey1]] + } else { + fail "Replication failed to propogate" + } + $slave expire testkey1 2 + assert_equal {1} [$slave wait 1 500] { "value should propogate + within 0.5 seconds" } + exec kill -SIGSTOP $slave_pid + after 3000 + # Ensure testkey1 is gone. Note, we can't do this directly as the normal commands lie to us + # about what is actually in the dict. The only way to know is with a count from info + assert_equal {1} [expr [string first {keys=1} [$master info keyspace]] >= 0] {"slave expired"} + } + exec kill -SIGCONT $slave_pid + test {Active replica different databases} { $master select 3 $master set testkey abcd