
Improve memory efficiency of list keys ## Description of the feature The new listpack encoding uses the old `list-max-listpack-size` config to perform the conversion, which we can think it of as a node inside a quicklist, but without 80 bytes overhead (internal fragmentation included) of quicklist and quicklistNode structs. For example, a list key with 5 items of 10 chars each, now takes 128 bytes instead of 208 it used to take. ## Conversion rules * Convert listpack to quicklist When the listpack length or size reaches the `list-max-listpack-size` limit, it will be converted to a quicklist. * Convert quicklist to listpack When a quicklist has only one node, and its length or size is reduced to half of the `list-max-listpack-size` limit, it will be converted to a listpack. This is done to avoid frequent conversions when we add or remove at the bounding size or length. ## Interface changes 1. add list entry param to listTypeSetIteratorDirection When list encoding is listpack, `listTypeIterator->lpi` points to the next entry of current entry, so when changing the direction, we need to use the current node (listTypeEntry->p) to update `listTypeIterator->lpi` to the next node in the reverse direction. ## Benchmark ### Listpack VS Quicklist with one node * LPUSH - roughly 0.3% improvement * LRANGE - roughly 13% improvement ### Both are quicklist * LRANGE - roughly 3% improvement * LRANGE without pipeline - roughly 3% improvement From the benchmark, as we can see from the results 1. When list is quicklist encoding, LRANGE improves performance by <5%. 2. When list is listpack encoding, LRANGE improves performance by ~13%, the main enhancement is brought by `addListListpackRangeReply()`. ## Memory usage 1M lists(key:0~key:1000000) with 5 items of 10 chars ("hellohello") each. shows memory usage down by 35.49%, from 214MB to 138MB. ## Note 1. Add conversion callback to support doing some work before conversion Since the quicklist iterator decompresses the current node when it is released, we can no longer decompress the quicklist after we convert the list.
228 lines
8.2 KiB
C
228 lines
8.2 KiB
C
#include "server.h"
|
|
#include "bio.h"
|
|
#include "atomicvar.h"
|
|
#include "functions.h"
|
|
|
|
static redisAtomic size_t lazyfree_objects = 0;
|
|
static redisAtomic size_t lazyfreed_objects = 0;
|
|
|
|
/* Release objects from the lazyfree thread. It's just decrRefCount()
|
|
* updating the count of objects to release. */
|
|
void lazyfreeFreeObject(void *args[]) {
|
|
robj *o = (robj *) args[0];
|
|
decrRefCount(o);
|
|
atomicDecr(lazyfree_objects,1);
|
|
atomicIncr(lazyfreed_objects,1);
|
|
}
|
|
|
|
/* Release a database from the lazyfree thread. The 'db' pointer is the
|
|
* database which was substituted with a fresh one in the main thread
|
|
* when the database was logically deleted. */
|
|
void lazyfreeFreeDatabase(void *args[]) {
|
|
dict *ht1 = (dict *) args[0];
|
|
dict *ht2 = (dict *) args[1];
|
|
|
|
size_t numkeys = dictSize(ht1);
|
|
dictRelease(ht1);
|
|
dictRelease(ht2);
|
|
atomicDecr(lazyfree_objects,numkeys);
|
|
atomicIncr(lazyfreed_objects,numkeys);
|
|
}
|
|
|
|
/* Release the key tracking table. */
|
|
void lazyFreeTrackingTable(void *args[]) {
|
|
rax *rt = args[0];
|
|
size_t len = rt->numele;
|
|
freeTrackingRadixTree(rt);
|
|
atomicDecr(lazyfree_objects,len);
|
|
atomicIncr(lazyfreed_objects,len);
|
|
}
|
|
|
|
/* Release the lua_scripts dict. */
|
|
void lazyFreeLuaScripts(void *args[]) {
|
|
dict *lua_scripts = args[0];
|
|
long long len = dictSize(lua_scripts);
|
|
dictRelease(lua_scripts);
|
|
atomicDecr(lazyfree_objects,len);
|
|
atomicIncr(lazyfreed_objects,len);
|
|
}
|
|
|
|
/* Release the functions ctx. */
|
|
void lazyFreeFunctionsCtx(void *args[]) {
|
|
functionsLibCtx *functions_lib_ctx = args[0];
|
|
size_t len = functionsLibCtxfunctionsLen(functions_lib_ctx);
|
|
functionsLibCtxFree(functions_lib_ctx);
|
|
atomicDecr(lazyfree_objects,len);
|
|
atomicIncr(lazyfreed_objects,len);
|
|
}
|
|
|
|
/* Release replication backlog referencing memory. */
|
|
void lazyFreeReplicationBacklogRefMem(void *args[]) {
|
|
list *blocks = args[0];
|
|
rax *index = args[1];
|
|
long long len = listLength(blocks);
|
|
len += raxSize(index);
|
|
listRelease(blocks);
|
|
raxFree(index);
|
|
atomicDecr(lazyfree_objects,len);
|
|
atomicIncr(lazyfreed_objects,len);
|
|
}
|
|
|
|
/* Return the number of currently pending objects to free. */
|
|
size_t lazyfreeGetPendingObjectsCount(void) {
|
|
size_t aux;
|
|
atomicGet(lazyfree_objects,aux);
|
|
return aux;
|
|
}
|
|
|
|
/* Return the number of objects that have been freed. */
|
|
size_t lazyfreeGetFreedObjectsCount(void) {
|
|
size_t aux;
|
|
atomicGet(lazyfreed_objects,aux);
|
|
return aux;
|
|
}
|
|
|
|
void lazyfreeResetStats() {
|
|
atomicSet(lazyfreed_objects,0);
|
|
}
|
|
|
|
/* Return the amount of work needed in order to free an object.
|
|
* The return value is not always the actual number of allocations the
|
|
* object is composed of, but a number proportional to it.
|
|
*
|
|
* For strings the function always returns 1.
|
|
*
|
|
* For aggregated objects represented by hash tables or other data structures
|
|
* the function just returns the number of elements the object is composed of.
|
|
*
|
|
* Objects composed of single allocations are always reported as having a
|
|
* single item even if they are actually logical composed of multiple
|
|
* elements.
|
|
*
|
|
* For lists the function returns the number of elements in the quicklist
|
|
* representing the list. */
|
|
size_t lazyfreeGetFreeEffort(robj *key, robj *obj, int dbid) {
|
|
if (obj->type == OBJ_LIST && obj->encoding == OBJ_ENCODING_QUICKLIST) {
|
|
quicklist *ql = obj->ptr;
|
|
return ql->len;
|
|
} else if (obj->type == OBJ_SET && obj->encoding == OBJ_ENCODING_HT) {
|
|
dict *ht = obj->ptr;
|
|
return dictSize(ht);
|
|
} else if (obj->type == OBJ_ZSET && obj->encoding == OBJ_ENCODING_SKIPLIST){
|
|
zset *zs = obj->ptr;
|
|
return zs->zsl->length;
|
|
} else if (obj->type == OBJ_HASH && obj->encoding == OBJ_ENCODING_HT) {
|
|
dict *ht = obj->ptr;
|
|
return dictSize(ht);
|
|
} else if (obj->type == OBJ_STREAM) {
|
|
size_t effort = 0;
|
|
stream *s = obj->ptr;
|
|
|
|
/* Make a best effort estimate to maintain constant runtime. Every macro
|
|
* node in the Stream is one allocation. */
|
|
effort += s->rax->numnodes;
|
|
|
|
/* Every consumer group is an allocation and so are the entries in its
|
|
* PEL. We use size of the first group's PEL as an estimate for all
|
|
* others. */
|
|
if (s->cgroups && raxSize(s->cgroups)) {
|
|
raxIterator ri;
|
|
streamCG *cg;
|
|
raxStart(&ri,s->cgroups);
|
|
raxSeek(&ri,"^",NULL,0);
|
|
/* There must be at least one group so the following should always
|
|
* work. */
|
|
serverAssert(raxNext(&ri));
|
|
cg = ri.data;
|
|
effort += raxSize(s->cgroups)*(1+raxSize(cg->pel));
|
|
raxStop(&ri);
|
|
}
|
|
return effort;
|
|
} else if (obj->type == OBJ_MODULE) {
|
|
size_t effort = moduleGetFreeEffort(key, obj, dbid);
|
|
/* If the module's free_effort returns 0, we will use asynchronous free
|
|
* memory by default. */
|
|
return effort == 0 ? ULONG_MAX : effort;
|
|
} else {
|
|
return 1; /* Everything else is a single allocation. */
|
|
}
|
|
}
|
|
|
|
/* If there are enough allocations to free the value object asynchronously, it
|
|
* may be put into a lazy free list instead of being freed synchronously. The
|
|
* lazy free list will be reclaimed in a different bio.c thread. If the value is
|
|
* composed of a few allocations, to free in a lazy way is actually just
|
|
* slower... So under a certain limit we just free the object synchronously. */
|
|
#define LAZYFREE_THRESHOLD 64
|
|
|
|
/* Free an object, if the object is huge enough, free it in async way. */
|
|
void freeObjAsync(robj *key, robj *obj, int dbid) {
|
|
size_t free_effort = lazyfreeGetFreeEffort(key,obj,dbid);
|
|
/* Note that if the object is shared, to reclaim it now it is not
|
|
* possible. This rarely happens, however sometimes the implementation
|
|
* of parts of the Redis core may call incrRefCount() to protect
|
|
* objects, and then call dbDelete(). */
|
|
if (free_effort > LAZYFREE_THRESHOLD && obj->refcount == 1) {
|
|
atomicIncr(lazyfree_objects,1);
|
|
bioCreateLazyFreeJob(lazyfreeFreeObject,1,obj);
|
|
} else {
|
|
decrRefCount(obj);
|
|
}
|
|
}
|
|
|
|
/* Empty a Redis DB asynchronously. What the function does actually is to
|
|
* create a new empty set of hash tables and scheduling the old ones for
|
|
* lazy freeing. */
|
|
void emptyDbAsync(redisDb *db) {
|
|
dict *oldht1 = db->dict, *oldht2 = db->expires;
|
|
db->dict = dictCreate(&dbDictType);
|
|
db->expires = dictCreate(&dbExpiresDictType);
|
|
atomicIncr(lazyfree_objects,dictSize(oldht1));
|
|
bioCreateLazyFreeJob(lazyfreeFreeDatabase,2,oldht1,oldht2);
|
|
}
|
|
|
|
/* Free the key tracking table.
|
|
* If the table is huge enough, free it in async way. */
|
|
void freeTrackingRadixTreeAsync(rax *tracking) {
|
|
/* Because this rax has only keys and no values so we use numnodes. */
|
|
if (tracking->numnodes > LAZYFREE_THRESHOLD) {
|
|
atomicIncr(lazyfree_objects,tracking->numele);
|
|
bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
|
|
} else {
|
|
freeTrackingRadixTree(tracking);
|
|
}
|
|
}
|
|
|
|
/* Free lua_scripts dict, if the dict is huge enough, free it in async way. */
|
|
void freeLuaScriptsAsync(dict *lua_scripts) {
|
|
if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) {
|
|
atomicIncr(lazyfree_objects,dictSize(lua_scripts));
|
|
bioCreateLazyFreeJob(lazyFreeLuaScripts,1,lua_scripts);
|
|
} else {
|
|
dictRelease(lua_scripts);
|
|
}
|
|
}
|
|
|
|
/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
|
|
void freeFunctionsAsync(functionsLibCtx *functions_lib_ctx) {
|
|
if (functionsLibCtxfunctionsLen(functions_lib_ctx) > LAZYFREE_THRESHOLD) {
|
|
atomicIncr(lazyfree_objects,functionsLibCtxfunctionsLen(functions_lib_ctx));
|
|
bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,functions_lib_ctx);
|
|
} else {
|
|
functionsLibCtxFree(functions_lib_ctx);
|
|
}
|
|
}
|
|
|
|
/* Free replication backlog referencing buffer blocks and rax index. */
|
|
void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
|
|
if (listLength(blocks) > LAZYFREE_THRESHOLD ||
|
|
raxSize(index) > LAZYFREE_THRESHOLD)
|
|
{
|
|
atomicIncr(lazyfree_objects,listLength(blocks)+raxSize(index));
|
|
bioCreateLazyFreeJob(lazyFreeReplicationBacklogRefMem,2,blocks,index);
|
|
} else {
|
|
listRelease(blocks);
|
|
raxFree(index);
|
|
}
|
|
}
|