From 8b40f686fb72358e600b238b8e8da5af2227d3de Mon Sep 17 00:00:00 2001 From: Oran Agra Date: Fri, 24 Apr 2020 17:20:28 +0300 Subject: [PATCH 1/2] optimize memory usage of deferred replies - fixed When deffered reply is added the previous reply node cannot be used so all the extra space we allocated in it is wasted. in case someone uses deffered replies in a loop, each time adding a small reply, each of these reply nodes (the small string reply) would have consumed a 16k block. now when we add anther diferred reply node, we trim the unused portion of the previous reply block. see #7123 cherry picked from commit 4ed5b7cb74caf5bef6606909603e371af0da4f9b with fix to handle a crash with LIBC allocator, which apparently can return the same pointer despite changing it's size. i.e. shrinking an allocation of 16k into 56 bytes without changing the pointer. --- src/networking.c | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/networking.c b/src/networking.c index c4a277e0a..d62533e3e 100644 --- a/src/networking.c +++ b/src/networking.c @@ -436,6 +436,34 @@ void addReplyStatusFormat(client *c, const char *fmt, ...) { sdsfree(s); } +/* Sometimes we are forced to create a new reply node, and we can't append to + * the previous one, when that happens, we wanna try to trim the unused space + * at the end of the last reply node which we won't use anymore. */ +void trimReplyUnusedTailSpace(client *c) { + listNode *ln = listLast(c->reply); + clientReplyBlock *tail = ln? listNodeValue(ln): NULL; + + /* Note that 'tail' may be NULL even if we have a tail node, becuase when + * addDeferredMultiBulkLength() is used */ + if (!tail) return; + + /* We only try to trim the space is relatively high (more than a 1/4 of the + * allocation), otherwise there's a high chance realloc will NOP. + * Also, to avoid large memmove which happens as part of realloc, we only do + * that if the used part is small. */ + if (tail->size - tail->used > tail->size / 4 && + tail->used < PROTO_REPLY_CHUNK_BYTES) + { + size_t old_size = tail->size; + tail = zrealloc(tail, tail->used + sizeof(clientReplyBlock)); + /* take over the allocation's internal fragmentation (at least for + * memory usage tracking) */ + tail->size = zmalloc_usable(tail) - sizeof(clientReplyBlock); + c->reply_bytes += tail->size - old_size; + listNodeValue(ln) = tail; + } +} + /* Adds an empty object to the reply list that will contain the multi bulk * length, which is not known when this function is called. */ void *addReplyDeferredLen(client *c) { @@ -443,6 +471,7 @@ void *addReplyDeferredLen(client *c) { * ready to be sent, since we are sure that before returning to the * event loop setDeferredAggregateLen() will be called. */ if (prepareClientToWrite(c) != C_OK) return NULL; + trimReplyUnusedTailSpace(c); listAddNodeTail(c->reply,NULL); /* NULL is our placeholder. */ return listLast(c->reply); } From 3843eb300d9da39dbb75d7c9f29d4948d305af8c Mon Sep 17 00:00:00 2001 From: Guy Benoish Date: Sun, 3 May 2020 16:49:45 +0300 Subject: [PATCH 2/2] XPENDING should not update consumer's seen-time Same goes for XGROUP DELCONSUMER (But in this case, it doesn't have any visible effect) --- src/blocked.c | 7 ++++--- src/rdb.c | 4 ++-- src/stream.h | 7 ++++++- src/t_stream.c | 33 ++++++++++++++++++++------------- 4 files changed, 32 insertions(+), 19 deletions(-) diff --git a/src/blocked.c b/src/blocked.c index 045369e93..92f1cee65 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -371,9 +371,10 @@ void serveClientsBlockedOnStreamKey(robj *o, readyList *rl) { int noack = 0; if (group) { - consumer = streamLookupConsumer(group, - receiver->bpop.xread_consumer->ptr, - 1); + consumer = + streamLookupConsumer(group, + receiver->bpop.xread_consumer->ptr, + SLC_NONE); noack = receiver->bpop.xread_group_noack; } diff --git a/src/rdb.c b/src/rdb.c index 6e4af3987..c882efcb4 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1851,8 +1851,8 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key) { decrRefCount(o); return NULL; } - streamConsumer *consumer = streamLookupConsumer(cgroup,cname, - 1); + streamConsumer *consumer = + streamLookupConsumer(cgroup,cname,SLC_NONE); sdsfree(cname); consumer->seen_time = rdbLoadMillisecondTime(rdb,RDB_VERSION); if (rioGetReadError(rdb)) { diff --git a/src/stream.h b/src/stream.h index b69073994..0d3bf63fc 100644 --- a/src/stream.h +++ b/src/stream.h @@ -96,6 +96,11 @@ typedef struct streamPropInfo { /* Prototypes of exported APIs. */ struct client; +/* Flags for streamLookupConsumer */ +#define SLC_NONE 0 +#define SLC_NOCREAT (1<<0) /* Do not create the consumer if it doesn't exist */ +#define SLC_NOREFRESH (1<<1) /* Do not update consumer's seen-time */ + stream *streamNew(void); void freeStream(stream *s); unsigned long streamLength(const robj *subject); @@ -105,7 +110,7 @@ int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields); void streamIteratorGetField(streamIterator *si, unsigned char **fieldptr, unsigned char **valueptr, int64_t *fieldlen, int64_t *valuelen); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create); +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); streamCG *streamCreateCG(stream *s, char *name, size_t namelen, streamID *id); streamNACK *streamCreateNACK(streamConsumer *consumer); void streamDecodeID(void *buf, streamID *id); diff --git a/src/t_stream.c b/src/t_stream.c index 5c1b9a523..676ddd9bb 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -1570,7 +1570,8 @@ void xreadCommand(client *c) { addReplyBulk(c,c->argv[streams_arg+i]); streamConsumer *consumer = NULL; if (groups) consumer = streamLookupConsumer(groups[i], - consumername->ptr,1); + consumername->ptr, + SLC_NONE); streamPropInfo spi = {c->argv[i+streams_arg],groupname}; int flags = 0; if (noack) flags |= STREAM_RWR_NOACK; @@ -1706,7 +1707,9 @@ streamCG *streamLookupCG(stream *s, sds groupname) { * consumer does not exist it is automatically created as a side effect * of calling this function, otherwise its last seen time is updated and * the existing consumer reference returned. */ -streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { +streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { + int create = !(flags & SLC_NOCREAT); + int refresh = !(flags & SLC_NOREFRESH); streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); if (consumer == raxNotFound) { @@ -1717,7 +1720,7 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { raxInsert(cg->consumers,(unsigned char*)name,sdslen(name), consumer,NULL); } - consumer->seen_time = mstime(); + if (refresh) consumer->seen_time = mstime(); return consumer; } @@ -1725,7 +1728,8 @@ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int create) { * may have pending messages: they are removed from the PEL, and the number * of pending messages "lost" is returned. */ uint64_t streamDelConsumer(streamCG *cg, sds name) { - streamConsumer *consumer = streamLookupConsumer(cg,name,0); + streamConsumer *consumer = + streamLookupConsumer(cg,name,SLC_NOCREAT|SLC_NOREFRESH); if (consumer == NULL) return 0; uint64_t retval = raxSize(consumer->pel); @@ -2068,15 +2072,18 @@ void xpendingCommand(client *c) { } /* XPENDING [] variant. */ else { - streamConsumer *consumer = consumername ? - streamLookupConsumer(group,consumername->ptr,0): - NULL; + streamConsumer *consumer = NULL; + if (consumername) { + consumer = streamLookupConsumer(group, + consumername->ptr, + SLC_NOCREAT|SLC_NOREFRESH); - /* If a consumer name was mentioned but it does not exist, we can - * just return an empty array. */ - if (consumername && consumer == NULL) { - addReplyArrayLen(c,0); - return; + /* If a consumer name was mentioned but it does not exist, we can + * just return an empty array. */ + if (consumer == NULL) { + addReplyArrayLen(c,0); + return; + } } rax *pel = consumer ? consumer->pel : group->pel; @@ -2338,7 +2345,7 @@ void xclaimCommand(client *c) { raxRemove(nack->consumer->pel,buf,sizeof(buf),NULL); /* Update the consumer and idle time. */ if (consumer == NULL) - consumer = streamLookupConsumer(group,c->argv[3]->ptr,1); + consumer = streamLookupConsumer(group,c->argv[3]->ptr,SLC_NONE); nack->consumer = consumer; nack->delivery_time = deliverytime; /* Set the delivery attempts counter if given, otherwise