Prevent unnecessary copies in replication scenarios

Former-commit-id: b152a9bd88c081ce98eebe9a7af49649e60e5523
This commit is contained in:
John Sully 2021-03-25 23:14:48 +00:00
parent 84e07e5d24
commit a0ea81d682
2 changed files with 13 additions and 0 deletions

View File

@ -312,6 +312,16 @@ int prepareClientToWrite(client *c) {
* Low level functions to add more data to output buffers.
* -------------------------------------------------------------------------- */
void _clientAsyncReplyBufferReserve(client *c, size_t len) {
if (c->replyAsync != nullptr)
return;
size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES);
clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize);
replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock);
replyNew->used = 0;
c->replyAsync = replyNew;
}
/* Attempts to add the reply to the static buffer in the client struct.
* Returns C_ERR if the buffer is full, or the reply list is not empty,
* in which case the reply must be added to the reply list. */

View File

@ -4449,6 +4449,7 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long
sdsfree(szFromObj(&objTtl));
}
void _clientAsyncReplyBufferReserve(client *c, size_t len);
void flushReplBacklogToClients()
{
serverAssert(GlobalLocksAcquired());
@ -4486,6 +4487,8 @@ void flushReplBacklogToClients()
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy);
} else {
auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart;
if (fAsyncWrite)
_clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx);
addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1);
addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx);
serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));