diff --git a/src/listpack.c b/src/listpack.c index b403a1200..a2255f0d7 100644 --- a/src/listpack.c +++ b/src/listpack.c @@ -219,9 +219,12 @@ int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) { } /* Create a new, empty listpack. - * On success the new listpack is returned, otherwise an error is returned. */ -unsigned char *lpNew(void) { - unsigned char *lp = lp_malloc(LP_HDR_SIZE+1); + * On success the new listpack is returned, otherwise an error is returned. + * Pre-allocate at least `capacity` bytes of memory, + * over-allocated memory can be shrinked by `lpShrinkToFit`. + * */ +unsigned char *lpNew(size_t capacity) { + unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1); if (lp == NULL) return NULL; lpSetTotalBytes(lp,LP_HDR_SIZE+1); lpSetNumElements(lp,0); @@ -234,6 +237,16 @@ void lpFree(unsigned char *lp) { lp_free(lp); } +/* Shrink the memory to fit. */ +unsigned char* lpShrinkToFit(unsigned char *lp) { + size_t size = lpGetTotalBytes(lp); + if (size < lp_malloc_size(lp)) { + return lp_realloc(lp, size); + } else { + return lp; + } +} + /* Given an element 'ele' of size 'size', determine if the element can be * represented inside the listpack encoded as integer, and returns * LP_ENCODING_INT if so. Otherwise returns LP_ENCODING_STR if no integer @@ -702,7 +715,8 @@ unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, un unsigned char *dst = lp + poff; /* May be updated after reallocation. */ /* Realloc before: we need more room. */ - if (new_listpack_bytes > old_listpack_bytes) { + if (new_listpack_bytes > old_listpack_bytes && + new_listpack_bytes > lp_malloc_size(lp)) { if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL; dst = lp + poff; } diff --git a/src/listpack.h b/src/listpack.h index e8375628b..f87622c18 100644 --- a/src/listpack.h +++ b/src/listpack.h @@ -35,6 +35,7 @@ #ifndef __LISTPACK_H #define __LISTPACK_H +#include #include #define LP_INTBUF_SIZE 21 /* 20 digits of -2^63 + 1 null term = 21. */ @@ -44,8 +45,9 @@ #define LP_AFTER 1 #define LP_REPLACE 2 -unsigned char *lpNew(void); +unsigned char *lpNew(size_t capacity); void lpFree(unsigned char *lp); +unsigned char* lpShrinkToFit(unsigned char *lp); unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp); unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size); unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp); diff --git a/src/listpack_malloc.h b/src/listpack_malloc.h index 401ab6f74..3a9050052 100644 --- a/src/listpack_malloc.h +++ b/src/listpack_malloc.h @@ -42,4 +42,5 @@ #define lp_malloc zmalloc #define lp_realloc zrealloc #define lp_free zfree +#define lp_malloc_size zmalloc_usable_size #endif diff --git a/src/t_stream.c b/src/t_stream.c index 7b4ffe3c4..da43fce18 100644 --- a/src/t_stream.c +++ b/src/t_stream.c @@ -43,6 +43,10 @@ * avoid malloc allocation.*/ #define STREAMID_STATIC_VECTOR_LEN 8 +/* Max pre-allocation for listpack. This is done to avoid abuse of a user + * setting stream_node_max_bytes to a huge number. */ +#define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096 + void streamFreeCG(streamCG *cg); void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); @@ -509,7 +513,13 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ lp = NULL; } else if (server.stream_node_max_entries) { int64_t count = lpGetInteger(lpFirst(lp)); - if (count >= server.stream_node_max_entries) lp = NULL; + if (count >= server.stream_node_max_entries) { + /* Shrink extra pre-allocated memory */ + lp = lpShrinkToFit(lp); + if (ri.data != lp) + raxInsert(s->rax,ri.key,ri.key_len,lp,NULL); + lp = NULL; + } } } @@ -517,8 +527,17 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ if (lp == NULL) { master_id = id; streamEncodeID(rax_key,&id); - /* Create the listpack having the master entry ID and fields. */ - lp = lpNew(); + /* Create the listpack having the master entry ID and fields. + * Pre-allocate some bytes when creating listpack to avoid realloc on + * every XADD. Since listpack.c uses malloc_size, it'll grow in steps, + * and won't realloc on every XADD. + * When listpack reaches max number of entries, we'll shrink the + * allocation to fit the data. */ + size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE; + if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) { + prealloc = server.stream_node_max_bytes; + } + lp = lpNew(prealloc); lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */ lp = lpAppendInteger(lp,0); /* Zero deleted so far. */ lp = lpAppendInteger(lp,numfields);