From 373f5844655b95ab571597a5eca641751e3d5630 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 23 Dec 2021 00:04:28 -0500 Subject: [PATCH] Fix partial sync corruption with FLASH Former-commit-id: 532f58c0539b775c040c0dd9a2ad3dc349faf87a --- src/replication.cpp | 18 +++++++++++------- src/server.cpp | 15 +++++++++++++++ src/server.h | 1 + tests/integration/replication-psync-flash.tcl | 4 ++-- 4 files changed, 29 insertions(+), 9 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 7a5280c0d..84c0459d6 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -2155,12 +2155,16 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type) /* Save the replid of yourself and any connected masters to storage. * Returns if no storage provider is used. */ -void saveMasterStatusToStorage() +void saveMasterStatusToStorage(bool fShutdown) { + long long tmp = -1; if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); - g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true); + if (fShutdown) + g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true); + else + g_pserver->metadataDb->insert("repl-offset", 11, &tmp, sizeof(g_pserver->master_repl_offset), true); if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); @@ -2223,7 +2227,7 @@ void saveMasterStatusToStorage() void changeReplicationId(void) { getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE); g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0'; - saveMasterStatusToStorage(); + saveMasterStatusToStorage(false); } @@ -3015,7 +3019,7 @@ void readSyncBulkPayload(connection *conn) { g_pserver->master_repl_offset = mi->master->reploff; if (g_pserver->repl_batch_offStart >= 0) g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; - saveMasterStatusToStorage(); + saveMasterStatusToStorage(false); } clearReplicationId2(); @@ -3984,7 +3988,7 @@ struct redisMaster *replicationAddMaster(char *ip, int port) { mi->masterhost, mi->masterport); connectWithMaster(mi); } - saveMasterStatusToStorage(); + saveMasterStatusToStorage(false); return mi; } @@ -4069,7 +4073,7 @@ void replicationUnsetMaster(redisMaster *mi) { * we were still a slave. */ if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC(); - saveMasterStatusToStorage(); + saveMasterStatusToStorage(false); } /* This function is called when the replica lose the connection with the @@ -4102,7 +4106,7 @@ void replicationHandleMasterDisconnection(redisMaster *mi) { connectWithMaster(mi); } - saveMasterStatusToStorage(); + saveMasterStatusToStorage(false); } } diff --git a/src/server.cpp b/src/server.cpp index 69dea0e82..8c0f9315b 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -3395,6 +3395,16 @@ int restartServer(int flags, mstime_t delay) { } } + if (flags & RESTART_SERVER_GRACEFULLY) { + if (g_pserver->m_pstorageFactory) { + saveMasterStatusToStorage(true); + for (int idb = 0; idb < cserver.dbnum; ++idb) { + g_pserver->db[idb]->storageProviderDelete(); + } + delete g_pserver->metadataDb; + } + } + /* Execute the server with the original command line. */ if (delay) usleep(delay*1000); zfree(cserver.exec_argv[0]); @@ -5163,6 +5173,7 @@ int prepareForShutdown(int flags) { if (g_pserver->db[idb]->processChanges(false)) g_pserver->db[idb]->commitChanges(); } + saveMasterStatusToStorage(true); } /* Fire the shutdown modules event. */ @@ -7591,9 +7602,13 @@ int main(int argc, char **argv) { g_pserver->shutdown_asap = true; // flag that we're in shutdown if (!fLockAcquired) g_fInCrash = true; // We don't actually crash right away, because we want to sync any storage providers + + saveMasterStatusToStorage(true); for (int idb = 0; idb < cserver.dbnum; ++idb) { g_pserver->db[idb]->storageProviderDelete(); } + delete g_pserver->metadataDb; + // If we couldn't acquire the global lock it means something wasn't shutdown and we'll probably deadlock serverAssert(fLockAcquired); diff --git a/src/server.h b/src/server.h index 8aee961dc..ef277e1dd 100644 --- a/src/server.h +++ b/src/server.h @@ -2827,6 +2827,7 @@ int redisSetProcTitle(const char *title); int validateProcTitleTemplate(const char *_template); int redisCommunicateSystemd(const char *sd_notify_msg); void redisSetCpuAffinity(const char *cpulist); +void saveMasterStatusToStorage(bool fShutdown); /* networking.c -- Networking and Client related operations */ client *createClient(connection *conn, int iel); diff --git a/tests/integration/replication-psync-flash.tcl b/tests/integration/replication-psync-flash.tcl index 5cad127f3..8438a878c 100644 --- a/tests/integration/replication-psync-flash.tcl +++ b/tests/integration/replication-psync-flash.tcl @@ -9,7 +9,7 @@ # reconnect with the master, otherwise just the initial synchronization is # checked for consistency. proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { - start_server {tags {"repl"}} { + start_server [list tags {"repl"} overrides [list storage-provider {flash .rocks.db.m} repl-backlog-size 1m]] { start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] { set master [srv -1 client] @@ -84,7 +84,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco fail "Slave still not connected after some time" } - set retry 10 + set retry 20 while {$retry && ([$master debug digest] ne [$slave debug digest])}\ { after 1000