Implement the disk backed backlog functionality
Former-commit-id: 759cc01c6ba05f9a865c11580cc4975b5f1bd1d6
This commit is contained in:
parent
fcbeb394bd
commit
2ed6d3f03a
@ -2469,7 +2469,10 @@ static int updateJemallocBgThread(int val, int prev, const char **err) {
|
||||
static int updateReplBacklogSize(long long val, long long prev, const char **err) {
|
||||
/* resizeReplicationBacklog sets g_pserver->repl_backlog_size, and relies on
|
||||
* being able to tell when the size changes, so restore prev before calling it. */
|
||||
UNUSED(err);
|
||||
if (cserver.repl_backlog_disk_size) {
|
||||
*err = "Unable to dynamically resize the backlog because disk backlog is enabled";
|
||||
return 0;
|
||||
}
|
||||
g_pserver->repl_backlog_size = prev;
|
||||
g_pserver->repl_backlog_config_size = val;
|
||||
resizeReplicationBacklog(val);
|
||||
@ -2822,6 +2825,7 @@ standardConfig configs[] = {
|
||||
createLongLongConfig("proto-max-bulk-len", NULL, MODIFIABLE_CONFIG, 1024*1024, LLONG_MAX, g_pserver->proto_max_bulk_len, 512ll*1024*1024, MEMORY_CONFIG, NULL, NULL), /* Bulk request max size */
|
||||
createLongLongConfig("stream-node-max-entries", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->stream_node_max_entries, 100, INTEGER_CONFIG, NULL, NULL),
|
||||
createLongLongConfig("repl-backlog-size", NULL, MODIFIABLE_CONFIG, 1, LLONG_MAX, g_pserver->repl_backlog_size, 1024*1024, MEMORY_CONFIG, NULL, updateReplBacklogSize), /* Default: 1mb */
|
||||
createLongLongConfig("repl-backlog-disk-reserve", NULL, IMMUTABLE_CONFIG, 0, LLONG_MAX, cserver.repl_backlog_disk_size, 0, MEMORY_CONFIG, NULL, NULL),
|
||||
|
||||
/* Unsigned Long Long configs */
|
||||
createULongLongConfig("maxmemory", NULL, MODIFIABLE_CONFIG, 0, LLONG_MAX, g_pserver->maxmemory, 0, MEMORY_CONFIG, NULL, updateMaxmemory),
|
||||
|
@ -46,6 +46,7 @@
|
||||
#include <chrono>
|
||||
#include <unordered_map>
|
||||
#include <string>
|
||||
#include <sys/mman.h>
|
||||
|
||||
void replicationDiscardCachedMaster(redisMaster *mi);
|
||||
void replicationResurrectCachedMaster(redisMaster *mi, connection *conn);
|
||||
@ -184,8 +185,39 @@ int bg_unlink(const char *filename) {
|
||||
|
||||
/* ---------------------------------- MASTER -------------------------------- */
|
||||
|
||||
bool createDiskBacklog() {
|
||||
// Lets create some disk backed pages and add them here
|
||||
std::string path = "./repl-backlog-temp" + std::to_string(gettid());
|
||||
int fd = open(path.c_str(), O_CREAT | O_RDWR | O_LARGEFILE, S_IRUSR | S_IWUSR);
|
||||
if (fd < 0) {
|
||||
return false;
|
||||
}
|
||||
size_t alloc = cserver.repl_backlog_disk_size;
|
||||
int result = truncate(path.c_str(), alloc);
|
||||
unlink(path.c_str()); // ensure the fd is the only ref
|
||||
if (result == -1) {
|
||||
close (fd);
|
||||
return false;
|
||||
}
|
||||
|
||||
g_pserver->repl_backlog_disk = (char*)mmap(nullptr, alloc, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
|
||||
close(fd);
|
||||
if (g_pserver->repl_backlog_disk == MAP_FAILED) {
|
||||
g_pserver->repl_backlog_disk = nullptr;
|
||||
return false;
|
||||
}
|
||||
|
||||
serverLog(LL_VERBOSE, "Disk Backed Replication Allocated");
|
||||
return true;
|
||||
}
|
||||
|
||||
void createReplicationBacklog(void) {
|
||||
serverAssert(g_pserver->repl_backlog == NULL);
|
||||
if (cserver.repl_backlog_disk_size) {
|
||||
if (!createDiskBacklog()) {
|
||||
serverLog(LL_WARNING, "Failed to create disk backlog, will use memory only");
|
||||
}
|
||||
}
|
||||
g_pserver->repl_backlog = (char*)zmalloc(g_pserver->repl_backlog_size, MALLOC_LOCAL);
|
||||
g_pserver->repl_backlog_histlen = 0;
|
||||
g_pserver->repl_backlog_idx = 0;
|
||||
@ -234,9 +266,22 @@ void resizeReplicationBacklog(long long newsize) {
|
||||
long long earliest_off = g_pserver->repl_lowest_off.load();
|
||||
|
||||
if (earliest_off != -1) {
|
||||
// We need to keep critical data so we can't shrink less than the hot data in the buffer
|
||||
char *backlog = nullptr;
|
||||
newsize = std::max(newsize, g_pserver->master_repl_offset - earliest_off);
|
||||
char *backlog = (char*)zmalloc(newsize);
|
||||
|
||||
if (cserver.repl_backlog_disk_size != 0) {
|
||||
if (newsize > g_pserver->repl_backlog_config_size) {
|
||||
if (g_pserver->repl_backlog == g_pserver->repl_backlog_disk)
|
||||
return; // Can't do anything more
|
||||
serverLog(LL_NOTICE, "Switching to disk backed replication backlog due to exceeding memory limits");
|
||||
backlog = g_pserver->repl_backlog_disk;
|
||||
newsize = cserver.repl_backlog_disk_size;
|
||||
}
|
||||
}
|
||||
|
||||
// We need to keep critical data so we can't shrink less than the hot data in the buffer
|
||||
if (backlog == nullptr)
|
||||
backlog = (char*)zmalloc(newsize);
|
||||
g_pserver->repl_backlog_histlen = g_pserver->master_repl_offset - earliest_off;
|
||||
long long earliest_idx = getReplIndexFromOffset(earliest_off);
|
||||
|
||||
@ -251,7 +296,10 @@ void resizeReplicationBacklog(long long newsize) {
|
||||
auto cbActiveBacklog = cbPhase1 + g_pserver->repl_backlog_idx;
|
||||
serverAssert(g_pserver->repl_backlog_histlen == cbActiveBacklog);
|
||||
}
|
||||
zfree(g_pserver->repl_backlog);
|
||||
if (g_pserver->repl_backlog != g_pserver->repl_backlog_disk)
|
||||
zfree(g_pserver->repl_backlog);
|
||||
else
|
||||
serverLog(LL_NOTICE, "Returning to memory backed replication backlog");
|
||||
g_pserver->repl_backlog = backlog;
|
||||
g_pserver->repl_backlog_idx = g_pserver->repl_backlog_histlen;
|
||||
if (g_pserver->repl_batch_idxStart >= 0) {
|
||||
@ -261,7 +309,10 @@ void resizeReplicationBacklog(long long newsize) {
|
||||
}
|
||||
g_pserver->repl_backlog_start = earliest_off;
|
||||
} else {
|
||||
zfree(g_pserver->repl_backlog);
|
||||
if (g_pserver->repl_backlog != g_pserver->repl_backlog_disk)
|
||||
zfree(g_pserver->repl_backlog);
|
||||
else
|
||||
serverLog(LL_NOTICE, "Returning to memory backed replication backlog");
|
||||
g_pserver->repl_backlog = (char*)zmalloc(newsize);
|
||||
g_pserver->repl_backlog_histlen = 0;
|
||||
g_pserver->repl_backlog_idx = 0;
|
||||
@ -311,6 +362,8 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
||||
long long maxClientBuffer = (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes;
|
||||
if (maxClientBuffer <= 0)
|
||||
maxClientBuffer = LLONG_MAX; // infinite essentially
|
||||
if (cserver.repl_backlog_disk_size)
|
||||
maxClientBuffer = std::max(g_pserver->repl_backlog_size, cserver.repl_backlog_disk_size);
|
||||
long long min_offset = LLONG_MAX;
|
||||
int listening_replicas = 0;
|
||||
while ((ln = listNext(&li))) {
|
||||
|
@ -4005,6 +4005,9 @@ void InitServerLast() {
|
||||
g_pserver->initial_memory_usage = zmalloc_used_memory();
|
||||
|
||||
g_pserver->asyncworkqueue = new (MALLOC_LOCAL) AsyncWorkQueue(cserver.cthreads);
|
||||
|
||||
// Allocate the repl backlog
|
||||
|
||||
}
|
||||
|
||||
/* Parse the flags string description 'strflags' and set them to the
|
||||
|
@ -2142,6 +2142,7 @@ struct redisServerConst {
|
||||
char *storage_conf = nullptr;
|
||||
int fForkBgSave = false;
|
||||
int time_thread_priority = false;
|
||||
long long repl_backlog_disk_size = 0;
|
||||
};
|
||||
|
||||
struct redisServer {
|
||||
@ -2381,6 +2382,7 @@ struct redisServer {
|
||||
int replicaseldb; /* Last SELECTed DB in replication output */
|
||||
int repl_ping_slave_period; /* Master pings the replica every N seconds */
|
||||
char *repl_backlog; /* Replication backlog for partial syncs */
|
||||
char *repl_backlog_disk = nullptr;
|
||||
long long repl_backlog_size; /* Backlog circular buffer size */
|
||||
long long repl_backlog_config_size; /* The repl backlog may grow but we want to know what the user set it to */
|
||||
long long repl_backlog_histlen; /* Backlog actual data length */
|
||||
|
@ -123,6 +123,7 @@ start_server {tags {"introspection"}} {
|
||||
active-replica
|
||||
bind
|
||||
set-proc-title
|
||||
repl-backlog-disk-reserve
|
||||
}
|
||||
|
||||
if {!$::tls} {
|
||||
|
Loading…
x
Reference in New Issue
Block a user