From 788ff61b4b0403de63c4a847f5bd10f68cef9627 Mon Sep 17 00:00:00 2001 From: John Sully Date: Thu, 25 Mar 2021 23:14:48 +0000 Subject: [PATCH] Prevent unnecessary copies in replication scenarios Former-commit-id: 84c671cdafae7e84beef722fcc8a90b92918c89d --- src/networking.cpp | 10 ++++++++++ src/replication.cpp | 3 +++ 2 files changed, 13 insertions(+) diff --git a/src/networking.cpp b/src/networking.cpp index 15aa6f43a..0774694ae 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -312,6 +312,16 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ +void _clientAsyncReplyBufferReserve(client *c, size_t len) { + if (c->replyAsync != nullptr) + return; + size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES); + clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); + replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); + replyNew->used = 0; + c->replyAsync = replyNew; +} + /* Attempts to add the reply to the static buffer in the client struct. * Returns C_ERR if the buffer is full, or the reply list is not empty, * in which case the reply must be added to the reply list. */ diff --git a/src/replication.cpp b/src/replication.cpp index 9f13dcda6..2533bae52 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -4449,6 +4449,7 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long sdsfree(szFromObj(&objTtl)); } +void _clientAsyncReplyBufferReserve(client *c, size_t len); void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); @@ -4486,6 +4487,8 @@ void flushReplBacklogToClients() addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); } else { auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; + if (fAsyncWrite) + _clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx); addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));