From 4f19c5de9fb6a4fa3cbafb3b2de83fa1d02e2edf Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 26 Sep 2019 20:35:51 -0400 Subject: [PATCH] Fix multi master bugs: 1. we fail to create the temp file. 2. We use a master RDB as our backup even though we merged databases (and therefore it is not representitive) Former-commit-id: e776474f68a2824bb7d4082c41991a9a9f3a9c9d --- src/rdb.cpp | 3 +-- src/rdb.h | 1 + src/replication.cpp | 59 ++++++++++++++++++++++++++++++++++++--------- src/server.h | 1 + 4 files changed, 50 insertions(+), 14 deletions(-) diff --git a/src/rdb.cpp b/src/rdb.cpp index 97ade6d1f..b983167a4 100644 --- a/src/rdb.cpp +++ b/src/rdb.cpp @@ -2179,7 +2179,6 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */ return C_ERR; /* Just to avoid warning */ } -int rdbLoadFile(char *filename, rdbSaveInfo *rsi); int rdbLoad(rdbSaveInfo *rsi) { int err = C_ERR; @@ -2199,7 +2198,7 @@ int rdbLoad(rdbSaveInfo *rsi) * * If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the * loading code will fiil the information fields in the structure. */ -int rdbLoadFile(char *filename, rdbSaveInfo *rsi) { +int rdbLoadFile(const char *filename, rdbSaveInfo *rsi) { FILE *fp; rio rdb; int retval; diff --git a/src/rdb.h b/src/rdb.h index 0ee2cad92..edf43d422 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -136,6 +136,7 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr); int rdbSaveObjectType(rio *rdb, robj_roptr o); int rdbLoadObjectType(rio *rdb); int rdbLoad(rdbSaveInfo *rsi); +int rdbLoadFile(const char *filename, rdbSaveInfo *rsi); int rdbSaveBackground(rdbSaveInfo *rsi); int rdbSaveToSlavesSockets(rdbSaveInfo *rsi); void rdbRemoveTempFile(pid_t childpid); diff --git a/src/replication.cpp b/src/replication.cpp index 8ea96da1c..e817e6000 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -41,6 +41,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, int newfd); @@ -1219,6 +1220,24 @@ void changeReplicationId(void) { g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0'; } + +int hexchToInt(char ch) +{ + if (ch >= '0' && ch <= '9') + return ch - '0'; + if (ch >= 'a' && ch <= 'f') + return (ch - 'a') + 10; + return (ch - 'A') + 10; +} +void mergeReplicationId(const char *id) +{ + for (int i = 0; i < CONFIG_RUN_ID_SIZE; ++i) + { + const char *charset = "0123456789abcdef"; + g_pserver->replid[i] = charset[hexchToInt(g_pserver->replid[i]) ^ hexchToInt(id[i])]; + } +} + /* Clear (invalidate) the secondary replication ID. This happens, for * example, after a full resynchronization, when we start a new replication * history. */ @@ -1492,12 +1511,19 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { killRDBChild(); } - if (rename(mi->repl_transfer_tmpfile,g_pserver->rdb_filename) == -1) { - serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s", - g_pserver->rdb_filename, strerror(errno)); - cancelReplicationHandshake(mi); - return; + const char *rdb_filename = mi->repl_transfer_tmpfile; + + if (!fUpdate) + { + if (rename(mi->repl_transfer_tmpfile,g_pserver->rdb_filename) == -1) { + serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s", + g_pserver->rdb_filename, strerror(errno)); + cancelReplicationHandshake(mi); + return; + } + rdb_filename = g_pserver->rdb_filename; } + serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: %s", fUpdate ? "Keeping old data" : "Flushing old data"); /* We need to stop any AOFRW fork before flusing and parsing * RDB, otherwise we'll create a copy-on-write disaster. */ @@ -1518,7 +1544,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { aeDeleteFileEvent(el,mi->repl_transfer_s,AE_READABLE); serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory"); rdbSaveInfo rsi = RDB_SAVE_INFO_INIT; - if (rdbLoad(&rsi) != C_OK) { + if (rdbLoadFile(rdb_filename, &rsi) != C_OK) { serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk"); cancelReplicationHandshake(mi); /* Re-enable the AOF if we disabled it earlier, in order to restore @@ -1532,11 +1558,18 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) { replicationCreateMasterClient(mi, mi->repl_transfer_s,rsi.repl_stream_db); mi->repl_state = REPL_STATE_CONNECTED; mi->repl_down_since = 0; - /* After a full resynchroniziation we use the replication ID and - * offset of the master. The secondary ID / offset are cleared since - * we are starting a new history. */ - memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid)); - g_pserver->master_repl_offset = mi->master->reploff; + if (fUpdate) + { + mergeReplicationId(mi->master->replid); + } + else + { + /* After a full resynchroniziation we use the replication ID and + * offset of the master. The secondary ID / offset are cleared since + * we are starting a new history. */ + memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid)); + g_pserver->master_repl_offset = mi->master->reploff; + } clearReplicationId2(); /* Let's create the replication backlog if needed. Slaves need to * accumulate the backlog regardless of the fact they have sub-slaves @@ -2123,8 +2156,10 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) { /* Prepare a suitable temp file for bulk transfer */ while(maxtries--) { + auto dt = std::chrono::system_clock::now().time_since_epoch(); + auto dtMillisecond = std::chrono::duration_cast(dt); snprintf(tmpfile,256, - "temp-%d.%ld.rdb",(int)g_pserver->unixtime,(long int)getpid()); + "temp-%d.%ld.rdb",(int)dtMillisecond.count(),(long int)getpid()); dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); if (dfd != -1) break; sleep(1); diff --git a/src/server.h b/src/server.h index 94e679dc2..dbcff38f5 100644 --- a/src/server.h +++ b/src/server.h @@ -2171,6 +2171,7 @@ long long getPsyncInitialOffset(void); int replicationSetupSlaveForFullResync(client *slave, long long offset); void changeReplicationId(void); void clearReplicationId2(void); +void mergeReplicationId(const char *); void chopReplicationBacklog(void); void replicationCacheMasterUsingMyself(struct redisMaster *mi); void feedReplicationBacklog(const void *ptr, size_t len);