Fix partial sync corruption with FLASH

Former-commit-id: 532f58c0539b775c040c0dd9a2ad3dc349faf87a
This commit is contained in:
John Sully 2021-12-23 00:04:28 -05:00
parent b13134c501
commit 373f584465
4 changed files with 29 additions and 9 deletions

View File

@ -2155,12 +2155,16 @@ void updateSlavesWaitingBgsave(int bgsaveerr, int type)
/* Save the replid of yourself and any connected masters to storage. /* Save the replid of yourself and any connected masters to storage.
* Returns if no storage provider is used. */ * Returns if no storage provider is used. */
void saveMasterStatusToStorage() void saveMasterStatusToStorage(bool fShutdown)
{ {
long long tmp = -1;
if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return; if (!g_pserver->m_pstorageFactory || !g_pserver->metadataDb) return;
g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true); g_pserver->metadataDb->insert("repl-id", 7, g_pserver->replid, sizeof(g_pserver->replid), true);
g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true); if (fShutdown)
g_pserver->metadataDb->insert("repl-offset", 11, &g_pserver->master_repl_offset, sizeof(g_pserver->master_repl_offset), true);
else
g_pserver->metadataDb->insert("repl-offset", 11, &tmp, sizeof(g_pserver->master_repl_offset), true);
if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) { if (g_pserver->fActiveReplica || (!listLength(g_pserver->masters) && g_pserver->repl_backlog)) {
g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb, g_pserver->metadataDb->insert("repl-stream-db", 14, g_pserver->replicaseldb == -1 ? 0 : &g_pserver->replicaseldb,
g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true); g_pserver->replicaseldb == -1 ? 0 : sizeof(g_pserver->replicaseldb), true);
@ -2223,7 +2227,7 @@ void saveMasterStatusToStorage()
void changeReplicationId(void) { void changeReplicationId(void) {
getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE); getRandomHexChars(g_pserver->replid,CONFIG_RUN_ID_SIZE);
g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0'; g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0';
saveMasterStatusToStorage(); saveMasterStatusToStorage(false);
} }
@ -3015,7 +3019,7 @@ void readSyncBulkPayload(connection *conn) {
g_pserver->master_repl_offset = mi->master->reploff; g_pserver->master_repl_offset = mi->master->reploff;
if (g_pserver->repl_batch_offStart >= 0) if (g_pserver->repl_batch_offStart >= 0)
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;
saveMasterStatusToStorage(); saveMasterStatusToStorage(false);
} }
clearReplicationId2(); clearReplicationId2();
@ -3984,7 +3988,7 @@ struct redisMaster *replicationAddMaster(char *ip, int port) {
mi->masterhost, mi->masterport); mi->masterhost, mi->masterport);
connectWithMaster(mi); connectWithMaster(mi);
} }
saveMasterStatusToStorage(); saveMasterStatusToStorage(false);
return mi; return mi;
} }
@ -4069,7 +4073,7 @@ void replicationUnsetMaster(redisMaster *mi) {
* we were still a slave. */ * we were still a slave. */
if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC(); if (g_pserver->aof_enabled && g_pserver->aof_state == AOF_OFF) restartAOFAfterSYNC();
saveMasterStatusToStorage(); saveMasterStatusToStorage(false);
} }
/* This function is called when the replica lose the connection with the /* This function is called when the replica lose the connection with the
@ -4102,7 +4106,7 @@ void replicationHandleMasterDisconnection(redisMaster *mi) {
connectWithMaster(mi); connectWithMaster(mi);
} }
saveMasterStatusToStorage(); saveMasterStatusToStorage(false);
} }
} }

View File

@ -3395,6 +3395,16 @@ int restartServer(int flags, mstime_t delay) {
} }
} }
if (flags & RESTART_SERVER_GRACEFULLY) {
if (g_pserver->m_pstorageFactory) {
saveMasterStatusToStorage(true);
for (int idb = 0; idb < cserver.dbnum; ++idb) {
g_pserver->db[idb]->storageProviderDelete();
}
delete g_pserver->metadataDb;
}
}
/* Execute the server with the original command line. */ /* Execute the server with the original command line. */
if (delay) usleep(delay*1000); if (delay) usleep(delay*1000);
zfree(cserver.exec_argv[0]); zfree(cserver.exec_argv[0]);
@ -5163,6 +5173,7 @@ int prepareForShutdown(int flags) {
if (g_pserver->db[idb]->processChanges(false)) if (g_pserver->db[idb]->processChanges(false))
g_pserver->db[idb]->commitChanges(); g_pserver->db[idb]->commitChanges();
} }
saveMasterStatusToStorage(true);
} }
/* Fire the shutdown modules event. */ /* Fire the shutdown modules event. */
@ -7591,9 +7602,13 @@ int main(int argc, char **argv) {
g_pserver->shutdown_asap = true; // flag that we're in shutdown g_pserver->shutdown_asap = true; // flag that we're in shutdown
if (!fLockAcquired) if (!fLockAcquired)
g_fInCrash = true; // We don't actually crash right away, because we want to sync any storage providers g_fInCrash = true; // We don't actually crash right away, because we want to sync any storage providers
saveMasterStatusToStorage(true);
for (int idb = 0; idb < cserver.dbnum; ++idb) { for (int idb = 0; idb < cserver.dbnum; ++idb) {
g_pserver->db[idb]->storageProviderDelete(); g_pserver->db[idb]->storageProviderDelete();
} }
delete g_pserver->metadataDb;
// If we couldn't acquire the global lock it means something wasn't shutdown and we'll probably deadlock // If we couldn't acquire the global lock it means something wasn't shutdown and we'll probably deadlock
serverAssert(fLockAcquired); serverAssert(fLockAcquired);

View File

@ -2827,6 +2827,7 @@ int redisSetProcTitle(const char *title);
int validateProcTitleTemplate(const char *_template); int validateProcTitleTemplate(const char *_template);
int redisCommunicateSystemd(const char *sd_notify_msg); int redisCommunicateSystemd(const char *sd_notify_msg);
void redisSetCpuAffinity(const char *cpulist); void redisSetCpuAffinity(const char *cpulist);
void saveMasterStatusToStorage(bool fShutdown);
/* networking.c -- Networking and Client related operations */ /* networking.c -- Networking and Client related operations */
client *createClient(connection *conn, int iel); client *createClient(connection *conn, int iel);

View File

@ -9,7 +9,7 @@
# reconnect with the master, otherwise just the initial synchronization is # reconnect with the master, otherwise just the initial synchronization is
# checked for consistency. # checked for consistency.
proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} { proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reconnect} {
start_server {tags {"repl"}} { start_server [list tags {"repl"} overrides [list storage-provider {flash .rocks.db.m} repl-backlog-size 1m]] {
start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] { start_server [list tags {flash} overrides [list storage-provider {flash ./rocks.db} delete-on-evict no storage-flush-period 10]] {
set master [srv -1 client] set master [srv -1 client]
@ -84,7 +84,7 @@ proc test_psync {descr duration backlog_size backlog_ttl delay cond mdl sdl reco
fail "Slave still not connected after some time" fail "Slave still not connected after some time"
} }
set retry 10 set retry 20
while {$retry && ([$master debug digest] ne [$slave debug digest])}\ while {$retry && ([$master debug digest] ne [$slave debug digest])}\
{ {
after 1000 after 1000