diff --git a/src/networking.cpp b/src/networking.cpp index 14b6a4428..94f1c21c9 100644 --- a/src/networking.cpp +++ b/src/networking.cpp @@ -172,9 +172,7 @@ client *createClient(connection *conn, int iel) { c->pubsub_patterns = listCreate(); c->peerid = NULL; c->client_list_node = NULL; - c->bufAsync = NULL; - c->buflenAsync = 0; - c->bufposAsync = 0; + c->replyAsync = NULL; c->client_tracking_redirection = 0; c->casyncOpsPending = 0; c->master_error = 0; @@ -299,15 +297,27 @@ int _addReplyToBuffer(client *c, const char *s, size_t len, bool fAsync) { if (fAsync) { serverAssert(GlobalLocksAcquired()); - if ((c->buflenAsync - c->bufposAsync) < (int)len) + if (c->replyAsync == nullptr || (c->replyAsync->size - c->replyAsync->used) < len) { - int minsize = len + c->bufposAsync; - c->buflenAsync = std::max(minsize, c->buflenAsync*2 - c->buflenAsync); - c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL); - c->buflenAsync = zmalloc_usable(c->bufAsync); + if (c->replyAsync == nullptr) { + size_t newsize = std::max(len, (size_t)PROTO_ASYNC_REPLY_CHUNK_BYTES); + + clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); + replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); + replyNew->used = 0; + c->replyAsync = replyNew; + } else { + size_t newsize = std::max(c->replyAsync->used + len, c->replyAsync->size*2); + clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); + replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); + replyNew->used = c->replyAsync->used; + memcpy(replyNew->buf(), c->replyAsync->buf(), c->replyAsync->used); + zfree(c->replyAsync); + c->replyAsync = replyNew; + } } - memcpy(c->bufAsync+c->bufposAsync,s,len); - c->bufposAsync += len; + memcpy(c->replyAsync->buf() + c->replyAsync->used,s,len); + c->replyAsync->used += len; } else { @@ -633,7 +643,7 @@ void *addReplyDeferredLenAsync(client *c) { if (FCorrectThread(c)) return addReplyDeferredLen(c); - return (void*)((ssize_t)c->bufposAsync); + return (void*)((ssize_t)(c->replyAsync ? c->replyAsync->used : 0)); } /* Populate the length object and try gluing it to the next chunk. */ @@ -689,17 +699,22 @@ void setDeferredAggregateLenAsync(client *c, void *node, long length, char prefi char lenstr[128]; int lenstr_len = sprintf(lenstr, "%c%ld\r\n", prefix, length); - ssize_t idxSplice = (ssize_t)node; - serverAssert(idxSplice <= c->bufposAsync); - if (c->buflenAsync < (c->bufposAsync + lenstr_len)) + size_t idxSplice = (size_t)node; + serverAssert(idxSplice <= c->replyAsync->used); + if (c->replyAsync->size < (c->replyAsync->used + lenstr_len)) { - c->buflenAsync = std::max((int)(c->bufposAsync+lenstr_len), c->buflenAsync*2 - c->buflenAsync); - c->bufAsync = (char*)zrealloc(c->bufAsync, c->buflenAsync, MALLOC_LOCAL); + int newsize = std::max(c->replyAsync->used + lenstr_len, c->replyAsync->size*2); + clientReplyBlock *replyNew = (clientReplyBlock*)zmalloc(sizeof(clientReplyBlock) + newsize); + replyNew->size = zmalloc_usable(replyNew) - sizeof(clientReplyBlock); + replyNew->used = c->replyAsync->used; + memcpy(replyNew->buf(), c->replyAsync->buf(), c->replyAsync->used); + zfree(c->replyAsync); + c->replyAsync = replyNew; } - memmove(c->bufAsync + idxSplice + lenstr_len, c->bufAsync + idxSplice, c->bufposAsync - idxSplice); - memcpy(c->bufAsync + idxSplice, lenstr, lenstr_len); - c->bufposAsync += lenstr_len; + memmove(c->replyAsync->buf() + idxSplice + lenstr_len, c->replyAsync->buf() + idxSplice, c->replyAsync->used - idxSplice); + memcpy(c->replyAsync->buf() + idxSplice, lenstr, lenstr_len); + c->replyAsync->used += lenstr_len; } void setDeferredArrayLen(client *c, void *node, long length) { @@ -1640,7 +1655,7 @@ bool freeClient(client *c) { /* Release other dynamically allocated client structure fields, * and finally release the client structure itself. */ - zfree(c->bufAsync); + zfree(c->replyAsync); if (c->name) decrRefCount(c->name); zfree(c->argv); freeClientMultiState(c); @@ -1846,29 +1861,25 @@ void ProcessPendingAsyncWrites() serverAssert(c->fPendingAsyncWrite); if (c->flags & (CLIENT_CLOSE_ASAP | CLIENT_CLOSE_AFTER_REPLY)) { - c->bufposAsync = 0; - c->buflenAsync = 0; - zfree(c->bufAsync); - c->bufAsync = nullptr; + zfree(c->replyAsync); + c->replyAsync = nullptr; c->fPendingAsyncWrite = FALSE; continue; } - // TODO: Append to end of reply block? + int size = c->replyAsync->used; - size_t size = c->bufposAsync; - clientReplyBlock *reply = (clientReplyBlock*)zmalloc(size + sizeof(clientReplyBlock), MALLOC_LOCAL); - /* take over the allocation's internal fragmentation */ - reply->size = zmalloc_usable(reply) - sizeof(clientReplyBlock); - reply->used = c->bufposAsync; - memcpy(reply->buf(), c->bufAsync, c->bufposAsync); - listAddNodeTail(c->reply, reply); - c->reply_bytes += reply->size; + if (listLength(c->reply) == 0 && size <= (PROTO_REPLY_CHUNK_BYTES - c->bufpos)) { + memcpy(c->buf + c->bufpos, c->replyAsync->buf(), size); + c->bufpos += size; + } else { + c->reply_bytes += c->replyAsync->size; + listAddNodeTail(c->reply, c->replyAsync); + c->replyAsync = nullptr; + } - c->bufposAsync = 0; - c->buflenAsync = 0; - zfree(c->bufAsync); - c->bufAsync = nullptr; + zfree(c->replyAsync); + c->replyAsync = nullptr; c->fPendingAsyncWrite = FALSE; // Now install the write event handler @@ -3241,7 +3252,7 @@ void rewriteClientCommandArgument(client *c, int i, robj *newval) { * enforcing the client output length limits. */ unsigned long getClientOutputBufferMemoryUsage(client *c) { unsigned long list_item_size = sizeof(listNode) + sizeof(clientReplyBlock); - return c->reply_bytes + (list_item_size*listLength(c->reply)) + c->buflenAsync; + return c->reply_bytes + (list_item_size*listLength(c->reply)) + (c->replyAsync ? c->replyAsync->size : 0); } /* Get the class of a client, used in order to enforce limits to different diff --git a/src/server.h b/src/server.h index 3705b199a..3e0c48f77 100644 --- a/src/server.h +++ b/src/server.h @@ -327,6 +327,7 @@ inline bool operator!=(const void *p, const robj_sharedptr &rhs) #define PROTO_MAX_QUERYBUF_LEN (1024*1024*1024) /* 1GB max query buffer. */ #define PROTO_IOBUF_LEN (1024*16) /* Generic I/O buffer size */ #define PROTO_REPLY_CHUNK_BYTES (16*1024) /* 16k output buffer */ +#define PROTO_ASYNC_REPLY_CHUNK_BYTES (1024) #define PROTO_INLINE_MAX_SIZE (1024*64) /* Max size of inline reads */ #define PROTO_MBULK_BIG_ARG (1024*32) #define LONG_STR_SIZE 21 /* Bytes needed for long -> str + '\0' */ @@ -1145,9 +1146,7 @@ typedef struct client { char buf[PROTO_REPLY_CHUNK_BYTES]; /* Async Response Buffer - other threads write here */ - int bufposAsync; - int buflenAsync; - char *bufAsync; + clientReplyBlock *replyAsync; int iel; /* the event loop index we're registered with */ struct fastlock lock;