Optimize listpack for stream usage to avoid repeated reallocs (#6281)
Avoid repeated reallocs growing the listpack while entries are being added. This is done by pre-allocating the listpack to near maximum size, and using malloc_size to check if it needs realloc or not. When the listpack reaches the maximum number of entries, we shrink it to fit it's used size. Co-authored-by: Viktor Söderqvist <viktor@zuiderkwast.se> Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
a8d6c6493a
commit
ae4d2fe1b3
@ -219,9 +219,12 @@ int lpStringToInt64(const char *s, unsigned long slen, int64_t *value) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Create a new, empty listpack.
|
/* Create a new, empty listpack.
|
||||||
* On success the new listpack is returned, otherwise an error is returned. */
|
* On success the new listpack is returned, otherwise an error is returned.
|
||||||
unsigned char *lpNew(void) {
|
* Pre-allocate at least `capacity` bytes of memory,
|
||||||
unsigned char *lp = lp_malloc(LP_HDR_SIZE+1);
|
* over-allocated memory can be shrinked by `lpShrinkToFit`.
|
||||||
|
* */
|
||||||
|
unsigned char *lpNew(size_t capacity) {
|
||||||
|
unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
|
||||||
if (lp == NULL) return NULL;
|
if (lp == NULL) return NULL;
|
||||||
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
|
lpSetTotalBytes(lp,LP_HDR_SIZE+1);
|
||||||
lpSetNumElements(lp,0);
|
lpSetNumElements(lp,0);
|
||||||
@ -234,6 +237,16 @@ void lpFree(unsigned char *lp) {
|
|||||||
lp_free(lp);
|
lp_free(lp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Shrink the memory to fit. */
|
||||||
|
unsigned char* lpShrinkToFit(unsigned char *lp) {
|
||||||
|
size_t size = lpGetTotalBytes(lp);
|
||||||
|
if (size < lp_malloc_size(lp)) {
|
||||||
|
return lp_realloc(lp, size);
|
||||||
|
} else {
|
||||||
|
return lp;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Given an element 'ele' of size 'size', determine if the element can be
|
/* Given an element 'ele' of size 'size', determine if the element can be
|
||||||
* represented inside the listpack encoded as integer, and returns
|
* represented inside the listpack encoded as integer, and returns
|
||||||
* LP_ENCODING_INT if so. Otherwise returns LP_ENCODING_STR if no integer
|
* LP_ENCODING_INT if so. Otherwise returns LP_ENCODING_STR if no integer
|
||||||
@ -702,7 +715,8 @@ unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, un
|
|||||||
unsigned char *dst = lp + poff; /* May be updated after reallocation. */
|
unsigned char *dst = lp + poff; /* May be updated after reallocation. */
|
||||||
|
|
||||||
/* Realloc before: we need more room. */
|
/* Realloc before: we need more room. */
|
||||||
if (new_listpack_bytes > old_listpack_bytes) {
|
if (new_listpack_bytes > old_listpack_bytes &&
|
||||||
|
new_listpack_bytes > lp_malloc_size(lp)) {
|
||||||
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
|
if ((lp = lp_realloc(lp,new_listpack_bytes)) == NULL) return NULL;
|
||||||
dst = lp + poff;
|
dst = lp + poff;
|
||||||
}
|
}
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
#ifndef __LISTPACK_H
|
#ifndef __LISTPACK_H
|
||||||
#define __LISTPACK_H
|
#define __LISTPACK_H
|
||||||
|
|
||||||
|
#include <stdlib.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
#define LP_INTBUF_SIZE 21 /* 20 digits of -2^63 + 1 null term = 21. */
|
#define LP_INTBUF_SIZE 21 /* 20 digits of -2^63 + 1 null term = 21. */
|
||||||
@ -44,8 +45,9 @@
|
|||||||
#define LP_AFTER 1
|
#define LP_AFTER 1
|
||||||
#define LP_REPLACE 2
|
#define LP_REPLACE 2
|
||||||
|
|
||||||
unsigned char *lpNew(void);
|
unsigned char *lpNew(size_t capacity);
|
||||||
void lpFree(unsigned char *lp);
|
void lpFree(unsigned char *lp);
|
||||||
|
unsigned char* lpShrinkToFit(unsigned char *lp);
|
||||||
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp);
|
unsigned char *lpInsert(unsigned char *lp, unsigned char *ele, uint32_t size, unsigned char *p, int where, unsigned char **newp);
|
||||||
unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size);
|
unsigned char *lpAppend(unsigned char *lp, unsigned char *ele, uint32_t size);
|
||||||
unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp);
|
unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp);
|
||||||
|
@ -42,4 +42,5 @@
|
|||||||
#define lp_malloc zmalloc
|
#define lp_malloc zmalloc
|
||||||
#define lp_realloc zrealloc
|
#define lp_realloc zrealloc
|
||||||
#define lp_free zfree
|
#define lp_free zfree
|
||||||
|
#define lp_malloc_size zmalloc_usable_size
|
||||||
#endif
|
#endif
|
||||||
|
@ -43,6 +43,10 @@
|
|||||||
* avoid malloc allocation.*/
|
* avoid malloc allocation.*/
|
||||||
#define STREAMID_STATIC_VECTOR_LEN 8
|
#define STREAMID_STATIC_VECTOR_LEN 8
|
||||||
|
|
||||||
|
/* Max pre-allocation for listpack. This is done to avoid abuse of a user
|
||||||
|
* setting stream_node_max_bytes to a huge number. */
|
||||||
|
#define STREAM_LISTPACK_MAX_PRE_ALLOCATE 4096
|
||||||
|
|
||||||
void streamFreeCG(streamCG *cg);
|
void streamFreeCG(streamCG *cg);
|
||||||
void streamFreeNACK(streamNACK *na);
|
void streamFreeNACK(streamNACK *na);
|
||||||
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
|
size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer);
|
||||||
@ -509,7 +513,13 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
|
|||||||
lp = NULL;
|
lp = NULL;
|
||||||
} else if (server.stream_node_max_entries) {
|
} else if (server.stream_node_max_entries) {
|
||||||
int64_t count = lpGetInteger(lpFirst(lp));
|
int64_t count = lpGetInteger(lpFirst(lp));
|
||||||
if (count >= server.stream_node_max_entries) lp = NULL;
|
if (count >= server.stream_node_max_entries) {
|
||||||
|
/* Shrink extra pre-allocated memory */
|
||||||
|
lp = lpShrinkToFit(lp);
|
||||||
|
if (ri.data != lp)
|
||||||
|
raxInsert(s->rax,ri.key,ri.key_len,lp,NULL);
|
||||||
|
lp = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -517,8 +527,17 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
|
|||||||
if (lp == NULL) {
|
if (lp == NULL) {
|
||||||
master_id = id;
|
master_id = id;
|
||||||
streamEncodeID(rax_key,&id);
|
streamEncodeID(rax_key,&id);
|
||||||
/* Create the listpack having the master entry ID and fields. */
|
/* Create the listpack having the master entry ID and fields.
|
||||||
lp = lpNew();
|
* Pre-allocate some bytes when creating listpack to avoid realloc on
|
||||||
|
* every XADD. Since listpack.c uses malloc_size, it'll grow in steps,
|
||||||
|
* and won't realloc on every XADD.
|
||||||
|
* When listpack reaches max number of entries, we'll shrink the
|
||||||
|
* allocation to fit the data. */
|
||||||
|
size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE;
|
||||||
|
if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) {
|
||||||
|
prealloc = server.stream_node_max_bytes;
|
||||||
|
}
|
||||||
|
lp = lpNew(prealloc);
|
||||||
lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
|
lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
|
||||||
lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
|
lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
|
||||||
lp = lpAppendInteger(lp,numfields);
|
lp = lpAppendInteger(lp,numfields);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user