diff --git a/src/replication.cpp b/src/replication.cpp index e78df9a62..d0cd15b37 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -279,9 +279,10 @@ void feedReplicationBacklog(const void *ptr, size_t len) { long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; if (minimumsize > g_pserver->repl_backlog_size) { flushReplBacklogToClients(); + serverAssert(g_pserver->master_repl_offset == g_pserver->repl_batch_offStart); minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; - if (minimumsize > g_pserver->repl_backlog_size) { + if (minimumsize > g_pserver->repl_backlog_size && minimumsize < (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes) { // This is an emergency overflow, we better resize to fit long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize); @@ -4458,8 +4459,24 @@ void flushReplBacklogToClients() if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { bool fAsyncWrite = false; - // Ensure no overflow + serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset); + if (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > g_pserver->repl_backlog_size) { + // We overflowed + listIter li; + listNode *ln; + listRewind(g_pserver->slaves, &li); + while ((ln = listNext(&li))) { + client *c = (client*)listNodeValue(ln); + sds sdsClient = catClientInfoString(sdsempty(),c); + freeClientAsync(c); + serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", sdsClient); + sdsfree(sdsClient); + } + goto LDone; + } + + // Ensure no overflow if we get here serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx); @@ -4497,6 +4514,7 @@ void flushReplBacklogToClients() if (fAsyncWrite) ProcessPendingAsyncWrites(); +LDone: // This may be called multiple times per "frame" so update with our progress flushing to clients g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; diff --git a/tests/integration/replication-2.tcl b/tests/integration/replication-2.tcl index 08905f11e..02687c619 100644 --- a/tests/integration/replication-2.tcl +++ b/tests/integration/replication-2.tcl @@ -86,5 +86,28 @@ start_server {tags {"repl"}} { } assert_equal [r debug digest] [r -1 debug digest] } + + test {REPL Backlog handles large value} { + # initialize bigval to 64-bytes + r flushall + r config set repl-backlog-size 1K + r config set client-output-buffer-limit "replica 1024 1024 0" + set bigval "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" + for {set i 0} { $i < 20 } { incr i } { + append bigval $bigval + } + r set bigkey $bigval + # We expect the replication to be disconnected so wait a bit + wait_for_condition 50 100 { + [s -1 master_link_status] eq {down} + } else { + fail "Memory limit exceeded but not detected" + } + wait_for_condition 50 100 { + [r debug digest] eq [r -1 debug digest] + } else { + fail "Replica did not reconnect" + } + } } }