From c195ee4453f1123e1779e5210e18b8bdc0fe507f Mon Sep 17 00:00:00 2001 From: christianEQ Date: Mon, 1 Nov 2021 03:17:13 +0000 Subject: [PATCH 1/9] added createMetadataDb to IStorageFactory + implementations Former-commit-id: a2acf75484d2af93aad9d03a20bd402893044860 --- src/IStorage.h | 1 + src/server.cpp | 4 ++++ src/server.h | 1 + src/storage/rocksdbfactor_internal.h | 1 + src/storage/rocksdbfactory.cpp | 6 ++++++ src/storage/teststorageprovider.cpp | 5 +++++ src/storage/teststorageprovider.h | 1 + 7 files changed, 19 insertions(+) diff --git a/src/IStorage.h b/src/IStorage.h index e060d849f..196ae8cef 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -9,6 +9,7 @@ public: virtual ~IStorageFactory() {} virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0; + virtual class IStorage *createMetadataDb() = 0; virtual const char *name() const = 0; virtual size_t totalDiskspaceUsed() const = 0; virtual bool FSlow() const = 0; diff --git a/src/server.cpp b/src/server.cpp index 0dcae55b0..64bfeb86e 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3984,6 +3984,10 @@ void initServer(void) { slowlogInit(); latencyMonitorInit(); + if (g_pserver->m_pstorageFactory) { + g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb(); + } + /* We have to initialize storage providers after the cluster has been initialized */ for (int idb = 0; idb < cserver.dbnum; ++idb) { diff --git a/src/server.h b/src/server.h index 656560868..6b00b27a3 100644 --- a/src/server.h +++ b/src/server.h @@ -2170,6 +2170,7 @@ struct redisServer { mode_t umask; /* The umask value of the process on startup */ std::atomic hz; /* serverCron() calls frequency in hertz */ int in_fork_child; /* indication that this is a fork child */ + IStorage *metadataDb = nullptr; redisDb **db = nullptr; dict *commands; /* Command table */ dict *orig_commands; /* Command table before command renaming. */ diff --git a/src/storage/rocksdbfactor_internal.h b/src/storage/rocksdbfactor_internal.h index beb71d87e..8a7df2cf5 100644 --- a/src/storage/rocksdbfactor_internal.h +++ b/src/storage/rocksdbfactor_internal.h @@ -14,6 +14,7 @@ public: ~RocksDBStorageFactory(); virtual IStorage *create(int db, key_load_iterator iter, void *privdata) override; + virtual IStorage *createMetadataDb() override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override; diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 7beda21f0..7fd41b349 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -32,6 +32,7 @@ IStorageFactory *CreateRocksDBStorageFactory(const char *path, int dbnum, const RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, const char *rgchConfig, size_t cchConfig) : m_path(dbfile) { + dbnum++; // create an extra db for metadata // Get the count of column families in the actual database std::vector vecT; auto status = rocksdb::DB::ListColumnFamilies(rocksdb::Options(), dbfile, &vecT); @@ -134,6 +135,11 @@ std::string RocksDBStorageFactory::getTempFolder() return path; } +IStorage *RocksDBStorageFactory::createMetadataDb() +{ + return this->create(-1, nullptr, nullptr); +} + IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *privdata) { ++db; // skip default col family diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index 188a339f3..3739c403d 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -6,6 +6,11 @@ IStorage *TestStorageFactory::create(int, key_load_iterator, void *) return new (MALLOC_LOCAL) TestStorageProvider(); } +IStorage *TestStorageFactory::createMetadataDb() +{ + return new (MALLOC_LOCAL) TestStorageProvider(); +} + const char *TestStorageFactory::name() const { return "TEST Storage Provider"; diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index d5e956c7e..2b2b1a38d 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -5,6 +5,7 @@ class TestStorageFactory : public IStorageFactory { virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override; + virtual class IStorage *createMetadataDb() override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override { return 0; } virtual bool FSlow() const { return false; } From b24e18601cbd9c13b95e25e3c77f5aa3699dfa62 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Tue, 2 Nov 2021 15:37:29 +0000 Subject: [PATCH 2/9] add KEYDB_METADATA_ID to metadata tables when created Former-commit-id: 7fe8a184db62eab171935c20498bdb4f30ee6b1d --- src/IStorage.h | 2 ++ src/storage/rocksdbfactory.cpp | 4 +++- src/storage/teststorageprovider.cpp | 4 +++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/IStorage.h b/src/IStorage.h index 196ae8cef..ad956beb6 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -2,6 +2,8 @@ #include #include "sds.h" +#define METADATA_DB_IDENTIFIER "c299fde0-6d42-4ec4-b939-34f680ffe39f" + class IStorageFactory { public: diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 7fd41b349..518944b38 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -137,7 +137,9 @@ std::string RocksDBStorageFactory::getTempFolder() IStorage *RocksDBStorageFactory::createMetadataDb() { - return this->create(-1, nullptr, nullptr); + IStorage *metadataDb = this->create(-1, nullptr, nullptr); + metadataDb->insert("KEYDB_METADATA_ID", strlen("KEYDB_METADATA_ID"), (void*)METADATA_DB_IDENTIFIER, strlen(METADATA_DB_IDENTIFIER), false); + return metadataDb; } IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *privdata) diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index 3739c403d..a287397c7 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -8,7 +8,9 @@ IStorage *TestStorageFactory::create(int, key_load_iterator, void *) IStorage *TestStorageFactory::createMetadataDb() { - return new (MALLOC_LOCAL) TestStorageProvider(); + IStorage *metadataDb = new (MALLOC_LOCAL) TestStorageProvider(); + metadataDb->insert("KEYDB_METADATA_ID", strlen("KEYDB_METADATA_ID"), (void*)METADATA_DB_IDENTIFIER, strlen(METADATA_DB_IDENTIFIER), false); + return metadataDb; } const char *TestStorageFactory::name() const From 0d87e54d10478ae9dc85ab8374e03f93827ac345 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 3 Nov 2021 00:59:17 +0000 Subject: [PATCH 3/9] save and recognize metadata table identifier Former-commit-id: f06ef757c24ecc50df0e7abf5201a5499ff28c53 --- src/storage/rocksdb.h | 1 + src/storage/rocksdbfactory.cpp | 9 ++++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/src/storage/rocksdb.h b/src/storage/rocksdb.h index 2a980cf64..0936c585a 100644 --- a/src/storage/rocksdb.h +++ b/src/storage/rocksdb.h @@ -8,6 +8,7 @@ #define INTERNAL_KEY_PREFIX "\x00\x04\x03\x00\x05\x02\x04" static const char count_key[] = INTERNAL_KEY_PREFIX "__keydb__count\1"; static const char version_key[] = INTERNAL_KEY_PREFIX "__keydb__version\1"; +static const char meta_key[] = INTERNAL_KEY_PREFIX "__keydb__metadata\1"; class RocksDBStorageFactory; class RocksDBStorageProvider : public IStorage diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 518944b38..b16033250 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -90,6 +90,13 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons m_spdb = std::shared_ptr(db); for (auto handle : handles) { + std::string metaId; + + auto idStatus = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(meta_key, sizeof(meta_key)), &metaId); + if (status.ok() && !strcmp(metaId.c_str(), METADATA_DB_IDENTIFIER)) { + printf("Recognized metadata table\r\n"); + } + std::string strVersion; auto status = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(version_key, sizeof(version_key)), &strVersion); if (!status.ok()) @@ -138,7 +145,7 @@ std::string RocksDBStorageFactory::getTempFolder() IStorage *RocksDBStorageFactory::createMetadataDb() { IStorage *metadataDb = this->create(-1, nullptr, nullptr); - metadataDb->insert("KEYDB_METADATA_ID", strlen("KEYDB_METADATA_ID"), (void*)METADATA_DB_IDENTIFIER, strlen(METADATA_DB_IDENTIFIER), false); + metadataDb->insert(meta_key, sizeof(meta_key), (void*)METADATA_DB_IDENTIFIER, strlen(METADATA_DB_IDENTIFIER), false); return metadataDb; } From 5f7fdb7c9eefdce7c7603913cce5e1104f8da434 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Wed, 3 Nov 2021 02:25:57 +0000 Subject: [PATCH 4/9] save master status to storage when masters change Former-commit-id: 4989926a0028aed7d7700fd1d1f4ed27c20277cc --- src/IStorage.h | 1 + src/replication.cpp | 44 +++++++++++++++++++++++++++++ src/server.cpp | 4 +++ src/storage/rocksdbfactory.cpp | 10 +++++-- src/storage/teststorageprovider.cpp | 5 ++++ src/storage/teststorageprovider.h | 1 + 6 files changed, 63 insertions(+), 2 deletions(-) diff --git a/src/IStorage.h b/src/IStorage.h index ad956beb6..cd5215b6d 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -12,6 +12,7 @@ public: virtual ~IStorageFactory() {} virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0; virtual class IStorage *createMetadataDb() = 0; + virtual std::string getMetadata() const = 0; virtual const char *name() const = 0; virtual size_t totalDiskspaceUsed() const = 0; virtual bool FSlow() const = 0; diff --git a/src/replication.cpp b/src/replication.cpp index 3651b0113..4a6e8d5c8 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3783,6 +3783,45 @@ void disconnectMaster(redisMaster *mi) } } +void saveMasterStatusToStorage() +{ + if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; + if (listLength(g_pserver->masters) == 0) { + g_pserver->metadataDb->insert("repl_masters", 12, (void*)"", 0, true); + return; + } + sds val = sds(sdsempty()); + listNode *ln; + listIter li; + redisMaster *mi; + listRewind(g_pserver->masters,&li); + while((ln = listNext(&li)) != NULL) { + mi = (redisMaster*)listNodeValue(ln); + if (!mi->master) { + // If master client is not available, use info from master struct - better than nothing + if (mi->master_replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid, + mi->master_initial_offset, + mi->masterhost, + mi->masterport); + } + else { + if (mi->master->replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid, + mi->master->reploff, + mi->masterhost, + mi->masterport); + } + } + g_pserver->metadataDb->insert("repl_masters", 12, (void*)val, sdslen(val), true); +} + /* Set replication to the specified master address and port. */ struct redisMaster *replicationAddMaster(char *ip, int port) { // pre-reqs: We must not already have a replica in the list with the same tuple @@ -3855,6 +3894,7 @@ struct redisMaster *replicationAddMaster(char *ip, int port) { mi->masterhost, mi->masterport); connectWithMaster(mi); } + saveMasterStatusToStorage(); return mi; } @@ -3938,6 +3978,8 @@ void replicationUnsetMaster(redisMaster *mi) { /* Restart the AOF subsystem in case we shut it down during a sync when * we were still a slave. */ if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC(); + + saveMasterStatusToStorage(); } /* This function is called when the replica lose the connection with the @@ -3969,6 +4011,8 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { mi->masterhost, mi->masterport); connectWithMaster(mi); } + + saveMasterStatusToStorage(); } } diff --git a/src/server.cpp b/src/server.cpp index 64bfeb86e..d807f4d04 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3985,6 +3985,10 @@ void initServer(void) { latencyMonitorInit(); if (g_pserver->m_pstorageFactory) { + std::string repl_masters = g_pserver->m_pstorageFactory->getMetadata(); + if (!repl_masters.empty()) { + serverLog(LL_NOTICE, "Loaded repl-masters from storage provider: %s", repl_masters); + } g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb(); } diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index b16033250..78ed09e51 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -93,8 +93,9 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons std::string metaId; auto idStatus = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(meta_key, sizeof(meta_key)), &metaId); - if (status.ok() && !strcmp(metaId.c_str(), METADATA_DB_IDENTIFIER)) { - printf("Recognized metadata table\r\n"); + if (idStatus.ok() && !strcmp(metaId.c_str(), METADATA_DB_IDENTIFIER)) { + auto metaStatus = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice("repl-masters", 12), &this->metadata); + if (!metaStatus.ok()) this->metadata = ""; } std::string strVersion; @@ -190,6 +191,11 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *pr return new RocksDBStorageProvider(this, m_spdb, spcolfamily, nullptr, count); } +std::string RocksDBStorageFactory::getMetadata() const +{ + return metadata; +} + const char *RocksDBStorageFactory::name() const { return "flash"; diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index a287397c7..73a7ef01d 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -13,6 +13,11 @@ IStorage *TestStorageFactory::createMetadataDb() return metadataDb; } +std::string TestStorageFactory::getMetadata() const +{ + return ""; +} + const char *TestStorageFactory::name() const { return "TEST Storage Provider"; diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index 2b2b1a38d..40f4a0f9d 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -6,6 +6,7 @@ class TestStorageFactory : public IStorageFactory { virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override; virtual class IStorage *createMetadataDb() override; + virtual std::string getMetadata() const override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override { return 0; } virtual bool FSlow() const { return false; } From 6ef6c917ecab9e90fe6638d2c5051ad189266bc0 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Sat, 6 Nov 2021 01:05:47 +0000 Subject: [PATCH 5/9] retrieve replid from IStorage rather than at load time Former-commit-id: c25323439ce400ca91b2193aa2f464e7b09978fd --- src/IStorage.h | 1 - src/replication.cpp | 20 ++++++++++++++++++-- src/server.cpp | 12 ++++++++---- src/storage/rocksdbfactory.cpp | 5 ----- src/storage/teststorageprovider.cpp | 5 ----- src/storage/teststorageprovider.h | 1 - 6 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/IStorage.h b/src/IStorage.h index cd5215b6d..ad956beb6 100644 --- a/src/IStorage.h +++ b/src/IStorage.h @@ -12,7 +12,6 @@ public: virtual ~IStorageFactory() {} virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) = 0; virtual class IStorage *createMetadataDb() = 0; - virtual std::string getMetadata() const = 0; virtual const char *name() const = 0; virtual size_t totalDiskspaceUsed() const = 0; virtual bool FSlow() const = 0; diff --git a/src/replication.cpp b/src/replication.cpp index 4a6e8d5c8..b6607235c 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -3786,8 +3786,24 @@ void disconnectMaster(redisMaster *mi) void saveMasterStatusToStorage() { if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; + + g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); + if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { + g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, + g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); + } + + struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); + + if (miFirst && miFirst->master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true); + } + else if (miFirst && miFirst->cached_master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true); + } + if (listLength(g_pserver->masters) == 0) { - g_pserver->metadataDb->insert("repl_masters", 12, (void*)"", 0, true); + g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true); return; } sds val = sds(sdsempty()); @@ -3819,7 +3835,7 @@ void saveMasterStatusToStorage() mi->masterport); } } - g_pserver->metadataDb->insert("repl_masters", 12, (void*)val, sdslen(val), true); + g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true); } /* Set replication to the specified master address and port. */ diff --git a/src/server.cpp b/src/server.cpp index d807f4d04..a3fe4773a 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3985,11 +3985,15 @@ void initServer(void) { latencyMonitorInit(); if (g_pserver->m_pstorageFactory) { - std::string repl_masters = g_pserver->m_pstorageFactory->getMetadata(); - if (!repl_masters.empty()) { - serverLog(LL_NOTICE, "Loaded repl-masters from storage provider: %s", repl_masters); - } g_pserver->metadataDb = g_pserver->m_pstorageFactory->createMetadataDb(); + if (g_pserver->metadataDb) { + g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){ + if (cb == sizeof(g_pserver->replid)) { + serverLog(LL_NOTICE, "Retrieved repl-id: %s", (const char*)data); + memcpy(g_pserver->replid, data, cb); + } + }); + } } /* We have to initialize storage providers after the cluster has been initialized */ diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index 78ed09e51..bef8cdbaa 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -191,11 +191,6 @@ IStorage *RocksDBStorageFactory::create(int db, key_load_iterator iter, void *pr return new RocksDBStorageProvider(this, m_spdb, spcolfamily, nullptr, count); } -std::string RocksDBStorageFactory::getMetadata() const -{ - return metadata; -} - const char *RocksDBStorageFactory::name() const { return "flash"; diff --git a/src/storage/teststorageprovider.cpp b/src/storage/teststorageprovider.cpp index 73a7ef01d..a287397c7 100644 --- a/src/storage/teststorageprovider.cpp +++ b/src/storage/teststorageprovider.cpp @@ -13,11 +13,6 @@ IStorage *TestStorageFactory::createMetadataDb() return metadataDb; } -std::string TestStorageFactory::getMetadata() const -{ - return ""; -} - const char *TestStorageFactory::name() const { return "TEST Storage Provider"; diff --git a/src/storage/teststorageprovider.h b/src/storage/teststorageprovider.h index 40f4a0f9d..2b2b1a38d 100644 --- a/src/storage/teststorageprovider.h +++ b/src/storage/teststorageprovider.h @@ -6,7 +6,6 @@ class TestStorageFactory : public IStorageFactory { virtual class IStorage *create(int db, key_load_iterator itr, void *privdata) override; virtual class IStorage *createMetadataDb() override; - virtual std::string getMetadata() const override; virtual const char *name() const override; virtual size_t totalDiskspaceUsed() const override { return 0; } virtual bool FSlow() const { return false; } From 4c3d5c83e5cb52abfba9aa3324838ed14b0ff63a Mon Sep 17 00:00:00 2001 From: christianEQ Date: Mon, 8 Nov 2021 19:34:30 +0000 Subject: [PATCH 6/9] save replid from storage on load Former-commit-id: 8e5d0cb7035db30f35ead36aab52df07ab3c9bee --- src/replication.cpp | 115 +++++++++++++++++++++++--------------------- src/server.cpp | 22 ++++++++- 2 files changed, 81 insertions(+), 56 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index b6607235c..4756801e4 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2096,6 +2096,64 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) } } +/* Save the replid of yourself and any connected masters to storage. + * Returns if no storage provider is used. */ +void saveMasterStatusToStorage() +{ + if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; + + g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); + g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true); + if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { + g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, + g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); + } + + struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); + + if (miFirst && miFirst->master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true); + } + else if (miFirst && miFirst->cached_master) { + g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true); + } + + if (listLength(g_pserver->masters) == 0) { + g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true); + return; + } + sds val = sds(sdsempty()); + listNode *ln; + listIter li; + redisMaster *mi; + listRewind(g_pserver->masters,&li); + while((ln = listNext(&li)) != NULL) { + mi = (redisMaster*)listNodeValue(ln); + if (!mi->master) { + // If master client is not available, use info from master struct - better than nothing + if (mi->master_replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid, + mi->master_initial_offset, + mi->masterhost, + mi->masterport); + } + else { + if (mi->master->replid[0] == 0) { + // if replid is null, there's no reason to save it + continue; + } + val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid, + mi->master->reploff, + mi->masterhost, + mi->masterport); + } + } + g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true); +} + /* Change the current instance replication ID with a new, random one. * This will prevent successful PSYNCs between this master and other * slaves, so the command should be called when something happens that @@ -2103,6 +2161,7 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) void changeReplicationId(void) { getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE); g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0'; + saveMasterStatusToStorage(); } @@ -2894,6 +2953,7 @@ void readSyncBulkPayload(connection *conn) { g_pserver->master_repl_offset = mi->master->reploff; if (g_pserver->repl_batch_offStart >= 0) g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; + saveMasterStatusToStorage(); } clearReplicationId2(); @@ -3783,61 +3843,6 @@ void disconnectMaster(redisMaster *mi) } } -void saveMasterStatusToStorage() -{ - if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; - - g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); - if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { - g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, - g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); - } - - struct redisMaster *miFirst = (redisMaster*)(listLength(g_pserver->masters) ? listNodeValue(listFirst(g_pserver->masters)) : NULL); - - if (miFirst && miFirst->master) { - g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->master->db->id, sizeof(miFirst->master->db->id), true); - } - else if (miFirst && miFirst->cached_master) { - g_pserver->metadataDb->insert("repl-stream-db", 14, &miFirst->cached_master->db->id, sizeof(miFirst->cached_master->db->id), true); - } - - if (listLength(g_pserver->masters) == 0) { - g_pserver->metadataDb->insert("repl-masters", 12, (void*)"", 0, true); - return; - } - sds val = sds(sdsempty()); - listNode *ln; - listIter li; - redisMaster *mi; - listRewind(g_pserver->masters,&li); - while((ln = listNext(&li)) != NULL) { - mi = (redisMaster*)listNodeValue(ln); - if (!mi->master) { - // If master client is not available, use info from master struct - better than nothing - if (mi->master_replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master_replid, - mi->master_initial_offset, - mi->masterhost, - mi->masterport); - } - else { - if (mi->master->replid[0] == 0) { - // if replid is null, there's no reason to save it - continue; - } - val = sdscatfmt(val, "%s:%I:%s:%i;", mi->master->replid, - mi->master->reploff, - mi->masterhost, - mi->masterport); - } - } - g_pserver->metadataDb->insert("repl-masters", 12, (void*)val, sdslen(val), true); -} - /* Set replication to the specified master address and port. */ struct redisMaster *replicationAddMaster(char *ip, int port) { // pre-reqs: We must not already have a replica in the list with the same tuple diff --git a/src/server.cpp b/src/server.cpp index a3fe4773a..c74cc179b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3989,10 +3989,30 @@ void initServer(void) { if (g_pserver->metadataDb) { g_pserver->metadataDb->retrieve("repl-id", 7, [&](const char *, size_t, const void *data, size_t cb){ if (cb == sizeof(g_pserver->replid)) { - serverLog(LL_NOTICE, "Retrieved repl-id: %s", (const char*)data); memcpy(g_pserver->replid, data, cb); } }); + g_pserver->metadataDb->retrieve("repl-offset", 11, [&](const char *, size_t, const void *data, size_t cb){ + if (cb == sizeof(g_pserver->replid)) { + g_pserver->master_repl_offset = *(long long*)data; + } + }); + + listIter li; + listNode *ln; + + listRewind(g_pserver->masters, &li); + while ((ln = listNext(&li))) + { + redisMaster *mi = (redisMaster*)listNodeValue(ln); + /* If we are a replica, create a cached master from this + * information, in order to allow partial resynchronizations + * with masters. */ + replicationCacheMasterUsingMyself(mi); + g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t cb){ + selectDb(mi->cached_master, *(int*)data); + }); + } } } From bde5ec5b45107896f326abbe6f9f657c422a1d43 Mon Sep 17 00:00:00 2001 From: christianEQ Date: Thu, 11 Nov 2021 01:41:28 +0000 Subject: [PATCH 7/9] added tests for flash psync Former-commit-id: 79f7376a4fea2de9bd06c06d96f3f98ee7308874 --- tests/integration/replication-psync-flash.tcl | 135 ++++++++++++++++++ tests/test_helper.tcl | 1 + 2 files changed, 136 insertions(+) create mode 100644 tests/integration/replication-psync-flash.tcl diff --git a/tests/integration/replication-psync-flash.tcl b/tests/integration/replication-psync-flash.tcl new file mode 100644 index 000000000..5cad127f3 --- /dev/null +++ b/tests/integration/replication-psync-flash.tcl @@ -0,0 +1,135 @@ +# Creates a master-slave pair and breaks the link continuously to force +# partial resyncs attempts, all this while flooding the master with +# write queries. +# +# You can specify backlog size, ttl, delay before reconnection, test duration +# in seconds, and an additional condition to verify at the end. +# +# If reconnect is > 0, the test actually try to break the connection and +# reconnect with the master, otherwise just the initial synchronization is +# checked for consistency. +proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { + start_server {tags {"repl"}} { + start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] { + + set master [srv -1 client] + set master_host [srv -1 host] + set master_port [srv -1 port] + set slave [srv 0 client] + + set load_handle0 [start_bg_complex_data $master_host $master_port 9 100000] + set load_handle1 [start_bg_complex_data $master_host $master_port 11 100000] + set load_handle2 [start_bg_complex_data $master_host $master_port 12 100000] + + test {Slave should be able to synchronize with the master} { + $slave slaveof $master_host $master_port + wait_for_condition 50 100 { + [lindex [r role] 0] eq {slave} && + [lindex [r role] 3] eq {connected} + } else { + fail "Replication not started." + } + } + + # Check that the background clients are actually writing. + test {Detect write load to master} { + wait_for_condition 50 1000 { + [$master dbsize] > 100 + } else { + fail "Can't detect write load from background clients." + } + } + + test "Test replication partial resync: $descr (diskless: $mdl, $sdl, reconnect: $reconnect)" { + # Now while the clients are writing data, break the maste-slave + # link multiple times. + if ($reconnect) { + for {set j 0} {$j < $duration*10} {incr j} { + after 100 + # catch {puts "MASTER [$master dbsize] keys, REPLICA [$slave dbsize] keys"} + + if {($j % 20) == 0} { + catch { + $slave debug restart + } + } + } + } + stop_bg_complex_data $load_handle0 + stop_bg_complex_data $load_handle1 + stop_bg_complex_data $load_handle2 + + # Wait for the slave to reach the "online" + # state from the POV of the master. + set retry 5000 + while {$retry} { + set info [$master info] + if {[string match {*slave0:*state=online*} $info]} { + break + } else { + incr retry -1 + after 100 + } + } + if {$retry == 0} { + error "assertion:Slave not correctly synchronized" + } + + # Wait that slave acknowledge it is online so + # we are sure that DBSIZE and DEBUG DIGEST will not + # fail because of timing issues. (-LOADING error) + wait_for_condition 5000 100 { + [lindex [$slave role] 3] eq {connected} + } else { + fail "Slave still not connected after some time" + } + + set retry 10 + while {$retry && ([$master debug digest] ne [$slave debug digest])}\ + { + after 1000 + incr retry -1 + } + assert {[$master dbsize] > 0} + + if {[$master debug digest] ne [$slave debug digest]} { + set csv1 [csvdump r] + set csv2 [csvdump {r -1}] + set fd [open /tmp/repldump1.txt w] + puts -nonewline $fd $csv1 + close $fd + set fd [open /tmp/repldump2.txt w] + puts -nonewline $fd $csv2 + close $fd + puts "Master - Replica inconsistency" + puts "Run diff -u against /tmp/repldump*.txt for more info" + } + assert_equal [r debug digest] [r -1 debug digest] + eval $cond + } + } + } +} + +foreach mdl {no yes} { + foreach sdl {disabled swapdb} { + test_psync {no reconnection, just sync} 6 1000000 3600 0 { + } $mdl $sdl 0 + + test_psync {ok psync} 6 100000000 3600 0 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl 1 + + test_psync {no backlog} 6 100 3600 0.5 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + + test_psync {ok after delay} 3 100000000 3600 3 { + assert {[s -1 sync_partial_ok] > 0} + } $mdl $sdl 1 + + test_psync {backlog expired} 3 100000000 1 3 { + assert {[s -1 sync_partial_err] > 0} + } $mdl $sdl 1 + } +} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index 2e200c35a..2ac904d20 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -47,6 +47,7 @@ set ::all_tests { integration/replication-3 integration/replication-4 integration/replication-psync + integration/replication-psync-flash integration/replication-active integration/replication-multimaster integration/replication-multimaster-connect From 642b1b9941f7be0a68bfb09fc64f90c7f33a9bfd Mon Sep 17 00:00:00 2001 From: christianEQ Date: Thu, 11 Nov 2021 13:12:46 +0000 Subject: [PATCH 8/9] removed unnecessary code to check for metadata Former-commit-id: 69ffc89d7c27a4eef04aa5cb59ffd1c2b9b8eb20 --- src/storage/rocksdbfactory.cpp | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/storage/rocksdbfactory.cpp b/src/storage/rocksdbfactory.cpp index bef8cdbaa..c7f45b977 100644 --- a/src/storage/rocksdbfactory.cpp +++ b/src/storage/rocksdbfactory.cpp @@ -90,14 +90,6 @@ RocksDBStorageFactory::RocksDBStorageFactory(const char *dbfile, int dbnum, cons m_spdb = std::shared_ptr(db); for (auto handle : handles) { - std::string metaId; - - auto idStatus = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(meta_key, sizeof(meta_key)), &metaId); - if (idStatus.ok() && !strcmp(metaId.c_str(), METADATA_DB_IDENTIFIER)) { - auto metaStatus = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice("repl-masters", 12), &this->metadata); - if (!metaStatus.ok()) this->metadata = ""; - } - std::string strVersion; auto status = m_spdb->Get(rocksdb::ReadOptions(), handle, rocksdb::Slice(version_key, sizeof(version_key)), &strVersion); if (!status.ok()) From 5e8fc5256bdc6673f2793ab9cd03c215131444fe Mon Sep 17 00:00:00 2001 From: christianEQ Date: Thu, 11 Nov 2021 13:15:21 +0000 Subject: [PATCH 9/9] removed unused vars Former-commit-id: e2bb6242e8be53cdad6def6acdd7e90e0f7d9852 --- src/server.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server.cpp b/src/server.cpp index c74cc179b..d13bc6f00 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4009,7 +4009,7 @@ void initServer(void) { * information, in order to allow partial resynchronizations * with masters. */ replicationCacheMasterUsingMyself(mi); - g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t cb){ + g_pserver->metadataDb->retrieve("repl-stream-db", 14, [&](const char *, size_t, const void *data, size_t){ selectDb(mi->cached_master, *(int*)data); }); }