futriix/src/defrag.c
Jim Brunner 349bc7547b
defrag: use monotime in module interface (#1388)
The recent PR (https://github.com/valkey-io/valkey/pull/1242) converted
Active Defrag to use `monotime`. In that change, a conversion was
performed to continue to use `ustime()` as part of the module interface.
Since this time is only used internally, and never actually exposed to
the module, we can convert this to use `monotime` directly.

Signed-off-by: Jim Brunner <brunnerj@amazon.com>
2024-12-03 11:19:53 -08:00

1403 lines
55 KiB
C

/*
* Active memory defragmentation
* Try to find key / value allocations that need to be re-allocated in order
* to reduce external fragmentation.
* We do that by scanning the keyspace and for each pointer we have, we can try to
* ask the allocator if moving it to a new address will help reduce fragmentation.
*
* Copyright (c) 2020, Redis Ltd.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* * Neither the name of Redis nor the names of its contributors may be used
* to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
#include "server.h"
#include <stddef.h>
#ifdef HAVE_DEFRAG
typedef enum { DEFRAG_NOT_DONE = 0,
DEFRAG_DONE = 1 } doneStatus;
/*
* Defragmentation is performed in stages. Each stage is serviced by a stage function
* (defragStageFn). The stage function is passed a target (void*) to defrag. The contents of that
* target are unique to the particular stage - and may even be NULL for some stage functions. The
* same stage function can be used multiple times (for different stages) each having a different
* target.
*
* The stage function is required to maintain an internal static state. This allows the stage
* function to continue when invoked in an iterative manner. When invoked with a 0 endtime, the
* stage function is required to clear it's internal state and prepare to begin a new stage. It
* should return false (more work to do) as it should NOT perform any real "work" during init.
*
* Parameters:
* endtime - This is the monotonic time that the function should end and return. This ensures
* a bounded latency due to defrag. When endtime is 0, the internal state should be
* cleared, preparing to begin the stage with a new target.
* target - This is the "thing" that should be defragged. It's type is dependent on the
* type of the stage function. This might be a dict, a kvstore, a DB, or other.
* privdata - A pointer to arbitrary private data which is unique to the stage function.
*
* Returns:
* - DEFRAG_DONE if the stage is complete
* - DEFRAG_NOT_DONE if there is more work to do
*/
typedef doneStatus (*defragStageFn)(monotime endtime, void *target, void *privdata);
typedef struct {
defragStageFn stage_fn; // The function to be invoked for the stage
void *target; // The target that the function will defrag
void *privdata; // Private data, unique to the stage function
} StageDescriptor;
/* Globals needed for the main defrag processing logic.
* Doesn't include variables specific to a stage or type of data. */
struct DefragContext {
monotime start_cycle; // Time of beginning of defrag cycle
long long start_defrag_hits; // server.stat_active_defrag_hits captured at beginning of cycle
list *remaining_stages; // List of stages which remain to be processed
StageDescriptor *current_stage; // The stage that's currently being processed
long long timeproc_id; // Eventloop ID of the timerproc (or AE_DELETED_EVENT_ID)
monotime timeproc_end_time; // Ending time of previous timerproc execution
long timeproc_overage_us; // A correction value if over/under target CPU percent
};
static struct DefragContext defrag;
/* There are a number of stages which process a kvstore. To simplify this, a stage helper function
* `defragStageKvstoreHelper()` is defined. This function aids in iterating over the kvstore. It
* uses these definitions.
*/
/* State of the kvstore helper. The private data (privdata) passed to the kvstore helper MUST BEGIN
* with a kvstoreIterState (or be passed as NULL). */
#define KVS_SLOT_DEFRAG_LUT -2
#define KVS_SLOT_UNASSIGNED -1
typedef struct {
kvstore *kvs;
int slot;
unsigned long cursor;
} kvstoreIterState;
/* The kvstore helper uses this function to perform tasks before continuing the iteration. For the
* main dictionary, large items are set aside and processed by this function before continuing with
* iteration over the kvstore.
* endtime - This is the monotonic time that the function should end and return.
* privdata - Private data for functions invoked by the helper. If provided in the call to
* `defragStageKvstoreHelper()`, the `kvstoreIterState` portion (at the beginning)
* will be updated with the current kvstore iteration status.
*
* Returns:
* - DEFRAG_DONE if the pre-continue work is complete
* - DEFRAG_NOT_DONE if there is more work to do
*/
typedef doneStatus (*kvstoreHelperPreContinueFn)(monotime endtime, void *privdata);
// Private data for main dictionary keys
typedef struct {
kvstoreIterState kvstate;
serverDb *db;
dictEntry *saved_expire_de;
} defragKeysCtx;
static_assert(offsetof(defragKeysCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
// Private data for pubsub kvstores
typedef dict *(*getClientChannelsFn)(client *);
typedef struct {
getClientChannelsFn fn;
} getClientChannelsFnWrapper;
typedef struct {
kvstoreIterState kvstate;
getClientChannelsFn getPubSubChannels;
} defragPubSubCtx;
static_assert(offsetof(defragPubSubCtx, kvstate) == 0, "defragStageKvstoreHelper requires this");
/* When scanning a main kvstore, large elements are queued for later handling rather than
* causing a large latency spike while processing a hash table bucket. This list is only used
* for stage: "defragStageDbKeys". It will only contain values for the current kvstore being
* defragged.
* Note that this is a list of key names. It's possible that the key may be deleted or modified
* before "later" and we will search by key name to find the entry when we defrag the item later.
*/
static list *defrag_later;
static unsigned long defrag_later_cursor;
/* this method was added to jemalloc in order to help us understand which
* pointers are worthwhile moving and which aren't */
int je_get_defrag_hint(void *ptr);
/* Defrag function which allocates and copies memory if needed, but DOESN'T free the old block.
* It is the responsibility of the caller to free the old block if a non-NULL value (new block)
* is returned. (Returns NULL if no relocation was needed.)
*/
static void *activeDefragAllocWithoutFree(void *ptr, size_t *allocation_size) {
size_t size;
void *newptr;
if (!allocatorShouldDefrag(ptr)) {
server.stat_active_defrag_misses++;
return NULL;
}
/* move this allocation to a new allocation.
* make sure not to use the thread cache. so that we don't get back the same
* pointers we try to free */
size = zmalloc_size(ptr);
newptr = allocatorDefragAlloc(size);
memcpy(newptr, ptr, size);
if (allocation_size) *allocation_size = size;
server.stat_active_defrag_hits++;
return newptr;
}
/* Defrag helper for generic allocations.
*
* Returns NULL in case the allocation wasn't moved.
* When it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
void *activeDefragAlloc(void *ptr) {
size_t allocation_size;
void *newptr = activeDefragAllocWithoutFree(ptr, &allocation_size);
if (newptr) allocatorDefragFree(ptr, allocation_size);
return newptr;
}
/* This method captures the expiry db dict entry which refers to data stored in keys db dict entry. */
static void defragEntryStartCbForKeys(void *ctx, void *oldptr) {
defragKeysCtx *defragctx = (defragKeysCtx *)ctx;
serverDb *db = defragctx->db;
sds oldsds = (sds)dictGetKey((dictEntry *)oldptr);
int slot = defragctx->kvstate.slot;
if (kvstoreDictSize(db->expires, slot)) {
dictEntry *expire_de = kvstoreDictFind(db->expires, slot, oldsds);
defragctx->saved_expire_de = expire_de;
} else {
defragctx->saved_expire_de = NULL;
}
}
/* This method updates the key of expiry db dict entry. The key might be no longer valid
* as it could have been cleaned up during the defrag-realloc of the main dictionary. */
static void defragEntryFinishCbForKeys(void *ctx, void *newptr) {
defragKeysCtx *defragctx = (defragKeysCtx *)ctx;
dictEntry *expire_de = defragctx->saved_expire_de;
/* Item doesn't have TTL associated to it. */
if (!expire_de) return;
/* No reallocation happened. */
if (!newptr) {
expire_de = NULL;
return;
}
serverDb *db = defragctx->db;
sds newsds = (sds)dictGetKey((dictEntry *)newptr);
int slot = defragctx->kvstate.slot;
kvstoreDictSetKey(db->expires, slot, expire_de, newsds);
}
/* Defrag helper for sds strings
*
* Returns NULL in case the allocation wasn't moved.
* When it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
static sds activeDefragSds(sds sdsptr) {
void *ptr = sdsAllocPtr(sdsptr);
void *newptr = activeDefragAlloc(ptr);
if (newptr) {
size_t offset = sdsptr - (char *)ptr;
sdsptr = (char *)newptr + offset;
return sdsptr;
}
return NULL;
}
/* Performs defrag on a string-type (or generic) robj, but does not free the old robj. This is the
* caller's responsibility. This is necessary for string objects with multiple references. In this
* case the caller can fix the references before freeing the original object.
*/
static robj *activeDefragStringObWithoutFree(robj *ob, size_t *allocation_size) {
if (ob->type == OBJ_STRING && ob->encoding == OBJ_ENCODING_RAW) {
// Try to defrag the linked sds, regardless of if robj will be moved
sds newsds = activeDefragSds((sds)ob->ptr);
if (newsds) ob->ptr = newsds;
}
robj *new_robj = activeDefragAllocWithoutFree(ob, allocation_size);
if (new_robj && ob->type == OBJ_STRING && ob->encoding == OBJ_ENCODING_EMBSTR) {
// If the robj is moved, correct the internal pointer
long embstr_offset = (intptr_t)ob->ptr - (intptr_t)ob;
new_robj->ptr = (void *)((intptr_t)new_robj + embstr_offset);
}
return new_robj;
}
/* Defrag helper for robj and/or string objects
*
* Returns NULL in case the allocation wasn't moved.
* When it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
robj *activeDefragStringOb(robj *ob) {
size_t allocation_size;
if (ob->refcount != 1) return NULL; // Unsafe to defrag if multiple refs
robj *new_robj = activeDefragStringObWithoutFree(ob, &allocation_size);
if (new_robj) allocatorDefragFree(ob, allocation_size);
return new_robj;
}
/* Defrag helper for lua scripts
*
* Returns NULL in case the allocation wasn't moved.
* When it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
static luaScript *activeDefragLuaScript(luaScript *script) {
luaScript *ret = NULL;
/* try to defrag script struct */
if ((ret = activeDefragAlloc(script))) {
script = ret;
}
/* try to defrag actual script object */
robj *ob = activeDefragStringOb(script->body);
if (ob) script->body = ob;
return ret;
}
/* Defrag helper for dict main allocations (dict struct, and hash tables).
* Receives a pointer to the dict* and return a new dict* when the dict
* struct itself was moved.
*
* Returns NULL in case the allocation wasn't moved.
* When it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
static dict *dictDefragTables(dict *d) {
dict *ret = NULL;
dictEntry **newtable;
/* handle the dict struct */
if ((ret = activeDefragAlloc(d))) d = ret;
/* handle the first hash table */
if (!d->ht_table[0]) return ret; /* created but unused */
newtable = activeDefragAlloc(d->ht_table[0]);
if (newtable) d->ht_table[0] = newtable;
/* handle the second hash table */
if (d->ht_table[1]) {
newtable = activeDefragAlloc(d->ht_table[1]);
if (newtable) d->ht_table[1] = newtable;
}
return ret;
}
/* Internal function used by zslDefrag */
static void zslUpdateNode(zskiplist *zsl, zskiplistNode *oldnode, zskiplistNode *newnode, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == oldnode) update[i]->level[i].forward = newnode;
}
serverAssert(zsl->header != oldnode);
if (newnode->level[0].forward) {
serverAssert(newnode->level[0].forward->backward == oldnode);
newnode->level[0].forward->backward = newnode;
} else {
serverAssert(zsl->tail == oldnode);
zsl->tail = newnode;
}
}
/* Defrag helper for sorted set.
* Update the robj pointer, defrag the skiplist struct and return the new score
* reference. We may not access oldele pointer (not even the pointer stored in
* the skiplist), as it was already freed. Newele may be null, in which case we
* only need to defrag the skiplist, but not update the obj pointer.
* When return value is non-NULL, it is the score reference that must be updated
* in the dict record. */
static double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x, *newx;
int i;
sds ele = newele ? newele : oldele;
/* find the skiplist node referring to the object that was moved,
* and all pointers that need to be updated if we'll end up moving the skiplist node. */
x = zsl->header;
for (i = zsl->level - 1; i >= 0; i--) {
while (x->level[i].forward && x->level[i].forward->ele != oldele && /* make sure not to access the
->obj pointer if it matches
oldele */
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score && sdscmp(x->level[i].forward->ele, ele) < 0)))
x = x->level[i].forward;
update[i] = x;
}
/* update the robj pointer inside the skip list record. */
x = x->level[0].forward;
serverAssert(x && score == x->score && x->ele == oldele);
if (newele) x->ele = newele;
/* try to defrag the skiplist record itself */
newx = activeDefragAlloc(x);
if (newx) {
zslUpdateNode(zsl, x, newx, update);
return &newx->score;
}
return NULL;
}
/* Defrag helper for sorted set.
* Defrag a single dict entry key name, and corresponding skiplist struct */
static void activeDefragZsetEntry(zset *zs, dictEntry *de) {
sds newsds;
double *newscore;
sds sdsele = dictGetKey(de);
if ((newsds = activeDefragSds(sdsele))) dictSetKey(zs->dict, de, newsds);
newscore = zslDefrag(zs->zsl, *(double *)dictGetVal(de), sdsele, newsds);
if (newscore) {
dictSetVal(zs->dict, de, newscore);
}
}
#define DEFRAG_SDS_DICT_NO_VAL 0
#define DEFRAG_SDS_DICT_VAL_IS_SDS 1
#define DEFRAG_SDS_DICT_VAL_IS_STROB 2
#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3
#define DEFRAG_SDS_DICT_VAL_LUA_SCRIPT 4
static void activeDefragSdsDictCallback(void *privdata, const dictEntry *de) {
UNUSED(privdata);
UNUSED(de);
}
/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
static void activeDefragSdsDict(dict *d, int val_type) {
unsigned long cursor = 0;
dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds,
.defragVal = (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS ? (dictDefragAllocFunction *)activeDefragSds
: val_type == DEFRAG_SDS_DICT_VAL_IS_STROB ? (dictDefragAllocFunction *)activeDefragStringOb
: val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR ? (dictDefragAllocFunction *)activeDefragAlloc
: val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT ? (dictDefragAllocFunction *)activeDefragLuaScript
: NULL)};
do {
cursor = dictScanDefrag(d, cursor, activeDefragSdsDictCallback, &defragfns, NULL);
} while (cursor != 0);
}
/* Defrag a list of ptr, sds or robj string values */
static void activeDefragQuickListNode(quicklist *ql, quicklistNode **node_ref) {
quicklistNode *newnode, *node = *node_ref;
unsigned char *newzl;
if ((newnode = activeDefragAlloc(node))) {
if (newnode->prev)
newnode->prev->next = newnode;
else
ql->head = newnode;
if (newnode->next)
newnode->next->prev = newnode;
else
ql->tail = newnode;
*node_ref = node = newnode;
}
if ((newzl = activeDefragAlloc(node->entry))) node->entry = newzl;
}
static void activeDefragQuickListNodes(quicklist *ql) {
quicklistNode *node = ql->head;
while (node) {
activeDefragQuickListNode(ql, &node);
node = node->next;
}
}
/* when the value has lots of elements, we want to handle it later and not as
* part of the main dictionary scan. this is needed in order to prevent latency
* spikes when handling large items */
static void defragLater(dictEntry *kde) {
if (!defrag_later) {
defrag_later = listCreate();
listSetFreeMethod(defrag_later, (void (*)(void *))sdsfree);
defrag_later_cursor = 0;
}
sds key = sdsdup(dictGetKey(kde));
listAddNodeTail(defrag_later, key);
}
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
static long scanLaterList(robj *ob, unsigned long *cursor, monotime endtime) {
quicklist *ql = ob->ptr;
quicklistNode *node;
long iterations = 0;
int bookmark_failed = 0;
if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST) return 0;
if (*cursor == 0) {
/* if cursor is 0, we start new iteration */
node = ql->head;
} else {
node = quicklistBookmarkFind(ql, "_AD");
if (!node) {
/* if the bookmark was deleted, it means we reached the end. */
*cursor = 0;
return 0;
}
node = node->next;
}
(*cursor)++;
while (node) {
activeDefragQuickListNode(ql, &node);
server.stat_active_defrag_scanned++;
if (++iterations > 128 && !bookmark_failed) {
if (getMonotonicUs() > endtime) {
if (!quicklistBookmarkCreate(&ql, "_AD", node)) {
bookmark_failed = 1;
} else {
ob->ptr = ql; /* bookmark creation may have re-allocated the quicklist */
return 1;
}
}
iterations = 0;
}
node = node->next;
}
quicklistBookmarkDelete(ql, "_AD");
*cursor = 0;
return bookmark_failed ? 1 : 0;
}
typedef struct {
zset *zs;
} scanLaterZsetData;
static void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
dictEntry *de = (dictEntry *)_de;
scanLaterZsetData *data = privdata;
activeDefragZsetEntry(data->zs, de);
server.stat_active_defrag_scanned++;
}
static void scanLaterZset(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST) return;
zset *zs = (zset *)ob->ptr;
dict *d = zs->dict;
scanLaterZsetData data = {zs};
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc};
*cursor = dictScanDefrag(d, *cursor, scanLaterZsetCallback, &defragfns, &data);
}
/* Used as scan callback when all the work is done in the dictDefragFunctions. */
static void scanCallbackCountScanned(void *privdata, const dictEntry *de) {
UNUSED(privdata);
UNUSED(de);
server.stat_active_defrag_scanned++;
}
static void scanLaterSet(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT) return;
dict *d = ob->ptr;
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds};
*cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
}
static void scanLaterHash(robj *ob, unsigned long *cursor) {
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT) return;
dict *d = ob->ptr;
dictDefragFunctions defragfns = {.defragAlloc = activeDefragAlloc,
.defragKey = (dictDefragAllocFunction *)activeDefragSds,
.defragVal = (dictDefragAllocFunction *)activeDefragSds};
*cursor = dictScanDefrag(d, *cursor, scanCallbackCountScanned, &defragfns, NULL);
}
static void defragQuicklist(dictEntry *kde) {
robj *ob = dictGetVal(kde);
quicklist *ql = ob->ptr, *newql;
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
if ((newql = activeDefragAlloc(ql))) ob->ptr = ql = newql;
if (ql->len > server.active_defrag_max_scan_fields)
defragLater(kde);
else
activeDefragQuickListNodes(ql);
}
static void defragZsetSkiplist(dictEntry *kde) {
robj *ob = dictGetVal(kde);
zset *zs = (zset *)ob->ptr;
zset *newzs;
zskiplist *newzsl;
dict *newdict;
dictEntry *de;
struct zskiplistNode *newheader;
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
if ((newzs = activeDefragAlloc(zs))) ob->ptr = zs = newzs;
if ((newzsl = activeDefragAlloc(zs->zsl))) zs->zsl = newzsl;
if ((newheader = activeDefragAlloc(zs->zsl->header))) zs->zsl->header = newheader;
if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)
defragLater(kde);
else {
dictIterator *di = dictGetIterator(zs->dict);
while ((de = dictNext(di)) != NULL) {
activeDefragZsetEntry(zs, de);
}
dictReleaseIterator(di);
}
/* defrag the dict struct and tables */
if ((newdict = dictDefragTables(zs->dict))) zs->dict = newdict;
}
static void defragHash(dictEntry *kde) {
robj *ob = dictGetVal(kde);
dict *d, *newd;
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
d = ob->ptr;
if (dictSize(d) > server.active_defrag_max_scan_fields)
defragLater(kde);
else
activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
/* defrag the dict struct and tables */
if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd;
}
static void defragSet(dictEntry *kde) {
robj *ob = dictGetVal(kde);
dict *d, *newd;
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
d = ob->ptr;
if (dictSize(d) > server.active_defrag_max_scan_fields)
defragLater(kde);
else
activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
/* defrag the dict struct and tables */
if ((newd = dictDefragTables(ob->ptr))) ob->ptr = newd;
}
/* Defrag callback for radix tree iterator, called for each node,
* used in order to defrag the nodes allocations. */
static int defragRaxNode(raxNode **noderef) {
raxNode *newnode = activeDefragAlloc(*noderef);
if (newnode) {
*noderef = newnode;
return 1;
}
return 0;
}
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
static int scanLaterStreamListpacks(robj *ob, unsigned long *cursor, monotime endtime) {
static unsigned char last[sizeof(streamID)];
raxIterator ri;
long iterations = 0;
if (ob->type != OBJ_STREAM || ob->encoding != OBJ_ENCODING_STREAM) {
*cursor = 0;
return 0;
}
stream *s = ob->ptr;
raxStart(&ri, s->rax);
if (*cursor == 0) {
/* if cursor is 0, we start new iteration */
defragRaxNode(&s->rax->head);
/* assign the iterator node callback before the seek, so that the
* initial nodes that are processed till the first item are covered */
ri.node_cb = defragRaxNode;
raxSeek(&ri, "^", NULL, 0);
} else {
/* if cursor is non-zero, we seek to the static 'last' */
if (!raxSeek(&ri, ">", last, sizeof(last))) {
*cursor = 0;
raxStop(&ri);
return 0;
}
/* assign the iterator node callback after the seek, so that the
* initial nodes that are processed till now aren't covered */
ri.node_cb = defragRaxNode;
}
(*cursor)++;
while (raxNext(&ri)) {
void *newdata = activeDefragAlloc(ri.data);
if (newdata) raxSetData(ri.node, ri.data = newdata);
server.stat_active_defrag_scanned++;
if (++iterations > 128) {
if (getMonotonicUs() > endtime) {
serverAssert(ri.key_len == sizeof(last));
memcpy(last, ri.key, ri.key_len);
raxStop(&ri);
return 1;
}
iterations = 0;
}
}
raxStop(&ri);
*cursor = 0;
return 0;
}
/* optional callback used defrag each rax element (not including the element pointer itself) */
typedef void *(raxDefragFunction)(raxIterator *ri, void *privdata);
/* defrag radix tree including:
* 1) rax struct
* 2) rax nodes
* 3) rax entry data (only if defrag_data is specified)
* 4) call a callback per element, and allow the callback to return a new pointer for the element */
static void defragRadixTree(rax **raxref, int defrag_data, raxDefragFunction *element_cb, void *element_cb_data) {
raxIterator ri;
rax *rax;
if ((rax = activeDefragAlloc(*raxref))) *raxref = rax;
rax = *raxref;
raxStart(&ri, rax);
ri.node_cb = defragRaxNode;
defragRaxNode(&rax->head);
raxSeek(&ri, "^", NULL, 0);
while (raxNext(&ri)) {
void *newdata = NULL;
if (element_cb) newdata = element_cb(&ri, element_cb_data);
if (defrag_data && !newdata) newdata = activeDefragAlloc(ri.data);
if (newdata) raxSetData(ri.node, ri.data = newdata);
}
raxStop(&ri);
}
typedef struct {
streamCG *cg;
streamConsumer *c;
} PendingEntryContext;
static void *defragStreamConsumerPendingEntry(raxIterator *ri, void *privdata) {
PendingEntryContext *ctx = privdata;
streamNACK *nack = ri->data, *newnack;
nack->consumer = ctx->c; /* update nack pointer to consumer */
newnack = activeDefragAlloc(nack);
if (newnack) {
/* update consumer group pointer to the nack */
void *prev;
raxInsert(ctx->cg->pel, ri->key, ri->key_len, newnack, &prev);
serverAssert(prev == nack);
}
return newnack;
}
static void *defragStreamConsumer(raxIterator *ri, void *privdata) {
streamConsumer *c = ri->data;
streamCG *cg = privdata;
void *newc = activeDefragAlloc(c);
if (newc) {
c = newc;
}
sds newsds = activeDefragSds(c->name);
if (newsds) c->name = newsds;
if (c->pel) {
PendingEntryContext pel_ctx = {cg, c};
defragRadixTree(&c->pel, 0, defragStreamConsumerPendingEntry, &pel_ctx);
}
return newc; /* returns NULL if c was not defragged */
}
static void *defragStreamConsumerGroup(raxIterator *ri, void *privdata) {
streamCG *cg = ri->data;
UNUSED(privdata);
if (cg->consumers) defragRadixTree(&cg->consumers, 0, defragStreamConsumer, cg);
if (cg->pel) defragRadixTree(&cg->pel, 0, NULL, NULL);
return NULL;
}
static void defragStream(dictEntry *kde) {
robj *ob = dictGetVal(kde);
serverAssert(ob->type == OBJ_STREAM && ob->encoding == OBJ_ENCODING_STREAM);
stream *s = ob->ptr, *news;
/* handle the main struct */
if ((news = activeDefragAlloc(s))) ob->ptr = s = news;
if (raxSize(s->rax) > server.active_defrag_max_scan_fields) {
rax *newrax = activeDefragAlloc(s->rax);
if (newrax) s->rax = newrax;
defragLater(kde);
} else
defragRadixTree(&s->rax, 1, NULL, NULL);
if (s->cgroups) defragRadixTree(&s->cgroups, 1, defragStreamConsumerGroup, NULL);
}
/* Defrag a module key. This is either done immediately or scheduled
* for later. Returns then number of pointers defragged.
*/
static void defragModule(serverDb *db, dictEntry *kde) {
robj *obj = dictGetVal(kde);
serverAssert(obj->type == OBJ_MODULE);
if (!moduleDefragValue(dictGetKey(kde), obj, db->id)) defragLater(kde);
}
/* for each key we scan in the main dict, this function will attempt to defrag
* all the various pointers it has. */
static void defragKey(defragKeysCtx *ctx, dictEntry *de) {
serverDb *db = ctx->db;
int slot = ctx->kvstate.slot;
robj *newob, *ob;
unsigned char *newzl;
/* Try to defrag robj and / or string value. */
ob = dictGetVal(de);
if ((newob = activeDefragStringOb(ob))) {
kvstoreDictSetVal(ctx->kvstate.kvs, slot, de, newob);
ob = newob;
}
if (ob->type == OBJ_STRING) {
/* Already handled in activeDefragStringOb. */
} else if (ob->type == OBJ_LIST) {
if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
defragQuicklist(de);
} else if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl;
} else {
serverPanic("Unknown list encoding");
}
} else if (ob->type == OBJ_SET) {
if (ob->encoding == OBJ_ENCODING_HT) {
defragSet(de);
} else if (ob->encoding == OBJ_ENCODING_INTSET || ob->encoding == OBJ_ENCODING_LISTPACK) {
void *newptr, *ptr = ob->ptr;
if ((newptr = activeDefragAlloc(ptr))) ob->ptr = newptr;
} else {
serverPanic("Unknown set encoding");
}
} else if (ob->type == OBJ_ZSET) {
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl;
} else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
defragZsetSkiplist(de);
} else {
serverPanic("Unknown sorted set encoding");
}
} else if (ob->type == OBJ_HASH) {
if (ob->encoding == OBJ_ENCODING_LISTPACK) {
if ((newzl = activeDefragAlloc(ob->ptr))) ob->ptr = newzl;
} else if (ob->encoding == OBJ_ENCODING_HT) {
defragHash(de);
} else {
serverPanic("Unknown hash encoding");
}
} else if (ob->type == OBJ_STREAM) {
defragStream(de);
} else if (ob->type == OBJ_MODULE) {
defragModule(db, de);
} else {
serverPanic("Unknown object type");
}
}
/* Defrag scan callback for the main db dictionary. */
static void dbKeysScanCallback(void *privdata, const dictEntry *de) {
long long hits_before = server.stat_active_defrag_hits;
defragKey((defragKeysCtx *)privdata, (dictEntry *)de);
if (server.stat_active_defrag_hits != hits_before)
server.stat_active_defrag_key_hits++;
else
server.stat_active_defrag_key_misses++;
server.stat_active_defrag_scanned++;
}
/* Utility function to get the fragmentation ratio from jemalloc.
* It is critical to do that by comparing only heap maps that belong to
* jemalloc, and skip ones the jemalloc keeps as spare. Since we use this
* fragmentation ratio in order to decide if a defrag action should be taken
* or not, a false detection can cause the defragmenter to waste a lot of CPU
* without the possibility of getting any results. */
static float getAllocatorFragmentation(size_t *out_frag_bytes) {
size_t resident, active, allocated, frag_smallbins_bytes;
zmalloc_get_allocator_info(&allocated, &active, &resident, NULL, NULL);
frag_smallbins_bytes = allocatorDefragGetFragSmallbins();
/* Calculate the fragmentation ratio as the proportion of wasted memory in small
* bins (which are defraggable) relative to the total allocated memory (including large bins).
* This is because otherwise, if most of the memory usage is large bins, we may show high percentage,
* despite the fact it's not a lot of memory for the user. */
float frag_pct = (float)frag_smallbins_bytes / allocated * 100;
float rss_pct = ((float)resident / allocated) * 100 - 100;
size_t rss_bytes = resident - allocated;
if (out_frag_bytes) *out_frag_bytes = frag_smallbins_bytes;
serverLog(LL_DEBUG, "allocated=%zu, active=%zu, resident=%zu, frag=%.2f%% (%.2f%% rss), frag_bytes=%zu (%zu rss)",
allocated, active, resident, frag_pct, rss_pct, frag_smallbins_bytes, rss_bytes);
return frag_pct;
}
/* Defrag scan callback for the pubsub dictionary. */
static void defragPubsubScanCallback(void *privdata, const dictEntry *de) {
defragPubSubCtx *ctx = privdata;
kvstore *pubsub_channels = ctx->kvstate.kvs;
robj *newchannel, *channel = dictGetKey(de);
dict *newclients, *clients = dictGetVal(de);
size_t allocation_size;
/* Try to defrag the channel name. */
serverAssert(channel->refcount == (int)dictSize(clients) + 1);
newchannel = activeDefragStringObWithoutFree(channel, &allocation_size);
if (newchannel) {
kvstoreDictSetKey(pubsub_channels, ctx->kvstate.slot, (dictEntry *)de, newchannel);
/* The channel name is shared by the client's pubsub(shard) and server's
* pubsub(shard), after defraging the channel name, we need to update
* the reference in the clients' dictionary. */
dictIterator *di = dictGetIterator(clients);
dictEntry *clientde;
while ((clientde = dictNext(di)) != NULL) {
client *c = dictGetKey(clientde);
dict *client_channels = ctx->getPubSubChannels(c);
dictEntry *pubsub_channel = dictFind(client_channels, newchannel);
serverAssert(pubsub_channel);
dictSetKey(ctx->getPubSubChannels(c), pubsub_channel, newchannel);
}
dictReleaseIterator(di);
// Now that we're done correcting the references, we can safely free the old channel robj
allocatorDefragFree(channel, allocation_size);
}
/* Try to defrag the dictionary of clients that is stored as the value part. */
if ((newclients = dictDefragTables(clients)))
kvstoreDictSetVal(pubsub_channels, ctx->kvstate.slot, (dictEntry *)de, newclients);
server.stat_active_defrag_scanned++;
}
/* returns 0 more work may or may not be needed (see non-zero cursor),
* and 1 if time is up and more work is needed. */
static int defragLaterItem(dictEntry *de, unsigned long *cursor, monotime endtime, int dbid) {
if (de) {
robj *ob = dictGetVal(de);
if (ob->type == OBJ_LIST) {
return scanLaterList(ob, cursor, endtime);
} else if (ob->type == OBJ_SET) {
scanLaterSet(ob, cursor);
} else if (ob->type == OBJ_ZSET) {
scanLaterZset(ob, cursor);
} else if (ob->type == OBJ_HASH) {
scanLaterHash(ob, cursor);
} else if (ob->type == OBJ_STREAM) {
return scanLaterStreamListpacks(ob, cursor, endtime);
} else if (ob->type == OBJ_MODULE) {
return moduleLateDefrag(dictGetKey(de), ob, cursor, endtime, dbid);
} else {
*cursor = 0; /* object type may have changed since we schedule it for later */
}
} else {
*cursor = 0; /* object may have been deleted already */
}
return 0;
}
// A kvstoreHelperPreContinueFn
static doneStatus defragLaterStep(monotime endtime, void *privdata) {
defragKeysCtx *ctx = privdata;
unsigned int iterations = 0;
unsigned long long prev_defragged = server.stat_active_defrag_hits;
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
while (defrag_later && listLength(defrag_later) > 0) {
listNode *head = listFirst(defrag_later);
sds key = head->value;
dictEntry *de = kvstoreDictFind(ctx->kvstate.kvs, ctx->kvstate.slot, key);
long long key_defragged = server.stat_active_defrag_hits;
bool timeout = (defragLaterItem(de, &defrag_later_cursor, endtime, ctx->db->id) == 1);
if (key_defragged != server.stat_active_defrag_hits) {
server.stat_active_defrag_key_hits++;
} else {
server.stat_active_defrag_key_misses++;
}
if (timeout) break;
if (defrag_later_cursor == 0) {
// the item is finished, move on
listDelNode(defrag_later, head);
}
if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 ||
server.stat_active_defrag_scanned - prev_scanned > 64) {
if (getMonotonicUs() > endtime) break;
iterations = 0;
prev_defragged = server.stat_active_defrag_hits;
prev_scanned = server.stat_active_defrag_scanned;
}
}
return (!defrag_later || listLength(defrag_later) == 0) ? DEFRAG_DONE : DEFRAG_NOT_DONE;
}
/* This helper function handles most of the work for iterating over a kvstore. 'privdata', if
* provided, MUST begin with 'kvstoreIterState' and this part is automatically updated by this
* function during the iteration. */
static doneStatus defragStageKvstoreHelper(monotime endtime,
kvstore *kvs,
dictScanFunction scan_fn,
kvstoreHelperPreContinueFn precontinue_fn,
const dictDefragFunctions *defragfns,
void *privdata) {
static kvstoreIterState state; // STATIC - this persists
if (endtime == 0) {
// Starting the stage, set up the state information for this stage
state.kvs = kvs;
state.slot = KVS_SLOT_DEFRAG_LUT;
state.cursor = 0;
return DEFRAG_NOT_DONE;
}
serverAssert(kvs == state.kvs); // Shouldn't change during the stage
unsigned int iterations = 0;
unsigned long long prev_defragged = server.stat_active_defrag_hits;
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
if (state.slot == KVS_SLOT_DEFRAG_LUT) {
// Before we start scanning the kvstore, handle the main structures
do {
state.cursor = kvstoreDictLUTDefrag(kvs, state.cursor, dictDefragTables);
if (getMonotonicUs() >= endtime) return DEFRAG_NOT_DONE;
} while (state.cursor != 0);
state.slot = KVS_SLOT_UNASSIGNED;
}
while (true) {
if (++iterations > 16 || server.stat_active_defrag_hits - prev_defragged > 512 || server.stat_active_defrag_scanned - prev_scanned > 64) {
if (getMonotonicUs() >= endtime) break;
iterations = 0;
prev_defragged = server.stat_active_defrag_hits;
prev_scanned = server.stat_active_defrag_scanned;
}
if (precontinue_fn) {
if (privdata) *(kvstoreIterState *)privdata = state;
if (precontinue_fn(endtime, privdata) == DEFRAG_NOT_DONE) return DEFRAG_NOT_DONE;
}
if (!state.cursor) {
// If there's no cursor, we're ready to begin a new kvstore slot.
if (state.slot == KVS_SLOT_UNASSIGNED) {
state.slot = kvstoreGetFirstNonEmptyDictIndex(kvs);
} else {
state.slot = kvstoreGetNextNonEmptyDictIndex(kvs, state.slot);
}
if (state.slot == KVS_SLOT_UNASSIGNED) return DEFRAG_DONE;
}
// Whatever privdata's actual type, this function requires that it begins with kvstoreIterState.
if (privdata) *(kvstoreIterState *)privdata = state;
state.cursor = kvstoreDictScanDefrag(kvs, state.slot, state.cursor,
scan_fn, defragfns, privdata);
}
return DEFRAG_NOT_DONE;
}
// Note: target is a DB, (not a KVS like most stages)
static doneStatus defragStageDbKeys(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
serverDb *db = (serverDb *)target;
static defragKeysCtx ctx; // STATIC - this persists
if (endtime == 0) {
ctx.db = db;
// Don't return yet. Call the helper with endtime==0 below.
}
serverAssert(ctx.db == db);
/* Note: for DB keys, we use the start/finish callback to fix an expires table entry if
* the main DB entry has been moved. */
static const dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
.defragKey = NULL, // Handled by dbKeysScanCallback
.defragVal = NULL, // Handled by dbKeysScanCallback
.defragEntryStartCb = defragEntryStartCbForKeys,
.defragEntryFinishCb = defragEntryFinishCbForKeys};
return defragStageKvstoreHelper(endtime, db->keys,
dbKeysScanCallback, defragLaterStep, &defragfns, &ctx);
}
static doneStatus defragStageExpiresKvstore(monotime endtime, void *target, void *privdata) {
UNUSED(privdata);
static const dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
.defragKey = NULL, // Not needed for expires (just a ref)
.defragVal = NULL, // Not needed for expires (no value)
};
return defragStageKvstoreHelper(endtime, (kvstore *)target,
scanCallbackCountScanned, NULL, &defragfns, NULL);
}
static doneStatus defragStagePubsubKvstore(monotime endtime, void *target, void *privdata) {
// target is server.pubsub_channels or server.pubsubshard_channels
getClientChannelsFnWrapper *fnWrapper = privdata;
static const dictDefragFunctions defragfns = {
.defragAlloc = activeDefragAlloc,
.defragKey = NULL, // Handled by defragPubsubScanCallback
.defragVal = NULL, // Not needed for expires (no value)
};
defragPubSubCtx ctx;
ctx.getPubSubChannels = fnWrapper->fn;
return defragStageKvstoreHelper(endtime, (kvstore *)target,
defragPubsubScanCallback, NULL, &defragfns, &ctx);
}
static doneStatus defragLuaScripts(monotime endtime, void *target, void *privdata) {
UNUSED(target);
UNUSED(privdata);
if (endtime == 0) return DEFRAG_NOT_DONE; // required initialization
activeDefragSdsDict(evalScriptsDict(), DEFRAG_SDS_DICT_VAL_LUA_SCRIPT);
return DEFRAG_DONE;
}
static doneStatus defragModuleGlobals(monotime endtime, void *target, void *privdata) {
UNUSED(target);
UNUSED(privdata);
if (endtime == 0) return DEFRAG_NOT_DONE; // required initialization
moduleDefragGlobals();
return DEFRAG_DONE;
}
static bool defragIsRunning(void) {
return (defrag.timeproc_id > 0);
}
static void addDefragStage(defragStageFn stage_fn, void *target, void *privdata) {
StageDescriptor *stage = zmalloc(sizeof(StageDescriptor));
stage->stage_fn = stage_fn;
stage->target = target;
stage->privdata = privdata;
listAddNodeTail(defrag.remaining_stages, stage);
}
// Called at the end of a complete defrag cycle, or when defrag is terminated
static void endDefragCycle(bool normal_termination) {
if (normal_termination) {
// For normal termination, we expect...
serverAssert(!defrag.current_stage);
serverAssert(listLength(defrag.remaining_stages) == 0);
serverAssert(!defrag_later || listLength(defrag_later) == 0);
} else {
// Defrag is being terminated abnormally
aeDeleteTimeEvent(server.el, defrag.timeproc_id);
if (defrag.current_stage) {
zfree(defrag.current_stage);
defrag.current_stage = NULL;
}
listSetFreeMethod(defrag.remaining_stages, zfree);
}
defrag.timeproc_id = AE_DELETED_EVENT_ID;
listRelease(defrag.remaining_stages);
defrag.remaining_stages = NULL;
if (defrag_later) {
listRelease(defrag_later);
defrag_later = NULL;
}
defrag_later_cursor = 0;
size_t frag_bytes;
float frag_pct = getAllocatorFragmentation(&frag_bytes);
serverLog(LL_VERBOSE, "Active defrag done in %dms, reallocated=%d, frag=%.0f%%, frag_bytes=%zu",
(int)elapsedMs(defrag.start_cycle), (int)(server.stat_active_defrag_hits - defrag.start_defrag_hits),
frag_pct, frag_bytes);
server.stat_total_active_defrag_time += elapsedUs(server.stat_last_active_defrag_time);
server.stat_last_active_defrag_time = 0;
server.active_defrag_cpu_percent = 0;
}
/* Must be called at the start of the timeProc as it measures the delay from the end of the previous
* timeProc invocation when performing the computation. */
static int computeDefragCycleUs(void) {
long dutyCycleUs;
int targetCpuPercent = server.active_defrag_cpu_percent;
serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100);
static int prevCpuPercent = 0; // STATIC - this persists
if (targetCpuPercent != prevCpuPercent) {
/* If the targetCpuPercent changes, the value might be different from when the last wait
* time was computed. In this case, don't consider wait time. (This is really only an
* issue in crazy tests that dramatically increase CPU while defrag is running.) */
defrag.timeproc_end_time = 0;
prevCpuPercent = targetCpuPercent;
}
// Given when the last duty cycle ended, compute time needed to achieve the desired percentage.
if (defrag.timeproc_end_time == 0) {
// Either the first call to the timeProc, or we were paused for some reason.
defrag.timeproc_overage_us = 0;
dutyCycleUs = server.active_defrag_cycle_us;
} else {
long waitedUs = getMonotonicUs() - defrag.timeproc_end_time;
/* Given the elapsed wait time between calls, compute the necessary duty time needed to
* achieve the desired CPU percentage.
* With: D = duty time, W = wait time, P = percent
* Solve: D P
* ----- = -----
* D + W 100
* Solving for D:
* D = P * W / (100 - P)
*
* Note that dutyCycleUs addresses starvation. If the wait time was long, we will compensate
* with a proportionately long duty-cycle. This won't significantly affect perceived
* latency, because clients are already being impacted by the long cycle time which caused
* the starvation of the timer. */
dutyCycleUs = targetCpuPercent * waitedUs / (100 - targetCpuPercent);
// Also adjust for any accumulated overage(underage).
dutyCycleUs -= defrag.timeproc_overage_us;
defrag.timeproc_overage_us = 0;
if (dutyCycleUs < server.active_defrag_cycle_us) {
/* We never reduce our cycle time, that would increase overhead. Instead, we track this
* as part of the overage, and increase wait time between cycles. */
defrag.timeproc_overage_us = server.active_defrag_cycle_us - dutyCycleUs;
dutyCycleUs = server.active_defrag_cycle_us;
}
}
return dutyCycleUs;
}
/* Must be called at the end of the timeProc as it records the timeproc_end_time for use in the next
* computeDefragCycleUs computation. */
static int computeDelayMs(monotime intendedEndtime) {
defrag.timeproc_end_time = getMonotonicUs();
int overage = defrag.timeproc_end_time - intendedEndtime;
defrag.timeproc_overage_us += overage; // track over/under desired CPU
int targetCpuPercent = server.active_defrag_cpu_percent;
serverAssert(targetCpuPercent > 0 && targetCpuPercent < 100);
// Given the desired duty cycle, what inter-cycle delay do we need to achieve that?
// We want to achieve a specific CPU percent. To do that, we can't use a skewed computation.
// Example, if we run for 1ms and delay 10ms, that's NOT 10%, because the total cycle time is 11ms.
// Instead, if we rum for 1ms, our total time should be 10ms. So the delay is only 9ms.
long totalCycleTimeUs = server.active_defrag_cycle_us * 100 / targetCpuPercent;
long delayUs = totalCycleTimeUs - server.active_defrag_cycle_us;
// Only increase delay by the fraction of the overage that would be non-duty-cycle
delayUs += defrag.timeproc_overage_us * (100 - targetCpuPercent) / 100; // "overage" might be negative
if (delayUs < 0) delayUs = 0;
long delayMs = delayUs / 1000; // round down
return delayMs;
}
/* An independent time proc for defrag. While defrag is running, this is called much more often
* than the server cron. Frequent short calls provides low latency impact. */
static long long activeDefragTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData) {
UNUSED(eventLoop);
UNUSED(id);
UNUSED(clientData);
// This timer shouldn't be registered unless there's work to do.
serverAssert(defrag.current_stage || listLength(defrag.remaining_stages) > 0);
if (!server.active_defrag_enabled) {
// Defrag has been disabled while running
endDefragCycle(false);
return AE_NOMORE;
}
if (hasActiveChildProcess()) {
// If there's a child process, pause the defrag, polling until the child completes.
defrag.timeproc_end_time = 0; // prevent starvation recovery
return 100;
}
monotime starttime = getMonotonicUs();
monotime endtime = starttime + computeDefragCycleUs();
mstime_t latency;
latencyStartMonitor(latency);
if (!defrag.current_stage) {
defrag.current_stage = listNodeValue(listFirst(defrag.remaining_stages));
listDelNode(defrag.remaining_stages, listFirst(defrag.remaining_stages));
// Initialize the stage with endtime==0
doneStatus status = defrag.current_stage->stage_fn(0, defrag.current_stage->target, defrag.current_stage->privdata);
serverAssert(status == DEFRAG_NOT_DONE); // Initialization should always return DEFRAG_NOT_DONE
}
doneStatus status = defrag.current_stage->stage_fn(endtime, defrag.current_stage->target, defrag.current_stage->privdata);
if (status == DEFRAG_DONE) {
zfree(defrag.current_stage);
defrag.current_stage = NULL;
}
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("active-defrag-cycle", latency);
if (defrag.current_stage || listLength(defrag.remaining_stages) > 0) {
return computeDelayMs(endtime);
} else {
endDefragCycle(true);
return AE_NOMORE; // Ends the timer proc
}
}
/* During long running scripts, or while loading, there is a periodic function for handling other
* actions. This interface allows defrag to continue running, avoiding a single long defrag step
* after the long operation completes. */
void defragWhileBlocked(void) {
if (!defragIsRunning()) return;
// Save off the timeproc_id. If we have a normal termination, it will be cleared.
long long timeproc_id = defrag.timeproc_id;
// Simulate a single call of the timer proc
long long reschedule_delay = activeDefragTimeProc(NULL, 0, NULL);
if (reschedule_delay == AE_NOMORE) {
// If it's done, deregister the timer
aeDeleteTimeEvent(server.el, timeproc_id);
}
/* Otherwise, just ignore the reschedule_delay, the timer will pop the next time that the
* event loop can process timers again. */
}
static void beginDefragCycle(void) {
serverAssert(!defragIsRunning());
serverAssert(defrag.remaining_stages == NULL);
defrag.remaining_stages = listCreate();
for (int dbid = 0; dbid < server.dbnum; dbid++) {
serverDb *db = &server.db[dbid];
addDefragStage(defragStageDbKeys, db, NULL);
addDefragStage(defragStageExpiresKvstore, db->expires, NULL);
}
static getClientChannelsFnWrapper getClientPubSubChannelsFn = {getClientPubSubChannels};
static getClientChannelsFnWrapper getClientPubSubShardChannelsFn = {getClientPubSubShardChannels};
addDefragStage(defragStagePubsubKvstore, server.pubsub_channels, &getClientPubSubChannelsFn);
addDefragStage(defragStagePubsubKvstore, server.pubsubshard_channels, &getClientPubSubShardChannelsFn);
addDefragStage(defragLuaScripts, NULL, NULL);
addDefragStage(defragModuleGlobals, NULL, NULL);
defrag.current_stage = NULL;
defrag.start_cycle = getMonotonicUs();
defrag.start_defrag_hits = server.stat_active_defrag_hits;
defrag.timeproc_end_time = 0;
defrag.timeproc_overage_us = 0;
defrag.timeproc_id = aeCreateTimeEvent(server.el, 0, activeDefragTimeProc, NULL, NULL);
elapsedStart(&server.stat_last_active_defrag_time);
}
#define INTERPOLATE(x, x1, x2, y1, y2) ((y1) + ((x) - (x1)) * ((y2) - (y1)) / ((x2) - (x1)))
#define LIMIT(y, min, max) ((y) < (min) ? min : ((y) > (max) ? max : (y)))
/* decide if defrag is needed, and at what CPU effort to invest in it */
static void updateDefragCpuPercent(void) {
size_t frag_bytes;
float frag_pct = getAllocatorFragmentation(&frag_bytes);
if (server.active_defrag_cpu_percent == 0) {
if (frag_pct < server.active_defrag_threshold_lower ||
frag_bytes < server.active_defrag_ignore_bytes) return;
}
/* Calculate the adaptive aggressiveness of the defrag based on the current
* fragmentation and configurations. */
int cpu_pct = INTERPOLATE(frag_pct, server.active_defrag_threshold_lower, server.active_defrag_threshold_upper,
server.active_defrag_cpu_min, server.active_defrag_cpu_max);
cpu_pct = LIMIT(cpu_pct, server.active_defrag_cpu_min, server.active_defrag_cpu_max);
/* Normally we allow increasing the aggressiveness during a scan, but don't
* reduce it, since we should not lower the aggressiveness when fragmentation
* drops. But when a configuration is made, we should reconsider it. */
if (cpu_pct > server.active_defrag_cpu_percent || server.active_defrag_configuration_changed) {
server.active_defrag_configuration_changed = 0;
if (defragIsRunning()) {
serverLog(LL_VERBOSE, "Changing active defrag CPU, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
frag_pct, frag_bytes, cpu_pct);
} else {
serverLog(LL_VERBOSE, "Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
frag_pct, frag_bytes, cpu_pct);
}
server.active_defrag_cpu_percent = cpu_pct;
}
}
void monitorActiveDefrag(void) {
if (!server.active_defrag_enabled) return;
/* Defrag gets paused while a child process is active. So there's no point in starting a new
* cycle or adjusting the CPU percentage for an existing cycle. */
if (hasActiveChildProcess()) return;
updateDefragCpuPercent();
if (server.active_defrag_cpu_percent > 0 && !defragIsRunning()) beginDefragCycle();
}
#else /* HAVE_DEFRAG */
void monitorActiveDefrag(void) {
/* Not implemented yet. */
}
void *activeDefragAlloc(void *ptr) {
UNUSED(ptr);
return NULL;
}
robj *activeDefragStringOb(robj *ob) {
UNUSED(ob);
return NULL;
}
void defragWhileBlocked(void) {
}
#endif