diff --git a/src/replication.cpp b/src/replication.cpp index c3b902db4..12142f9a5 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -303,19 +303,56 @@ void feedReplicationBacklog(const void *ptr, size_t len) { if (lower_bound == -1) lower_bound = g_pserver->repl_batch_offStart; long long minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; + if (minimumsize > g_pserver->repl_backlog_size) { - flushReplBacklogToClients(); - lower_bound = g_pserver->repl_lowest_off.load(std::memory_order_seq_cst); - if (lower_bound == -1) - lower_bound = g_pserver->repl_batch_offStart; + listIter li; + listNode *ln; + listRewind(g_pserver->slaves, &li); + long long maxClientBuffer = (long long)cserver.client_obuf_limits[CLIENT_TYPE_SLAVE].hard_limit_bytes; + if (maxClientBuffer <= 0) + maxClientBuffer = LLONG_MAX; // infinite essentially + long long min_offset = LLONG_MAX; + int listening_replicas = 0; + while ((ln = listNext(&li))) { + client *replica = (client*)listNodeValue(ln); + if (!canFeedReplicaReplBuffer(replica)) continue; + if (replica->flags & CLIENT_CLOSE_ASAP) continue; - minimumsize = g_pserver->master_repl_offset + len - lower_bound + 1; + std::unique_lock ul(replica->lock); - if (minimumsize > g_pserver->repl_backlog_size) { + // Would this client overflow? If so close it + long long neededBuffer = g_pserver->master_repl_offset + len - replica->repl_curr_off + 1; + if (neededBuffer > maxClientBuffer) { + + sds clientInfo = catClientInfoString(sdsempty(),replica); + freeClientAsync(replica); + serverLog(LL_WARNING,"Client %s scheduled to be closed ASAP due to exceeding output buffer hard limit.", clientInfo); + sdsfree(clientInfo); + continue; + } + min_offset = std::min(min_offset, replica->repl_curr_off); + ++listening_replicas; + } + + if (min_offset == LLONG_MAX) { + min_offset = g_pserver->repl_batch_offStart; + g_pserver->repl_lowest_off = -1; + } else { + g_pserver->repl_lowest_off = min_offset; + } + + minimumsize = g_pserver->master_repl_offset + len - min_offset + 1; + serverAssert(listening_replicas == 0 || minimumsize <= maxClientBuffer); + + if (minimumsize > g_pserver->repl_backlog_size && listening_replicas) { // This is an emergency overflow, we better resize to fit long long newsize = std::max(g_pserver->repl_backlog_size*2, minimumsize); - serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld", newsize); + serverLog(LL_WARNING, "Replication backlog is too small, resizing to: %lld bytes", newsize); resizeReplicationBacklog(newsize); + } else if (!listening_replicas) { + // We need to update a few variables or later asserts will notice we dropped data + g_pserver->repl_batch_offStart = g_pserver->master_repl_offset + len; + g_pserver->repl_lowest_off = -1; } } }