diff --git a/src/rdb.cpp b/src/rdb.cpp index 5d158eec5..320ab918b 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -1218,37 +1218,15 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { == -1) return -1; if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset) == -1) return -1; - if (g_pserver->fActiveReplica && listLength(g_pserver->masters) > 0) { + if (g_pserver->fActiveReplica) { sdsstring val = sdsstring(sdsempty()); - listNode *ln; - listIter li; - redisMaster* mi; - listRewind(g_pserver->masters,&li); - while ((ln = listNext(&li)) != NULL) { - mi = (redisMaster*)listNodeValue(ln); - if (!mi->master) { - // If master client is not available, use info from master struct - better than nothing - serverLog(LL_NOTICE, "saving master %s", mi->master_replid); - if (mi->master_replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = val.catfmt("%s:%I:%s:%i;", mi->master_replid, - mi->master_initial_offset, - mi->masterhost, - mi->masterport); - } - else { - serverLog(LL_NOTICE, "saving master %s", mi->master->replid); - if (mi->master->replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = val.catfmt("%s:%I:%s:%i;", mi->master->replid, - mi->master->reploff, - mi->masterhost, - mi->masterport); - } + + for (auto &msi : rsi->vecmastersaveinfo) { + val = val.catfmt("%s:%I:%s:%i:%i;", msi.master_replid, + msi.master_initial_offset, + msi.masterhost.get(), + msi.masterport, + msi.selected_db); } if (rdbSaveAuxFieldStrStr(rdb, "repl-masters",val.get()) == -1) return -1; } @@ -1661,11 +1639,12 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi) return rdbSaveBackgroundFork(rsi); } else { - rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); + rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zcalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL); + // Placement new rdbSaveInfo rsiT; if (rsi == nullptr) rsi = &rsiT; - args->rsi = *(new (args) rdbSaveInfo(*rsi)); + args->rsi = *rsi; memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid)); args->rsi.master_repl_offset = g_pserver->master_repl_offset; @@ -2922,11 +2901,11 @@ public: * snapshot taken by the master may not be reflected on the replica. */ bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now; if (fStaleMvccKey || fExpiredKey) { - if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->masters != nullptr && this->rsi->masters->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { + if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) { // We have a key that we've already deleted and is not back in our database. // We'll need to inform the sending master of the delete if it is also a replica of us robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key))); - this->rsi->masters->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); + this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup); } sdsfree(job.key); job.key = nullptr; @@ -3242,19 +3221,26 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) { } } else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) { if (rsi) { - struct redisMaster mi; + MasterSaveInfo msi; char *masters = szFromObj(auxval); - char *entry = strtok(masters, ":"); + char *saveptr; + char *entry = strtok_r(masters, ":", &saveptr); while (entry != NULL) { - memcpy(mi.master_replid, entry, sizeof(mi.master_replid)); - entry = strtok(NULL, ":"); - mi.master_initial_offset = atoi(entry); - entry = strtok(NULL, ":"); - mi.masterhost = entry; - entry = strtok(NULL, ";"); - mi.masterport = atoi(entry); - entry = strtok(NULL, ":"); - rsi->addMaster(mi); + memcpy(msi.master_replid, entry, sizeof(msi.master_replid)); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.master_initial_offset = atoll(entry); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.masterhost = sdsstring(sdsnew(entry)); + entry = strtok_r(NULL, ":", &saveptr); + if (entry == nullptr) break; + msi.masterport = atoi(entry); + entry = strtok_r(NULL, ";", &saveptr); + if (entry == nullptr) break; + msi.selected_db = atoi(entry); + entry = strtok_r(NULL, ":", &saveptr); + rsi->addMaster(msi); } } } else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) { @@ -3533,6 +3519,32 @@ eoferr: return C_ERR; } +void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi) { + if (rsi != nullptr && g_pserver->fActiveReplica) { + serverLog(LL_NOTICE, "RDB contains information on %d masters", (int)rsi->numMasters()); + listIter li; + listNode *ln; + + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + if (mi->master != nullptr) { + continue; //ignore connected masters + } + for (size_t i = 0; i < rsi->numMasters(); i++) { + if (!sdscmp(mi->masterhost, (sds)rsi->vecmastersaveinfo[i].masterhost.get()) && mi->masterport == rsi->vecmastersaveinfo[i].masterport) { + memcpy(mi->master_replid, rsi->vecmastersaveinfo[i].master_replid, sizeof(mi->master_replid)); + mi->master_initial_offset = rsi->vecmastersaveinfo[i].master_initial_offset; + replicationCacheMasterUsingMaster(mi); + serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport); + break; + } + } + } + } +} + int rdbLoad(rdbSaveInfo *rsi, int rdbflags) { int err = C_ERR; @@ -3913,11 +3925,22 @@ void bgsaveCommand(client *c) { * information. */ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { rdbSaveInfo rsi_init; - *rsi = std::move(rsi_init); + *rsi = rsi_init; memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid)); rsi->master_repl_offset = g_pserver->master_repl_offset; + if (g_pserver->fActiveReplica) { + listIter li; + listNode *ln = nullptr; + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + MasterSaveInfo msi(*mi); + rsi->addMaster(msi); + } + } + /* If the instance is a master, we can populate the replication info * only when repl_backlog is not NULL. If the repl_backlog is NULL, * it means that the instance isn't in any replication chains. In this @@ -3935,11 +3958,6 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) { return rsi; } - if (listLength(g_pserver->masters) > 1) - { - // BUGBUG, warn user about this incomplete implementation - serverLog(LL_WARNING, "Warning: Only backing up first master's information in RDB"); - } struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); /* If the instance is a replica we need a connected master diff --git a/src/replication.cpp b/src/replication.cpp index 7ca2a647e..fd713e55b 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2856,6 +2856,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, * gets promoted. */ return false; } + if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi); /* RDB loading succeeded if we reach this point. */ if (g_pserver->repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) { @@ -2934,7 +2935,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, mi->staleKeyMap->clear(); else mi->staleKeyMap = new (MALLOC_LOCAL) std::map>(); - rsi.addMaster(*mi); + rsi.mi = mi; } if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) { serverLog(LL_WARNING, @@ -2951,6 +2952,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi, it'll be restarted when sync succeeds or replica promoted. */ return false; } + if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi); /* Cleanup. */ if (g_pserver->rdb_del_sync_files && allPersistenceDisabled()) { diff --git a/src/server.cpp b/src/server.cpp index 3cac24e97..373b20337 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -1141,7 +1141,7 @@ struct redisCommand redisCommandTable[] = { 0,NULL,0,0,0,0,0,0}, {"rreplay",replicaReplayCommand,-3, - "read-only fast noprop", + "read-only fast noprop ok-stale", 0,NULL,0,0,0,0,0,0}, {"keydb.cron",cronCommand,-5, @@ -4938,7 +4938,8 @@ int processCommand(client *c, int callFlags) { if (FBrokenLinkToMaster() && g_pserver->repl_serve_stale_data == 0 && is_denystale_command && - !(g_pserver->fActiveReplica && c->cmd->proc == syncCommand)) + !(g_pserver->fActiveReplica && c->cmd->proc == syncCommand) + && !FInReplicaReplay()) { rejectCommand(c, shared.masterdownerr); return C_OK; @@ -6962,31 +6963,15 @@ void loadDataFromDisk(void) { g_pserver->master_repl_offset = rsi.repl_offset; if (g_pserver->repl_batch_offStart >= 0) g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - listIter li; - listNode *ln; - - listRewind(g_pserver->masters, &li); - while ((ln = listNext(&li))) - { - redisMaster *mi = (redisMaster*)listNodeValue(ln); - if (g_pserver->fActiveReplica) { - for (size_t i = 0; i < rsi.numMasters(); i++) { - if (!strcmp(mi->masterhost, rsi.masters[i].masterhost) && mi->masterport == rsi.masters[i].masterport) { - memcpy(mi->master_replid, rsi.masters[i].master_replid, sizeof(mi->master_replid)); - mi->master_initial_offset = rsi.masters[i].master_initial_offset; - replicationCacheMasterUsingMaster(mi); - serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport); - } - } - } - else { - /* If we are a replica, create a cached master from this - * information, in order to allow partial resynchronizations - * with masters. */ - replicationCacheMasterUsingMyself(mi); - selectDb(mi->cached_master,rsi.repl_stream_db); - } - } + } + updateActiveReplicaMastersFromRsi(&rsi); + if (!g_pserver->fActiveReplica && listLength(g_pserver->masters)) { + redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters)); + /* If we are a replica, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + selectDb(mi->cached_master,rsi.repl_stream_db); } } else if (errno != ENOENT) { serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno)); @@ -7293,6 +7278,7 @@ void *workerThreadMain(void *parg) serverAssert(!GlobalLocksAcquired()); aeDeleteEventLoop(el); + tlsCleanupThread(); return NULL; } diff --git a/src/server.h b/src/server.h index 0a7d7c1e6..f96482612 100644 --- a/src/server.h +++ b/src/server.h @@ -1887,6 +1887,40 @@ struct redisMaster { int ielReplTransfer = -1; }; +struct MasterSaveInfo { + MasterSaveInfo() = default; + MasterSaveInfo(const redisMaster &mi) { + memcpy(master_replid, mi.master_replid, sizeof(mi.master_replid)); + if (mi.master) { + master_initial_offset = mi.master->reploff; + selected_db = mi.master->db->id; + } else if (mi.cached_master) { + master_initial_offset = mi.cached_master->reploff; + selected_db = mi.cached_master->db->id; + } else { + master_initial_offset = -1; + selected_db = 0; + } + masterport = mi.masterport; + masterhost = sdsstring(sdsdup(mi.masterhost)); + masterport = mi.masterport; + } + + MasterSaveInfo &operator=(const MasterSaveInfo &other) { + masterhost = other.masterhost; + masterport = other.masterport; + memcpy(master_replid, other.master_replid, sizeof(master_replid)); + master_initial_offset = other.master_initial_offset; + return *this; + } + + sdsstring masterhost; + int masterport; + char master_replid[CONFIG_RUN_ID_SIZE+1]; + long long master_initial_offset; + int selected_db; +}; + /* This structure can be optionally passed to RDB save/load functions in * order to implement additional functionalities, by storing and loading * metadata to the RDB file. @@ -1904,8 +1938,6 @@ public: repl_offset = -1; fForceSetKey = TRUE; mvccMinThreshold = 0; - masters = nullptr; - masterCount = 0; } rdbSaveInfo(const rdbSaveInfo &other) { repl_stream_db = other.repl_stream_db; @@ -1914,45 +1946,31 @@ public: repl_offset = other.repl_offset; fForceSetKey = other.fForceSetKey; mvccMinThreshold = other.mvccMinThreshold; - masters = (struct redisMaster*)malloc(sizeof(struct redisMaster) * other.masterCount); - memcpy(masters, other.masters, sizeof(struct redisMaster) * other.masterCount); - masterCount = other.masterCount; + vecmastersaveinfo = other.vecmastersaveinfo; + master_repl_offset = other.master_repl_offset; + mi = other.mi; } - rdbSaveInfo(rdbSaveInfo &&other) : rdbSaveInfo() { - swap(*this, other); - } - rdbSaveInfo &operator=(rdbSaveInfo other) { - swap(*this, other); + + rdbSaveInfo &operator=(const rdbSaveInfo &other) { + repl_stream_db = other.repl_stream_db; + repl_id_is_set = other.repl_id_is_set; + memcpy(repl_id, other.repl_id, sizeof(repl_id)); + repl_offset = other.repl_offset; + fForceSetKey = other.fForceSetKey; + mvccMinThreshold = other.mvccMinThreshold; + vecmastersaveinfo = other.vecmastersaveinfo; + master_repl_offset = other.master_repl_offset; + mi = other.mi; + return *this; } - ~rdbSaveInfo() { - free(masters); - } - friend void swap(rdbSaveInfo &first, rdbSaveInfo &second) { - std::swap(first.repl_stream_db, second.repl_stream_db); - std::swap(first.repl_id_is_set, second.repl_id_is_set); - std::swap(first.repl_id, second.repl_id); - std::swap(first.repl_offset, second.repl_offset); - std::swap(first.fForceSetKey, second.fForceSetKey); - std::swap(first.mvccMinThreshold, second.mvccMinThreshold); - std::swap(first.masters, second.masters); - std::swap(first.masterCount, second.masterCount); - } - - void addMaster(const struct redisMaster &mi) { - masterCount++; - if (masters == nullptr) { - masters = (struct redisMaster*)malloc(sizeof(struct redisMaster)); - } - else { - masters = (struct redisMaster*)realloc(masters, sizeof(struct redisMaster) * masterCount); - } - memcpy(masters + masterCount - 1, &mi, sizeof(struct redisMaster)); + void addMaster(const MasterSaveInfo &si) { + vecmastersaveinfo.push_back(si); } size_t numMasters() { - return masterCount; + return vecmastersaveinfo.size(); } /* Used saving and loading. */ @@ -1968,10 +1986,8 @@ public: long long master_repl_offset; uint64_t mvccMinThreshold; - struct redisMaster *masters; - -private: - size_t masterCount; + std::vector vecmastersaveinfo; + struct redisMaster *mi = nullptr; }; struct malloc_stats { @@ -3853,6 +3869,8 @@ void lfenceCommand(client *c); int FBrokenLinkToMaster(int *pconnectMasters = nullptr); int FActiveMaster(client *c); struct redisMaster *MasterInfoFromClient(client *c); +bool FInReplicaReplay(); +void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi); /* MVCC */ uint64_t getMvccTstamp(); @@ -3950,6 +3968,7 @@ void makeThreadKillable(void); /* TLS stuff */ void tlsInit(void); void tlsInitThread(); +void tlsCleanupThread(); void tlsCleanup(void); int tlsConfigure(redisTLSContextConfig *ctx_config); void tlsReload(void); diff --git a/src/t_stream.cpp b/src/t_stream.cpp index 010e9b65b..b005cf600 100644 --- a/src/t_stream.cpp +++ b/src/t_stream.cpp @@ -56,7 +56,6 @@ void streamFreeCG(streamCG *cg); void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); -bool FInReplicaReplay(); int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); diff --git a/src/tls.cpp b/src/tls.cpp index 9b3a0415c..68651bfbb 100644 --- a/src/tls.cpp +++ b/src/tls.cpp @@ -180,6 +180,12 @@ void tlsInitThread(void) pending_list = listCreate(); } +void tlsCleanupThread(void) +{ + if (pending_list) + listRelease(pending_list); +} + void tlsCleanup(void) { if (redis_tls_ctx) { SSL_CTX_free(redis_tls_ctx); @@ -1260,6 +1266,7 @@ int tlsProcessPendingData() { } void tlsInitThread() {} +void tlsCleanupThread(void) {} sds connTLSGetPeerCert(connection *conn_) { (void) conn_;