Respect replica output buffer limits when adding large commands to the ring buffer

Former-commit-id: 37ec01cfd8a8da1e895c7cdc358d382d35ad59dd
This commit is contained in:
John Sully 2021-05-03 16:33:16 +00:00
parent 95ae92a691
commit c58739bbcb
2 changed files with 43 additions and 2 deletions

View File

@ -279,9 +279,10 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; long long minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1;
if (minimumsize > g_pserver->repl_backlog_size) { if (minimumsize > g_pserver->repl_backlog_size) {
flushReplBacklogToClients(); flushReplBacklogToClients();
serverAssert(g_pserver->master_repl_offset == g_pserver->repl_batch_offStart);
minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1; minimumsize = g_pserver->master_repl_offset + len - g_pserver->repl_batch_offStart+1;
if (minimumsize > g_pserver->repl_backlog_size) { if (minimumsize > g_pserver->repl_backlog_size && minimumsize < (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes) {
// This is an emergency overflow, we better resize to fit // This is an emergency overflow, we better resize to fit
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize); serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize);
@ -4458,8 +4459,24 @@ void flushReplBacklogToClients()
if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) { if (g_pserver->repl_batch_offStart != g_pserver->master_repl_offset) {
bool fAsyncWrite = false; bool fAsyncWrite = false;
// Ensure no overflow
serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset); serverAssert(g_pserver->repl_batch_offStart < g_pserver->master_repl_offset);
if (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart > g_pserver->repl_backlog_size) {
// We overflowed
listIter li;
listNode *ln;
listRewind(g_pserver->slaves, &li);
while ((ln = listNext(&li))) {
client *c = (client*)listNodeValue(ln);
sds sdsClient = catClientInfoString(sdsempty(),c);
freeClientAsync(c);
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP for overcoming of output buffer limits.", sdsClient);
sdsfree(sdsClient);
}
goto LDone;
}
// Ensure no overflow if we get here
serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size); serverAssert(g_pserver->master_repl_offset - g_pserver->repl_batch_offStart <= g_pserver->repl_backlog_size);
serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx); serverAssert(g_pserver->repl_batch_idxStart != g_pserver->repl_backlog_idx);
@ -4497,6 +4514,7 @@ void flushReplBacklogToClients()
if (fAsyncWrite) if (fAsyncWrite)
ProcessPendingAsyncWrites(); ProcessPendingAsyncWrites();
LDone:
// This may be called multiple times per "frame" so update with our progress flushing to clients // This may be called multiple times per "frame" so update with our progress flushing to clients
g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx; g_pserver->repl_batch_idxStart = g_pserver->repl_backlog_idx;
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset; g_pserver->repl_batch_offStart = g_pserver->master_repl_offset;

View File

@ -86,5 +86,28 @@ start_server {tags {"repl"}} {
} }
assert_equal [r debug digest] [r -1 debug digest] assert_equal [r debug digest] [r -1 debug digest]
} }
test {REPL Backlog handles large value} {
# initialize bigval to 64-bytes
r flushall
r config set repl-backlog-size 1K
r config set client-output-buffer-limit "replica 1024 1024 0"
set bigval "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
for {set i 0} { $i < 20 } { incr i } {
append bigval $bigval
}
r set bigkey $bigval
# We expect the replication to be disconnected so wait a bit
wait_for_condition 50 100 {
[s -1 master_link_status] eq {down}
} else {
fail "Memory limit exceeded but not detected"
}
wait_for_condition 50 100 {
[r debug digest] eq [r -1 debug digest]
} else {
fail "Replica did not reconnect"
}
}
} }
} }