Fix multi master bugs: 1. we fail to create the temp file. 2. We use a master RDB as our backup even though we merged databases (and therefore it is not representitive)
Former-commit-id: e776474f68a2824bb7d4082c41991a9a9f3a9c9d
This commit is contained in:
parent
c75e700a86
commit
4f19c5de9f
@ -2179,7 +2179,6 @@ eoferr: /* unexpected end of file is handled here with a fatal exit */
|
||||
return C_ERR; /* Just to avoid warning */
|
||||
}
|
||||
|
||||
int rdbLoadFile(char *filename, rdbSaveInfo *rsi);
|
||||
int rdbLoad(rdbSaveInfo *rsi)
|
||||
{
|
||||
int err = C_ERR;
|
||||
@ -2199,7 +2198,7 @@ int rdbLoad(rdbSaveInfo *rsi)
|
||||
*
|
||||
* If you pass an 'rsi' structure initialied with RDB_SAVE_OPTION_INIT, the
|
||||
* loading code will fiil the information fields in the structure. */
|
||||
int rdbLoadFile(char *filename, rdbSaveInfo *rsi) {
|
||||
int rdbLoadFile(const char *filename, rdbSaveInfo *rsi) {
|
||||
FILE *fp;
|
||||
rio rdb;
|
||||
int retval;
|
||||
|
@ -136,6 +136,7 @@ int rdbLoadLenByRef(rio *rdb, int *isencoded, uint64_t *lenptr);
|
||||
int rdbSaveObjectType(rio *rdb, robj_roptr o);
|
||||
int rdbLoadObjectType(rio *rdb);
|
||||
int rdbLoad(rdbSaveInfo *rsi);
|
||||
int rdbLoadFile(const char *filename, rdbSaveInfo *rsi);
|
||||
int rdbSaveBackground(rdbSaveInfo *rsi);
|
||||
int rdbSaveToSlavesSockets(rdbSaveInfo *rsi);
|
||||
void rdbRemoveTempFile(pid_t childpid);
|
||||
|
@ -41,6 +41,7 @@
|
||||
#include <mutex>
|
||||
#include <algorithm>
|
||||
#include <uuid/uuid.h>
|
||||
#include <chrono>
|
||||
|
||||
void replicationDiscardCachedMaster(redisMaster *mi);
|
||||
void replicationResurrectCachedMaster(redisMaster *mi, int newfd);
|
||||
@ -1219,6 +1220,24 @@ void changeReplicationId(void) {
|
||||
g_pserver->replid[CONFIG_RUN_ID_SIZE] = '\0';
|
||||
}
|
||||
|
||||
|
||||
int hexchToInt(char ch)
|
||||
{
|
||||
if (ch >= '0' && ch <= '9')
|
||||
return ch - '0';
|
||||
if (ch >= 'a' && ch <= 'f')
|
||||
return (ch - 'a') + 10;
|
||||
return (ch - 'A') + 10;
|
||||
}
|
||||
void mergeReplicationId(const char *id)
|
||||
{
|
||||
for (int i = 0; i < CONFIG_RUN_ID_SIZE; ++i)
|
||||
{
|
||||
const char *charset = "0123456789abcdef";
|
||||
g_pserver->replid[i] = charset[hexchToInt(g_pserver->replid[i]) ^ hexchToInt(id[i])];
|
||||
}
|
||||
}
|
||||
|
||||
/* Clear (invalidate) the secondary replication ID. This happens, for
|
||||
* example, after a full resynchronization, when we start a new replication
|
||||
* history. */
|
||||
@ -1492,12 +1511,19 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
killRDBChild();
|
||||
}
|
||||
|
||||
if (rename(mi->repl_transfer_tmpfile,g_pserver->rdb_filename) == -1) {
|
||||
serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s",
|
||||
g_pserver->rdb_filename, strerror(errno));
|
||||
cancelReplicationHandshake(mi);
|
||||
return;
|
||||
const char *rdb_filename = mi->repl_transfer_tmpfile;
|
||||
|
||||
if (!fUpdate)
|
||||
{
|
||||
if (rename(mi->repl_transfer_tmpfile,g_pserver->rdb_filename) == -1) {
|
||||
serverLog(LL_WARNING,"Failed trying to rename the temp DB into %s in MASTER <-> REPLICA synchronization: %s",
|
||||
g_pserver->rdb_filename, strerror(errno));
|
||||
cancelReplicationHandshake(mi);
|
||||
return;
|
||||
}
|
||||
rdb_filename = g_pserver->rdb_filename;
|
||||
}
|
||||
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: %s", fUpdate ? "Keeping old data" : "Flushing old data");
|
||||
/* We need to stop any AOFRW fork before flusing and parsing
|
||||
* RDB, otherwise we'll create a copy-on-write disaster. */
|
||||
@ -1518,7 +1544,7 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
aeDeleteFileEvent(el,mi->repl_transfer_s,AE_READABLE);
|
||||
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Loading DB in memory");
|
||||
rdbSaveInfo rsi = RDB_SAVE_INFO_INIT;
|
||||
if (rdbLoad(&rsi) != C_OK) {
|
||||
if (rdbLoadFile(rdb_filename, &rsi) != C_OK) {
|
||||
serverLog(LL_WARNING,"Failed trying to load the MASTER synchronization DB from disk");
|
||||
cancelReplicationHandshake(mi);
|
||||
/* Re-enable the AOF if we disabled it earlier, in order to restore
|
||||
@ -1532,11 +1558,18 @@ void readSyncBulkPayload(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
replicationCreateMasterClient(mi, mi->repl_transfer_s,rsi.repl_stream_db);
|
||||
mi->repl_state = REPL_STATE_CONNECTED;
|
||||
mi->repl_down_since = 0;
|
||||
/* After a full resynchroniziation we use the replication ID and
|
||||
* offset of the master. The secondary ID / offset are cleared since
|
||||
* we are starting a new history. */
|
||||
memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid));
|
||||
g_pserver->master_repl_offset = mi->master->reploff;
|
||||
if (fUpdate)
|
||||
{
|
||||
mergeReplicationId(mi->master->replid);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* After a full resynchroniziation we use the replication ID and
|
||||
* offset of the master. The secondary ID / offset are cleared since
|
||||
* we are starting a new history. */
|
||||
memcpy(g_pserver->replid,mi->master->replid,sizeof(g_pserver->replid));
|
||||
g_pserver->master_repl_offset = mi->master->reploff;
|
||||
}
|
||||
clearReplicationId2();
|
||||
/* Let's create the replication backlog if needed. Slaves need to
|
||||
* accumulate the backlog regardless of the fact they have sub-slaves
|
||||
@ -2123,8 +2156,10 @@ void syncWithMaster(aeEventLoop *el, int fd, void *privdata, int mask) {
|
||||
|
||||
/* Prepare a suitable temp file for bulk transfer */
|
||||
while(maxtries--) {
|
||||
auto dt = std::chrono::system_clock::now().time_since_epoch();
|
||||
auto dtMillisecond = std::chrono::duration_cast<std::chrono::milliseconds>(dt);
|
||||
snprintf(tmpfile,256,
|
||||
"temp-%d.%ld.rdb",(int)g_pserver->unixtime,(long int)getpid());
|
||||
"temp-%d.%ld.rdb",(int)dtMillisecond.count(),(long int)getpid());
|
||||
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
|
||||
if (dfd != -1) break;
|
||||
sleep(1);
|
||||
|
@ -2171,6 +2171,7 @@ long long getPsyncInitialOffset(void);
|
||||
int replicationSetupSlaveForFullResync(client *slave, long long offset);
|
||||
void changeReplicationId(void);
|
||||
void clearReplicationId2(void);
|
||||
void mergeReplicationId(const char *);
|
||||
void chopReplicationBacklog(void);
|
||||
void replicationCacheMasterUsingMyself(struct redisMaster *mi);
|
||||
void feedReplicationBacklog(const void *ptr, size_t len);
|
||||
|
Loading…
x
Reference in New Issue
Block a user