diff --git a/src/replication.c b/src/replication.c index 0629a4ca9..17cbaa4cc 100644 --- a/src/replication.c +++ b/src/replication.c @@ -159,6 +159,38 @@ void freeReplicationBacklog(void) { server.repl_backlog = NULL; } +/* To make search offset from replication buffer blocks quickly + * when replicas ask partial resynchronization, we create one index + * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */ +void createReplicationBacklogIndex(listNode *ln) { + server.repl_backlog->unindexed_count++; + if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) { + replBufBlock *o = listNodeValue(ln); + uint64_t encoded_offset = htonu64(o->repl_offset); + raxInsert(server.repl_backlog->blocks_index, + (unsigned char*)&encoded_offset, sizeof(uint64_t), + ln, NULL); + server.repl_backlog->unindexed_count = 0; + } +} + +/* Rebase replication buffer blocks' offset since the initial + * setting offset starts from 0 when master restart. */ +void rebaseReplicationBuffer(long long base_repl_offset) { + raxFree(server.repl_backlog->blocks_index); + server.repl_backlog->blocks_index = raxNew(); + server.repl_backlog->unindexed_count = 0; + + listIter li; + listNode *ln; + listRewind(server.repl_buffer_blocks, &li); + while ((ln = listNext(&li))) { + replBufBlock *o = listNodeValue(ln); + o->repl_offset += base_repl_offset; + createReplicationBacklogIndex(ln); + } +} + void resetReplicationBuffer(void) { server.repl_buffer_mem = 0; server.repl_buffer_blocks = listCreate(); @@ -373,17 +405,7 @@ void feedReplicationBuffer(char *s, size_t len) { serverAssert(add_new_block == 1 && start_pos == 0); } if (add_new_block) { - /* To make search offset from replication buffer blocks quickly - * when replicas ask partial resynchronization, we create one index - * block every REPL_BACKLOG_INDEX_PER_BLOCKS blocks. */ - server.repl_backlog->unindexed_count++; - if (server.repl_backlog->unindexed_count >= REPL_BACKLOG_INDEX_PER_BLOCKS) { - uint64_t encoded_offset = htonu64(tail->repl_offset); - raxInsert(server.repl_backlog->blocks_index, - (unsigned char*)&encoded_offset, sizeof(uint64_t), - listLast(server.repl_buffer_blocks), NULL); - server.repl_backlog->unindexed_count = 0; - } + createReplicationBacklogIndex(listLast(server.repl_buffer_blocks)); } /* Try to trim replication backlog since replication backlog may exceed * our setting when we add replication stream. Note that it is important to diff --git a/src/server.c b/src/server.c index 5f76b9612..fc6606b69 100644 --- a/src/server.c +++ b/src/server.c @@ -7611,17 +7611,8 @@ void loadDataFromDisk(void) { serverAssert(server.repl_backlog); server.repl_backlog->offset = server.master_repl_offset - server.repl_backlog->histlen + 1; + rebaseReplicationBuffer(rsi.repl_offset); server.repl_no_slaves_since = time(NULL); - - /* Rebase replication buffer blocks' offset since the previous - * setting offset starts from 0. */ - listIter li; - listNode *ln; - listRewind(server.repl_buffer_blocks, &li); - while ((ln = listNext(&li))) { - replBufBlock *o = listNodeValue(ln); - o->repl_offset += rsi.repl_offset; - } } } } else if (errno != ENOENT) { diff --git a/src/server.h b/src/server.h index 9619e10a6..fd5762db9 100644 --- a/src/server.h +++ b/src/server.h @@ -2326,6 +2326,7 @@ void replicationCacheMasterUsingMyself(void); void feedReplicationBacklog(void *ptr, size_t len); void incrementalTrimReplicationBacklog(size_t blocks); int canFeedReplicaReplBuffer(client *replica); +void rebaseReplicationBuffer(long long base_repl_offset); void showLatestBacklog(void); void rdbPipeReadHandler(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask); void rdbPipeWriteHandlerConnRemoved(struct connection *conn);