Do not resize replica buffer past the max client limit
Former-commit-id: ba116500ca4fd53e4e40f04fc33981e60bb21ab7
This commit is contained in:
parent
29ef9708fa
commit
48b30ec34a
@ -303,19 +303,56 @@ void feedReplicationBacklog(const void *ptr, size_t len) {
|
|||||||
if (lower_bound == -1)
|
if (lower_bound == -1)
|
||||||
lower_bound = g_pserver->repl_batch_offStart;
|
lower_bound = g_pserver->repl_batch_offStart;
|
||||||
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
|
long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
|
||||||
|
|
||||||
if (minimumsize > g_pserver->repl_backlog_size) {
|
if (minimumsize > g_pserver->repl_backlog_size) {
|
||||||
flushReplBacklogToClients();
|
listIter li;
|
||||||
lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst);
|
listNode *ln;
|
||||||
if (lower_bound == -1)
|
listRewind(g_pserver->slaves, &li);
|
||||||
lower_bound = g_pserver->repl_batch_offStart;
|
long long maxClientBuffer = (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes;
|
||||||
|
if (maxClientBuffer <= 0)
|
||||||
|
maxClientBuffer = LLONG_MAX; // infinite essentially
|
||||||
|
long long min_offset = LLONG_MAX;
|
||||||
|
int listening_replicas = 0;
|
||||||
|
while ((ln = listNext(&li))) {
|
||||||
|
client *replica = (client*)listNodeValue(ln);
|
||||||
|
if (!canFeedReplicaReplBuffer(replica)) continue;
|
||||||
|
if (replica->flags & CLIENT_CLOSE_ASAP) continue;
|
||||||
|
|
||||||
minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1;
|
std::unique_lock<fastlock> ul(replica->lock);
|
||||||
|
|
||||||
if (minimumsize > g_pserver->repl_backlog_size) {
|
// Would this client overflow? If so close it
|
||||||
|
long long neededBuffer = g_pserver->master_repl_offset + len - replica->repl_curr_off + 1;
|
||||||
|
if (neededBuffer > maxClientBuffer) {
|
||||||
|
|
||||||
|
sds clientInfo = catClientInfoString(sdsempty(),replica);
|
||||||
|
freeClientAsync(replica);
|
||||||
|
serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP due to exceeding output buffer hard limit.", clientInfo);
|
||||||
|
sdsfree(clientInfo);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
min_offset = std::min(min_offset, replica->repl_curr_off);
|
||||||
|
++listening_replicas;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (min_offset == LLONG_MAX) {
|
||||||
|
min_offset = g_pserver->repl_batch_offStart;
|
||||||
|
g_pserver->repl_lowest_off = -1;
|
||||||
|
} else {
|
||||||
|
g_pserver->repl_lowest_off = min_offset;
|
||||||
|
}
|
||||||
|
|
||||||
|
minimumsize = g_pserver->master_repl_offset + len - min_offset + 1;
|
||||||
|
serverAssert(listening_replicas == 0 || minimumsize <= maxClientBuffer);
|
||||||
|
|
||||||
|
if (minimumsize > g_pserver->repl_backlog_size && listening_replicas) {
|
||||||
// This is an emergency overflow, we better resize to fit
|
// This is an emergency overflow, we better resize to fit
|
||||||
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
|
long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize);
|
||||||
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize);
|
serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld bytes", newsize);
|
||||||
resizeReplicationBacklog(newsize);
|
resizeReplicationBacklog(newsize);
|
||||||
|
} else if (!listening_replicas) {
|
||||||
|
// We need to update a few variables or later asserts will notice we dropped data
|
||||||
|
g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len;
|
||||||
|
g_pserver->repl_lowest_off = -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user