Refactor of ActiveDefrag to reduce latencies (#1242)

Refer to:  https://github.com/valkey-io/valkey/issues/1141

This update refactors the defrag code to:
* Make the overall code more readable and maintainable
* Reduce latencies incurred during defrag processing

With this update, the defrag cycle time is reduced to 500us, with more
frequent cycles. This results in much more predictable latencies, with a
dramatic reduction in tail latencies.

(See https://github.com/valkey-io/valkey/issues/1141 for more complete
details.)

This update is focused mostly on the high-level processing, and does NOT
address lower level functions which aren't currently timebound (e.g.
`activeDefragSdsDict()`, and `moduleDefragGlobals()`). These are out of
scope for this update and left for a future update.

I fixed `kvstoreDictLUTDefrag` because it was using up to 7ms on a CME
single shard. See original github issue for performance details.

---------

Signed-off-by: Jim Brunner <brunnerj@amazon.com>
Signed-off-by: Madelyn Olson <madelyneolson@gmail.com>
Co-authored-by: Madelyn Olson <madelyneolson@gmail.com>
This commit is contained in:
Jim Brunner 2024-12-03 08:42:29 -08:00 committed by GitHub
parent 3df609ef06
commit 397201c48f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 741 additions and 481 deletions

View File

@ -85,7 +85,7 @@ aeEventLoop *aeCreateEventLoop(int setsize) {
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->timeEventNextId = 1;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;

View File

@ -3278,10 +3278,11 @@ standardConfig static_configs[] = {
createIntConfig("list-max-listpack-size", "list-max-ziplist-size", MODIFIABLE_CONFIG, INT_MIN, INT_MAX, server.list_max_listpack_size, -2, INTEGER_CONFIG, NULL, NULL),
createIntConfig("tcp-keepalive", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.tcpkeepalive, 300, INTEGER_CONFIG, NULL, NULL),
createIntConfig("cluster-migration-barrier", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.cluster_migration_barrier, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */
createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cycle_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */
createIntConfig("active-defrag-cycle-min", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_min, 1, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 1% CPU min (at lower threshold) */
createIntConfig("active-defrag-cycle-max", NULL, MODIFIABLE_CONFIG, 1, 99, server.active_defrag_cpu_max, 25, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: 25% CPU max (at upper threshold) */
createIntConfig("active-defrag-threshold-lower", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_lower, 10, INTEGER_CONFIG, NULL, NULL), /* Default: don't defrag when fragmentation is below 10% */
createIntConfig("active-defrag-threshold-upper", NULL, MODIFIABLE_CONFIG, 0, 1000, server.active_defrag_threshold_upper, 100, INTEGER_CONFIG, NULL, updateDefragConfiguration), /* Default: maximum defrag force at 100% fragmentation */
createIntConfig("active-defrag-cycle-us", NULL, MODIFIABLE_CONFIG, 0, 100000, server.active_defrag_cycle_us, 500, INTEGER_CONFIG, NULL, updateDefragConfiguration),
createIntConfig("lfu-log-factor", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_log_factor, 10, INTEGER_CONFIG, NULL, NULL),
createIntConfig("lfu-decay-time", NULL, MODIFIABLE_CONFIG, 0, INT_MAX, server.lfu_decay_time, 1, INTEGER_CONFIG, NULL, NULL),
createIntConfig("replica-priority", "slave-priority", MODIFIABLE_CONFIG, 0, INT_MAX, server.replica_priority, 100, INTEGER_CONFIG, NULL, NULL),

File diff suppressed because it is too large Load Diff

View File

@ -1321,7 +1321,7 @@ end:
/* Reallocate the dictEntry, key and value allocations in a bucket using the
* provided allocation functions in order to defrag them. */
static void dictDefragBucket(dictEntry **bucketref, dictDefragFunctions *defragfns, void *privdata) {
static void dictDefragBucket(dictEntry **bucketref, const dictDefragFunctions *defragfns, void *privdata) {
dictDefragAllocFunction *defragalloc = defragfns->defragAlloc;
dictDefragAllocFunction *defragkey = defragfns->defragKey;
dictDefragAllocFunction *defragval = defragfns->defragVal;
@ -1499,7 +1499,7 @@ unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *pri
* where NULL means that no reallocation happened and the old memory is still
* valid. */
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata) {
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata) {
int htidx0, htidx1;
const dictEntry *de, *next;
unsigned long m0, m1;

View File

@ -238,7 +238,7 @@ void dictSetHashFunctionSeed(uint8_t *seed);
uint8_t *dictGetHashFunctionSeed(void);
unsigned long dictScan(dict *d, unsigned long v, dictScanFunction *fn, void *privdata);
unsigned long
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, dictDefragFunctions *defragfns, void *privdata);
dictScanDefrag(dict *d, unsigned long v, dictScanFunction *fn, const dictDefragFunctions *defragfns, void *privdata);
uint64_t dictGetHash(dict *d, const void *key);
void dictRehashingInfo(dict *d, unsigned long long *from_size, unsigned long long *to_size);

View File

@ -739,7 +739,7 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
int didx,
unsigned long v,
dictScanFunction *fn,
dictDefragFunctions *defragfns,
const dictDefragFunctions *defragfns,
void *privdata) {
dict *d = kvstoreGetDict(kvs, didx);
if (!d) return 0;
@ -750,14 +750,27 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
* within dict, it only reallocates the memory used by the dict structure itself using
* the provided allocation function. This feature was added for the active defrag feature.
*
* The 'defragfn' callback is called with a reference to the dict
* that callback can reallocate. */
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn) {
for (int didx = 0; didx < kvs->num_dicts; didx++) {
* With 16k dictionaries for cluster mode with 1 shard, this operation may require substantial time
* to execute. A "cursor" is used to perform the operation iteratively. When first called, a
* cursor value of 0 should be provided. The return value is an updated cursor which should be
* provided on the next iteration. The operation is complete when 0 is returned.
*
* The 'defragfn' callback is called with a reference to the dict that callback can reallocate. */
unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn) {
for (int didx = cursor; didx < kvs->num_dicts; didx++) {
dict **d = kvstoreGetDictRef(kvs, didx), *newd;
if (!*d) continue;
listNode *rehashing_node = NULL;
if (listLength(kvs->rehashing) > 0) {
rehashing_node = ((kvstoreDictMetadata *)dictMetadata(*d))->rehashing_node;
}
if ((newd = defragfn(*d))) *d = newd;
if (rehashing_node) listNodeValue(rehashing_node) = *d;
return (didx + 1);
}
return 0;
}
uint64_t kvstoreGetHash(kvstore *kvs, const void *key) {

View File

@ -68,10 +68,10 @@ unsigned long kvstoreDictScanDefrag(kvstore *kvs,
int didx,
unsigned long v,
dictScanFunction *fn,
dictDefragFunctions *defragfns,
const dictDefragFunctions *defragfns,
void *privdata);
typedef dict *(kvstoreDictLUTDefragFunction)(dict *d);
void kvstoreDictLUTDefrag(kvstore *kvs, kvstoreDictLUTDefragFunction *defragfn);
unsigned long kvstoreDictLUTDefrag(kvstore *kvs, unsigned long cursor, kvstoreDictLUTDefragFunction *defragfn);
void *kvstoreDictFetchValue(kvstore *kvs, int didx, const void *key);
dictEntry *kvstoreDictFind(kvstore *kvs, int didx, void *key);
dictEntry *kvstoreDictAddRaw(kvstore *kvs, int didx, void *key, dictEntry **existing);

View File

@ -1140,8 +1140,8 @@ void databasesCron(void) {
}
}
/* Defrag keys gradually. */
activeDefragCycle();
/* Start active defrag cycle or adjust defrag CPU if needed. */
monitorActiveDefrag();
/* Perform hash tables rehashing if needed, but only if there are no
* other processes saving the DB on disk. Otherwise rehashing is bad
@ -1611,24 +1611,7 @@ void whileBlockedCron(void) {
mstime_t latency;
latencyStartMonitor(latency);
/* In some cases we may be called with big intervals, so we may need to do
* extra work here. This is because some of the functions in serverCron rely
* on the fact that it is performed every 10 ms or so. For instance, if
* activeDefragCycle needs to utilize 25% cpu, it will utilize 2.5ms, so we
* need to call it multiple times. */
long hz_ms = 1000 / server.hz;
while (server.blocked_last_cron < server.mstime) {
/* Defrag keys gradually. */
activeDefragCycle();
server.blocked_last_cron += hz_ms;
/* Increment cronloop so that run_with_period works. */
server.cronloops++;
}
/* Other cron jobs do not need to be done in a loop. No need to check
* server.blocked_last_cron since we have an early exit at the top. */
defragWhileBlocked();
/* Update memory stats during loading (excluding blocked scripts) */
if (server.loading) cronUpdateMemoryStats();
@ -2120,7 +2103,7 @@ void initServerConfig(void) {
server.aof_flush_postponed_start = 0;
server.aof_last_incr_size = 0;
server.aof_last_incr_fsync_offset = 0;
server.active_defrag_running = 0;
server.active_defrag_cpu_percent = 0;
server.active_defrag_configuration_changed = 0;
server.notify_keyspace_events = 0;
server.blocked_clients = 0;
@ -2722,8 +2705,6 @@ void initServer(void) {
server.db[j].watched_keys = dictCreate(&keylistDictType);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
listSetFreeMethod(server.db[j].defrag_later, (void (*)(void *))sdsfree);
}
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
/* Note that server.pubsub_channels was chosen to be a kvstore (with only one dict, which
@ -5704,7 +5685,7 @@ sds genValkeyInfoString(dict *section_dict, int all_sections, int everything) {
"mem_aof_buffer:%zu\r\n", mh->aof_buffer,
"mem_allocator:%s\r\n", ZMALLOC_LIB,
"mem_overhead_db_hashtable_rehashing:%zu\r\n", mh->overhead_db_hashtable_rehashing,
"active_defrag_running:%d\r\n", server.active_defrag_running,
"active_defrag_running:%d\r\n", server.active_defrag_cpu_percent,
"lazyfree_pending_objects:%zu\r\n", lazyfreeGetPendingObjectsCount(),
"lazyfreed_objects:%zu\r\n", lazyfreeGetFreedObjectsCount()));
freeMemoryOverheadData(mh);

View File

@ -961,7 +961,6 @@ typedef struct serverDb {
int id; /* Database ID */
long long avg_ttl; /* Average TTL, just for stats */
unsigned long expires_cursor; /* Cursor of the active expire cycle. */
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
} serverDb;
/* forward declaration for functions ctx */
@ -1702,7 +1701,7 @@ struct valkeyServer {
int last_sig_received; /* Indicates the last SIGNAL received, if any (e.g., SIGINT or SIGTERM). */
int shutdown_flags; /* Flags passed to prepareForShutdown(). */
int activerehashing; /* Incremental rehash in serverCron() */
int active_defrag_running; /* Active defragmentation running (holds current scan aggressiveness) */
int active_defrag_cpu_percent; /* Current desired CPU percentage for active defrag */
char *pidfile; /* PID file path */
int arch_bits; /* 32 or 64 depending on sizeof(long) */
int cronloops; /* Number of times the cron function run */
@ -1899,8 +1898,9 @@ struct valkeyServer {
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
int active_defrag_cpu_min; /* minimal effort for defrag in CPU percentage */
int active_defrag_cpu_max; /* maximal effort for defrag in CPU percentage */
int active_defrag_cycle_us; /* standard duration of defrag cycle */
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from
within the main dict scan */
size_t client_max_querybuf_len; /* Limit for client query buffer length */
@ -3353,7 +3353,8 @@ void bytesToHuman(char *s, size_t size, unsigned long long n);
void enterExecutionUnit(int update_cached_time, long long us);
void exitExecutionUnit(void);
void resetServerStats(void);
void activeDefragCycle(void);
void monitorActiveDefrag(void);
void defragWhileBlocked(void);
unsigned int getLRUClock(void);
unsigned int LRU_CLOCK(void);
const char *evictPolicyToString(void);

View File

@ -40,7 +40,6 @@ run_solo {defrag} {
proc test_active_defrag {type} {
if {[string match {*jemalloc*} [s mem_allocator]] && [r debug mallctl arenas.page] <= 8192} {
test "Active defrag main dictionary: $type" {
r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
@ -89,6 +88,8 @@ run_solo {defrag} {
r config set active-defrag-cycle-min 65
r config set active-defrag-cycle-max 75
after 1000 ;# Give defrag time to work (might be multiple cycles)
# Wait for the active defrag to stop working.
wait_for_condition 2000 100 {
[s active_defrag_running] eq 0
@ -138,12 +139,13 @@ run_solo {defrag} {
r config resetstat
r config set key-load-delay -25 ;# sleep on average 1/25 usec
r debug loadaof
after 1000 ;# give defrag a chance to work before turning it off
r config set activedefrag no
# measure hits and misses right after aof loading
set misses [s active_defrag_misses]
set hits [s active_defrag_hits]
after 120 ;# serverCron only updates the info once in 100ms
set frag [s allocator_frag_ratio]
set max_latency 0
foreach event [r latency latest] {
@ -181,7 +183,6 @@ run_solo {defrag} {
r flushdb sync
r script flush sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
@ -203,7 +204,7 @@ run_solo {defrag} {
$rd read ; # Discard script load replies
$rd read ; # Discard set replies
}
after 120 ;# serverCron only updates the info once in 100ms
after 1000 ;# give defrag some time to work
if {$::verbose} {
puts "used [s allocator_allocated]"
puts "rss [s allocator_active]"
@ -239,6 +240,8 @@ run_solo {defrag} {
fail "defrag not started."
}
after 1000 ;# Give defrag time to work (might be multiple cycles)
# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
@ -266,7 +269,6 @@ run_solo {defrag} {
test "Active defrag big keys: $type" {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
@ -361,6 +363,8 @@ run_solo {defrag} {
fail "defrag not started."
}
after 1000 ;# Give defrag some time to work (it may run several cycles)
# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
@ -407,7 +411,6 @@ run_solo {defrag} {
test "Active defrag pubsub: $type" {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-threshold-lower 5
r config set active-defrag-cycle-min 65
@ -430,7 +433,6 @@ run_solo {defrag} {
$rd read ; # Discard set replies
}
after 120 ;# serverCron only updates the info once in 100ms
if {$::verbose} {
puts "used [s allocator_allocated]"
puts "rss [s allocator_active]"
@ -466,6 +468,8 @@ run_solo {defrag} {
fail "defrag not started."
}
after 1000 ;# Give defrag some time to work (it may run several cycles)
# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
@ -475,6 +479,7 @@ run_solo {defrag} {
puts [r memory malloc-stats]
fail "defrag didn't stop."
}
r config set activedefrag no ;# disable before we accidentally create more frag
# test the fragmentation is lower
after 120 ;# serverCron only updates the info once in 100ms
@ -507,7 +512,6 @@ run_solo {defrag} {
test "Active defrag big list: $type" {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
@ -561,6 +565,8 @@ run_solo {defrag} {
fail "defrag not started."
}
after 1000 ;# Give defrag some time to work (it may run several cycles)
# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0
@ -619,7 +625,6 @@ run_solo {defrag} {
start_server {tags {"defrag"} overrides {save ""}} {
r flushdb sync
r config resetstat
r config set hz 100
r config set activedefrag no
r config set active-defrag-max-scan-fields 1000
r config set active-defrag-threshold-lower 5
@ -685,6 +690,8 @@ run_solo {defrag} {
fail "defrag not started."
}
after 1000 ;# Give defrag some time to work (it may run several cycles)
# wait for the active defrag to stop working
wait_for_condition 500 100 {
[s active_defrag_running] eq 0

View File

@ -2,7 +2,6 @@ set testmodule [file normalize tests/modules/defragtest.so]
start_server {tags {"modules"} overrides {{save ""}}} {
r module load $testmodule 10000
r config set hz 100
r config set active-defrag-ignore-bytes 1
r config set active-defrag-threshold-lower 0
r config set active-defrag-cycle-min 99

View File

@ -2381,9 +2381,8 @@ rdb-save-incremental-fsync yes
# Fragmentation is a natural process that happens with every allocator (but
# less so with Jemalloc, fortunately) and certain workloads. Normally a server
# restart is needed in order to lower the fragmentation, or at least to flush
# away all the data and create it again. However thanks to this feature
# implemented by Oran Agra, this process can happen at runtime
# in a "hot" way, while the server is running.
# away all the data and create it again. However thanks to this feature, this
# process can happen at runtime in a "hot" way, while the server is running.
#
# Basically when the fragmentation is over a certain level (see the
# configuration options below) the server will start to create new copies of the
@ -2421,18 +2420,23 @@ rdb-save-incremental-fsync yes
# Maximum percentage of fragmentation at which we use maximum effort
# active-defrag-threshold-upper 100
# Minimal effort for defrag in CPU percentage, to be used when the lower
# threshold is reached
# Minimal effort for defrag in CPU percentage, not cycle time as the name might
# suggest, to be used when the lower threshold is reached.
# active-defrag-cycle-min 1
# Maximal effort for defrag in CPU percentage, to be used when the upper
# threshold is reached
# Maximal effort for defrag in CPU percentage, not cycle time as the name might
# suggest, to be used when the upper threshold is reached.
# active-defrag-cycle-max 25
# Maximum number of set/hash/zset/list fields that will be processed from
# the main dictionary scan
# active-defrag-max-scan-fields 1000
# The time spent (in microseconds) of the periodic active defrag process. This
# affects the latency impact of active defrag on client commands. Smaller numbers
# will result in less latency impact at the cost of increased defrag overhead.
# active-defrag-cycle-us 500
# Jemalloc background thread for purging will be enabled by default
jemalloc-bg-thread yes