From 6fbf6f8ed1d7f2259dcf8251a566fd399b28f1c0 Mon Sep 17 00:00:00 2001 From: John Sully Date: Mon, 7 Mar 2022 16:40:01 -0500 Subject: [PATCH] Fix fast-sync perf issue while server is under load (batch size too small) --- src/replication.cpp | 35 ++++++++++++++++++++++++++--------- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/src/replication.cpp b/src/replication.cpp index 3a506a62a..4e8d81a91 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -995,6 +995,7 @@ need_full_resync: } int checkClientOutputBufferLimits(client *c); +void clientInstallAsyncWriteHandler(client *c); class replicationBuffer { std::vector replicas; clientReplyBlock *reply = nullptr; @@ -1024,6 +1025,17 @@ public: if (reply == nullptr) return; size_t written = reply->used; + + for (auto replica : replicas) { + std::unique_lock ul(replica->lock); + while (checkClientOutputBufferLimits(replica) + && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { + ul.unlock(); + usleep(0); + ul.lock(); + } + } + aeAcquireLock(); for (size_t ireplica = 0; ireplica < replicas.size(); ++ireplica) { auto replica = replicas[ireplica]; @@ -1031,18 +1043,23 @@ public: replica->replstate = REPL_STATE_NONE; continue; } - - while (checkClientOutputBufferLimits(replica) - && (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) { - aeReleaseLock(); - usleep(10); - aeAcquireLock(); - } std::unique_lock lock(replica->lock); - addReplyProto(replica, reply->buf(), reply->used); + if (ireplica == (replicas.size()-1) && replica->replyAsync == nullptr) { + replica->replyAsync = reply; + reply = nullptr; + if (!(replica->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(replica); + } else { + addReplyProto(replica, reply->buf(), reply->used); + } } ProcessPendingAsyncWrites(); + for (auto c : replicas) { + if (c->flags & CLIENT_CLOSE_ASAP) { + std::unique_lock ul(c->lock); + c->replstate = REPL_STATE_NONE; // otherwise the client can't be free'd + } + } replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end()); aeReleaseLock(); if (reply != nullptr) { @@ -1062,7 +1079,7 @@ public: } if (reply == nullptr) { - reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*2))); + reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*64))); reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock); reply->used = 0; }