diff --git a/src/config.cpp b/src/config.cpp index 7ef995bbd..53d45cabb 100644 --- a/src/config.cpp +++ b/src/config.cpp @@ -2469,7 +2469,10 @@ static int updateJemallocBgThread(int val, int prev, const char **err) { static int updateReplBacklogSize(long long val, long long prev, const char **err) { /* resizeReplicationBacklog sets g_pserver->repl_backlog_size, and relies on * being able to tell when the size changes, so restore prev before calling it. */ - UNUSED(err); + if (cserver.repl_backlog_disk_size) { + *err = "Unable to dynamically resize the backlog because disk backlog is enabled"; + return 0; + } g_pserver->repl_backlog_size = prev; g_pserver->repl_backlog_config_size = val; resizeReplicationBacklog(val); @@ -2822,6 +2825,7 @@ standardConfig configs[] = { createLongLongConfig("proto-max-bulk-len", NULL, MODIFIABLE_CONFIG, 1024*1024, LLONG_MAX, g_pserver->proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */ createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL), createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, g_pserver->repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */ + createLongLongConfig("repl-backlog-disk-reserve", NULL, IMMUTABLE_CONFIG, 0, LLONG_MAX, cserver.repl_backlog_disk_size, 0, MEMORY_CONFIG, NULL, NULL), /* Unsigned Long Long configs */ createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory), diff --git a/src/replication.cpp b/src/replication.cpp index 91036723a..1f2c9043a 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -46,6 +46,7 @@ #include #include #include +#include void replicationDiscardCachedMaster(redisMaster *mi); void replicationResurrectCachedMaster(redisMaster *mi, connection *conn); @@ -184,8 +185,39 @@ int bg_unlink(const char *filename) { /* ---------------------------------- MASTER -------------------------------- */ +bool createDiskBacklog() { + // Lets create some disk backed pages and add them here + std::string path = "./repl-backlog-temp" + std::to_string(gettid()); + int fd = open(path.c_str(), O_CREAT | O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR); + if (fd < 0) { + return false; + } + size_t alloc = cserver.repl_backlog_disk_size; + int result = truncate(path.c_str(), alloc); + unlink(path.c_str()); // ensure the fd is the only ref + if (result == -1) { + close (fd); + return false; + } + + g_pserver->repl_backlog_disk = (char*)mmap(nullptr, alloc, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); + close(fd); + if (g_pserver->repl_backlog_disk == MAP_FAILED) { + g_pserver->repl_backlog_disk = nullptr; + return false; + } + + serverLog(LL_VERBOSE, "Disk Backed Replication Allocated"); + return true; +} + void createReplicationBacklog(void) { serverAssert(g_pserver->repl_backlog == NULL); + if (cserver.repl_backlog_disk_size) { + if (!createDiskBacklog()) { + serverLog(LL_WARNING, "Failed to create disk backlog, will use memory only"); + } + } g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL); g_pserver->repl_backlog_histlen = 0; g_pserver->repl_backlog_idx = 0; @@ -234,9 +266,22 @@ void resizeReplicationBacklog(long long newsize) { long long earliest_off = g_pserver->repl_lowest_off.load(); if (earliest_off != -1) { - // We need to keep critical data so we can't shrink less than the hot data in the buffer + char *backlog = nullptr; newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off); - char *backlog = (char*)zmalloc(newsize); + + if (cserver.repl_backlog_disk_size != 0) { + if (newsize > g_pserver->repl_backlog_config_size) { + if (g_pserver->repl_backlog == g_pserver->repl_backlog_disk) + return; // Can't do anything more + serverLog(LL_NOTICE, "Switching to disk backed replication backlog due to exceeding memory limits"); + backlog = g_pserver->repl_backlog_disk; + newsize = cserver.repl_backlog_disk_size; + } + } + + // We need to keep critical data so we can't shrink less than the hot data in the buffer + if (backlog == nullptr) + backlog = (char*)zmalloc(newsize); g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off; long long earliest_idx = getReplIndexFromOffset(earliest_off); @@ -251,7 +296,10 @@ void resizeReplicationBacklog(long long newsize) { auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx; serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog); } - zfree(g_pserver->repl_backlog); + if (g_pserver->repl_backlog != g_pserver->repl_backlog_disk) + zfree(g_pserver->repl_backlog); + else + serverLog(LL_NOTICE, "Returning to memory backed replication backlog"); g_pserver->repl_backlog = backlog; g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen; if (g_pserver->repl_batch_idxStart >= 0) { @@ -261,7 +309,10 @@ void resizeReplicationBacklog(long long newsize) { } g_pserver->repl_backlog_start = earliest_off; } else { - zfree(g_pserver->repl_backlog); + if (g_pserver->repl_backlog != g_pserver->repl_backlog_disk) + zfree(g_pserver->repl_backlog); + else + serverLog(LL_NOTICE, "Returning to memory backed replication backlog"); g_pserver->repl_backlog = (char*)zmalloc(newsize); g_pserver->repl_backlog_histlen = 0; g_pserver->repl_backlog_idx = 0; @@ -311,6 +362,8 @@ void feedReplicationBacklog(const void *ptr, size_t len) { long long maxClientBuffer = (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes; if (maxClientBuffer <= 0) maxClientBuffer = LLONG_MAX; // infinite essentially + if (cserver.repl_backlog_disk_size) + maxClientBuffer = std::max(g_pserver->repl_backlog_size, cserver.repl_backlog_disk_size); long long min_offset = LLONG_MAX; int listening_replicas = 0; while ((ln = listNext(&li))) { diff --git a/src/server.cpp b/src/server.cpp index 4eb0f5a75..6827ad675 100644 --- a/src/server.cpp +++ b/src/server.cpp @@ -4005,6 +4005,9 @@ void InitServerLast() { g_pserver->initial_memory_usage = zmalloc_used_memory(); g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads); + + // Allocate the repl backlog + } /* Parse the flags string description 'strflags' and set them to the diff --git a/src/server.h b/src/server.h index 5a067090b..a93d4008d 100644 --- a/src/server.h +++ b/src/server.h @@ -2142,6 +2142,7 @@ struct redisServerConst { char *storage_conf = nullptr; int fForkBgSave = false; int time_thread_priority = false; + long long repl_backlog_disk_size = 0; }; struct redisServer { @@ -2381,6 +2382,7 @@ struct redisServer { int replicaseldb; /* Last SELECTed DB in replication output */ int repl_ping_slave_period; /* Master pings the replica every N seconds */ char *repl_backlog; /* Replication backlog for partial syncs */ + char *repl_backlog_disk = nullptr; long long repl_backlog_size; /* Backlog circular buffer size */ long long repl_backlog_config_size; /* The repl backlog may grow but we want to know what the user set it to */ long long repl_backlog_histlen; /* Backlog actual data length */ diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index b562530f6..45dbd9838 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -123,6 +123,7 @@ start_server {tags {"introspection"}} { active-replica bind set-proc-title + repl-backlog-disk-reserve } if {!$::tls} {