diff --git a/src/aof.cpp b/src/aof.cpp index 5ee11877b..9fa9290bd 100644 --- a/src/aof.cpp +++ b/src/aof.cpp @@ -884,7 +884,7 @@ int loadAppendOnlyFile(char *filename) { } else { /* RDB preamble. Pass loading the RDB functions. */ rio rdb; - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + rdbSaveInfo rsi; serverLog(LL_NOTICE,"Reading RDB preamble from AOF file..."); if (fseek(fp,0,SEEK_SET) == -1) goto readerr; diff --git a/src/rdb.cpp b/src/rdb.cpp index 8e0bca922..702ed9097 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1218,6 +1218,40 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset) == -1) return -1; + if (g_pserver->fActiveReplica && listLength(g_pserver->masters) > 0) { + sdsstring val = sdsstring(sdsempty()); + listNode *ln; + listIter li; + redisMaster* mi; + listRewind(g_pserver->masters,&li); + while ((ln = listNext(&li)) != NULL) { + mi = (redisMaster*)listNodeValue(ln); + if (!mi->master) { + // If master client is not available, use info from master struct - better than nothing + serverLog(LL_NOTICE, "saving master %s", mi->master_replid); + if (mi->master_replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = val.catfmt("%s:%I:%s:%i;", mi->master_replid, + mi->master_initial_offset, + mi->masterhost, + mi->masterport); + } + else { + serverLog(LL_NOTICE, "saving master %s", mi->master->replid); + if (mi->master->replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = val.catfmt("%s:%I:%s:%i;", mi->master->replid, + mi->master->reploff, + mi->masterhost, + mi->masterport); + } + } + if (rdbSaveAuxFieldStrStr(rdb, "repl-masters",val.get()) == -1) return -1; + } } if (rdbSaveAuxFieldStrInt(rdb,"aof-preamble",aof_preamble) == -1) return -1; return 1; @@ -1628,7 +1662,7 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) } else { rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); - rdbSaveInfo rsiT = RDB_SAVE_INFO_INIT; + rdbSaveInfo rsiT; if (rsi == nullptr) rsi = &rsiT; memcpy(&args->rsi, rsi, sizeof(rdbSaveInfo)); @@ -2888,11 +2922,11 @@ public: * snapshot taken by the master may not be reflected on the replica. */ bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now; if (fStaleMvccKey || fExpiredKey) { - if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { + if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->masters != nullptr && this->rsi->masters->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { // We have a key that we've already deleted and is not back in our database. // We'll need to inform the sending master of the delete if it is also a replica of us robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key))); - this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); + this->rsi->masters->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); } sdsfree(job.key); job.key = nullptr; @@ -3206,6 +3240,23 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { memcpy(rsi->repl_id,ptrFromObj(auxval),CONFIG_RUN_ID_SIZE+1); rsi->repl_id_is_set = 1; } + } else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) { + if (rsi) { + struct redisMaster mi; + char *masters = szFromObj(auxval); + char *entry = strtok(masters, ":"); + while (entry != NULL) { + memcpy(mi.master_replid, entry, sizeof(mi.master_replid)); + entry = strtok(NULL, ":"); + mi.master_initial_offset = atoi(entry); + entry = strtok(NULL, ":"); + mi.masterhost = entry; + entry = strtok(NULL, ";"); + mi.masterport = atoi(entry); + entry = strtok(NULL, ":"); + rsi->addMaster(mi); + } + } } else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) { if (rsi) rsi->repl_offset = strtoll(szFromObj(auxval),NULL,10); } else if (!strcasecmp(szFromObj(auxkey),"lua")) { @@ -3861,8 +3912,8 @@ void bgsaveCommand(client *c) { * is returned, and the RDB saving will not persist any replication related * information. */ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { - rdbSaveInfo rsi_init = RDB_SAVE_INFO_INIT; - *rsi = rsi_init; + rdbSaveInfo rsi_init; + *rsi = std::move(rsi_init); memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid)); rsi->master_repl_offset = g_pserver->master_repl_offset; diff --git a/src/replication.cpp b/src/replication.cpp index c68e3f09c..858e9d55d 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2954,7 +2954,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, mi->staleKeyMap->clear(); else mi->staleKeyMap = new (MALLOC_LOCAL) std::map>(); - rsi.mi = mi; + rsi.addMaster(*mi); } if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, @@ -2994,7 +2994,7 @@ error: } void readSyncBulkPayload(connection *conn) { - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + rdbSaveInfo rsi; redisMaster *mi = (redisMaster*)connGetPrivateData(conn); static int usemark = 0; if (mi == nullptr) { @@ -3043,7 +3043,7 @@ void readSyncBulkPayload(connection *conn) { { mergeReplicationId(mi->master->replid); } - else + else if (!g_pserver->fActiveReplica) { /* After a full resynchroniziation we use the replication ID and * offset of the master. The secondary ID / offset are cleared since @@ -3238,7 +3238,7 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read * client structure representing the master into g_pserver->master. */ mi->master_initial_offset = -1; - if (mi->cached_master && !g_pserver->fActiveReplica) { + if (mi->cached_master) { psync_replid = mi->cached_master->replid; snprintf(psync_offset,sizeof(psync_offset),"%lld", mi->cached_master->reploff+1); serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); @@ -3337,14 +3337,15 @@ int slaveTryPartialResynchronization(redisMaster *mi, connection *conn, int read sizeof(g_pserver->replid2)); g_pserver->second_replid_offset = g_pserver->master_repl_offset+1; - /* Update the cached master ID and our own primary ID to the - * new one. */ - memcpy(g_pserver->replid,sznew,sizeof(g_pserver->replid)); - memcpy(mi->cached_master->replid,sznew,sizeof(g_pserver->replid)); + if (!g_pserver->fActiveReplica) { + /* Update the cached master ID and our own primary ID to the + * new one. */ + memcpy(g_pserver->replid,sznew,sizeof(g_pserver->replid)); + memcpy(mi->cached_master->replid,sznew,sizeof(g_pserver->replid)); - /* Disconnect all the sub-slaves: they need to be notified. */ - if (!g_pserver->fActiveReplica) + /* Disconnect all the sub-slaves: they need to be notified. */ disconnectSlaves(); + } } } @@ -3725,18 +3726,6 @@ retry_connect: disconnectSlavesExcept(mi->master_uuid); /* Force our slaves to resync with us as well. */ freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ } - else - { - if (listLength(g_pserver->slaves)) - { - changeReplicationId(); - clearReplicationId2(); - } - else - { - freeReplicationBacklog(); /* Don't allow our chained slaves to PSYNC. */ - } - } /* Fall back to SYNC if needed. Otherwise psync_result == PSYNC_FULLRESYNC * and the g_pserver->master_replid and master_initial_offset are @@ -4405,6 +4394,26 @@ void replicationCacheMasterUsingMyself(redisMaster *mi) { mi->master = NULL; } +/* This function is called when reloading master info from an RDB in Active Replica mode. + * It creates a cached master client using the info contained in the redisMaster struct. + * + * Assumes that the passed struct contains valid master info. */ +void replicationCacheMasterUsingMaster(redisMaster *mi) { + if (mi->cached_master) { + freeClient(mi->cached_master); + } + + replicationCreateMasterClient(mi, NULL, -1); + std::lock_guardmaster->lock)> lock(mi->master->lock); + + memcpy(mi->master->replid, mi->master_replid, sizeof(mi->master_replid)); + mi->master->reploff = mi->master_initial_offset; + + unlinkClient(mi->master); + mi->cached_master = mi->master; + mi->master = NULL; +} + /* Free a cached master, called when there are no longer the conditions for * a partial resync on reconnection. */ void replicationDiscardCachedMaster(redisMaster *mi) { diff --git a/src/sds.h b/src/sds.h index 26fe16225..1c983cdc1 100644 --- a/src/sds.h +++ b/src/sds.h @@ -429,6 +429,21 @@ public: return *this; } + sdsstring &operator=(sdsstring &&other) + { + sds tmp = m_str; + m_str = other.m_str; + other.m_str = tmp; + return *this; + } + + template + sdsstring catfmt(const char *fmt, Args... args) + { + m_str = sdscatfmt(m_str, fmt, args...); + return *this; + } + sds release() { sds sdsT = m_str; m_str = nullptr; diff --git a/src/server.cpp b/src/server.cpp index 1f94566b4..e3ed33beb 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -6940,7 +6940,7 @@ void loadDataFromDisk(void) { if (loadAppendOnlyFile(g_pserver->aof_filename) == C_OK) serverLog(LL_NOTICE,"DB loaded from append only file: %.3f seconds",(float)(ustime()-start)/1000000); } else if (g_pserver->rdb_filename != NULL || g_pserver->rdb_s3bucketpath != NULL) { - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + rdbSaveInfo rsi; rsi.fForceSetKey = false; errno = 0; /* Prevent a stale value from affecting error checking */ if (rdbLoad(&rsi,RDBFLAGS_NONE) == C_OK) { @@ -6969,11 +6969,23 @@ void loadDataFromDisk(void) { while ((ln = listNext(&li))) { redisMaster *mi = (redisMaster*)listNodeValue(ln); - /* If we are a replica, create a cached master from this - * information, in order to allow partial resynchronizations - * with masters. */ - replicationCacheMasterUsingMyself(mi); - selectDb(mi->cached_master,rsi.repl_stream_db); + if (g_pserver->fActiveReplica) { + for (size_t i = 0; i < rsi.numMasters(); i++) { + if (!strcmp(mi->masterhost, rsi.masters[i].masterhost) && mi->masterport == rsi.masters[i].masterport) { + memcpy(mi->master_replid, rsi.masters[i].master_replid, sizeof(mi->master_replid)); + mi->master_initial_offset = rsi.masters[i].master_initial_offset; + replicationCacheMasterUsingMaster(mi); + serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport); + } + } + } + else { + /* If we are a replica, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + selectDb(mi->cached_master,rsi.repl_stream_db); + } } } } else if (errno != ENOENT) { @@ -7596,7 +7608,7 @@ int main(int argc, char **argv) { __AFL_INIT(); #endif rio rdb; - rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; + rdbSaveInfo rsi; startLoadingFile(stdin, (char*)"stdin", 0); rioInitWithFile(&rdb,stdin); rdbLoadRio(&rdb,0,&rsi); diff --git a/src/server.h b/src/server.h index e3d7e90ae..900c42f94 100644 --- a/src/server.h +++ b/src/server.h @@ -1850,6 +1850,43 @@ struct redisMemOverhead { } *db; }; + +struct redisMaster { + char *masteruser; /* AUTH with this user and masterauth with master */ + char *masterauth; /* AUTH with this password with master */ + char *masterhost; /* Hostname of master */ + int masterport; /* Port of master */ + client *cached_master; /* Cached master to be reused for PSYNC. */ + client *master; + /* The following two fields is where we store master PSYNC replid/offset + * while the PSYNC is in progress. At the end we'll copy the fields into + * the server->master client structure. */ + char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ + long long master_initial_offset; /* Master PSYNC offset. */ + + bool isActive = false; + bool isRocksdbSnapshotRepl = false; + int repl_state; /* Replication status if the instance is a replica */ + off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ + off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ + off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ + connection *repl_transfer_s; /* Slave -> Master SYNC socket */ + int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ + char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ + time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ + time_t repl_down_since; /* Unix time at which link with master went down */ + + class SnapshotPayloadParseState *parseState; + sds bulkreadBuffer = nullptr; + + unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ + /* After we've connected with our master use the UUID in g_pserver->master */ + uint64_t mvccLastSync; + /* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */ + std::map> *staleKeyMap; + int ielReplTransfer = -1; +}; + /* This structure can be optionally passed to RDB save/load functions in * order to implement additional functionalities, by storing and loading * metadata to the RDB file. @@ -1858,7 +1895,66 @@ struct redisMemOverhead { * replication in order to make sure that chained slaves (slaves of slaves) * select the correct DB and are able to accept the stream coming from the * top-level master. */ -typedef struct rdbSaveInfo { +class rdbSaveInfo { +public: + rdbSaveInfo() { + repl_stream_db = -1; + repl_id_is_set = 0; + memcpy(repl_id, "0000000000000000000000000000000000000000", sizeof(repl_id)); + repl_offset = -1; + fForceSetKey = TRUE; + mvccMinThreshold = 0; + masters = nullptr; + masterCount = 0; + } + rdbSaveInfo(const rdbSaveInfo &other) { + repl_stream_db = other.repl_stream_db; + repl_id_is_set = other.repl_id_is_set; + memcpy(repl_id, other.repl_id, sizeof(repl_id)); + repl_offset = other.repl_offset; + fForceSetKey = other.fForceSetKey; + mvccMinThreshold = other.mvccMinThreshold; + masters = (struct redisMaster*)malloc(sizeof(struct redisMaster) * other.masterCount); + memcpy(masters, other.masters, sizeof(struct redisMaster) * other.masterCount); + masterCount = other.masterCount; + } + rdbSaveInfo(rdbSaveInfo &&other) : rdbSaveInfo() { + swap(*this, other); + } + rdbSaveInfo &operator=(rdbSaveInfo other) { + swap(*this, other); + return *this; + } + ~rdbSaveInfo() { + free(masters); + } + friend void swap(rdbSaveInfo &first, rdbSaveInfo &second) { + std::swap(first.repl_stream_db, second.repl_stream_db); + std::swap(first.repl_id_is_set, second.repl_id_is_set); + std::swap(first.repl_id, second.repl_id); + std::swap(first.repl_offset, second.repl_offset); + std::swap(first.fForceSetKey, second.fForceSetKey); + std::swap(first.mvccMinThreshold, second.mvccMinThreshold); + std::swap(first.masters, second.masters); + std::swap(first.masterCount, second.masterCount); + + } + + void addMaster(const struct redisMaster &mi) { + masterCount++; + if (masters == nullptr) { + masters = (struct redisMaster*)malloc(sizeof(struct redisMaster)); + } + else { + masters = (struct redisMaster*)realloc(masters, sizeof(struct redisMaster) * masterCount); + } + memcpy(masters + masterCount - 1, &mi, sizeof(struct redisMaster)); + } + + size_t numMasters() { + return masterCount; + } + /* Used saving and loading. */ int repl_stream_db; /* DB to select in g_pserver->master client. */ @@ -1872,10 +1968,11 @@ typedef struct rdbSaveInfo { long long master_repl_offset; uint64_t mvccMinThreshold; - struct redisMaster *mi; -} rdbSaveInfo; + struct redisMaster *masters; -#define RDB_SAVE_INFO_INIT {-1,0,"0000000000000000000000000000000000000000",-1, TRUE, 0, 0, nullptr} +private: + size_t masterCount; +}; struct malloc_stats { size_t zmalloc_used; @@ -2081,42 +2178,6 @@ private: int rdb_key_save_delay = -1; // thread local cache }; -struct redisMaster { - char *masteruser; /* AUTH with this user and masterauth with master */ - char *masterauth; /* AUTH with this password with master */ - char *masterhost; /* Hostname of master */ - int masterport; /* Port of master */ - client *cached_master; /* Cached master to be reused for PSYNC. */ - client *master; - /* The following two fields is where we store master PSYNC replid/offset - * while the PSYNC is in progress. At the end we'll copy the fields into - * the server->master client structure. */ - char master_replid[CONFIG_RUN_ID_SIZE+1]; /* Master PSYNC runid. */ - long long master_initial_offset; /* Master PSYNC offset. */ - - bool isActive = false; - bool isRocksdbSnapshotRepl = false; - int repl_state; /* Replication status if the instance is a replica */ - off_t repl_transfer_size; /* Size of RDB to read from master during sync. */ - off_t repl_transfer_read; /* Amount of RDB read from master during sync. */ - off_t repl_transfer_last_fsync_off; /* Offset when we fsync-ed last time. */ - connection *repl_transfer_s; /* Slave -> Master SYNC socket */ - int repl_transfer_fd; /* Slave -> Master SYNC temp file descriptor */ - char *repl_transfer_tmpfile; /* Slave-> master SYNC temp file name */ - time_t repl_transfer_lastio; /* Unix time of the latest read, for timeout */ - time_t repl_down_since; /* Unix time at which link with master went down */ - - class SnapshotPayloadParseState *parseState; - sds bulkreadBuffer = nullptr; - - unsigned char master_uuid[UUID_BINARY_LEN]; /* Used during sync with master, this is our master's UUID */ - /* After we've connected with our master use the UUID in g_pserver->master */ - uint64_t mvccLastSync; - /* During a handshake the server may have stale keys, we track these here to share once a reciprocal connection is made */ - std::map> *staleKeyMap; - int ielReplTransfer = -1; -}; - // Const vars are not changed after worker threads are launched struct redisServerConst { pid_t pid; /* Main process pid. */ @@ -3101,6 +3162,7 @@ void clearReplicationId2(void); void mergeReplicationId(const char *); void chopReplicationBacklog(void); void replicationCacheMasterUsingMyself(struct redisMaster *mi); +void replicationCacheMasterUsingMaster(struct redisMaster *mi); void feedReplicationBacklog(const void *ptr, size_t len); void updateMasterAuth(); void showLatestBacklog();