Prevent LSET command from causing quicklist plain node size to exceed 4GB (#12955)
Fix #12864 The main reason for this crash is that when replacing a element of a quicklist packed node with lpReplace() method, if the final size is larger than 4GB, lpReplace() will fail and returns NULL, causing `node->entry` to be incorrectly set to NULL. Since the inserted data is not a large element, we can't just replace it like a large element, first quicklistInsertAfter() and then quicklistDelIndex(), because the current node may be merged and invalidated in quicklistInsertAfter(). The solution of this PR: When replacing a node fails (listpack exceeds 4GB), split the current node, create a new node to put in the middle, and try to merge them. This is the same as inserting a large element. In the worst case, its size will not exceed 4GB. Signed-off-by: Ping Xie <pingxie@google.com>
This commit is contained in:
parent
e4b88bb10f
commit
b685fadb77
@ -104,6 +104,9 @@ quicklistBookmark *_quicklistBookmarkFindByName(quicklist *ql, const char *name)
|
||||
quicklistBookmark *_quicklistBookmarkFindByNode(quicklist *ql, quicklistNode *node);
|
||||
void _quicklistBookmarkDelete(quicklist *ql, quicklistBookmark *bm);
|
||||
|
||||
quicklistNode *_quicklistSplitNode(quicklistNode *node, int offset, int after);
|
||||
void _quicklistMergeNodes(quicklist *quicklist, quicklistNode *center);
|
||||
|
||||
/* Simple way to give quicklistEntry structs default values with one call. */
|
||||
#define initEntry(e) \
|
||||
do { \
|
||||
@ -529,19 +532,25 @@ REDIS_STATIC int _quicklistNodeAllowMerge(const quicklistNode *a,
|
||||
(node)->sz = lpBytes((node)->entry); \
|
||||
} while (0)
|
||||
|
||||
static quicklistNode* __quicklistCreatePlainNode(void *value, size_t sz) {
|
||||
static quicklistNode* __quicklistCreateNode(int container, void *value, size_t sz) {
|
||||
quicklistNode *new_node = quicklistCreateNode();
|
||||
new_node->entry = zmalloc(sz);
|
||||
new_node->container = QUICKLIST_NODE_CONTAINER_PLAIN;
|
||||
memcpy(new_node->entry, value, sz);
|
||||
new_node->container = container;
|
||||
if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
|
||||
new_node->entry = zmalloc(sz);
|
||||
memcpy(new_node->entry, value, sz);
|
||||
} else {
|
||||
new_node->entry = lpPrepend(lpNew(0), value, sz);
|
||||
}
|
||||
new_node->sz = sz;
|
||||
new_node->count++;
|
||||
return new_node;
|
||||
}
|
||||
|
||||
static void __quicklistInsertPlainNode(quicklist *quicklist, quicklistNode *old_node,
|
||||
void *value, size_t sz, int after) {
|
||||
__quicklistInsertNode(quicklist, old_node, __quicklistCreatePlainNode(value, sz), after);
|
||||
void *value, size_t sz, int after)
|
||||
{
|
||||
quicklistNode *new_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz);
|
||||
__quicklistInsertNode(quicklist, old_node, new_node, after);
|
||||
quicklist->count++;
|
||||
}
|
||||
|
||||
@ -741,9 +750,13 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
|
||||
void *data, size_t sz)
|
||||
{
|
||||
quicklist* quicklist = iter->quicklist;
|
||||
quicklistNode *node = entry->node;
|
||||
unsigned char *newentry;
|
||||
|
||||
if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz))) {
|
||||
entry->node->entry = lpReplace(entry->node->entry, &entry->zi, data, sz);
|
||||
if (likely(!QL_NODE_IS_PLAIN(entry->node) && !isLargeElement(sz) &&
|
||||
(newentry = lpReplace(entry->node->entry, &entry->zi, data, sz)) != NULL))
|
||||
{
|
||||
entry->node->entry = newentry;
|
||||
quicklistNodeUpdateSz(entry->node);
|
||||
/* quicklistNext() and quicklistGetIteratorEntryAtIdx() provide an uncompressed node */
|
||||
quicklistCompress(quicklist, entry->node);
|
||||
@ -758,15 +771,30 @@ void quicklistReplaceEntry(quicklistIter *iter, quicklistEntry *entry,
|
||||
quicklistInsertAfter(iter, entry, data, sz);
|
||||
__quicklistDelNode(quicklist, entry->node);
|
||||
}
|
||||
} else {
|
||||
entry->node->dont_compress = 1; /* Prevent compression in quicklistInsertAfter() */
|
||||
quicklistInsertAfter(iter, entry, data, sz);
|
||||
} else { /* The node is full or data is a large element */
|
||||
quicklistNode *split_node = NULL, *new_node;
|
||||
node->dont_compress = 1; /* Prevent compression in __quicklistInsertNode() */
|
||||
|
||||
/* If the entry is not at the tail, split the node at the entry's offset. */
|
||||
if (entry->offset != node->count - 1 && entry->offset != -1)
|
||||
split_node = _quicklistSplitNode(node, entry->offset, 1);
|
||||
|
||||
/* Create a new node and insert it after the original node.
|
||||
* If the original node was split, insert the split node after the new node. */
|
||||
new_node = __quicklistCreateNode(isLargeElement(sz) ?
|
||||
QUICKLIST_NODE_CONTAINER_PLAIN : QUICKLIST_NODE_CONTAINER_PACKED, data, sz);
|
||||
__quicklistInsertNode(quicklist, node, new_node, 1);
|
||||
if (split_node) __quicklistInsertNode(quicklist, new_node, split_node, 1);
|
||||
quicklist->count++;
|
||||
|
||||
/* Delete the replaced element. */
|
||||
if (entry->node->count == 1) {
|
||||
__quicklistDelNode(quicklist, entry->node);
|
||||
} else {
|
||||
unsigned char *p = lpSeek(entry->node->entry, -1);
|
||||
quicklistDelIndex(quicklist, entry->node, &p);
|
||||
entry->node->dont_compress = 0; /* Re-enable compression */
|
||||
_quicklistMergeNodes(quicklist, entry->node);
|
||||
quicklistCompress(quicklist, entry->node);
|
||||
quicklistCompress(quicklist, entry->node->next);
|
||||
}
|
||||
@ -1002,7 +1030,7 @@ REDIS_STATIC void _quicklistInsert(quicklistIter *iter, quicklistEntry *entry,
|
||||
} else {
|
||||
quicklistDecompressNodeForUse(node);
|
||||
new_node = _quicklistSplitNode(node, entry->offset, after);
|
||||
quicklistNode *entry_node = __quicklistCreatePlainNode(value, sz);
|
||||
quicklistNode *entry_node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, value, sz);
|
||||
__quicklistInsertNode(quicklist, node, entry_node, after);
|
||||
__quicklistInsertNode(quicklist, entry_node, new_node, after);
|
||||
quicklist->count++;
|
||||
@ -3224,7 +3252,7 @@ int quicklistTest(int argc, char *argv[], int flags) {
|
||||
memcpy(s, "helloworld", 10);
|
||||
memcpy(s + sz - 10, "1234567890", 10);
|
||||
|
||||
quicklistNode *node = __quicklistCreatePlainNode(s, sz);
|
||||
quicklistNode *node = __quicklistCreateNode(QUICKLIST_NODE_CONTAINER_PLAIN, s, sz);
|
||||
|
||||
/* Just to avoid triggering the assertion in __quicklistCompressNode(),
|
||||
* it disables the passing of quicklist head or tail node. */
|
||||
|
@ -220,6 +220,7 @@ start_server [list overrides [list save ""] ] {
|
||||
|
||||
# checking LSET in case ziplist needs to be split
|
||||
test {Test LSET with packed is split in the middle} {
|
||||
set original_config [config_get_set list-max-listpack-size 4]
|
||||
r flushdb
|
||||
r debug quicklist-packed-threshold 5b
|
||||
r RPUSH lst "aa"
|
||||
@ -227,6 +228,7 @@ start_server [list overrides [list save ""] ] {
|
||||
r RPUSH lst "cc"
|
||||
r RPUSH lst "dd"
|
||||
r RPUSH lst "ee"
|
||||
assert_encoding quicklist lst
|
||||
r lset lst 2 [string repeat e 10]
|
||||
assert_equal [r lpop lst] "aa"
|
||||
assert_equal [r lpop lst] "bb"
|
||||
@ -234,6 +236,7 @@ start_server [list overrides [list save ""] ] {
|
||||
assert_equal [r lpop lst] "dd"
|
||||
assert_equal [r lpop lst] "ee"
|
||||
r debug quicklist-packed-threshold 0
|
||||
r config set list-max-listpack-size $original_config
|
||||
} {OK} {needs:debug}
|
||||
|
||||
|
||||
@ -381,6 +384,16 @@ if {[lindex [r config get proto-max-bulk-len] 1] == 10000000000} {
|
||||
assert_equal [read_big_bulk {r rpop lst}] $str_length
|
||||
} {} {large-memory}
|
||||
|
||||
test {Test LSET on plain nodes with large elements under packed_threshold over 4GB} {
|
||||
r flushdb
|
||||
r rpush lst a b c d e
|
||||
for {set i 0} {$i < 5} {incr i} {
|
||||
r write "*4\r\n\$4\r\nlset\r\n\$3\r\nlst\r\n\$1\r\n$i\r\n"
|
||||
write_big_bulk 1000000000
|
||||
}
|
||||
r ping
|
||||
} {PONG} {large-memory}
|
||||
|
||||
test {Test LMOVE on plain nodes over 4GB} {
|
||||
r flushdb
|
||||
r RPUSH lst2{t} "aa"
|
||||
|
Loading…
x
Reference in New Issue
Block a user