Merge branch 'fastsync_collab2' into 'fastsync_collab'
fix dataloss on aof load with storage provider See merge request external-collab/keydb-pro-6!7 Former-commit-id: a3aecc171b45308b68eb922947f8dd31be7e3566
This commit is contained in:
commit
af96f8deb2
10
src/aof.cpp
10
src/aof.cpp
@ -870,6 +870,11 @@ int loadAppendOnlyFile(char *filename) {
|
|||||||
fakeClient = createAOFClient();
|
fakeClient = createAOFClient();
|
||||||
startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);
|
startLoadingFile(fp, filename, RDBFLAGS_AOF_PREAMBLE);
|
||||||
|
|
||||||
|
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
||||||
|
{
|
||||||
|
g_pserver->db[idb]->trackChanges(true);
|
||||||
|
}
|
||||||
|
|
||||||
/* Check if this AOF file has an RDB preamble. In that case we need to
|
/* Check if this AOF file has an RDB preamble. In that case we need to
|
||||||
* load the RDB file and later continue loading the AOF tail. */
|
* load the RDB file and later continue loading the AOF tail. */
|
||||||
char sig[5]; /* "REDIS" */
|
char sig[5]; /* "REDIS" */
|
||||||
@ -892,11 +897,6 @@ int loadAppendOnlyFile(char *filename) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int idb = 0; idb < cserver.dbnum; ++idb)
|
|
||||||
{
|
|
||||||
g_pserver->db[idb]->trackChanges(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Read the actual AOF file, in REPL format, command by command. */
|
/* Read the actual AOF file, in REPL format, command by command. */
|
||||||
while(1) {
|
while(1) {
|
||||||
int argc, j;
|
int argc, j;
|
||||||
|
@ -975,14 +975,8 @@ public:
|
|||||||
aeAcquireLock();
|
aeAcquireLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ireplica == replicas.size()-1 && replica->replyAsync == nullptr) {
|
std::unique_lock<fastlock> lock(replica->lock);
|
||||||
if (prepareClientToWrite(replica) == C_OK) {
|
addReplyProto(replica, reply->buf(), reply->used);
|
||||||
replica->replyAsync = reply;
|
|
||||||
reply = nullptr;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
addReplyProto(replica, reply->buf(), reply->size);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
ProcessPendingAsyncWrites();
|
ProcessPendingAsyncWrites();
|
||||||
replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end());
|
replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end());
|
||||||
@ -1089,7 +1083,7 @@ int rdbSaveSnapshotForReplication(struct rdbSaveInfo *rsi) {
|
|||||||
spreplBuf->addLong(rsi->repl_stream_db);
|
spreplBuf->addLong(rsi->repl_stream_db);
|
||||||
spreplBuf->addArrayLen(2);
|
spreplBuf->addArrayLen(2);
|
||||||
spreplBuf->addString("repl-id", 7);
|
spreplBuf->addString("repl-id", 7);
|
||||||
spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE+1);
|
spreplBuf->addString(rsi->repl_id, CONFIG_RUN_ID_SIZE);
|
||||||
spreplBuf->addArrayLen(2);
|
spreplBuf->addArrayLen(2);
|
||||||
spreplBuf->addString("repl-offset", 11);
|
spreplBuf->addString("repl-offset", 11);
|
||||||
spreplBuf->addLong(rsi->master_repl_offset);
|
spreplBuf->addLong(rsi->master_repl_offset);
|
||||||
@ -1387,7 +1381,7 @@ void syncCommand(client *c) {
|
|||||||
|
|
||||||
/* CASE 0: Fast Sync */
|
/* CASE 0: Fast Sync */
|
||||||
if ((c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && g_pserver->m_pstorageFactory) {
|
if ((c->slave_capa & SLAVE_CAPA_ROCKSDB_SNAPSHOT) && g_pserver->m_pstorageFactory) {
|
||||||
startBgsaveForReplication(c->slave_capa);
|
serverLog(LL_NOTICE,"Fast SYNC on next replication cycle");
|
||||||
/* CASE 1: BGSAVE is in progress, with disk target. */
|
/* CASE 1: BGSAVE is in progress, with disk target. */
|
||||||
} else if (g_pserver->FRdbSaveInProgress() &&
|
} else if (g_pserver->FRdbSaveInProgress() &&
|
||||||
g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK)
|
g_pserver->rdb_child_type == RDB_CHILD_TYPE_DISK)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user