save replid from storage on load

Former-commit-id: 8e5d0cb7035db30f35ead36aab52df07ab3c9bee
This commit is contained in:
christianEQ 2021-11-08 19:34:30 +00:00
parent 6ef6c917ec
commit 4c3d5c83e5
2 changed files with 81 additions and 56 deletions

View File

@ -2096,6 +2096,64 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
} }
} }
/* Save the replid of yourself and any connected masters to storage.
* Returns if no storage provider is used. */
void saveMasterStatusToStorage()
{
if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return;
g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true);
g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true);
if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) {
g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb,
g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true);
}
struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL);
if (miFirst && miFirst->master) {
g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true);
}
else if (miFirst && miFirst->cached_master) {
g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true);
}
if (listLength(g_pserver->masters) == 0) {
g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true);
return;
}
sds val = sds(sdsempty());
listNode *ln;
listIter li;
redisMaster *mi;
listRewind(g_pserver->masters,&li);
while((ln = listNext(&li)) != NULL) {
mi = (redisMaster*)listNodeValue(ln);
if (!mi->master) {
// If master client is not available, use info from master struct - better than nothing
if (mi->master_replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid,
mi->master_initial_offset,
mi->masterhost,
mi->masterport);
}
else {
if (mi->master->replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid,
mi->master->reploff,
mi->masterhost,
mi->masterport);
}
}
g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true);
}
/* Change the current instance replication ID with a new, random one. /* Change the current instance replication ID with a new, random one.
* This will prevent successful PSYNCs between this master and other * This will prevent successful PSYNCs between this master and other
* slaves, so the command should be called when something happens that * slaves, so the command should be called when something happens that
@ -2103,6 +2161,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
void changeReplicationId(void) { void changeReplicationId(void) {
getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE); getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE);
g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0'; g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0';
saveMasterStatusToStorage();
} }
@ -2894,6 +2953,7 @@ void readSyncBulkPayload(connection *conn) {
g_pserver->master_repl_offset = mi->master->reploff; g_pserver->master_repl_offset = mi->master->reploff;
if (g_pserver->repl_batch_offStart >= 0) if (g_pserver->repl_batch_offStart >= 0)
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
saveMasterStatusToStorage();
} }
clearReplicationId2(); clearReplicationId2();
@ -3783,61 +3843,6 @@ void disconnectMaster(redisMaster *mi)
} }
} }
void saveMasterStatusToStorage()
{
if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return;
g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true);
if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) {
g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb,
g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true);
}
struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL);
if (miFirst && miFirst->master) {
g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true);
}
else if (miFirst && miFirst->cached_master) {
g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true);
}
if (listLength(g_pserver->masters) == 0) {
g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true);
return;
}
sds val = sds(sdsempty());
listNode *ln;
listIter li;
redisMaster *mi;
listRewind(g_pserver->masters,&li);
while((ln = listNext(&li)) != NULL) {
mi = (redisMaster*)listNodeValue(ln);
if (!mi->master) {
// If master client is not available, use info from master struct - better than nothing
if (mi->master_replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid,
mi->master_initial_offset,
mi->masterhost,
mi->masterport);
}
else {
if (mi->master->replid[0] == 0) {
// if replid is null, there's no reason to save it
continue;
}
val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid,
mi->master->reploff,
mi->masterhost,
mi->masterport);
}
}
g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true);
}
/* Set replication to the specified master address and port. */ /* Set replication to the specified master address and port. */
struct redisMaster *replicationAddMaster(char *ip, int port) { struct redisMaster *replicationAddMaster(char *ip, int port) {
// pre-reqs: We must not already have a replica in the list with the same tuple // pre-reqs: We must not already have a replica in the list with the same tuple

View File

@ -3989,10 +3989,30 @@ void initServer(void) {
if (g_pserver->metadataDb) { if (g_pserver->metadataDb) {
g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){ g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){
if (cb == sizeof(g_pserver->replid)) { if (cb == sizeof(g_pserver->replid)) {
serverLog(LL_NOTICE, "Retrieved repl-id: %s", (const char*)data);
memcpy(g_pserver->replid, data, cb); memcpy(g_pserver->replid, data, cb);
} }
}); });
g_pserver->metadataDb->retrieve("repl-offset", 11, [&](const char *, size_t, const void *data, size_t cb){
if (cb == sizeof(g_pserver->replid)) {
g_pserver->master_repl_offset = *(long long*)data;
}
});
listIter li;
listNode *ln;
listRewind(g_pserver->masters, &li);
while ((ln = listNext(&li)))
{
redisMaster *mi = (redisMaster*)listNodeValue(ln);
/* If we are a replica, create a cached master from this
* information, in order to allow partial resynchronizations
* with masters. */
replicationCacheMasterUsingMyself(mi);
g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t cb){
selectDb(mi->cached_master, *(int*)data);
});
}
} }
} }