Fix fast-sync perf issue while server is under load (batch size too small)

This commit is contained in:
John Sully 2022-03-07 16:40:01 -05:00
parent 5df39b56ac
commit 6fbf6f8ed1

View File

@ -995,6 +995,7 @@ need_full_resync:
} }
int checkClientOutputBufferLimits(client *c); int checkClientOutputBufferLimits(client *c);
void clientInstallAsyncWriteHandler(client *c);
class replicationBuffer { class replicationBuffer {
std::vector<client *> replicas; std::vector<client *> replicas;
clientReplyBlock *reply = nullptr; clientReplyBlock *reply = nullptr;
@ -1024,6 +1025,17 @@ public:
if (reply == nullptr) if (reply == nullptr)
return; return;
size_t written = reply->used; size_t written = reply->used;
for (auto replica : replicas) {
std::unique_lock<fastlock> ul(replica->lock);
while (checkClientOutputBufferLimits(replica)
&& (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) {
ul.unlock();
usleep(0);
ul.lock();
}
}
aeAcquireLock(); aeAcquireLock();
for (size_t ireplica = 0; ireplica < replicas.size(); ++ireplica) { for (size_t ireplica = 0; ireplica < replicas.size(); ++ireplica) {
auto replica = replicas[ireplica]; auto replica = replicas[ireplica];
@ -1031,18 +1043,23 @@ public:
replica->replstate = REPL_STATE_NONE; replica->replstate = REPL_STATE_NONE;
continue; continue;
} }
while (checkClientOutputBufferLimits(replica)
&& (replica->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP) == 0) {
aeReleaseLock();
usleep(10);
aeAcquireLock();
}
std::unique_lock<fastlock> lock(replica->lock); std::unique_lock<fastlock> lock(replica->lock);
addReplyProto(replica, reply->buf(), reply->used); if (ireplica == (replicas.size()-1) && replica->replyAsync == nullptr) {
replica->replyAsync = reply;
reply = nullptr;
if (!(replica->fPendingAsyncWrite)) clientInstallAsyncWriteHandler(replica);
} else {
addReplyProto(replica, reply->buf(), reply->used);
}
} }
ProcessPendingAsyncWrites(); ProcessPendingAsyncWrites();
for (auto c : replicas) {
if (c->flags & CLIENT_CLOSE_ASAP) {
std::unique_lock<fastlock> ul(c->lock);
c->replstate = REPL_STATE_NONE; // otherwise the client can't be free'd
}
}
replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end()); replicas.erase(std::remove_if(replicas.begin(), replicas.end(), [](const client *c)->bool{ return c->flags.load(std::memory_order_relaxed) & CLIENT_CLOSE_ASAP;}), replicas.end());
aeReleaseLock(); aeReleaseLock();
if (reply != nullptr) { if (reply != nullptr) {
@ -1062,7 +1079,7 @@ public:
} }
if (reply == nullptr) { if (reply == nullptr) {
reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*2))); reply = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + std::max(size, (unsigned long)(PROTO_REPLY_CHUNK_BYTES*64)));
reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock); reply->size = zmalloc_usable_size(reply) - sizeof(clientReplyBlock);
reply->used = 0; reply->used = 0;
} }