diff --git a/src/networking.cpp b/src/networking.cpp index 15aa6f43a..0774694ae 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -312,6 +312,16 @@ int prepareClientToWrite(client *c) { * Low level functions to add more data to output buffers. * -------------------------------------------------------------------------- */ +void _clientAsyncReplyBufferReserve(client *c, size_t len) { + if (c->replyAsync != nullptr) + return; + size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES); + clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); + replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); + replyNew->used = 0; + c->replyAsync = replyNew; +} + /* Attempts to add the reply to the static buffer in the client struct. * Returns C_ERR if the buffer is full, or the reply list is not empty, * in which case the reply must be added to the reply list. */ diff --git a/src/replication.cpp b/src/replication.cpp index 9f13dcda6..2533bae52 100644 --- a/src/replication.cpp +++ b/src/replication.cpp @@ -4449,6 +4449,7 @@ void replicateSubkeyExpire(redisDb *db, robj_roptr key, robj_roptr subkey, long sdsfree(szFromObj(&objTtl)); } +void _clientAsyncReplyBufferReserve(client *c, size_t len); void flushReplBacklogToClients() { serverAssert(GlobalLocksAcquired()); @@ -4486,6 +4487,8 @@ void flushReplBacklogToClients() addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbCopy); } else { auto cbPhase1 = g_pserver->repl_backlog_size - g_pserver->repl_batch_idxStart; + if (fAsyncWrite) + _clientAsyncReplyBufferReserve(replica, cbPhase1 + g_pserver->repl_backlog_idx); addReplyProto(replica, g_pserver->repl_backlog + g_pserver->repl_batch_idxStart, cbPhase1); addReplyProto(replica, g_pserver->repl_backlog, g_pserver->repl_backlog_idx); serverAssert((cbPhase1 + g_pserver->repl_backlog_idx) == (g_pserver->master_repl_offset - g_pserver->repl_batch_offStart));