Use Reservoir Sampling for random sampling of dict, and fix hang during fork (#12276)
## Issue: When a dict has a long chain or the length of the chain is longer than the number of samples, we will never be able to sample the elements at the end of the chain using dictGetSomeKeys(). This could mean that SRANDMEMBER can be hang in and endless loop. The most severe case, is the pathological case of when someone uses SCAN+DEL or SSCAN+SREM creating an unevenly distributed dict. This was amplified by the recent change in #11692 which prevented a down-sizing rehashing while there is a fork. ## Solution 1. Before, we will stop sampling when we reach the maximum number of samples, even if there is more data after the current chain. Now when we reach the maximum we use the Reservoir Sampling algorithm to fairly sample the end of the chain that cannot be sampled 2. Fix the rehashing code, so that the same as it allows rehashing for up-sizing during fork when the ratio is extreme, it will allow it for down-sizing as well. Issue was introduced (or became more severe) by #11692 Co-authored-by: Oran Agra <oran@redislabs.com>
This commit is contained in:
parent
439b0315c8
commit
b00a235186
32
src/dict.c
32
src/dict.c
@ -294,9 +294,12 @@ int dictTryExpand(dict *d, unsigned long size) {
|
||||
* work it does would be unbound and the function may block for a long time. */
|
||||
int dictRehash(dict *d, int n) {
|
||||
int empty_visits = n*10; /* Max number of empty buckets to visit. */
|
||||
unsigned long s0 = DICTHT_SIZE(d->ht_size_exp[0]);
|
||||
unsigned long s1 = DICTHT_SIZE(d->ht_size_exp[1]);
|
||||
if (dict_can_resize == DICT_RESIZE_FORBID || !dictIsRehashing(d)) return 0;
|
||||
if (dict_can_resize == DICT_RESIZE_AVOID &&
|
||||
(DICTHT_SIZE(d->ht_size_exp[1]) / DICTHT_SIZE(d->ht_size_exp[0]) < dict_force_resize_ratio))
|
||||
((s1 > s0 && s1 / s0 < dict_force_resize_ratio) ||
|
||||
(s1 < s0 && s0 / s1 < dict_force_resize_ratio)))
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
@ -1095,19 +1098,30 @@ unsigned int dictGetSomeKeys(dict *d, dictEntry **des, unsigned int count) {
|
||||
} else {
|
||||
emptylen = 0;
|
||||
while (he) {
|
||||
/* Collect all the elements of the buckets found non
|
||||
* empty while iterating. */
|
||||
*des = he;
|
||||
des++;
|
||||
/* Collect all the elements of the buckets found non empty while iterating.
|
||||
* To avoid the issue of being unable to sample the end of a long chain,
|
||||
* we utilize the Reservoir Sampling algorithm to optimize the sampling process.
|
||||
* This means that even when the maximum number of samples has been reached,
|
||||
* we continue sampling until we reach the end of the chain.
|
||||
* See https://en.wikipedia.org/wiki/Reservoir_sampling. */
|
||||
if (stored < count) {
|
||||
des[stored] = he;
|
||||
} else {
|
||||
unsigned long r = randomULong() % (stored + 1);
|
||||
if (r < count) des[r] = he;
|
||||
}
|
||||
|
||||
he = dictGetNext(he);
|
||||
stored++;
|
||||
if (stored == count) return stored;
|
||||
}
|
||||
if (stored >= count) goto end;
|
||||
}
|
||||
}
|
||||
i = (i+1) & maxsizemask;
|
||||
}
|
||||
return stored;
|
||||
|
||||
end:
|
||||
return stored > count ? count : stored;
|
||||
}
|
||||
|
||||
|
||||
@ -1510,7 +1524,9 @@ size_t _dictGetStatsHt(char *buf, size_t bufsize, dict *d, int htidx, int full)
|
||||
|
||||
if (d->ht_used[htidx] == 0) {
|
||||
return snprintf(buf,bufsize,
|
||||
"No stats available for empty dictionaries\n");
|
||||
"Hash table %d stats (%s):\n"
|
||||
"No stats available for empty dictionaries\n",
|
||||
htidx, (htidx == 0) ? "main hash table" : "rehashing target");
|
||||
}
|
||||
|
||||
if (!full) {
|
||||
|
@ -1002,6 +1002,129 @@ foreach type {single multiple single_multiple} {
|
||||
}
|
||||
}
|
||||
|
||||
proc is_rehashing {myset} {
|
||||
set htstats [r debug HTSTATS-KEY $myset]
|
||||
return [string match {*rehashing target*} $htstats]
|
||||
}
|
||||
|
||||
proc rem_hash_set_top_N {myset n} {
|
||||
set cursor 0
|
||||
set members {}
|
||||
set enough 0
|
||||
while 1 {
|
||||
set res [r sscan $myset $cursor]
|
||||
set cursor [lindex $res 0]
|
||||
set k [lindex $res 1]
|
||||
foreach m $k {
|
||||
lappend members $m
|
||||
if {[llength $members] >= $n} {
|
||||
set enough 1
|
||||
break
|
||||
}
|
||||
}
|
||||
if {$enough || $cursor == 0} {
|
||||
break
|
||||
}
|
||||
}
|
||||
r srem $myset {*}$members
|
||||
}
|
||||
|
||||
test "SRANDMEMBER with a dict containing long chain" {
|
||||
set origin_save [config_get_set save ""]
|
||||
set origin_max_lp [config_get_set set-max-listpack-entries 0]
|
||||
set origin_save_delay [config_get_set rdb-key-save-delay 2147483647]
|
||||
|
||||
# 1) Create a hash set with 100000 members.
|
||||
set members {}
|
||||
for {set i 0} {$i < 100000} {incr i} {
|
||||
lappend members [format "m:%d" $i]
|
||||
}
|
||||
create_set myset $members
|
||||
|
||||
# 2) Wait for the hash set rehashing to finish.
|
||||
while {[is_rehashing myset]} {
|
||||
r srandmember myset 100
|
||||
}
|
||||
|
||||
# 3) Turn off the rehashing of this set, and remove the members to 500.
|
||||
r bgsave
|
||||
rem_hash_set_top_N myset [expr {[r scard myset] - 500}]
|
||||
assert_equal [r scard myset] 500
|
||||
|
||||
# 4) Kill RDB child process to restart rehashing.
|
||||
set pid1 [get_child_pid 0]
|
||||
catch {exec kill -9 $pid1}
|
||||
waitForBgsave r
|
||||
|
||||
# 5) Let the set hash to start rehashing
|
||||
r spop myset 1
|
||||
assert [is_rehashing myset]
|
||||
|
||||
# 6) Verify that when rdb saving is in progress, rehashing will still be performed (because
|
||||
# the ratio is extreme) by waiting for it to finish during an active bgsave.
|
||||
r bgsave
|
||||
|
||||
while {[is_rehashing myset]} {
|
||||
r srandmember myset 1
|
||||
}
|
||||
if {$::verbose} {
|
||||
puts [r debug HTSTATS-KEY myset full]
|
||||
}
|
||||
|
||||
set pid1 [get_child_pid 0]
|
||||
catch {exec kill -9 $pid1}
|
||||
waitForBgsave r
|
||||
|
||||
# 7) Check that eventually, SRANDMEMBER returns all elements.
|
||||
array set allmyset {}
|
||||
foreach ele [r smembers myset] {
|
||||
set allmyset($ele) 1
|
||||
}
|
||||
unset -nocomplain auxset
|
||||
set iterations 1000
|
||||
while {$iterations != 0} {
|
||||
incr iterations -1
|
||||
set res [r srandmember myset -10]
|
||||
foreach ele $res {
|
||||
set auxset($ele) 1
|
||||
}
|
||||
if {[lsort [array names allmyset]] eq
|
||||
[lsort [array names auxset]]} {
|
||||
break;
|
||||
}
|
||||
}
|
||||
assert {$iterations != 0}
|
||||
|
||||
# 8) Remove the members to 30 in order to calculate the value of Chi-Square Distribution,
|
||||
# otherwise we would need more iterations.
|
||||
rem_hash_set_top_N myset [expr {[r scard myset] - 30}]
|
||||
assert_equal [r scard myset] 30
|
||||
assert {[is_rehashing myset]}
|
||||
|
||||
# Now that we have a hash set with only one long chain bucket.
|
||||
set htstats [r debug HTSTATS-KEY myset full]
|
||||
assert {[regexp {different slots: ([0-9]+)} $htstats - different_slots]}
|
||||
assert {[regexp {max chain length: ([0-9]+)} $htstats - max_chain_length]}
|
||||
assert {$different_slots == 1 && $max_chain_length == 30}
|
||||
|
||||
# 9) Use positive count (PATH 4) to get 10 elements (out of 30) each time.
|
||||
unset -nocomplain allkey
|
||||
set iterations 1000
|
||||
while {$iterations != 0} {
|
||||
incr iterations -1
|
||||
set res [r srandmember myset 10]
|
||||
foreach ele $res {
|
||||
lappend allkey $ele
|
||||
}
|
||||
}
|
||||
# validate even distribution of random sampling (df = 29, 73 means 0.00001 probability)
|
||||
assert_lessthan [chi_square_value $allkey] 73
|
||||
|
||||
r config set save $origin_save
|
||||
r config set set-max-listpack-entries $origin_max_lp
|
||||
r config set rdb-key-save-delay $origin_save_delay
|
||||
} {OK} {needs:debug slow}
|
||||
|
||||
proc setup_move {} {
|
||||
r del myset3{t} myset4{t}
|
||||
create_set myset1{t} {1 a b}
|
||||
|
Loading…
x
Reference in New Issue
Block a user