Fix SORT STORE quicklist with the right options (#13042)

We forgot to call quicklistSetOptions after createQuicklistObject,
in the sort store scenario, we will create a quicklist with default
fill or compress options.

This PR adds fill and depth parameters to createQuicklistObject to
specify that options need to be set after creating a quicklist.

This closes #12871.

release notes:
> Fix lists created by SORT STORE to respect list compression and
packing configs.
This commit is contained in:
Binbin 2024-02-08 20:36:11 +08:00 committed by GitHub
parent 1e8dc1da0d
commit 813327b231
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 24 additions and 18 deletions

View File

@ -232,8 +232,8 @@ robj *dupStringObject(const robj *o) {
} }
} }
robj *createQuicklistObject(void) { robj *createQuicklistObject(int fill, int compress) {
quicklist *l = quicklistCreate(); quicklist *l = quicklistNew(fill, compress);
robj *o = createObject(OBJ_LIST,l); robj *o = createObject(OBJ_LIST,l);
o->encoding = OBJ_ENCODING_QUICKLIST; o->encoding = OBJ_ENCODING_QUICKLIST;
return o; return o;

View File

@ -161,9 +161,9 @@ void quicklistSetFill(quicklist *quicklist, int fill) {
quicklist->fill = fill; quicklist->fill = fill;
} }
void quicklistSetOptions(quicklist *quicklist, int fill, int depth) { void quicklistSetOptions(quicklist *quicklist, int fill, int compress) {
quicklistSetFill(quicklist, fill); quicklistSetFill(quicklist, fill);
quicklistSetCompressDepth(quicklist, depth); quicklistSetCompressDepth(quicklist, compress);
} }
/* Create a new quicklist with some default parameters. */ /* Create a new quicklist with some default parameters. */

View File

@ -155,9 +155,9 @@ typedef struct quicklistEntry {
/* Prototypes */ /* Prototypes */
quicklist *quicklistCreate(void); quicklist *quicklistCreate(void);
quicklist *quicklistNew(int fill, int compress); quicklist *quicklistNew(int fill, int compress);
void quicklistSetCompressDepth(quicklist *quicklist, int depth); void quicklistSetCompressDepth(quicklist *quicklist, int compress);
void quicklistSetFill(quicklist *quicklist, int fill); void quicklistSetFill(quicklist *quicklist, int fill);
void quicklistSetOptions(quicklist *quicklist, int fill, int depth); void quicklistSetOptions(quicklist *quicklist, int fill, int compress);
void quicklistRelease(quicklist *quicklist); void quicklistRelease(quicklist *quicklist);
int quicklistPushHead(quicklist *quicklist, void *value, const size_t sz); int quicklistPushHead(quicklist *quicklist, void *value, const size_t sz);
int quicklistPushTail(quicklist *quicklist, void *value, const size_t sz); int quicklistPushTail(quicklist *quicklist, void *value, const size_t sz);

View File

@ -1862,9 +1862,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if (len == 0) goto emptykey; if (len == 0) goto emptykey;
o = createQuicklistObject(); o = createQuicklistObject(server.list_max_listpack_size, server.list_compress_depth);
quicklistSetOptions(o->ptr, server.list_max_listpack_size,
server.list_compress_depth);
/* Load every single element of the list */ /* Load every single element of the list */
while(len--) { while(len--) {
@ -2174,9 +2172,7 @@ robj *rdbLoadObject(int rdbtype, rio *rdb, sds key, int dbid, int *error) {
if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL; if ((len = rdbLoadLen(rdb,NULL)) == RDB_LENERR) return NULL;
if (len == 0) goto emptykey; if (len == 0) goto emptykey;
o = createQuicklistObject(); o = createQuicklistObject(server.list_max_listpack_size, server.list_compress_depth);
quicklistSetOptions(o->ptr, server.list_max_listpack_size,
server.list_compress_depth);
uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED; uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED;
while (len--) { while (len--) {
unsigned char *lp; unsigned char *lp;

View File

@ -2771,7 +2771,7 @@ robj *createStringObjectFromLongLong(long long value);
robj *createStringObjectFromLongLongForValue(long long value); robj *createStringObjectFromLongLongForValue(long long value);
robj *createStringObjectFromLongLongWithSds(long long value); robj *createStringObjectFromLongLongWithSds(long long value);
robj *createStringObjectFromLongDouble(long double value, int humanfriendly); robj *createStringObjectFromLongDouble(long double value, int humanfriendly);
robj *createQuicklistObject(void); robj *createQuicklistObject(int fill, int compress);
robj *createListListpackObject(void); robj *createListListpackObject(void);
robj *createSetObject(void); robj *createSetObject(void);
robj *createIntsetObject(void); robj *createIntsetObject(void);

View File

@ -304,8 +304,7 @@ void sortCommandGeneric(client *c, int readonly) {
if (sortval) if (sortval)
incrRefCount(sortval); incrRefCount(sortval);
else else
sortval = createQuicklistObject(); sortval = createQuicklistObject(server.list_max_listpack_size, server.list_compress_depth);
/* When sorting a set with no sort specified, we must sort the output /* When sorting a set with no sort specified, we must sort the output
* so the result is consistent across scripting and replication. * so the result is consistent across scripting and replication.
@ -557,7 +556,7 @@ void sortCommandGeneric(client *c, int readonly) {
} else { } else {
/* We can't predict the size and encoding of the stored list, we /* We can't predict the size and encoding of the stored list, we
* assume it's a large list and then convert it at the end if needed. */ * assume it's a large list and then convert it at the end if needed. */
robj *sobj = createQuicklistObject(); robj *sobj = createQuicklistObject(server.list_max_listpack_size, server.list_compress_depth);
/* STORE option specified, set the sorting result as a List object */ /* STORE option specified, set the sorting result as a List object */
for (j = start; j <= end; j++) { for (j = start; j <= end; j++) {

View File

@ -62,8 +62,7 @@ static void listTypeTryConvertListpack(robj *o, robj **argv, int start, int end,
/* Invoke callback before conversion. */ /* Invoke callback before conversion. */
if (fn) fn(data); if (fn) fn(data);
quicklist *ql = quicklistCreate(); quicklist *ql = quicklistNew(server.list_max_listpack_size, server.list_compress_depth);
quicklistSetOptions(ql, server.list_max_listpack_size, server.list_compress_depth);
/* Append listpack to quicklist if it's not empty, otherwise release it. */ /* Append listpack to quicklist if it's not empty, otherwise release it. */
if (lpLength(o->ptr)) if (lpLength(o->ptr))

View File

@ -356,6 +356,18 @@ foreach command {SORT SORT_RO} {
} }
} }
} }
test {SORT STORE quicklist with the right options} {
set origin_config [config_get_set list-max-listpack-size -1]
r del lst{t} lst_dst{t}
r config set list-max-listpack-size -1
r config set list-compress-depth 12
r lpush lst{t} {*}[split [string repeat "1" 6000] ""]
r sort lst{t} store lst_dst{t}
assert_encoding quicklist lst_dst{t}
assert_match "*ql_listpack_max:-1 ql_compressed:1*" [r debug object lst_dst{t}]
config_set list-max-listpack-size $origin_config
} {} {needs:debug}
} }
start_cluster 1 0 {tags {"external:skip cluster sort"}} { start_cluster 1 0 {tags {"external:skip cluster sort"}} {