rebuild replication backlog index when master restart (#9720)
After PR #9166 , replication backlog is not a real block of memory, just contains a reference points to replication buffer's block and the blocks index (to accelerate search offset when partial sync), so we need update both replication buffer's block's offset and replication backlog blocks index's offset when master restart from RDB, since the `server.master_repl_offset` is changed. The implications of this bug was just a slow search, but not a replication failure.
This commit is contained in:
parent
58a1d16ff6
commit
d08f0552ee
@ -159,6 +159,38 @@ void freeReplicationBacklog(void) {
|
||||
server.repl_backlog = NULL;
|
||||
}
|
||||
|
||||
/* To make search offset from replication buffer blocks quickly
|
||||
* when replicas ask partial resynchronization, we create one index
|
||||
* block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */
|
||||
void createReplicationBacklogIndex(listNode *ln) {
|
||||
server.repl_backlog->unindexed_count++;
|
||||
if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) {
|
||||
replBufBlock *o = listNodeValue(ln);
|
||||
uint64_t encoded_offset = htonu64(o->repl_offset);
|
||||
raxInsert(server.repl_backlog->blocks_index,
|
||||
(unsigned char*)&encoded_offset, sizeof(uint64_t),
|
||||
ln, NULL);
|
||||
server.repl_backlog->unindexed_count = 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* Rebase replication buffer blocks' offset since the initial
|
||||
* setting offset starts from 0 when master restart. */
|
||||
void rebaseReplicationBuffer(long long base_repl_offset) {
|
||||
raxFree(server.repl_backlog->blocks_index);
|
||||
server.repl_backlog->blocks_index = raxNew();
|
||||
server.repl_backlog->unindexed_count = 0;
|
||||
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(server.repl_buffer_blocks, &li);
|
||||
while ((ln = listNext(&li))) {
|
||||
replBufBlock *o = listNodeValue(ln);
|
||||
o->repl_offset += base_repl_offset;
|
||||
createReplicationBacklogIndex(ln);
|
||||
}
|
||||
}
|
||||
|
||||
void resetReplicationBuffer(void) {
|
||||
server.repl_buffer_mem = 0;
|
||||
server.repl_buffer_blocks = listCreate();
|
||||
@ -373,17 +405,7 @@ void feedReplicationBuffer(char *s, size_t len) {
|
||||
serverAssert(add_new_block == 1 && start_pos == 0);
|
||||
}
|
||||
if (add_new_block) {
|
||||
/* To make search offset from replication buffer blocks quickly
|
||||
* when replicas ask partial resynchronization, we create one index
|
||||
* block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */
|
||||
server.repl_backlog->unindexed_count++;
|
||||
if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) {
|
||||
uint64_t encoded_offset = htonu64(tail->repl_offset);
|
||||
raxInsert(server.repl_backlog->blocks_index,
|
||||
(unsigned char*)&encoded_offset, sizeof(uint64_t),
|
||||
listLast(server.repl_buffer_blocks), NULL);
|
||||
server.repl_backlog->unindexed_count = 0;
|
||||
}
|
||||
createReplicationBacklogIndex(listLast(server.repl_buffer_blocks));
|
||||
}
|
||||
/* Try to trim replication backlog since replication backlog may exceed
|
||||
* our setting when we add replication stream. Note that it is important to
|
||||
|
11
src/server.c
11
src/server.c
@ -7611,17 +7611,8 @@ void loadDataFromDisk(void) {
|
||||
serverAssert(server.repl_backlog);
|
||||
server.repl_backlog->offset = server.master_repl_offset -
|
||||
server.repl_backlog->histlen + 1;
|
||||
rebaseReplicationBuffer(rsi.repl_offset);
|
||||
server.repl_no_slaves_since = time(NULL);
|
||||
|
||||
/* Rebase replication buffer blocks' offset since the previous
|
||||
* setting offset starts from 0. */
|
||||
listIter li;
|
||||
listNode *ln;
|
||||
listRewind(server.repl_buffer_blocks, &li);
|
||||
while ((ln = listNext(&li))) {
|
||||
replBufBlock *o = listNodeValue(ln);
|
||||
o->repl_offset += rsi.repl_offset;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else if (errno != ENOENT) {
|
||||
|
@ -2326,6 +2326,7 @@ void replicationCacheMasterUsingMyself(void);
|
||||
void feedReplicationBacklog(void *ptr, size_t len);
|
||||
void incrementalTrimReplicationBacklog(size_t blocks);
|
||||
int canFeedReplicaReplBuffer(client *replica);
|
||||
void rebaseReplicationBuffer(long long base_repl_offset);
|
||||
void showLatestBacklog(void);
|
||||
void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
|
||||
void rdbPipeWriteHandlerConnRemoved(struct connection *conn);
|
||||
|
Loading…
x
Reference in New Issue
Block a user