diff --git a/src/object.cpp b/src/object.cpp index e7f053a24..2a9c3f215 100644 --- a/src/object.cpp +++ b/src/object.cpp @@ -714,6 +714,34 @@ int getLongLongFromObject(robj *o, long long *target) { return C_OK; } +int getUnsignedLongLongFromObject(robj *o, uint64_t *target) { + uint64_t value; + + if (o == NULL) { + value = 0; + } else { + serverAssertWithInfo(NULL,o,o->type == OBJ_STRING); + if (sdsEncodedObject(o)) { + char *pchEnd = nullptr; + errno = 0; + value = strtoull(szFromObj(o), &pchEnd, 10); + if (value == 0) { + // potential error + if (errno != 0) + return C_ERR; + if (pchEnd == szFromObj(o)) + return C_ERR; + } + } else if (o->encoding == OBJ_ENCODING_INT) { + value = (long)ptrFromObj(o); + } else { + serverPanic("Unknown string encoding"); + } + } + if (target) *target = value; + return C_OK; +} + int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const char *msg) { long long value; if (getLongLongFromObject(o, &value) != C_OK) { diff --git a/src/rdb.cpp b/src/rdb.cpp index 476b2e2e4..1c5b25d16 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2115,12 +2115,19 @@ int rdbLoadRio(rio *rdb, rdbSaveInfo *rsi, int loading_aof) { if ((key = rdbLoadStringObject(rdb)) == NULL) goto eoferr; /* Read value */ if ((val = rdbLoadObject(type,rdb,key, mvcc_tstamp)) == NULL) goto eoferr; + bool fStaleMvccKey = val->mvcc_tstamp < rsi->mvccMinThreshold; /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was * received from the master. In the latter case, the master is * responsible for key expiry. If we would expire keys here, the * snapshot taken by the master may not be reflected on the replica. */ - if (listLength(g_pserver->masters) == 0 && !loading_aof && expiretime != -1 && expiretime < now) { + bool fExpiredKey = (listLength(g_pserver->masters) == 0 || g_pserver->fActiveReplica) && !loading_aof && expiretime != -1 && expiretime < now; + if (fStaleMvccKey || fExpiredKey) { + if (fStaleMvccKey && !fExpiredKey && rsi->mi != nullptr && rsi->mi->staleKeyMap != nullptr && lookupKeyRead(db, key) == nullptr) { + // We have a key that we've already deleted and is not back in our database. + // We'll need to inform the sending master of the delete if it is also a replica of us + rsi->mi->staleKeyMap->operator[](db - g_pserver->db).push_back(key); + } decrRefCount(key); key = nullptr; decrRefCount(val); diff --git a/src/replication.cpp b/src/replication.cpp index 89f73eb06..c912abba8 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -48,6 +48,7 @@ void replicationResurrectCachedMaster(redisMaster *mi, int newfd); void replicationSendAck(redisMaster *mi); void putSlaveOnline(client *replica); int cancelReplicationHandshake(redisMaster *mi); +static void propagateMasterStaleKeys(); /* --------------------------- Utility functions ---------------------------- */ @@ -129,6 +130,23 @@ static bool FAnyDisconnectedMasters() return false; } +client *replicaFromMaster(redisMaster *mi) +{ + if (mi->master == nullptr) + return nullptr; + + listIter liReplica; + listNode *lnReplica; + listRewind(g_pserver->slaves, &liReplica); + while ((lnReplica = listNext(&liReplica)) != nullptr) + { + client *replica = (client*)listNodeValue(lnReplica); + if (FSameHost(mi->master, replica)) + return replica; + } + return nullptr; +} + /* ---------------------------------- MASTER -------------------------------- */ void createReplicationBacklog(void) { @@ -325,12 +343,20 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { char uuid[40] = {'\0'}; uuid_unparse(cserver.uuid, uuid); char proto[1024]; - int cchProto = snprintf(proto, sizeof(proto), "*4\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); + int cchProto = snprintf(proto, sizeof(proto), "*5\r\n$7\r\nRREPLAY\r\n$%d\r\n%s\r\n$%lld\r\n", (int)strlen(uuid), uuid, cchbuf); cchProto = std::min((int)sizeof(proto), cchProto); long long master_repl_offset_start = g_pserver->master_repl_offset; char szDbNum[128]; - int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", (dictid/10)+1, dictid); + int cchDictIdNum = snprintf(szDbNum, sizeof(szDbNum), "%d", dictid); + int cchDbNum = snprintf(szDbNum, sizeof(szDbNum), "$%d\r\n%d\r\n", cchDictIdNum, dictid); + cchDbNum = std::min(cchDbNum, sizeof(szDbNum)); // snprintf is tricky like that + + char szMvcc[128]; + uint64_t mvccTstamp = getMvccTstamp(); + int cchMvccNum = snprintf(szMvcc, sizeof(szMvcc), "%lu", mvccTstamp); + int cchMvcc = snprintf(szMvcc, sizeof(szMvcc), "$%d\r\n%lu\r\n", cchMvccNum, mvccTstamp); + cchMvcc = std::min(cchMvcc, sizeof(szMvcc)); // tricky snprintf /* Write the command to the replication backlog if any. */ if (g_pserver->repl_backlog) @@ -374,6 +400,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { const char *crlf = "\r\n"; feedReplicationBacklog(crlf, 2); feedReplicationBacklog(szDbNum, cchDbNum); + feedReplicationBacklog(szMvcc, cchMvcc); } } @@ -409,6 +436,7 @@ void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { { addReplyAsync(replica,shared.crlf); addReplyProtoAsync(replica, szDbNum, cchDbNum); + addReplyProtoAsync(replica, szMvcc, cchMvcc); } } @@ -1587,6 +1615,15 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { aeDeleteFileEvent(el,mi->repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + if (g_pserver->fActiveReplica) + { + rsi.mvccMinThreshold = mi->mvccLastSync; + if (mi->staleKeyMap != nullptr) + mi->staleKeyMap->clear(); + else + mi->staleKeyMap = new (MALLOC_LOCAL) std::map>(); + rsi.mi = mi; + } if (rdbLoadFile(rdb_filename, &rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(mi); @@ -2382,6 +2419,7 @@ void freeMasterInfo(redisMaster *mi) { zfree(mi->masterauth); zfree(mi->masteruser); + delete mi->staleKeyMap; zfree(mi); } @@ -3215,6 +3253,8 @@ void replicationCron(void) { } } + propagateMasterStaleKeys(); + /* Refresh the number of slaves with lag <= min-slaves-max-lag. */ refreshGoodSlavesCount(); replication_cron_loops++; /* Incremented with frequency 1 HZ. */ @@ -3361,6 +3401,17 @@ void replicaReplayCommand(client *c) } } + uint64_t mvcc = 0; + if (c->argc >= 5) + { + if (getUnsignedLongLongFromObject(c->argv[4], &mvcc) != C_OK) + { + addReplyError(c, "Invalid MVCC Timestamp"); + s_pstate->Cancel(); + return; + } + } + if (FSameUuidNoNil(uuid, cserver.uuid)) { addReply(c, shared.ok); @@ -3387,6 +3438,11 @@ void replicaReplayCommand(client *c) { addReply(c, shared.ok); selectDb(c, cFake->db->id); + redisMaster *mi = MasterInfoFromClient(c); + if (mi != nullptr) // this should never be null but I'd prefer not to crash + { + mi->mvccLastSync = mvcc; + } } else { @@ -3421,3 +3477,43 @@ void updateMasterAuth() mi->masteruser = zstrdup(cserver.default_masteruser); } } + +static void propagateMasterStaleKeys() +{ + listIter li; + listNode *ln; + listRewind(g_pserver->masters, &li); + robj *rgobj[2]; + + rgobj[0] = createEmbeddedStringObject("DEL", 3); + + while ((ln = listNext(&li)) != nullptr) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->staleKeyMap != nullptr) + { + if (mi->master != nullptr) + { + for (auto &pair : *mi->staleKeyMap) + { + if (pair.second.empty()) + continue; + + client *replica = replicaFromMaster(mi); + if (replica == nullptr) + continue; + + for (auto &spkey : pair.second) + { + rgobj[1] = spkey.get(); + replicationFeedSlave(replica, pair.first, rgobj, 2, false); + } + } + delete mi->staleKeyMap; + mi->staleKeyMap = nullptr; + } + } + } + + decrRefCount(rgobj[0]); +} \ No newline at end of file diff --git a/src/server.cpp b/src/server.cpp index 6fc44f387..3c8c588cd 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -2319,6 +2319,7 @@ void initMasterInfo(redisMaster *master) master->repl_state = REPL_STATE_NONE; master->repl_down_since = 0; /* Never connected, repl is down since EVER. */ + master->mvccLastSync = 0; } void initServerConfig(void) { diff --git a/src/server.h b/src/server.h index 81a5da8c3..ad7f2826c 100644 --- a/src/server.h +++ b/src/server.h @@ -54,6 +54,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { #include @@ -144,6 +145,87 @@ public: } }; +void decrRefCount(robj_roptr o); +void incrRefCount(robj_roptr o); +class robj_sharedptr +{ + redisObject *m_ptr; + +public: + robj_sharedptr() + : m_ptr(nullptr) + {} + robj_sharedptr(redisObject *ptr) + : m_ptr(ptr) + { + incrRefCount(ptr); + } + ~robj_sharedptr() + { + if (m_ptr) + decrRefCount(m_ptr); + } + robj_sharedptr(const robj_sharedptr& other) + { + m_ptr = other.m_ptr; + incrRefCount(m_ptr); + } + + robj_sharedptr(robj_sharedptr&& other) + { + m_ptr = other.m_ptr; + other.m_ptr = nullptr; + } + + robj_sharedptr &operator=(const robj_sharedptr& other) + { + if (m_ptr) + decrRefCount(m_ptr); + m_ptr = other.m_ptr; + incrRefCount(m_ptr); + return *this; + } + robj_sharedptr &operator=(redisObject *ptr) + { + if (m_ptr) + decrRefCount(m_ptr); + m_ptr = ptr; + incrRefCount(m_ptr); + return *this; + } + + bool operator==(const robj_sharedptr &other) const + { + return m_ptr == other.m_ptr; + } + + bool operator!=(const robj_sharedptr &other) const + { + return m_ptr != other.m_ptr; + } + + redisObject* operator->() const + { + return m_ptr; + } + + bool operator!() const + { + return !m_ptr; + } + + operator bool() const{ + return !!m_ptr; + } + + operator redisObject *() + { + return (redisObject*)m_ptr; + } + + redisObject *get() { return m_ptr; } +}; + /* Error codes */ #define C_OK 0 #define C_ERR -1 @@ -1391,9 +1473,11 @@ typedef struct rdbSaveInfo { char repl_id[CONFIG_RUN_ID_SIZE+1]; /* Replication ID. */ long long repl_offset; /* Replication offset. */ int fForceSetKey; + uint64_t mvccMinThreshold; + struct redisMaster *mi; } rdbSaveInfo; -#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE} +#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1, TRUE, 0, nullptr} struct malloc_stats { size_t zmalloc_used; @@ -1467,6 +1551,9 @@ struct redisMaster { unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ /* After we've connected with our master use the UUID in g_pserver->master */ + uint64_t mvccLastSync; + /* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */ + std::map> *staleKeyMap; }; // Const vars are not changed after worker threads are launched @@ -2156,6 +2243,7 @@ int getLongLongFromObjectOrReply(client *c, robj *o, long long *target, const ch int getDoubleFromObjectOrReply(client *c, robj *o, double *target, const char *msg); int getDoubleFromObject(const robj *o, double *target); int getLongLongFromObject(robj *o, long long *target); +int getUnsignedLongLongFromObject(robj *o, uint64_t *target); int getLongDoubleFromObject(robj *o, long double *target); int getLongDoubleFromObjectOrReply(client *c, robj *o, long double *target, const char *msg); const char *strEncoding(int encoding);