RAND* commands: fix risk of OOM panic in hash and zset, use fair random in hash, and add tests for even distribution to all (#8429)

Changes to HRANDFIELD and ZRANDMEMBER:
* Fix risk of OOM panic when client query a very big negative count (avoid allocating huge temporary buffer).
* Fix uneven random distribution in HRANDFIELD with negative count (wasn't using dictGetFairRandomKey).
* Add tests to check an even random distribution (HRANDFIELD, SRANDMEMBER, ZRANDMEMBER).

Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
sundb 2021-02-05 21:56:20 +08:00 committed by GitHub
parent 81da73ff31
commit ef211f440b
7 changed files with 141 additions and 36 deletions

View File

@ -964,6 +964,11 @@ void hscanCommand(client *c) {
* implementation for more info. */ * implementation for more info. */
#define HRANDFIELD_SUB_STRATEGY_MUL 3 #define HRANDFIELD_SUB_STRATEGY_MUL 3
/* If client is trying to ask for a very large number of random elements,
* queuing may consume an unlimited amount of memory, so we want to limit
* the number of randoms per time. */
#define HRANDFIELD_RANDOM_SAMPLE_LIMIT 1000
void hrandfieldWithCountCommand(client *c, long l, int withvalues) { void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
unsigned long count, size; unsigned long count, size;
int uniq = 1; int uniq = 1;
@ -999,7 +1004,7 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
if (hash->encoding == OBJ_ENCODING_HT) { if (hash->encoding == OBJ_ENCODING_HT) {
sds key, value; sds key, value;
while (count--) { while (count--) {
dictEntry *de = dictGetRandomKey(hash->ptr); dictEntry *de = dictGetFairRandomKey(hash->ptr);
key = dictGetKey(de); key = dictGetKey(de);
value = dictGetVal(de); value = dictGetVal(de);
if (withvalues && c->resp > 2) if (withvalues && c->resp > 2)
@ -1010,22 +1015,28 @@ void hrandfieldWithCountCommand(client *c, long l, int withvalues) {
} }
} else if (hash->encoding == OBJ_ENCODING_ZIPLIST) { } else if (hash->encoding == OBJ_ENCODING_ZIPLIST) {
ziplistEntry *keys, *vals = NULL; ziplistEntry *keys, *vals = NULL;
keys = zmalloc(sizeof(ziplistEntry)*count); unsigned long limit, sample_count;
limit = count > HRANDFIELD_RANDOM_SAMPLE_LIMIT ? HRANDFIELD_RANDOM_SAMPLE_LIMIT : count;
keys = zmalloc(sizeof(ziplistEntry)*limit);
if (withvalues) if (withvalues)
vals = zmalloc(sizeof(ziplistEntry)*count); vals = zmalloc(sizeof(ziplistEntry)*limit);
ziplistRandomPairs(hash->ptr, count, keys, vals); while (count) {
for (unsigned long i = 0; i < count; i++) { sample_count = count > limit ? limit : count;
if (withvalues && c->resp > 2) count -= sample_count;
addReplyArrayLen(c,2); ziplistRandomPairs(hash->ptr, sample_count, keys, vals);
if (keys[i].sval) for (unsigned long i = 0; i < sample_count; i++) {
addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen); if (withvalues && c->resp > 2)
else addReplyArrayLen(c,2);
addReplyBulkLongLong(c, keys[i].lval); if (keys[i].sval)
if (withvalues) { addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen);
if (vals[i].sval)
addReplyBulkCBuffer(c, vals[i].sval, vals[i].slen);
else else
addReplyBulkLongLong(c, vals[i].lval); addReplyBulkLongLong(c, keys[i].lval);
if (withvalues) {
if (vals[i].sval)
addReplyBulkCBuffer(c, vals[i].sval, vals[i].slen);
else
addReplyBulkLongLong(c, vals[i].lval);
}
} }
} }
zfree(keys); zfree(keys);

View File

@ -3961,6 +3961,11 @@ void bzpopmaxCommand(client *c) {
* implementation for more info. */ * implementation for more info. */
#define ZRANDMEMBER_SUB_STRATEGY_MUL 3 #define ZRANDMEMBER_SUB_STRATEGY_MUL 3
/* If client is trying to ask for a very large number of random elements,
* queuing may consume an unlimited amount of memory, so we want to limit
* the number of randoms per time. */
#define ZRANDMEMBER_RANDOM_SAMPLE_LIMIT 1000
void zrandmemberWithCountCommand(client *c, long l, int withscores) { void zrandmemberWithCountCommand(client *c, long l, int withscores) {
unsigned long count, size; unsigned long count, size;
int uniq = 1; int uniq = 1;
@ -4006,22 +4011,28 @@ void zrandmemberWithCountCommand(client *c, long l, int withscores) {
} }
} else if (zsetobj->encoding == OBJ_ENCODING_ZIPLIST) { } else if (zsetobj->encoding == OBJ_ENCODING_ZIPLIST) {
ziplistEntry *keys, *vals = NULL; ziplistEntry *keys, *vals = NULL;
keys = zmalloc(sizeof(ziplistEntry)*count); unsigned long limit, sample_count;
limit = count > ZRANDMEMBER_RANDOM_SAMPLE_LIMIT ? ZRANDMEMBER_RANDOM_SAMPLE_LIMIT : count;
keys = zmalloc(sizeof(ziplistEntry)*limit);
if (withscores) if (withscores)
vals = zmalloc(sizeof(ziplistEntry)*count); vals = zmalloc(sizeof(ziplistEntry)*limit);
ziplistRandomPairs(zsetobj->ptr, count, keys, vals); while (count) {
for (unsigned long i = 0; i < count; i++) { sample_count = count > limit ? limit : count;
if (withscores && c->resp > 2) count -= sample_count;
addReplyArrayLen(c,2); ziplistRandomPairs(zsetobj->ptr, sample_count, keys, vals);
if (keys[i].sval) for (unsigned long i = 0; i < sample_count; i++) {
addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen); if (withscores && c->resp > 2)
else addReplyArrayLen(c,2);
addReplyBulkLongLong(c, keys[i].lval); if (keys[i].sval)
if (withscores) { addReplyBulkCBuffer(c, keys[i].sval, keys[i].slen);
if (vals[i].sval) { else
addReplyDouble(c, zzlStrtod(vals[i].sval,vals[i].slen)); addReplyBulkLongLong(c, keys[i].lval);
} else if (withscores) {
addReplyDouble(c, vals[i].lval); if (vals[i].sval) {
addReplyDouble(c, zzlStrtod(vals[i].sval,vals[i].slen));
} else
addReplyDouble(c, vals[i].lval);
}
} }
} }
zfree(keys); zfree(keys);

View File

@ -1541,6 +1541,8 @@ void ziplistRandomPairs(unsigned char *zl, int count, ziplistEntry *keys, ziplis
unsigned char *p, *key, *value; unsigned char *p, *key, *value;
unsigned int klen, vlen; unsigned int klen, vlen;
long long klval, vlval; long long klval, vlval;
/* Notice: the index member must be first due to the use in intCompare */
typedef struct { typedef struct {
int index; int index;
int order; int order;

View File

@ -681,3 +681,21 @@ proc string2printable s {
set res "\"$res\"" set res "\"$res\""
return $res return $res
} }
# Check that probability of each element are between {min_prop} and {max_prop}.
proc check_histogram_distribution {res min_prop max_prop} {
unset -nocomplain mydict
foreach key $res {
dict incr mydict $key 1
}
foreach key [dict keys $mydict] {
set value [dict get $mydict $key]
set probability [expr {double($value) / [llength $res]}]
if {$probability < $min_prop || $probability > $max_prop} {
return false
}
}
return true
}

View File

@ -96,9 +96,17 @@ start_server {tags {"hash"}} {
# 1) Check that it returns repeated elements with and without values. # 1) Check that it returns repeated elements with and without values.
set res [r hrandfield myhash -20] set res [r hrandfield myhash -20]
assert_equal [llength $res] 20 assert_equal [llength $res] 20
set res [r hrandfield myhash -1001]
assert_equal [llength $res] 1001
# again with WITHVALUES # again with WITHVALUES
set res [r hrandfield myhash -20 withvalues] set res [r hrandfield myhash -20 withvalues]
assert_equal [llength $res] 40 assert_equal [llength $res] 40
set res [r hrandfield myhash -1001 withvalues]
assert_equal [llength $res] 2002
# Test random uniform distribution
set res [r hrandfield myhash -1000]
assert_equal [check_histogram_distribution $res 0.05 0.15] true
# 2) Check that all the elements actually belong to the original hash. # 2) Check that all the elements actually belong to the original hash.
foreach {key val} $res { foreach {key val} $res {
@ -167,26 +175,31 @@ start_server {tags {"hash"}} {
# 2) Check that eventually all the elements are returned. # 2) Check that eventually all the elements are returned.
# Use both WITHVALUES and without # Use both WITHVALUES and without
unset -nocomplain auxset unset -nocomplain auxset
set iterations 1000 unset -nocomplain allkey
set iterations [expr {1000 / $size}]
set all_ele_return false
while {$iterations != 0} { while {$iterations != 0} {
incr iterations -1 incr iterations -1
if {[expr {$iterations % 2}] == 0} { if {[expr {$iterations % 2}] == 0} {
set res [r hrandfield myhash $size withvalues] set res [r hrandfield myhash $size withvalues]
foreach {key value} $res { foreach {key value} $res {
dict append auxset $key $value dict append auxset $key $value
lappend allkey $key
} }
} else { } else {
set res [r hrandfield myhash $size] set res [r hrandfield myhash $size]
foreach key $res { foreach key $res {
dict append auxset $key dict append auxset $key
lappend allkey $key
} }
} }
if {[lsort [dict keys $mydict]] eq if {[lsort [dict keys $mydict]] eq
[lsort [dict keys $auxset]]} { [lsort [dict keys $auxset]]} {
break; set all_ele_return true
} }
} }
assert {$iterations != 0} assert_equal $all_ele_return true
assert_equal [check_histogram_distribution $allkey 0.05 0.15] true
} }
} }
r config set hash-max-ziplist-value $original_max_value r config set hash-max-ziplist-value $original_max_value

View File

@ -515,6 +515,43 @@ start_server {
} }
} }
foreach {type contents} {
hashtable {
1 5 10 50 125
MARY PATRICIA LINDA BARBARA ELIZABETH
}
intset {
0 1 2 3 4 5 6 7 8 9
}
} {
test "SRANDMEMBER histogram distribution - $type" {
create_set myset $contents
unset -nocomplain myset
array set myset {}
foreach ele [r smembers myset] {
set myset($ele) 1
}
# Use negative count (PATH 1).
set res [r srandmember myset -1000]
assert_equal [check_histogram_distribution $res 0.05 0.15] true
# Use positive count (both PATH 3 and PATH 4).
foreach size {8 2} {
unset -nocomplain allkey
set iterations [expr {1000 / $size}]
while {$iterations != 0} {
incr iterations -1
set res [r srandmember myset $size]
foreach ele $res {
lappend allkey $ele
}
}
assert_equal [check_histogram_distribution $allkey 0.05 0.15] true
}
}
}
proc setup_move {} { proc setup_move {} {
r del myset3 myset4 r del myset3 myset4
create_set myset1 {1 a b} create_set myset1 {1 a b}

View File

@ -1646,9 +1646,17 @@ start_server {tags {"zset"}} {
# 1) Check that it returns repeated elements with and without values. # 1) Check that it returns repeated elements with and without values.
set res [r zrandmember myzset -20] set res [r zrandmember myzset -20]
assert_equal [llength $res] 20 assert_equal [llength $res] 20
set res [r zrandmember myzset -1001]
assert_equal [llength $res] 1001
# again with WITHSCORES # again with WITHSCORES
set res [r zrandmember myzset -20 withscores] set res [r zrandmember myzset -20 withscores]
assert_equal [llength $res] 40 assert_equal [llength $res] 40
set res [r zrandmember myzset -1001 withscores]
assert_equal [llength $res] 2002
# Test random uniform distribution
set res [r zrandmember myzset -1000]
assert_equal [check_histogram_distribution $res 0.05 0.15] true
# 2) Check that all the elements actually belong to the original zset. # 2) Check that all the elements actually belong to the original zset.
foreach {key val} $res { foreach {key val} $res {
@ -1717,26 +1725,31 @@ start_server {tags {"zset"}} {
# 2) Check that eventually all the elements are returned. # 2) Check that eventually all the elements are returned.
# Use both WITHSCORES and without # Use both WITHSCORES and without
unset -nocomplain auxset unset -nocomplain auxset
set iterations 1000 unset -nocomplain allkey
set iterations [expr {1000 / $size}]
set all_ele_return false
while {$iterations != 0} { while {$iterations != 0} {
incr iterations -1 incr iterations -1
if {[expr {$iterations % 2}] == 0} { if {[expr {$iterations % 2}] == 0} {
set res [r zrandmember myzset $size withscores] set res [r zrandmember myzset $size withscores]
foreach {key value} $res { foreach {key value} $res {
dict append auxset $key $value dict append auxset $key $value
lappend allkey $key
} }
} else { } else {
set res [r zrandmember myzset $size] set res [r zrandmember myzset $size]
foreach key $res { foreach key $res {
dict append auxset $key dict append auxset $key
lappend allkey $key
} }
} }
if {[lsort [dict keys $mydict]] eq if {[lsort [dict keys $mydict]] eq
[lsort [dict keys $auxset]]} { [lsort [dict keys $auxset]]} {
break; set all_ele_return true
} }
} }
assert {$iterations != 0} assert_equal $all_ele_return true
assert_equal [check_histogram_distribution $allkey 0.05 0.15] true
} }
} }
r config set zset-max-ziplist-value $original_max_value r config set zset-max-ziplist-value $original_max_value