Reduce async write copies

Former-commit-id: ed369d722335ed4105748ef2bed5d64f3f32c433
This commit is contained in:
John Sully 2020-10-15 23:10:17 +00:00
parent 665a9ed7c6
commit a9f4c37604
2 changed files with 51 additions and 41 deletions

View File

@ -172,9 +172,7 @@ client *createClient(connection *conn, int iel) {
c->pubsub_patterns = listCreate();
c->peerid = NULL;
c->client_list_node = NULL;
c->bufAsync = NULL;
c->buflenAsync = 0;
c->bufposAsync = 0;
c->replyAsync = NULL;
c->client_tracking_redirection = 0;
c->casyncOpsPending = 0;
c->master_error = 0;
@ -299,15 +297,27 @@ int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) {
if (fAsync)
{
serverAssert(GlobalLocksAcquired());
if ((c->buflenAsync - c->bufposAsync) < (int)len)
if (c->replyAsync == nullptr || (c->replyAsync->size - c->replyAsync->used) < len)
{
int minsize = len + c->bufposAsync;
c->buflenAsync = std::max(minsize, c->buflenAsync*2 - c->buflenAsync);
c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL);
c->buflenAsync = zmalloc_usable(c->bufAsync);
if (c->replyAsync == nullptr) {
size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES);
clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize);
replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock);
replyNew->used = 0;
c->replyAsync = replyNew;
} else {
size_t newsize = std::max(c->replyAsync->used + len, c->replyAsync->size*2);
clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize);
replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock);
replyNew->used = c->replyAsync->used;
memcpy(replyNew->buf(), c->replyAsync->buf(), c->replyAsync->used);
zfree(c->replyAsync);
c->replyAsync = replyNew;
}
}
memcpy(c->bufAsync+c->bufposAsync,s,len);
c->bufposAsync += len;
memcpy(c->replyAsync->buf() + c->replyAsync->used,s,len);
c->replyAsync->used += len;
}
else
{
@ -633,7 +643,7 @@ void *addReplyDeferredLenAsync(client *c) {
if (FCorrectThread(c))
return addReplyDeferredLen(c);
return (void*)((ssize_t)c->bufposAsync);
return (void*)((ssize_t)(c->replyAsync ? c->replyAsync->used : 0));
}
/* Populate the length object and try gluing it to the next chunk. */
@ -689,17 +699,22 @@ void setDeferredAggregateLenAsync(client *c, void *node, long length, char prefi
char lenstr[128];
int lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length);
ssize_t idxSplice = (ssize_t)node;
serverAssert(idxSplice <= c->bufposAsync);
if (c->buflenAsync < (c->bufposAsync + lenstr_len))
size_t idxSplice = (size_t)node;
serverAssert(idxSplice <= c->replyAsync->used);
if (c->replyAsync->size < (c->replyAsync->used + lenstr_len))
{
c->buflenAsync = std::max((int)(c->bufposAsync+lenstr_len), c->buflenAsync*2 - c->buflenAsync);
c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL);
int newsize = std::max(c->replyAsync->used + lenstr_len, c->replyAsync->size*2);
clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize);
replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock);
replyNew->used = c->replyAsync->used;
memcpy(replyNew->buf(), c->replyAsync->buf(), c->replyAsync->used);
zfree(c->replyAsync);
c->replyAsync = replyNew;
}
memmove(c->bufAsync + idxSplice + lenstr_len, c->bufAsync + idxSplice, c->bufposAsync - idxSplice);
memcpy(c->bufAsync + idxSplice, lenstr, lenstr_len);
c->bufposAsync += lenstr_len;
memmove(c->replyAsync->buf() + idxSplice + lenstr_len, c->replyAsync->buf() + idxSplice, c->replyAsync->used - idxSplice);
memcpy(c->replyAsync->buf() + idxSplice, lenstr, lenstr_len);
c->replyAsync->used += lenstr_len;
}
void setDeferredArrayLen(client *c, void *node, long length) {
@ -1640,7 +1655,7 @@ bool freeClient(client *c) {
/* Release other dynamically allocated client structure fields,
* and finally release the client structure itself. */
zfree(c->bufAsync);
zfree(c->replyAsync);
if (c->name) decrRefCount(c->name);
zfree(c->argv);
freeClientMultiState(c);
@ -1846,29 +1861,25 @@ void ProcessPendingAsyncWrites()
serverAssert(c->fPendingAsyncWrite);
if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY))
{
c->bufposAsync = 0;
c->buflenAsync = 0;
zfree(c->bufAsync);
c->bufAsync = nullptr;
zfree(c->replyAsync);
c->replyAsync = nullptr;
c->fPendingAsyncWrite = FALSE;
continue;
}
// TODO: Append to end of reply block?
int size = c->replyAsync->used;
size_t size = c->bufposAsync;
clientReplyBlock *reply = (clientReplyBlock*)zmalloc(size + sizeof(clientReplyBlock), MALLOC_LOCAL);
/* take over the allocation's internal fragmentation */
reply->size = zmalloc_usable(reply) - sizeof(clientReplyBlock);
reply->used = c->bufposAsync;
memcpy(reply->buf(), c->bufAsync, c->bufposAsync);
listAddNodeTail(c->reply, reply);
c->reply_bytes += reply->size;
if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) {
memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size);
c->bufpos += size;
} else {
c->reply_bytes += c->replyAsync->size;
listAddNodeTail(c->reply, c->replyAsync);
c->replyAsync = nullptr;
}
c->bufposAsync = 0;
c->buflenAsync = 0;
zfree(c->bufAsync);
c->bufAsync = nullptr;
zfree(c->replyAsync);
c->replyAsync = nullptr;
c->fPendingAsyncWrite = FALSE;
// Now install the write event handler
@ -3241,7 +3252,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) {
* enforcing the client output length limits. */
unsigned long getClientOutputBufferMemoryUsage(client *c) {
unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock);
return c->reply_bytes + (list_item_size*listLength(c->reply)) + c->buflenAsync;
return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0);
}
/* Get the class of a client, used in order to enforce limits to different

View File

@ -327,6 +327,7 @@ inline bool operator!=(const void *p, const robj_sharedptr &rhs)
#define PROTO_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */
#define PROTO_IOBUF_LEN (1024*16) /* Generic I/O buffer size */
#define PROTO_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */
#define PROTO_ASYNC_REPLY_CHUNK_BYTES (1024)
#define PROTO_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */
#define PROTO_MBULK_BIG_ARG (1024*32)
#define LONG_STR_SIZE 21 /* Bytes needed for long -> str + '\0' */
@ -1145,9 +1146,7 @@ typedef struct client {
char buf[PROTO_REPLY_CHUNK_BYTES];
/* Async Response Buffer - other threads write here */
int bufposAsync;
int buflenAsync;
char *bufAsync;
clientReplyBlock *replyAsync;
int iel; /* the event loop index we're registered with */
struct fastlock lock;