From 4c3d5c83e5cb52abfba9aa3324838ed14b0ff63a Mon Sep 17 00:00:00 2001 From: christianEQ Date: Mon, 8 Nov 2021 19:34:30 +0000 Subject: [PATCH] save replid from storage on load Former-commit-id: 8e5d0cb7035db30f35ead36aab52df07ab3c9bee --- src/replication.cpp | 115 +++++++++++++++++++++++--------------------- src/server.cpp | 22 ++++++++- 2 files changed, 81 insertions(+), 56 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index b6607235c..4756801e4 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2096,6 +2096,64 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) } } +/* Save the replid of yourself and any connected masters to storage. + * Returns if no storage provider is used. */ +void saveMasterStatusToStorage() +{ + if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; + + g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); + g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true); + if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { + g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, + g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); + } + + struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); + + if (miFirst && miFirst->master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true); + } + else if (miFirst && miFirst->cached_master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true); + } + + if (listLength(g_pserver->masters) == 0) { + g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true); + return; + } + sds val = sds(sdsempty()); + listNode *ln; + listIter li; + redisMaster *mi; + listRewind(g_pserver->masters,&li); + while((ln = listNext(&li)) != NULL) { + mi = (redisMaster*)listNodeValue(ln); + if (!mi->master) { + // If master client is not available, use info from master struct - better than nothing + if (mi->master_replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid, + mi->master_initial_offset, + mi->masterhost, + mi->masterport); + } + else { + if (mi->master->replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid, + mi->master->reploff, + mi->masterhost, + mi->masterport); + } + } + g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true); +} + /* Change the current instance replication ID with a new, random one. * This will prevent successful PSYNCs between this master and other * slaves, so the command should be called when something happens that @@ -2103,6 +2161,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) void changeReplicationId(void) { getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE); g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0'; + saveMasterStatusToStorage(); } @@ -2894,6 +2953,7 @@ void readSyncBulkPayload(connection *conn) { g_pserver->master_repl_offset = mi->master->reploff; if (g_pserver->repl_batch_offStart >= 0) g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + saveMasterStatusToStorage(); } clearReplicationId2(); @@ -3783,61 +3843,6 @@ void disconnectMaster(redisMaster *mi) } } -void saveMasterStatusToStorage() -{ - if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; - - g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); - if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { - g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, - g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); - } - - struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); - - if (miFirst && miFirst->master) { - g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true); - } - else if (miFirst && miFirst->cached_master) { - g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true); - } - - if (listLength(g_pserver->masters) == 0) { - g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true); - return; - } - sds val = sds(sdsempty()); - listNode *ln; - listIter li; - redisMaster *mi; - listRewind(g_pserver->masters,&li); - while((ln = listNext(&li)) != NULL) { - mi = (redisMaster*)listNodeValue(ln); - if (!mi->master) { - // If master client is not available, use info from master struct - better than nothing - if (mi->master_replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid, - mi->master_initial_offset, - mi->masterhost, - mi->masterport); - } - else { - if (mi->master->replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid, - mi->master->reploff, - mi->masterhost, - mi->masterport); - } - } - g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true); -} - /* Set replication to the specified master address and port. */ struct redisMaster *replicationAddMaster(char *ip, int port) { // pre-reqs: We must not already have a replica in the list with the same tuple diff --git a/src/server.cpp b/src/server.cpp index a3fe4773a..c74cc179b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3989,10 +3989,30 @@ void initServer(void) { if (g_pserver->metadataDb) { g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){ if (cb == sizeof(g_pserver->replid)) { - serverLog(LL_NOTICE, "Retrieved repl-id: %s", (const char*)data); memcpy(g_pserver->replid, data, cb); } }); + g_pserver->metadataDb->retrieve("repl-offset", 11, [&](const char *, size_t, const void *data, size_t cb){ + if (cb == sizeof(g_pserver->replid)) { + g_pserver->master_repl_offset = *(long long*)data; + } + }); + + listIter li; + listNode *ln; + + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + /* If we are a replica, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t cb){ + selectDb(mi->cached_master, *(int*)data); + }); + } } }