diff --git a/src/t_zset.c b/src/t_zset.c index 86d241883..7c60a45bd 100644 --- a/src/t_zset.c +++ b/src/t_zset.c @@ -66,6 +66,7 @@ int zslLexValueGteMin(sds value, zlexrangespec *spec); int zslLexValueLteMax(sds value, zlexrangespec *spec); +void zsetConvertAndExpand(robj *zobj, int encoding, unsigned long cap); /* Create a skiplist node with the specified number of levels. * The SDS string 'ele' is referenced by the node after the call. */ @@ -1165,7 +1166,45 @@ unsigned long zsetLength(const robj *zobj) { return length; } +/* Factory method to return a zset. + * + * The size hint indicates approximately how many items will be added, + * and the value len hint indicates the approximate individual size of the added elements, + * they are used to determine the initial representation. + * + * If the hints are not known, and underestimation or 0 is suitable. */ +robj *zsetTypeCreate(size_t size_hint, size_t val_len_hint) { + if (size_hint <= server.zset_max_listpack_entries && + val_len_hint <= server.zset_max_listpack_value) + { + return createZsetListpackObject(); + } + + robj *zobj = createZsetObject(); + zset *zs = zobj->ptr; + dictExpand(zs->dict, size_hint); + return zobj; +} + +/* Check if the existing zset should be converted to another encoding based off the + * the size hint. */ +void zsetTypeMaybeConvert(robj *zobj, size_t size_hint) { + if (zobj->encoding == OBJ_ENCODING_LISTPACK && + size_hint > server.zset_max_listpack_entries) + { + zsetConvertAndExpand(zobj, OBJ_ENCODING_SKIPLIST, size_hint); + } +} + +/* Convert the zset to specified encoding. The zset dict (when converting + * to a skiplist) is presized to hold the number of elements in the original + * zset. */ void zsetConvert(robj *zobj, int encoding) { + zsetConvertAndExpand(zobj, encoding, zsetLength(zobj)); +} + +/* Converts a zset to the specified encoding, pre-sizing it for 'cap' elements. */ +void zsetConvertAndExpand(robj *zobj, int encoding, unsigned long cap) { zset *zs; zskiplistNode *node, *next; sds ele; @@ -1186,6 +1225,9 @@ void zsetConvert(robj *zobj, int encoding) { zs->dict = dictCreate(&zsetDictType); zs->zsl = zslCreate(); + /* Presize the dict to avoid rehashing */ + dictExpand(zs->dict, cap); + eptr = lpSeek(zl,0); if (eptr != NULL) { sptr = lpNext(zl,eptr); @@ -1375,7 +1417,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, dou sdslen(ele) > server.zset_max_listpack_value || !lpSafeToAdd(zobj->ptr, sdslen(ele))) { - zsetConvert(zobj,OBJ_ENCODING_SKIPLIST); + zsetConvertAndExpand(zobj, OBJ_ENCODING_SKIPLIST, zsetLength(zobj) + 1); } else { zobj->ptr = zzlInsert(zobj->ptr,ele,score); if (newscore) *newscore = score; @@ -1749,14 +1791,10 @@ void zaddGenericCommand(client *c, int flags) { if (checkType(c,zobj,OBJ_ZSET)) goto cleanup; if (zobj == NULL) { if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */ - if (server.zset_max_listpack_entries == 0 || - server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr)) - { - zobj = createZsetObject(); - } else { - zobj = createZsetListpackObject(); - } + zobj = zsetTypeCreate(elements, sdslen(c->argv[scoreidx+1]->ptr)); dbAdd(c->db,key,zobj); + } else { + zsetTypeMaybeConvert(zobj, elements); } for (j = 0; j < elements; j++) { @@ -2955,10 +2993,7 @@ static void zrangeResultFinalizeClient(zrange_result_handler *handler, /* Result handler methods for storing the ZRANGESTORE to a zset. */ static void zrangeResultBeginStore(zrange_result_handler *handler, long length) { - if (length > (long)server.zset_max_listpack_entries) - handler->dstobj = createZsetObject(); - else - handler->dstobj = createZsetListpackObject(); + handler->dstobj = zsetTypeCreate(length, 0); } static void zrangeResultEmitCBufferForStore(zrange_result_handler *handler, diff --git a/tests/unit/type/zset.tcl b/tests/unit/type/zset.tcl index d1da02778..f3630aeb2 100644 --- a/tests/unit/type/zset.tcl +++ b/tests/unit/type/zset.tcl @@ -2285,7 +2285,9 @@ start_server {tags {"zset"}} { r config set zset-max-listpack-entries 0 r del z1{t} z2{t} r zadd z1{t} 1 a + assert_encoding skiplist z1{t} assert_equal 1 [r zrangestore z2{t} z1{t} 0 -1] + assert_encoding skiplist z2{t} r config set zset-max-listpack-entries $original_max } @@ -2616,4 +2618,37 @@ start_server {tags {"zset"}} { r config set set-max-listpack-entries 128 r config set zset-max-listpack-entries 128 } + + foreach type {single multiple single_multiple} { + test "ZADD overflows the maximum allowed elements in a listpack - $type" { + r del myzset + + set max_entries 64 + set original_max [lindex [r config get zset-max-listpack-entries] 1] + r config set zset-max-listpack-entries $max_entries + + if {$type == "single"} { + # All are single zadd commands. + for {set i 0} {$i < $max_entries} {incr i} { r zadd myzset $i $i } + } elseif {$type == "multiple"} { + # One zadd command to add all elements. + set args {} + for {set i 0} {$i < $max_entries * 2} {incr i} { lappend args $i } + r zadd myzset {*}$args + } elseif {$type == "single_multiple"} { + # First one zadd adds an element (creates a key) and then one zadd adds all elements. + r zadd myzset 1 1 + set args {} + for {set i 0} {$i < $max_entries * 2} {incr i} { lappend args $i } + r zadd myzset {*}$args + } + + assert_encoding listpack myzset + assert_equal $max_entries [r zcard myzset] + assert_equal 1 [r zadd myzset 1 b] + assert_encoding skiplist myzset + + r config set zset-max-listpack-entries $original_max + } + } }