From d08f0552ee13fc9feaa7afbb2c493d1bf92ee6ac Mon Sep 17 00:00:00 2001 From: "zhaozhao.zz" <276441700@qq.com> Date: Tue, 2 Nov 2021 16:53:52 +0800 Subject: [PATCH] rebuild replication backlog index when master restart (#9720) After PR #9166 , replication backlog is not a real block of memory, just contains a reference points to replication buffer's block and the blocks index (to accelerate search offset when partial sync), so we need update both replication buffer's block's offset and replication backlog blocks index's offset when master restart from RDB, since the `server.master_repl_offset` is changed. The implications of this bug was just a slow search, but not a replication failure. --- src/replication.c | 44 +++++++++++++++++++++++++++++++++----------- src/server.c | 11 +---------- src/server.h | 1 + 3 files changed, 35 insertions(+), 21 deletions(-) diff --git a/src/replication.c b/src/replication.c index 0629a4ca9..17cbaa4cc 100644 --- a/src/replication.c +++ b/src/replication.c @@ -159,6 +159,38 @@ void freeReplicationBacklog(void) { server.repl_backlog = NULL; } +/* To make search offset from replication buffer blocks quickly + * when replicas ask partial resynchronization, we create one index + * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */ +void createReplicationBacklogIndex(listNode *ln) { + server.repl_backlog->unindexed_count++; + if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) { + replBufBlock *o = listNodeValue(ln); + uint64_t encoded_offset = htonu64(o->repl_offset); + raxInsert(server.repl_backlog->blocks_index, + (unsigned char*)&encoded_offset, sizeof(uint64_t), + ln, NULL); + server.repl_backlog->unindexed_count = 0; + } +} + +/* Rebase replication buffer blocks' offset since the initial + * setting offset starts from 0 when master restart. */ +void rebaseReplicationBuffer(long long base_repl_offset) { + raxFree(server.repl_backlog->blocks_index); + server.repl_backlog->blocks_index = raxNew(); + server.repl_backlog->unindexed_count = 0; + + listIter li; + listNode *ln; + listRewind(server.repl_buffer_blocks, &li); + while ((ln = listNext(&li))) { + replBufBlock *o = listNodeValue(ln); + o->repl_offset += base_repl_offset; + createReplicationBacklogIndex(ln); + } +} + void resetReplicationBuffer(void) { server.repl_buffer_mem = 0; server.repl_buffer_blocks = listCreate(); @@ -373,17 +405,7 @@ void feedReplicationBuffer(char *s, size_t len) { serverAssert(add_new_block == 1 && start_pos == 0); } if (add_new_block) { - /* To make search offset from replication buffer blocks quickly - * when replicas ask partial resynchronization, we create one index - * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */ - server.repl_backlog->unindexed_count++; - if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) { - uint64_t encoded_offset = htonu64(tail->repl_offset); - raxInsert(server.repl_backlog->blocks_index, - (unsigned char*)&encoded_offset, sizeof(uint64_t), - listLast(server.repl_buffer_blocks), NULL); - server.repl_backlog->unindexed_count = 0; - } + createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); } /* Try to trim replication backlog since replication backlog may exceed * our setting when we add replication stream. Note that it is important to diff --git a/src/server.c b/src/server.c index 5f76b9612..fc6606b69 100644 --- a/src/server.c +++ b/src/server.c @@ -7611,17 +7611,8 @@ void loadDataFromDisk(void) { serverAssert(server.repl_backlog); server.repl_backlog->offset = server.master_repl_offset - server.repl_backlog->histlen + 1; + rebaseReplicationBuffer(rsi.repl_offset); server.repl_no_slaves_since = time(NULL); - - /* Rebase replication buffer blocks' offset since the previous - * setting offset starts from 0. */ - listIter li; - listNode *ln; - listRewind(server.repl_buffer_blocks, &li); - while ((ln = listNext(&li))) { - replBufBlock *o = listNodeValue(ln); - o->repl_offset += rsi.repl_offset; - } } } } else if (errno != ENOENT) { diff --git a/src/server.h b/src/server.h index 9619e10a6..fd5762db9 100644 --- a/src/server.h +++ b/src/server.h @@ -2326,6 +2326,7 @@ void replicationCacheMasterUsingMyself(void); void feedReplicationBacklog(void *ptr, size_t len); void incrementalTrimReplicationBacklog(size_t blocks); int canFeedReplicaReplBuffer(client *replica); +void rebaseReplicationBuffer(long long base_repl_offset); void showLatestBacklog(void); void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); void rdbPipeWriteHandlerConnRemoved(struct connection *conn);