diff --git a/src/quicklist.c b/src/quicklist.c index 301a2166e..f8cb62e23 100644 --- a/src/quicklist.c +++ b/src/quicklist.c @@ -104,6 +104,9 @@ quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name) quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node); void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm); +quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset, int after); +void _quicklistMergeNodes(quicklist *quicklist, quicklistNode *center); + /* Simple way to give quicklistEntry structs default values with one call. */ #define initEntry(e) \ do { \ @@ -529,19 +532,25 @@ REDIS_STATIC int _quicklistNodeAllowMerge(const quicklistNode *a, (node)->sz = lpBytes((node)->entry); \ } while (0) -static quicklistNode* __quicklistCreatePlainNode(void *value, size_t sz) { +static quicklistNode* __quicklistCreateNode(int container, void *value, size_t sz) { quicklistNode *new_node = quicklistCreateNode(); - new_node->entry = zmalloc(sz); - new_node->container = QUICKLIST_NODE_CONTAINER_PLAIN; - memcpy(new_node->entry, value, sz); + new_node->container = container; + if (container == QUICKLIST_NODE_CONTAINER_PLAIN) { + new_node->entry = zmalloc(sz); + memcpy(new_node->entry, value, sz); + } else { + new_node->entry = lpPrepend(lpNew(0), value, sz); + } new_node->sz = sz; new_node->count++; return new_node; } static void __quicklistInsertPlainNode(quicklist *quicklist, quicklistNode *old_node, - void *value, size_t sz, int after) { - __quicklistInsertNode(quicklist, old_node, __quicklistCreatePlainNode(value, sz), after); + void *value, size_t sz, int after) +{ + quicklistNode *new_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz); + __quicklistInsertNode(quicklist, old_node, new_node, after); quicklist->count++; } @@ -741,9 +750,13 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry, void *data, size_t sz) { quicklist* quicklist = iter->quicklist; + quicklistNode *node = entry->node; + unsigned char *newentry; - if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz))) { - entry->node->entry = lpReplace(entry->node->entry, &entry->zi, data, sz); + if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz) && + (newentry = lpReplace(entry->node->entry, &entry->zi, data, sz)) != NULL)) + { + entry->node->entry = newentry; quicklistNodeUpdateSz(entry->node); /* quicklistNext() and quicklistGetIteratorEntryAtIdx() provide an uncompressed node */ quicklistCompress(quicklist, entry->node); @@ -758,15 +771,30 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry, quicklistInsertAfter(iter, entry, data, sz); __quicklistDelNode(quicklist, entry->node); } - } else { - entry->node->dont_compress = 1; /* Prevent compression in quicklistInsertAfter() */ - quicklistInsertAfter(iter, entry, data, sz); + } else { /* The node is full or data is a large element */ + quicklistNode *split_node = NULL, *new_node; + node->dont_compress = 1; /* Prevent compression in __quicklistInsertNode() */ + + /* If the entry is not at the tail, split the node at the entry's offset. */ + if (entry->offset != node->count - 1 && entry->offset != -1) + split_node = _quicklistSplitNode(node, entry->offset, 1); + + /* Create a new node and insert it after the original node. + * If the original node was split, insert the split node after the new node. */ + new_node = __quicklistCreateNode(isLargeElement(sz) ? + QUICKLIST_NODE_CONTAINER_PLAIN : QUICKLIST_NODE_CONTAINER_PACKED, data, sz); + __quicklistInsertNode(quicklist, node, new_node, 1); + if (split_node) __quicklistInsertNode(quicklist, new_node, split_node, 1); + quicklist->count++; + + /* Delete the replaced element. */ if (entry->node->count == 1) { __quicklistDelNode(quicklist, entry->node); } else { unsigned char *p = lpSeek(entry->node->entry, -1); quicklistDelIndex(quicklist, entry->node, &p); entry->node->dont_compress = 0; /* Re-enable compression */ + _quicklistMergeNodes(quicklist, entry->node); quicklistCompress(quicklist, entry->node); quicklistCompress(quicklist, entry->node->next); } @@ -1002,7 +1030,7 @@ REDIS_STATIC void _quicklistInsert(quicklistIter *iter, quicklistEntry *entry, } else { quicklistDecompressNodeForUse(node); new_node = _quicklistSplitNode(node, entry->offset, after); - quicklistNode *entry_node = __quicklistCreatePlainNode(value, sz); + quicklistNode *entry_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz); __quicklistInsertNode(quicklist, node, entry_node, after); __quicklistInsertNode(quicklist, entry_node, new_node, after); quicklist->count++; @@ -3224,7 +3252,7 @@ int quicklistTest(int argc, char *argv[], int flags) { memcpy(s, "helloworld", 10); memcpy(s + sz - 10, "1234567890", 10); - quicklistNode *node = __quicklistCreatePlainNode(s, sz); + quicklistNode *node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, s, sz); /* Just to avoid triggering the assertion in __quicklistCompressNode(), * it disables the passing of quicklist head or tail node. */ diff --git a/tests/unit/type/list.tcl b/tests/unit/type/list.tcl index 0dece6cb7..75c8c8259 100644 --- a/tests/unit/type/list.tcl +++ b/tests/unit/type/list.tcl @@ -220,6 +220,7 @@ start_server [list overrides [list save ""] ] { # checking LSET in case ziplist needs to be split test {Test LSET with packed is split in the middle} { + set original_config [config_get_set list-max-listpack-size 4] r flushdb r debug quicklist-packed-threshold 5b r RPUSH lst "aa" @@ -227,6 +228,7 @@ start_server [list overrides [list save ""] ] { r RPUSH lst "cc" r RPUSH lst "dd" r RPUSH lst "ee" + assert_encoding quicklist lst r lset lst 2 [string repeat e 10] assert_equal [r lpop lst] "aa" assert_equal [r lpop lst] "bb" @@ -234,6 +236,7 @@ start_server [list overrides [list save ""] ] { assert_equal [r lpop lst] "dd" assert_equal [r lpop lst] "ee" r debug quicklist-packed-threshold 0 + r config set list-max-listpack-size $original_config } {OK} {needs:debug} @@ -381,6 +384,16 @@ if {[lindex [r config get proto-max-bulk-len] 1] == 10000000000} { assert_equal [read_big_bulk {r rpop lst}] $str_length } {} {large-memory} + test {Test LSET on plain nodes with large elements under packed_threshold over 4GB} { + r flushdb + r rpush lst a b c d e + for {set i 0} {$i < 5} {incr i} { + r write "*4\r\n\$4\r\nlset\r\n\$3\r\nlst\r\n\$1\r\n$i\r\n" + write_big_bulk 1000000000 + } + r ping + } {PONG} {large-memory} + test {Test LMOVE on plain nodes over 4GB} { r flushdb r RPUSH lst2{t} "aa"