PSYNC production fixes

This commit is contained in:
John Sully 2022-04-22 22:43:20 +00:00
parent aefff904ef
commit 6bf79b69c6
6 changed files with 149 additions and 118 deletions

View File

@ -1218,37 +1218,15 @@ int rdbSaveInfoAuxFields(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
== -1) return -1;
if (rdbSaveAuxFieldStrInt(rdb,"repl-offset",rsi->master_repl_offset)
== -1) return -1;
if (g_pserver->fActiveReplica && listLength(g_pserver->masters) > 0) {
if (g_pserver->fActiveReplica) {
sdsstring val = sdsstring(sdsempty());
listNode *ln;
listIter li;
redisMaster* mi;
listRewind(g_pserver->masters,&li);
while ((ln = listNext(&li)) != NULL) {
mi = (redisMaster*)listNodeValue(ln);
if (!mi->master) {
// If master client is not available, use info from master struct - better than nothing
serverLog(LL_NOTICE, "saving master %s", mi->master_replid);
if (mi->master_replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = val.catfmt("%s:%I:%s:%i;", mi->master_replid,
mi->master_initial_offset,
mi->masterhost,
mi->masterport);
}
else {
serverLog(LL_NOTICE, "saving master %s", mi->master->replid);
if (mi->master->replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = val.catfmt("%s:%I:%s:%i;", mi->master->replid,
mi->master->reploff,
mi->masterhost,
mi->masterport);
}
for (auto &msi : rsi->vecmastersaveinfo) {
val = val.catfmt("%s:%I:%s:%i:%i;", msi.master_replid,
msi.master_initial_offset,
msi.masterhost.get(),
msi.masterport,
msi.selected_db);
}
if (rdbSaveAuxFieldStrStr(rdb, "repl-masters",val.get()) == -1) return -1;
}
@ -1661,11 +1639,12 @@ int launchRdbSaveThread(pthread_t &child, rdbSaveInfo *rsi)
return rdbSaveBackgroundFork(rsi);
} else
{
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zmalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL);
rdbSaveThreadArgs *args = (rdbSaveThreadArgs*)zcalloc(sizeof(rdbSaveThreadArgs) + ((cserver.dbnum-1)*sizeof(redisDbPersistentDataSnapshot*)), MALLOC_LOCAL);
// Placement new
rdbSaveInfo rsiT;
if (rsi == nullptr)
rsi = &rsiT;
args->rsi = *(new (args) rdbSaveInfo(*rsi));
args->rsi = *rsi;
memcpy(&args->rsi.repl_id, g_pserver->replid, sizeof(g_pserver->replid));
args->rsi.master_repl_offset = g_pserver->master_repl_offset;
@ -2922,11 +2901,11 @@ public:
* snapshot taken by the master may not be reflected on the replica. */
bool fExpiredKey = iAmMaster() && !(this->rdbflags&RDBFLAGS_AOF_PREAMBLE) && job.expiretime != INVALID_EXPIRE && job.expiretime < this->now;
if (fStaleMvccKey || fExpiredKey) {
if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->masters != nullptr && this->rsi->masters->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) {
if (fStaleMvccKey && !fExpiredKey && this->rsi != nullptr && this->rsi->mi != nullptr && this->rsi->mi->staleKeyMap != nullptr && lookupKeyRead(job.db, &keyobj) == nullptr) {
// We have a key that we've already deleted and is not back in our database.
// We'll need to inform the sending master of the delete if it is also a replica of us
robj_sharedptr objKeyDup(createStringObject(job.key, sdslen(job.key)));
this->rsi->masters->staleKeyMap->operator[](job.db->id).push_back(objKeyDup);
this->rsi->mi->staleKeyMap->operator[](job.db->id).push_back(objKeyDup);
}
sdsfree(job.key);
job.key = nullptr;
@ -3242,19 +3221,26 @@ int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi) {
}
} else if (!strcasecmp(szFromObj(auxkey),"repl-masters")) {
if (rsi) {
struct redisMaster mi;
MasterSaveInfo msi;
char *masters = szFromObj(auxval);
char *entry = strtok(masters, ":");
char *saveptr;
char *entry = strtok_r(masters, ":", &saveptr);
while (entry != NULL) {
memcpy(mi.master_replid, entry, sizeof(mi.master_replid));
entry = strtok(NULL, ":");
mi.master_initial_offset = atoi(entry);
entry = strtok(NULL, ":");
mi.masterhost = entry;
entry = strtok(NULL, ";");
mi.masterport = atoi(entry);
entry = strtok(NULL, ":");
rsi->addMaster(mi);
memcpy(msi.master_replid, entry, sizeof(msi.master_replid));
entry = strtok_r(NULL, ":", &saveptr);
if (entry == nullptr) break;
msi.master_initial_offset = atoll(entry);
entry = strtok_r(NULL, ":", &saveptr);
if (entry == nullptr) break;
msi.masterhost = sdsstring(sdsnew(entry));
entry = strtok_r(NULL, ":", &saveptr);
if (entry == nullptr) break;
msi.masterport = atoi(entry);
entry = strtok_r(NULL, ";", &saveptr);
if (entry == nullptr) break;
msi.selected_db = atoi(entry);
entry = strtok_r(NULL, ":", &saveptr);
rsi->addMaster(msi);
}
}
} else if (!strcasecmp(szFromObj(auxkey),"repl-offset")) {
@ -3533,6 +3519,32 @@ eoferr:
return C_ERR;
}
void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi) {
if (rsi != nullptr && g_pserver->fActiveReplica) {
serverLog(LL_NOTICE, "RDB contains information on %d masters", (int)rsi->numMasters());
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
redisMaster *mi = (redisMaster*)listNodeValue(ln);
if (mi->master != nullptr) {
continue; //ignore connected masters
}
for (size_t i = 0; i < rsi->numMasters(); i++) {
if (!sdscmp(mi->masterhost, (sds)rsi->vecmastersaveinfo[i].masterhost.get()) && mi->masterport == rsi->vecmastersaveinfo[i].masterport) {
memcpy(mi->master_replid, rsi->vecmastersaveinfo[i].master_replid, sizeof(mi->master_replid));
mi->master_initial_offset = rsi->vecmastersaveinfo[i].master_initial_offset;
replicationCacheMasterUsingMaster(mi);
serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport);
break;
}
}
}
}
}
int rdbLoad(rdbSaveInfo *rsi, int rdbflags)
{
int err = C_ERR;
@ -3913,11 +3925,22 @@ void bgsaveCommand(client *c) {
* information. */
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
rdbSaveInfo rsi_init;
*rsi = std::move(rsi_init);
*rsi = rsi_init;
memcpy(rsi->repl_id, g_pserver->replid, sizeof(g_pserver->replid));
rsi->master_repl_offset = g_pserver->master_repl_offset;
if (g_pserver->fActiveReplica) {
listIter li;
listNode *ln = nullptr;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li))) {
redisMaster *mi = (redisMaster*)listNodeValue(ln);
MasterSaveInfo msi(*mi);
rsi->addMaster(msi);
}
}
/* If the instance is a master, we can populate the replication info
* only when repl_backlog is not NULL. If the repl_backlog is NULL,
* it means that the instance isn't in any replication chains. In this
@ -3935,11 +3958,6 @@ rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi) {
return rsi;
}
if (listLength(g_pserver->masters) > 1)
{
// BUGBUG, warn user about this incomplete implementation
serverLog(LL_WARNING, "Warning: Only backing up first master's information in RDB");
}
struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL);
/* If the instance is a replica we need a connected master

View File

@ -2856,6 +2856,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi,
* gets promoted. */
return false;
}
if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi);
/* RDB loading succeeded if we reach this point. */
if (g_pserver->repl_diskless_load == REPL_DISKLESS_LOAD_SWAPDB) {
@ -2934,7 +2935,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi,
mi->staleKeyMap->clear();
else
mi->staleKeyMap = new (MALLOC_LOCAL) std::map<int, std::vector<robj_sharedptr>>();
rsi.addMaster(*mi);
rsi.mi = mi;
}
if (rdbLoadFile(rdb_filename,&rsi,RDBFLAGS_REPLICATION) != C_OK) {
serverLog(LL_WARNING,
@ -2951,6 +2952,7 @@ bool readSyncBulkPayloadRdb(connection *conn, redisMaster *mi, rdbSaveInfo &rsi,
it'll be restarted when sync succeeds or replica promoted. */
return false;
}
if (g_pserver->fActiveReplica) updateActiveReplicaMastersFromRsi(&rsi);
/* Cleanup. */
if (g_pserver->rdb_del_sync_files && allPersistenceDisabled()) {

View File

@ -1141,7 +1141,7 @@ struct redisCommand redisCommandTable[] = {
0,NULL,0,0,0,0,0,0},
{"rreplay",replicaReplayCommand,-3,
"read-only fast noprop",
"read-only fast noprop ok-stale",
0,NULL,0,0,0,0,0,0},
{"keydb.cron",cronCommand,-5,
@ -4938,7 +4938,8 @@ int processCommand(client *c, int callFlags) {
if (FBrokenLinkToMaster() &&
g_pserver->repl_serve_stale_data == 0 &&
is_denystale_command &&
!(g_pserver->fActiveReplica && c->cmd->proc == syncCommand))
!(g_pserver->fActiveReplica && c->cmd->proc == syncCommand)
&& !FInReplicaReplay())
{
rejectCommand(c, shared.masterdownerr);
return C_OK;
@ -6962,31 +6963,15 @@ void loadDataFromDisk(void) {
g_pserver->master_repl_offset = rsi.repl_offset;
if (g_pserver->repl_batch_offStart >= 0)
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
redisMaster *mi = (redisMaster*)listNodeValue(ln);
if (g_pserver->fActiveReplica) {
for (size_t i = 0; i < rsi.numMasters(); i++) {
if (!strcmp(mi->masterhost, rsi.masters[i].masterhost) && mi->masterport == rsi.masters[i].masterport) {
memcpy(mi->master_replid, rsi.masters[i].master_replid, sizeof(mi->master_replid));
mi->master_initial_offset = rsi.masters[i].master_initial_offset;
replicationCacheMasterUsingMaster(mi);
serverLog(LL_NOTICE, "Cached master recovered from RDB for %s:%d", mi->masterhost, mi->masterport);
}
}
}
else {
/* If we are a replica, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself(mi);
selectDb(mi->cached_master,rsi.repl_stream_db);
}
}
}
updateActiveReplicaMastersFromRsi(&rsi);
if (!g_pserver->fActiveReplica && listLength(g_pserver->masters)) {
redisMaster *mi = (redisMaster*)listNodeValue(listFirst(g_pserver->masters));
/* If we are a replica, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself(mi);
selectDb(mi->cached_master,rsi.repl_stream_db);
}
} else if (errno != ENOENT) {
serverLog(LL_WARNING,"Fatal error loading the DB: %s. Exiting.",strerror(errno));
@ -7293,6 +7278,7 @@ void *workerThreadMain(void *parg)
serverAssert(!GlobalLocksAcquired());
aeDeleteEventLoop(el);
tlsCleanupThread();
return NULL;
}

View File

@ -1887,6 +1887,40 @@ struct redisMaster {
int ielReplTransfer = -1;
};
struct MasterSaveInfo {
MasterSaveInfo() = default;
MasterSaveInfo(const redisMaster &mi) {
memcpy(master_replid, mi.master_replid, sizeof(mi.master_replid));
if (mi.master) {
master_initial_offset = mi.master->reploff;
selected_db = mi.master->db->id;
} else if (mi.cached_master) {
master_initial_offset = mi.cached_master->reploff;
selected_db = mi.cached_master->db->id;
} else {
master_initial_offset = -1;
selected_db = 0;
}
masterport = mi.masterport;
masterhost = sdsstring(sdsdup(mi.masterhost));
masterport = mi.masterport;
}
MasterSaveInfo &operator=(const MasterSaveInfo &other) {
masterhost = other.masterhost;
masterport = other.masterport;
memcpy(master_replid, other.master_replid, sizeof(master_replid));
master_initial_offset = other.master_initial_offset;
return *this;
}
sdsstring masterhost;
int masterport;
char master_replid[CONFIG_RUN_ID_SIZE+1];
long long master_initial_offset;
int selected_db;
};
/* This structure can be optionally passed to RDB save/load functions in
* order to implement additional functionalities, by storing and loading
* metadata to the RDB file.
@ -1904,8 +1938,6 @@ public:
repl_offset = -1;
fForceSetKey = TRUE;
mvccMinThreshold = 0;
masters = nullptr;
masterCount = 0;
}
rdbSaveInfo(const rdbSaveInfo &other) {
repl_stream_db = other.repl_stream_db;
@ -1914,45 +1946,31 @@ public:
repl_offset = other.repl_offset;
fForceSetKey = other.fForceSetKey;
mvccMinThreshold = other.mvccMinThreshold;
masters = (struct redisMaster*)malloc(sizeof(struct redisMaster) * other.masterCount);
memcpy(masters, other.masters, sizeof(struct redisMaster) * other.masterCount);
masterCount = other.masterCount;
vecmastersaveinfo = other.vecmastersaveinfo;
master_repl_offset = other.master_repl_offset;
mi = other.mi;
}
rdbSaveInfo(rdbSaveInfo &&other) : rdbSaveInfo() {
swap(*this, other);
}
rdbSaveInfo &operator=(rdbSaveInfo other) {
swap(*this, other);
rdbSaveInfo &operator=(const rdbSaveInfo &other) {
repl_stream_db = other.repl_stream_db;
repl_id_is_set = other.repl_id_is_set;
memcpy(repl_id, other.repl_id, sizeof(repl_id));
repl_offset = other.repl_offset;
fForceSetKey = other.fForceSetKey;
mvccMinThreshold = other.mvccMinThreshold;
vecmastersaveinfo = other.vecmastersaveinfo;
master_repl_offset = other.master_repl_offset;
mi = other.mi;
return *this;
}
~rdbSaveInfo() {
free(masters);
}
friend void swap(rdbSaveInfo &first, rdbSaveInfo &second) {
std::swap(first.repl_stream_db, second.repl_stream_db);
std::swap(first.repl_id_is_set, second.repl_id_is_set);
std::swap(first.repl_id, second.repl_id);
std::swap(first.repl_offset, second.repl_offset);
std::swap(first.fForceSetKey, second.fForceSetKey);
std::swap(first.mvccMinThreshold, second.mvccMinThreshold);
std::swap(first.masters, second.masters);
std::swap(first.masterCount, second.masterCount);
}
void addMaster(const struct redisMaster &mi) {
masterCount++;
if (masters == nullptr) {
masters = (struct redisMaster*)malloc(sizeof(struct redisMaster));
}
else {
masters = (struct redisMaster*)realloc(masters, sizeof(struct redisMaster) * masterCount);
}
memcpy(masters + masterCount - 1, &mi, sizeof(struct redisMaster));
void addMaster(const MasterSaveInfo &si) {
vecmastersaveinfo.push_back(si);
}
size_t numMasters() {
return masterCount;
return vecmastersaveinfo.size();
}
/* Used saving and loading. */
@ -1968,10 +1986,8 @@ public:
long long master_repl_offset;
uint64_t mvccMinThreshold;
struct redisMaster *masters;
private:
size_t masterCount;
std::vector<MasterSaveInfo> vecmastersaveinfo;
struct redisMaster *mi = nullptr;
};
struct malloc_stats {
@ -3853,6 +3869,8 @@ void lfenceCommand(client *c);
int FBrokenLinkToMaster(int *pconnectMasters = nullptr);
int FActiveMaster(client *c);
struct redisMaster *MasterInfoFromClient(client *c);
bool FInReplicaReplay();
void updateActiveReplicaMastersFromRsi(rdbSaveInfo *rsi);
/* MVCC */
uint64_t getMvccTstamp();
@ -3950,6 +3968,7 @@ void makeThreadKillable(void);
/* TLS stuff */
void tlsInit(void);
void tlsInitThread();
void tlsCleanupThread();
void tlsCleanup(void);
int tlsConfigure(redisTLSContextConfig *ctx_config);
void tlsReload(void);

View File

@ -56,7 +56,6 @@
void streamFreeCG(streamCG *cg);
void streamFreeNACK(streamNACK *na);
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
bool FInReplicaReplay();
int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);
int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq);

View File

@ -180,6 +180,12 @@ void tlsInitThread(void)
pending_list = listCreate();
}
void tlsCleanupThread(void)
{
if (pending_list)
listRelease(pending_list);
}
void tlsCleanup(void) {
if (redis_tls_ctx) {
SSL_CTX_free(redis_tls_ctx);
@ -1260,6 +1266,7 @@ int tlsProcessPendingData() {
}
void tlsInitThread() {}
void tlsCleanupThread(void) {}
sds connTLSGetPeerCert(connection *conn_) {
(void) conn_;