Merge pull request #4691 from oranagra/active_defrag_v2
Active defrag v2
This commit is contained in:
commit
da621783f0
@ -1312,8 +1312,12 @@ aof-rewrite-incremental-fsync yes
|
||||
# active-defrag-threshold-upper 100
|
||||
|
||||
# Minimal effort for defrag in CPU percentage
|
||||
# active-defrag-cycle-min 25
|
||||
# active-defrag-cycle-min 5
|
||||
|
||||
# Maximal effort for defrag in CPU percentage
|
||||
# active-defrag-cycle-max 75
|
||||
|
||||
# Maximum number of set/hash/zset/list fields that will be processed from
|
||||
# the main dictionary scan
|
||||
# active-defrag-max-scan-fields 1000
|
||||
|
||||
|
10
src/config.c
10
src/config.c
@ -537,6 +537,12 @@ void loadServerConfigFromString(char *config) {
|
||||
err = "active-defrag-cycle-max must be between 1 and 99";
|
||||
goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"active-defrag-max-scan-fields") && argc == 2) {
|
||||
server.active_defrag_max_scan_fields = strtoll(argv[1],NULL,10);
|
||||
if (server.active_defrag_max_scan_fields < 1) {
|
||||
err = "active-defrag-max-scan-fields must be positive";
|
||||
goto loaderr;
|
||||
}
|
||||
} else if (!strcasecmp(argv[0],"hash-max-ziplist-entries") && argc == 2) {
|
||||
server.hash_max_ziplist_entries = memtoll(argv[1], NULL);
|
||||
} else if (!strcasecmp(argv[0],"hash-max-ziplist-value") && argc == 2) {
|
||||
@ -1068,6 +1074,8 @@ void configSetCommand(client *c) {
|
||||
"active-defrag-cycle-min",server.active_defrag_cycle_min,1,99) {
|
||||
} config_set_numerical_field(
|
||||
"active-defrag-cycle-max",server.active_defrag_cycle_max,1,99) {
|
||||
} config_set_numerical_field(
|
||||
"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,1,LLONG_MAX) {
|
||||
} config_set_numerical_field(
|
||||
"auto-aof-rewrite-percentage",server.aof_rewrite_perc,0,LLONG_MAX){
|
||||
} config_set_numerical_field(
|
||||
@ -1249,6 +1257,7 @@ void configGetCommand(client *c) {
|
||||
config_get_numerical_field("active-defrag-ignore-bytes",server.active_defrag_ignore_bytes);
|
||||
config_get_numerical_field("active-defrag-cycle-min",server.active_defrag_cycle_min);
|
||||
config_get_numerical_field("active-defrag-cycle-max",server.active_defrag_cycle_max);
|
||||
config_get_numerical_field("active-defrag-max-scan-fields",server.active_defrag_max_scan_fields);
|
||||
config_get_numerical_field("auto-aof-rewrite-percentage",
|
||||
server.aof_rewrite_perc);
|
||||
config_get_numerical_field("auto-aof-rewrite-min-size",
|
||||
@ -2025,6 +2034,7 @@ int rewriteConfig(char *path) {
|
||||
rewriteConfigBytesOption(state,"active-defrag-ignore-bytes",server.active_defrag_ignore_bytes,CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES);
|
||||
rewriteConfigNumericalOption(state,"active-defrag-cycle-min",server.active_defrag_cycle_min,CONFIG_DEFAULT_DEFRAG_CYCLE_MIN);
|
||||
rewriteConfigNumericalOption(state,"active-defrag-cycle-max",server.active_defrag_cycle_max,CONFIG_DEFAULT_DEFRAG_CYCLE_MAX);
|
||||
rewriteConfigNumericalOption(state,"active-defrag-max-scan-fields",server.active_defrag_max_scan_fields,CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS);
|
||||
rewriteConfigYesNoOption(state,"appendonly",server.aof_state != AOF_OFF,0);
|
||||
rewriteConfigStringOption(state,"appendfilename",server.aof_filename,CONFIG_DEFAULT_AOF_FILENAME);
|
||||
rewriteConfigEnumOption(state,"appendfsync",server.aof_fsync,aof_fsync_enum,CONFIG_DEFAULT_AOF_FSYNC);
|
||||
|
674
src/defrag.c
674
src/defrag.c
@ -45,6 +45,10 @@
|
||||
* pointers are worthwhile moving and which aren't */
|
||||
int je_get_defrag_hint(void* ptr, int *bin_util, int *run_util);
|
||||
|
||||
/* forward declarations*/
|
||||
void defragDictBucketCallback(void *privdata, dictEntry **bucketref);
|
||||
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged);
|
||||
|
||||
/* Defrag helper for generic allocations.
|
||||
*
|
||||
* returns NULL in case the allocatoin wasn't moved.
|
||||
@ -96,7 +100,7 @@ sds activeDefragSds(sds sdsptr) {
|
||||
* returns NULL in case the allocatoin wasn't moved.
|
||||
* when it returns a non-null value, the old pointer was already released
|
||||
* and should NOT be accessed. */
|
||||
robj *activeDefragStringOb(robj* ob, int *defragged) {
|
||||
robj *activeDefragStringOb(robj* ob, long *defragged) {
|
||||
robj *ret = NULL;
|
||||
if (ob->refcount!=1)
|
||||
return NULL;
|
||||
@ -134,11 +138,11 @@ robj *activeDefragStringOb(robj* ob, int *defragged) {
|
||||
|
||||
/* Defrag helper for dictEntries to be used during dict iteration (called on
|
||||
* each step). Teturns a stat of how many pointers were moved. */
|
||||
int dictIterDefragEntry(dictIterator *iter) {
|
||||
long dictIterDefragEntry(dictIterator *iter) {
|
||||
/* This function is a little bit dirty since it messes with the internals
|
||||
* of the dict and it's iterator, but the benefit is that it is very easy
|
||||
* to use, and require no other chagnes in the dict. */
|
||||
int defragged = 0;
|
||||
long defragged = 0;
|
||||
dictht *ht;
|
||||
/* Handle the next entry (if there is one), and update the pointer in the
|
||||
* current entry. */
|
||||
@ -166,14 +170,9 @@ int dictIterDefragEntry(dictIterator *iter) {
|
||||
/* Defrag helper for dict main allocations (dict struct, and hash tables).
|
||||
* receives a pointer to the dict* and implicitly updates it when the dict
|
||||
* struct itself was moved. Returns a stat of how many pointers were moved. */
|
||||
int dictDefragTables(dict** dictRef) {
|
||||
dict *d = *dictRef;
|
||||
long dictDefragTables(dict* d) {
|
||||
dictEntry **newtable;
|
||||
int defragged = 0;
|
||||
/* handle the dict struct */
|
||||
dict *newd = activeDefragAlloc(d);
|
||||
if (newd)
|
||||
defragged++, *dictRef = d = newd;
|
||||
long defragged = 0;
|
||||
/* handle the first hash table */
|
||||
newtable = activeDefragAlloc(d->ht[0].table);
|
||||
if (newtable)
|
||||
@ -246,6 +245,146 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* Defrag helpler for sorted set.
|
||||
* Defrag a single dict entry key name, and corresponding skiplist struct */
|
||||
long activeDefragZsetEntry(zset *zs, dictEntry *de) {
|
||||
sds newsds;
|
||||
double* newscore;
|
||||
long defragged = 0;
|
||||
sds sdsele = dictGetKey(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
defragged++, de->key = newsds;
|
||||
newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
|
||||
if (newscore) {
|
||||
dictSetVal(zs->dict, de, newscore);
|
||||
defragged++;
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
#define DEFRAG_SDS_DICT_NO_VAL 0
|
||||
#define DEFRAG_SDS_DICT_VAL_IS_SDS 1
|
||||
#define DEFRAG_SDS_DICT_VAL_IS_STROB 2
|
||||
#define DEFRAG_SDS_DICT_VAL_VOID_PTR 3
|
||||
|
||||
/* Defrag a dict with sds key and optional value (either ptr, sds or robj string) */
|
||||
long activeDefragSdsDict(dict* d, int val_type) {
|
||||
dictIterator *di;
|
||||
dictEntry *de;
|
||||
long defragged = 0;
|
||||
di = dictGetIterator(d);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
sds sdsele = dictGetKey(de), newsds;
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
de->key = newsds, defragged++;
|
||||
/* defrag the value */
|
||||
if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
|
||||
sdsele = dictGetVal(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
de->v.val = newsds, defragged++;
|
||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
|
||||
robj *newele, *ele = dictGetVal(de);
|
||||
if ((newele = activeDefragStringOb(ele, &defragged)))
|
||||
de->v.val = newele;
|
||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
|
||||
void *newptr, *ptr = dictGetVal(de);
|
||||
if ((newptr = activeDefragAlloc(ptr)))
|
||||
de->v.val = newptr, defragged++;
|
||||
}
|
||||
defragged += dictIterDefragEntry(di);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* Defrag a list of ptr, sds or robj string values */
|
||||
long activeDefragList(list *l, int val_type) {
|
||||
long defragged = 0;
|
||||
listNode *ln, *newln;
|
||||
for (ln = l->head; ln; ln = ln->next) {
|
||||
if ((newln = activeDefragAlloc(ln))) {
|
||||
if (newln->prev)
|
||||
newln->prev->next = newln;
|
||||
else
|
||||
l->head = newln;
|
||||
if (newln->next)
|
||||
newln->next->prev = newln;
|
||||
else
|
||||
l->tail = newln;
|
||||
ln = newln;
|
||||
defragged++;
|
||||
}
|
||||
if (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
|
||||
sds newsds, sdsele = ln->value;
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
ln->value = newsds, defragged++;
|
||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
|
||||
robj *newele, *ele = ln->value;
|
||||
if ((newele = activeDefragStringOb(ele, &defragged)))
|
||||
ln->value = newele;
|
||||
} else if (val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
|
||||
void *newptr, *ptr = ln->value;
|
||||
if ((newptr = activeDefragAlloc(ptr)))
|
||||
ln->value = newptr, defragged++;
|
||||
}
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* Defrag a list of sds values and a dict with the same sds keys */
|
||||
long activeDefragSdsListAndDict(list *l, dict *d, int dict_val_type) {
|
||||
long defragged = 0;
|
||||
sds newsds, sdsele;
|
||||
listNode *ln, *newln;
|
||||
dictIterator *di;
|
||||
dictEntry *de;
|
||||
/* Defrag the list and it's sds values */
|
||||
for (ln = l->head; ln; ln = ln->next) {
|
||||
if ((newln = activeDefragAlloc(ln))) {
|
||||
if (newln->prev)
|
||||
newln->prev->next = newln;
|
||||
else
|
||||
l->head = newln;
|
||||
if (newln->next)
|
||||
newln->next->prev = newln;
|
||||
else
|
||||
l->tail = newln;
|
||||
ln = newln;
|
||||
defragged++;
|
||||
}
|
||||
sdsele = ln->value;
|
||||
if ((newsds = activeDefragSds(sdsele))) {
|
||||
/* When defragging an sds value, we need to update the dict key */
|
||||
unsigned int hash = dictGetHash(d, sdsele);
|
||||
replaceSateliteDictKeyPtrAndOrDefragDictEntry(d, sdsele, newsds, hash, &defragged);
|
||||
ln->value = newsds;
|
||||
defragged++;
|
||||
}
|
||||
}
|
||||
|
||||
/* Defrag the dict values (keys were already handled) */
|
||||
di = dictGetIterator(d);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_SDS) {
|
||||
sds newsds, sdsele = dictGetVal(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
de->v.val = newsds, defragged++;
|
||||
} else if (dict_val_type == DEFRAG_SDS_DICT_VAL_IS_STROB) {
|
||||
robj *newele, *ele = dictGetVal(de);
|
||||
if ((newele = activeDefragStringOb(ele, &defragged)))
|
||||
de->v.val = newele, defragged++;
|
||||
} else if (dict_val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR) {
|
||||
void *newptr, *ptr = ln->value;
|
||||
if ((newptr = activeDefragAlloc(ptr)))
|
||||
ln->value = newptr, defragged++;
|
||||
}
|
||||
defragged += dictIterDefragEntry(di);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* Utility function that replaces an old key pointer in the dictionary with a
|
||||
* new pointer. Additionally, we try to defrag the dictEntry in that dict.
|
||||
* Oldkey mey be a dead pointer and should not be accessed (we get a
|
||||
@ -253,7 +392,7 @@ double *zslDefrag(zskiplist *zsl, double score, sds oldele, sds newele) {
|
||||
* moved. Return value is the the dictEntry if found, or NULL if not found.
|
||||
* NOTE: this is very ugly code, but it let's us avoid the complication of
|
||||
* doing a scan on another dict. */
|
||||
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, int *defragged) {
|
||||
dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sds newkey, unsigned int hash, long *defragged) {
|
||||
dictEntry **deref = dictFindEntryRefByPtrAndHash(d, oldkey, hash);
|
||||
if (deref) {
|
||||
dictEntry *de = *deref;
|
||||
@ -269,16 +408,198 @@ dictEntry* replaceSateliteDictKeyPtrAndOrDefragDictEntry(dict *d, sds oldkey, sd
|
||||
return NULL;
|
||||
}
|
||||
|
||||
long activeDefragQuickListNodes(quicklist *ql) {
|
||||
quicklistNode *node = ql->head, *newnode;
|
||||
long defragged = 0;
|
||||
unsigned char *newzl;
|
||||
while (node) {
|
||||
if ((newnode = activeDefragAlloc(node))) {
|
||||
if (newnode->prev)
|
||||
newnode->prev->next = newnode;
|
||||
else
|
||||
ql->head = newnode;
|
||||
if (newnode->next)
|
||||
newnode->next->prev = newnode;
|
||||
else
|
||||
ql->tail = newnode;
|
||||
node = newnode;
|
||||
defragged++;
|
||||
}
|
||||
if ((newzl = activeDefragAlloc(node->zl)))
|
||||
defragged++, node->zl = newzl;
|
||||
node = node->next;
|
||||
}
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* when the value has lots of elements, we want to handle it later and not as
|
||||
* oart of the main dictionary scan. this is needed in order to prevent latency
|
||||
* spikes when handling large items */
|
||||
void defragLater(redisDb *db, dictEntry *kde) {
|
||||
sds key = sdsdup(dictGetKey(kde));
|
||||
listAddNodeTail(db->defrag_later, key);
|
||||
}
|
||||
|
||||
long scanLaterList(robj *ob) {
|
||||
quicklist *ql = ob->ptr;
|
||||
if (ob->type != OBJ_LIST || ob->encoding != OBJ_ENCODING_QUICKLIST)
|
||||
return 0;
|
||||
server.stat_active_defrag_scanned+=ql->len;
|
||||
return activeDefragQuickListNodes(ql);
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
zset *zs;
|
||||
long defragged;
|
||||
} scanLaterZsetData;
|
||||
|
||||
void scanLaterZsetCallback(void *privdata, const dictEntry *_de) {
|
||||
dictEntry *de = (dictEntry*)_de;
|
||||
scanLaterZsetData *data = privdata;
|
||||
data->defragged += activeDefragZsetEntry(data->zs, de);
|
||||
server.stat_active_defrag_scanned++;
|
||||
}
|
||||
|
||||
long scanLaterZset(robj *ob, unsigned long *cursor) {
|
||||
if (ob->type != OBJ_ZSET || ob->encoding != OBJ_ENCODING_SKIPLIST)
|
||||
return 0;
|
||||
zset *zs = (zset*)ob->ptr;
|
||||
dict *d = zs->dict;
|
||||
scanLaterZsetData data = {zs, 0};
|
||||
*cursor = dictScan(d, *cursor, scanLaterZsetCallback, defragDictBucketCallback, &data);
|
||||
return data.defragged;
|
||||
}
|
||||
|
||||
void scanLaterSetCallback(void *privdata, const dictEntry *_de) {
|
||||
dictEntry *de = (dictEntry*)_de;
|
||||
long *defragged = privdata;
|
||||
sds sdsele = dictGetKey(de), newsds;
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
(*defragged)++, de->key = newsds;
|
||||
server.stat_active_defrag_scanned++;
|
||||
}
|
||||
|
||||
long scanLaterSet(robj *ob, unsigned long *cursor) {
|
||||
long defragged = 0;
|
||||
if (ob->type != OBJ_SET || ob->encoding != OBJ_ENCODING_HT)
|
||||
return 0;
|
||||
dict *d = ob->ptr;
|
||||
*cursor = dictScan(d, *cursor, scanLaterSetCallback, defragDictBucketCallback, &defragged);
|
||||
return defragged;
|
||||
}
|
||||
|
||||
void scanLaterHashCallback(void *privdata, const dictEntry *_de) {
|
||||
dictEntry *de = (dictEntry*)_de;
|
||||
long *defragged = privdata;
|
||||
sds sdsele = dictGetKey(de), newsds;
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
(*defragged)++, de->key = newsds;
|
||||
sdsele = dictGetVal(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
(*defragged)++, de->v.val = newsds;
|
||||
server.stat_active_defrag_scanned++;
|
||||
}
|
||||
|
||||
long scanLaterHash(robj *ob, unsigned long *cursor) {
|
||||
long defragged = 0;
|
||||
if (ob->type != OBJ_HASH || ob->encoding != OBJ_ENCODING_HT)
|
||||
return 0;
|
||||
dict *d = ob->ptr;
|
||||
*cursor = dictScan(d, *cursor, scanLaterHashCallback, defragDictBucketCallback, &defragged);
|
||||
return defragged;
|
||||
}
|
||||
|
||||
long defragQuicklist(redisDb *db, dictEntry *kde) {
|
||||
robj *ob = dictGetVal(kde);
|
||||
long defragged = 0;
|
||||
quicklist *ql = ob->ptr, *newql;
|
||||
serverAssert(ob->type == OBJ_LIST && ob->encoding == OBJ_ENCODING_QUICKLIST);
|
||||
if ((newql = activeDefragAlloc(ql)))
|
||||
defragged++, ob->ptr = ql = newql;
|
||||
if (ql->len > server.active_defrag_max_scan_fields)
|
||||
defragLater(db, kde);
|
||||
else
|
||||
defragged += activeDefragQuickListNodes(ql);
|
||||
return defragged;
|
||||
}
|
||||
|
||||
long defragZsetSkiplist(redisDb *db, dictEntry *kde) {
|
||||
robj *ob = dictGetVal(kde);
|
||||
long defragged = 0;
|
||||
zset *zs = (zset*)ob->ptr;
|
||||
zset *newzs;
|
||||
zskiplist *newzsl;
|
||||
dict *newdict;
|
||||
dictEntry *de;
|
||||
struct zskiplistNode *newheader;
|
||||
serverAssert(ob->type == OBJ_ZSET && ob->encoding == OBJ_ENCODING_SKIPLIST);
|
||||
if ((newzs = activeDefragAlloc(zs)))
|
||||
defragged++, ob->ptr = zs = newzs;
|
||||
if ((newzsl = activeDefragAlloc(zs->zsl)))
|
||||
defragged++, zs->zsl = newzsl;
|
||||
if ((newheader = activeDefragAlloc(zs->zsl->header)))
|
||||
defragged++, zs->zsl->header = newheader;
|
||||
if (dictSize(zs->dict) > server.active_defrag_max_scan_fields)
|
||||
defragLater(db, kde);
|
||||
else {
|
||||
dictIterator *di = dictGetIterator(zs->dict);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
defragged += activeDefragZsetEntry(zs, de);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
}
|
||||
/* handle the dict struct */
|
||||
if ((newdict = activeDefragAlloc(zs->dict)))
|
||||
defragged++, zs->dict = newdict;
|
||||
/* defrag the dict tables */
|
||||
defragged += dictDefragTables(zs->dict);
|
||||
return defragged;
|
||||
}
|
||||
|
||||
long defragHash(redisDb *db, dictEntry *kde) {
|
||||
long defragged = 0;
|
||||
robj *ob = dictGetVal(kde);
|
||||
dict *d, *newd;
|
||||
serverAssert(ob->type == OBJ_HASH && ob->encoding == OBJ_ENCODING_HT);
|
||||
d = ob->ptr;
|
||||
if (dictSize(d) > server.active_defrag_max_scan_fields)
|
||||
defragLater(db, kde);
|
||||
else
|
||||
defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_VAL_IS_SDS);
|
||||
/* handle the dict struct */
|
||||
if ((newd = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newd;
|
||||
/* defrag the dict tables */
|
||||
defragged += dictDefragTables(ob->ptr);
|
||||
return defragged;
|
||||
}
|
||||
|
||||
long defragSet(redisDb *db, dictEntry *kde) {
|
||||
long defragged = 0;
|
||||
robj *ob = dictGetVal(kde);
|
||||
dict *d, *newd;
|
||||
serverAssert(ob->type == OBJ_SET && ob->encoding == OBJ_ENCODING_HT);
|
||||
d = ob->ptr;
|
||||
if (dictSize(d) > server.active_defrag_max_scan_fields)
|
||||
defragLater(db, kde);
|
||||
else
|
||||
defragged += activeDefragSdsDict(d, DEFRAG_SDS_DICT_NO_VAL);
|
||||
/* handle the dict struct */
|
||||
if ((newd = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newd;
|
||||
/* defrag the dict tables */
|
||||
defragged += dictDefragTables(ob->ptr);
|
||||
return defragged;
|
||||
}
|
||||
|
||||
/* for each key we scan in the main dict, this function will attempt to defrag
|
||||
* all the various pointers it has. Returns a stat of how many pointers were
|
||||
* moved. */
|
||||
int defragKey(redisDb *db, dictEntry *de) {
|
||||
long defragKey(redisDb *db, dictEntry *de) {
|
||||
sds keysds = dictGetKey(de);
|
||||
robj *newob, *ob;
|
||||
unsigned char *newzl;
|
||||
dict *d;
|
||||
dictIterator *di;
|
||||
int defragged = 0;
|
||||
long defragged = 0;
|
||||
sds newsds;
|
||||
|
||||
/* Try to defrag the key name. */
|
||||
@ -304,27 +625,7 @@ int defragKey(redisDb *db, dictEntry *de) {
|
||||
/* Already handled in activeDefragStringOb. */
|
||||
} else if (ob->type == OBJ_LIST) {
|
||||
if (ob->encoding == OBJ_ENCODING_QUICKLIST) {
|
||||
quicklist *ql = ob->ptr, *newql;
|
||||
quicklistNode *node = ql->head, *newnode;
|
||||
if ((newql = activeDefragAlloc(ql)))
|
||||
defragged++, ob->ptr = ql = newql;
|
||||
while (node) {
|
||||
if ((newnode = activeDefragAlloc(node))) {
|
||||
if (newnode->prev)
|
||||
newnode->prev->next = newnode;
|
||||
else
|
||||
ql->head = newnode;
|
||||
if (newnode->next)
|
||||
newnode->next->prev = newnode;
|
||||
else
|
||||
ql->tail = newnode;
|
||||
node = newnode;
|
||||
defragged++;
|
||||
}
|
||||
if ((newzl = activeDefragAlloc(node->zl)))
|
||||
defragged++, node->zl = newzl;
|
||||
node = node->next;
|
||||
}
|
||||
defragged += defragQuicklist(db, de);
|
||||
} else if (ob->encoding == OBJ_ENCODING_ZIPLIST) {
|
||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newzl;
|
||||
@ -333,20 +634,10 @@ int defragKey(redisDb *db, dictEntry *de) {
|
||||
}
|
||||
} else if (ob->type == OBJ_SET) {
|
||||
if (ob->encoding == OBJ_ENCODING_HT) {
|
||||
d = ob->ptr;
|
||||
di = dictGetIterator(d);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
sds sdsele = dictGetKey(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
defragged++, de->key = newsds;
|
||||
defragged += dictIterDefragEntry(di);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
dictDefragTables((dict**)&ob->ptr);
|
||||
defragged += defragSet(db, de);
|
||||
} else if (ob->encoding == OBJ_ENCODING_INTSET) {
|
||||
intset *is = ob->ptr;
|
||||
intset *newis = activeDefragAlloc(is);
|
||||
if (newis)
|
||||
intset *newis, *is = ob->ptr;
|
||||
if ((newis = activeDefragAlloc(is)))
|
||||
defragged++, ob->ptr = newis;
|
||||
} else {
|
||||
serverPanic("Unknown set encoding");
|
||||
@ -356,32 +647,7 @@ int defragKey(redisDb *db, dictEntry *de) {
|
||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newzl;
|
||||
} else if (ob->encoding == OBJ_ENCODING_SKIPLIST) {
|
||||
zset *zs = (zset*)ob->ptr;
|
||||
zset *newzs;
|
||||
zskiplist *newzsl;
|
||||
struct zskiplistNode *newheader;
|
||||
if ((newzs = activeDefragAlloc(zs)))
|
||||
defragged++, ob->ptr = zs = newzs;
|
||||
if ((newzsl = activeDefragAlloc(zs->zsl)))
|
||||
defragged++, zs->zsl = newzsl;
|
||||
if ((newheader = activeDefragAlloc(zs->zsl->header)))
|
||||
defragged++, zs->zsl->header = newheader;
|
||||
d = zs->dict;
|
||||
di = dictGetIterator(d);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
double* newscore;
|
||||
sds sdsele = dictGetKey(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
defragged++, de->key = newsds;
|
||||
newscore = zslDefrag(zs->zsl, *(double*)dictGetVal(de), sdsele, newsds);
|
||||
if (newscore) {
|
||||
dictSetVal(d, de, newscore);
|
||||
defragged++;
|
||||
}
|
||||
defragged += dictIterDefragEntry(di);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
dictDefragTables(&zs->dict);
|
||||
defragged += defragZsetSkiplist(db, de);
|
||||
} else {
|
||||
serverPanic("Unknown sorted set encoding");
|
||||
}
|
||||
@ -390,19 +656,7 @@ int defragKey(redisDb *db, dictEntry *de) {
|
||||
if ((newzl = activeDefragAlloc(ob->ptr)))
|
||||
defragged++, ob->ptr = newzl;
|
||||
} else if (ob->encoding == OBJ_ENCODING_HT) {
|
||||
d = ob->ptr;
|
||||
di = dictGetIterator(d);
|
||||
while((de = dictNext(di)) != NULL) {
|
||||
sds sdsele = dictGetKey(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
defragged++, de->key = newsds;
|
||||
sdsele = dictGetVal(de);
|
||||
if ((newsds = activeDefragSds(sdsele)))
|
||||
defragged++, de->v.val = newsds;
|
||||
defragged += dictIterDefragEntry(di);
|
||||
}
|
||||
dictReleaseIterator(di);
|
||||
dictDefragTables((dict**)&ob->ptr);
|
||||
defragged += defragHash(db, de);
|
||||
} else {
|
||||
serverPanic("Unknown hash encoding");
|
||||
}
|
||||
@ -417,18 +671,19 @@ int defragKey(redisDb *db, dictEntry *de) {
|
||||
|
||||
/* Defrag scan callback for the main db dictionary. */
|
||||
void defragScanCallback(void *privdata, const dictEntry *de) {
|
||||
int defragged = defragKey((redisDb*)privdata, (dictEntry*)de);
|
||||
long defragged = defragKey((redisDb*)privdata, (dictEntry*)de);
|
||||
server.stat_active_defrag_hits += defragged;
|
||||
if(defragged)
|
||||
server.stat_active_defrag_key_hits++;
|
||||
else
|
||||
server.stat_active_defrag_key_misses++;
|
||||
server.stat_active_defrag_scanned++;
|
||||
}
|
||||
|
||||
/* Defrag scan callback for for each hash table bicket,
|
||||
* used in order to defrag the dictEntry allocations. */
|
||||
void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
|
||||
UNUSED(privdata);
|
||||
UNUSED(privdata); /* NOTE: this function is also used by both activeDefragCycle and scanLaterHash, etc. don't use privdata */
|
||||
while(*bucketref) {
|
||||
dictEntry *de = *bucketref, *newde;
|
||||
if ((newde = activeDefragAlloc(de))) {
|
||||
@ -439,24 +694,15 @@ void defragDictBucketCallback(void *privdata, dictEntry **bucketref) {
|
||||
}
|
||||
|
||||
/* Utility function to get the fragmentation ratio from jemalloc.
|
||||
* It is critical to do that by comparing only heap maps that belown to
|
||||
* It is critical to do that by comparing only heap maps that belong to
|
||||
* jemalloc, and skip ones the jemalloc keeps as spare. Since we use this
|
||||
* fragmentation ratio in order to decide if a defrag action should be taken
|
||||
* or not, a false detection can cause the defragmenter to waste a lot of CPU
|
||||
* without the possibility of getting any results. */
|
||||
float getAllocatorFragmentation(size_t *out_frag_bytes) {
|
||||
size_t epoch = 1, allocated = 0, resident = 0, active = 0, sz = sizeof(size_t);
|
||||
/* Update the statistics cached by mallctl. */
|
||||
je_mallctl("epoch", &epoch, &sz, &epoch, sz);
|
||||
/* Unlike RSS, this does not include RSS from shared libraries and other non
|
||||
* heap mappings. */
|
||||
je_mallctl("stats.resident", &resident, &sz, NULL, 0);
|
||||
/* Unlike resident, this doesn't not include the pages jemalloc reserves
|
||||
* for re-use (purge will clean that). */
|
||||
je_mallctl("stats.active", &active, &sz, NULL, 0);
|
||||
/* Unlike zmalloc_used_memory, this matches the stats.resident by taking
|
||||
* into account all allocations done by this process (not only zmalloc). */
|
||||
je_mallctl("stats.allocated", &allocated, &sz, NULL, 0);
|
||||
size_t resident = server.cron_malloc_stats.allocator_resident;
|
||||
size_t active = server.cron_malloc_stats.allocator_active;
|
||||
size_t allocated = server.cron_malloc_stats.allocator_allocated;
|
||||
float frag_pct = ((float)active / allocated)*100 - 100;
|
||||
size_t frag_bytes = active - allocated;
|
||||
float rss_pct = ((float)resident / allocated)*100 - 100;
|
||||
@ -464,14 +710,147 @@ float getAllocatorFragmentation(size_t *out_frag_bytes) {
|
||||
if(out_frag_bytes)
|
||||
*out_frag_bytes = frag_bytes;
|
||||
serverLog(LL_DEBUG,
|
||||
"allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu%% rss)",
|
||||
"allocated=%zu, active=%zu, resident=%zu, frag=%.0f%% (%.0f%% rss), frag_bytes=%zu (%zu rss)",
|
||||
allocated, active, resident, frag_pct, rss_pct, frag_bytes, rss_bytes);
|
||||
return frag_pct;
|
||||
}
|
||||
|
||||
/* We may need to defrag other globals, one small allcation can hold a full allocator run.
|
||||
* so although small, it is still important to defrag these */
|
||||
long defragOtherGlobals() {
|
||||
long defragged = 0;
|
||||
|
||||
/* there are many more pointers to defrag (e.g. client argv, output / aof buffers, etc.
|
||||
* but we assume most of these are short lived, we only need to defrag allocations
|
||||
* that remain static for a long time */
|
||||
defragged += activeDefragSdsDict(server.lua_scripts, DEFRAG_SDS_DICT_VAL_IS_STROB);
|
||||
defragged += activeDefragSdsListAndDict(server.repl_scriptcache_fifo, server.repl_scriptcache_dict, DEFRAG_SDS_DICT_NO_VAL);
|
||||
return defragged;
|
||||
}
|
||||
|
||||
unsigned long defragLaterItem(dictEntry *de, unsigned long cursor) {
|
||||
long defragged = 0;
|
||||
if (de) {
|
||||
robj *ob = dictGetVal(de);
|
||||
if (ob->type == OBJ_LIST) {
|
||||
defragged += scanLaterList(ob);
|
||||
cursor = 0; /* list has no scan, we must finish it in one go */
|
||||
} else if (ob->type == OBJ_SET) {
|
||||
defragged += scanLaterSet(ob, &cursor);
|
||||
} else if (ob->type == OBJ_ZSET) {
|
||||
defragged += scanLaterZset(ob, &cursor);
|
||||
} else if (ob->type == OBJ_HASH) {
|
||||
defragged += scanLaterHash(ob, &cursor);
|
||||
} else {
|
||||
cursor = 0; /* object type may have changed since we schedule it for later */
|
||||
}
|
||||
} else {
|
||||
cursor = 0; /* object may have been deleted already */
|
||||
}
|
||||
server.stat_active_defrag_hits += defragged;
|
||||
return cursor;
|
||||
}
|
||||
|
||||
/* returns 0 if no more work needs to be been done, and 1 if time is up and more work is needed. */
|
||||
int defragLaterStep(redisDb *db, long long endtime) {
|
||||
static sds current_key = NULL;
|
||||
static unsigned long cursor = 0;
|
||||
unsigned int iterations = 0;
|
||||
unsigned long long prev_defragged = server.stat_active_defrag_hits;
|
||||
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
|
||||
long long key_defragged;
|
||||
|
||||
do {
|
||||
/* if we're not continuing a scan from the last call or loop, start a new one */
|
||||
if (!cursor) {
|
||||
listNode *head = listFirst(db->defrag_later);
|
||||
|
||||
/* Move on to next key */
|
||||
if (current_key) {
|
||||
serverAssert(current_key == head->value);
|
||||
sdsfree(head->value);
|
||||
listDelNode(db->defrag_later, head);
|
||||
cursor = 0;
|
||||
current_key = NULL;
|
||||
}
|
||||
|
||||
/* stop if we reached the last one. */
|
||||
head = listFirst(db->defrag_later);
|
||||
if (!head)
|
||||
return 0;
|
||||
|
||||
/* start a new key */
|
||||
current_key = head->value;
|
||||
cursor = 0;
|
||||
}
|
||||
|
||||
/* each time we enter this function we need to fetch the key from the dict again (if it still exists) */
|
||||
dictEntry *de = dictFind(db->dict, current_key);
|
||||
key_defragged = server.stat_active_defrag_hits;
|
||||
do {
|
||||
cursor = defragLaterItem(de, cursor);
|
||||
|
||||
/* Once in 16 scan iterations, 512 pointer reallocations, or 64 fields
|
||||
* (if we have a lot of pointers in one hash bucket, or rehashing),
|
||||
* check if we reached the time limit.
|
||||
* But regardless, don't start a new BIG key in this loop, this is because the
|
||||
* next key can be a list, and scanLaterList must be done in once cycle */
|
||||
if (!cursor || (++iterations > 16 ||
|
||||
server.stat_active_defrag_hits - prev_defragged > 512 ||
|
||||
server.stat_active_defrag_scanned - prev_scanned > 64)) {
|
||||
if (!cursor || ustime() > endtime) {
|
||||
if(key_defragged != server.stat_active_defrag_hits)
|
||||
server.stat_active_defrag_key_hits++;
|
||||
else
|
||||
server.stat_active_defrag_key_misses++;
|
||||
return 1;
|
||||
}
|
||||
iterations = 0;
|
||||
prev_defragged = server.stat_active_defrag_hits;
|
||||
prev_scanned = server.stat_active_defrag_scanned;
|
||||
}
|
||||
} while(cursor);
|
||||
if(key_defragged != server.stat_active_defrag_hits)
|
||||
server.stat_active_defrag_key_hits++;
|
||||
else
|
||||
server.stat_active_defrag_key_misses++;
|
||||
} while(1);
|
||||
}
|
||||
|
||||
#define INTERPOLATE(x, x1, x2, y1, y2) ( (y1) + ((x)-(x1)) * ((y2)-(y1)) / ((x2)-(x1)) )
|
||||
#define LIMIT(y, min, max) ((y)<(min)? min: ((y)>(max)? max: (y)))
|
||||
|
||||
/* decide if defrag is needed, and at what CPU effort to invest in it */
|
||||
void computeDefragCycles() {
|
||||
size_t frag_bytes;
|
||||
float frag_pct = getAllocatorFragmentation(&frag_bytes);
|
||||
/* If we're not already running, and below the threshold, exit. */
|
||||
if (!server.active_defrag_running) {
|
||||
if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes)
|
||||
return;
|
||||
}
|
||||
|
||||
/* Calculate the adaptive aggressiveness of the defrag */
|
||||
int cpu_pct = INTERPOLATE(frag_pct,
|
||||
server.active_defrag_threshold_lower,
|
||||
server.active_defrag_threshold_upper,
|
||||
server.active_defrag_cycle_min,
|
||||
server.active_defrag_cycle_max);
|
||||
cpu_pct = LIMIT(cpu_pct,
|
||||
server.active_defrag_cycle_min,
|
||||
server.active_defrag_cycle_max);
|
||||
/* We allow increasing the aggressiveness during a scan, but don't
|
||||
* reduce it. */
|
||||
if (!server.active_defrag_running ||
|
||||
cpu_pct > server.active_defrag_running)
|
||||
{
|
||||
server.active_defrag_running = cpu_pct;
|
||||
serverLog(LL_VERBOSE,
|
||||
"Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
|
||||
frag_pct, frag_bytes, cpu_pct);
|
||||
}
|
||||
}
|
||||
|
||||
/* Perform incremental defragmentation work from the serverCron.
|
||||
* This works in a similar way to activeExpireCycle, in the sense that
|
||||
* we do incremental work across calls. */
|
||||
@ -481,8 +860,11 @@ void activeDefragCycle(void) {
|
||||
static redisDb *db = NULL;
|
||||
static long long start_scan, start_stat;
|
||||
unsigned int iterations = 0;
|
||||
unsigned long long defragged = server.stat_active_defrag_hits;
|
||||
long long start, timelimit;
|
||||
unsigned long long prev_defragged = server.stat_active_defrag_hits;
|
||||
unsigned long long prev_scanned = server.stat_active_defrag_scanned;
|
||||
long long start, timelimit, endtime;
|
||||
mstime_t latency;
|
||||
int quit = 0;
|
||||
|
||||
if (server.aof_child_pid!=-1 || server.rdb_child_pid!=-1)
|
||||
return; /* Defragging memory while there's a fork will just do damage. */
|
||||
@ -490,33 +872,7 @@ void activeDefragCycle(void) {
|
||||
/* Once a second, check if we the fragmentation justfies starting a scan
|
||||
* or making it more aggressive. */
|
||||
run_with_period(1000) {
|
||||
size_t frag_bytes;
|
||||
float frag_pct = getAllocatorFragmentation(&frag_bytes);
|
||||
/* If we're not already running, and below the threshold, exit. */
|
||||
if (!server.active_defrag_running) {
|
||||
if(frag_pct < server.active_defrag_threshold_lower || frag_bytes < server.active_defrag_ignore_bytes)
|
||||
return;
|
||||
}
|
||||
|
||||
/* Calculate the adaptive aggressiveness of the defrag */
|
||||
int cpu_pct = INTERPOLATE(frag_pct,
|
||||
server.active_defrag_threshold_lower,
|
||||
server.active_defrag_threshold_upper,
|
||||
server.active_defrag_cycle_min,
|
||||
server.active_defrag_cycle_max);
|
||||
cpu_pct = LIMIT(cpu_pct,
|
||||
server.active_defrag_cycle_min,
|
||||
server.active_defrag_cycle_max);
|
||||
/* We allow increasing the aggressiveness during a scan, but don't
|
||||
* reduce it. */
|
||||
if (!server.active_defrag_running ||
|
||||
cpu_pct > server.active_defrag_running)
|
||||
{
|
||||
server.active_defrag_running = cpu_pct;
|
||||
serverLog(LL_VERBOSE,
|
||||
"Starting active defrag, frag=%.0f%%, frag_bytes=%zu, cpu=%d%%",
|
||||
frag_pct, frag_bytes, cpu_pct);
|
||||
}
|
||||
computeDefragCycles();
|
||||
}
|
||||
if (!server.active_defrag_running)
|
||||
return;
|
||||
@ -525,11 +881,23 @@ void activeDefragCycle(void) {
|
||||
start = ustime();
|
||||
timelimit = 1000000*server.active_defrag_running/server.hz/100;
|
||||
if (timelimit <= 0) timelimit = 1;
|
||||
endtime = start + timelimit;
|
||||
latencyStartMonitor(latency);
|
||||
|
||||
do {
|
||||
/* if we're not continuing a scan from the last call or loop, start a new one */
|
||||
if (!cursor) {
|
||||
/* finish any leftovers from previous db before moving to the next one */
|
||||
if (db && defragLaterStep(db, endtime)) {
|
||||
quit = 1; /* time is up, we didn't finish all the work */
|
||||
break; /* this will exit the function and we'll continue on the next cycle */
|
||||
}
|
||||
|
||||
/* Move on to next database, and stop if we reached the last one. */
|
||||
if (++current_db >= server.dbnum) {
|
||||
/* defrag other items not part of the db / keys */
|
||||
defragOtherGlobals();
|
||||
|
||||
long long now = ustime();
|
||||
size_t frag_bytes;
|
||||
float frag_pct = getAllocatorFragmentation(&frag_bytes);
|
||||
@ -542,7 +910,11 @@ void activeDefragCycle(void) {
|
||||
cursor = 0;
|
||||
db = NULL;
|
||||
server.active_defrag_running = 0;
|
||||
return;
|
||||
|
||||
computeDefragCycles(); /* if another scan is needed, start it right away */
|
||||
if (server.active_defrag_running != 0 && ustime() < endtime)
|
||||
continue;
|
||||
break;
|
||||
}
|
||||
else if (current_db==0) {
|
||||
/* Start a scan from the first database. */
|
||||
@ -555,19 +927,35 @@ void activeDefragCycle(void) {
|
||||
}
|
||||
|
||||
do {
|
||||
/* before scanning the next bucket, see if we have big keys left from the previous bucket to scan */
|
||||
if (defragLaterStep(db, endtime)) {
|
||||
quit = 1; /* time is up, we didn't finish all the work */
|
||||
break; /* this will exit the function and we'll continue on the next cycle */
|
||||
}
|
||||
|
||||
cursor = dictScan(db->dict, cursor, defragScanCallback, defragDictBucketCallback, db);
|
||||
/* Once in 16 scan iterations, or 1000 pointer reallocations
|
||||
* (if we have a lot of pointers in one hash bucket), check if we
|
||||
* reached the tiem limit. */
|
||||
if (cursor && (++iterations > 16 || server.stat_active_defrag_hits - defragged > 1000)) {
|
||||
if ((ustime() - start) > timelimit) {
|
||||
return;
|
||||
|
||||
/* Once in 16 scan iterations, 512 pointer reallocations. or 64 keys
|
||||
* (if we have a lot of pointers in one hash bucket or rehasing),
|
||||
* check if we reached the time limit.
|
||||
* But regardless, don't start a new db in this loop, this is because after
|
||||
* the last db we call defragOtherGlobals, which must be done in once cycle */
|
||||
if (!cursor || (++iterations > 16 ||
|
||||
server.stat_active_defrag_hits - prev_defragged > 512 ||
|
||||
server.stat_active_defrag_scanned - prev_scanned > 64)) {
|
||||
if (!cursor || ustime() > endtime) {
|
||||
quit = 1;
|
||||
break;
|
||||
}
|
||||
iterations = 0;
|
||||
defragged = server.stat_active_defrag_hits;
|
||||
prev_defragged = server.stat_active_defrag_hits;
|
||||
prev_scanned = server.stat_active_defrag_scanned;
|
||||
}
|
||||
} while(cursor);
|
||||
} while(1);
|
||||
} while(cursor && !quit);
|
||||
} while(!quit);
|
||||
|
||||
latencyEndMonitor(latency);
|
||||
latencyAddSampleIfNeeded("active-defrag-cycle",latency);
|
||||
}
|
||||
|
||||
#else /* HAVE_DEFRAG */
|
||||
|
91
src/object.c
91
src/object.c
@ -919,8 +919,23 @@ struct redisMemOverhead *getMemoryOverheadData(void) {
|
||||
mh->total_allocated = zmalloc_used;
|
||||
mh->startup_allocated = server.initial_memory_usage;
|
||||
mh->peak_allocated = server.stat_peak_memory;
|
||||
mh->fragmentation =
|
||||
zmalloc_get_fragmentation_ratio(server.resident_set_size);
|
||||
mh->total_frag =
|
||||
(float)server.cron_malloc_stats.process_rss / server.cron_malloc_stats.zmalloc_used;
|
||||
mh->total_frag_bytes =
|
||||
server.cron_malloc_stats.process_rss - server.cron_malloc_stats.zmalloc_used;
|
||||
mh->allocator_frag =
|
||||
(float)server.cron_malloc_stats.allocator_active / server.cron_malloc_stats.allocator_allocated;
|
||||
mh->allocator_frag_bytes =
|
||||
server.cron_malloc_stats.allocator_active - server.cron_malloc_stats.allocator_allocated;
|
||||
mh->allocator_rss =
|
||||
(float)server.cron_malloc_stats.allocator_resident / server.cron_malloc_stats.allocator_active;
|
||||
mh->allocator_rss_bytes =
|
||||
server.cron_malloc_stats.allocator_resident - server.cron_malloc_stats.allocator_active;
|
||||
mh->rss_extra =
|
||||
(float)server.cron_malloc_stats.process_rss / server.cron_malloc_stats.allocator_resident;
|
||||
mh->rss_extra_bytes =
|
||||
server.cron_malloc_stats.process_rss - server.cron_malloc_stats.allocator_resident;
|
||||
|
||||
mem_total += server.initial_memory_usage;
|
||||
|
||||
mem = 0;
|
||||
@ -1023,6 +1038,9 @@ sds getMemoryDoctorReport(void) {
|
||||
int empty = 0; /* Instance is empty or almost empty. */
|
||||
int big_peak = 0; /* Memory peak is much larger than used mem. */
|
||||
int high_frag = 0; /* High fragmentation. */
|
||||
int high_alloc_frag = 0;/* High allocator fragmentation. */
|
||||
int high_proc_rss = 0; /* High process rss overhead. */
|
||||
int high_alloc_rss = 0; /* High rss overhead. */
|
||||
int big_slave_buf = 0; /* Slave buffers are too big. */
|
||||
int big_client_buf = 0; /* Client buffers are too big. */
|
||||
int num_reports = 0;
|
||||
@ -1038,12 +1056,30 @@ sds getMemoryDoctorReport(void) {
|
||||
num_reports++;
|
||||
}
|
||||
|
||||
/* Fragmentation is higher than 1.4? */
|
||||
if (mh->fragmentation > 1.4) {
|
||||
/* Fragmentation is higher than 1.4 and 10MB ?*/
|
||||
if (mh->total_frag > 1.4 && mh->total_frag_bytes > 10<<20) {
|
||||
high_frag = 1;
|
||||
num_reports++;
|
||||
}
|
||||
|
||||
/* External fragmentation is higher than 1.1 and 10MB? */
|
||||
if (mh->allocator_frag > 1.1 && mh->allocator_frag_bytes > 10<<20) {
|
||||
high_alloc_frag = 1;
|
||||
num_reports++;
|
||||
}
|
||||
|
||||
/* Allocator fss is higher than 1.1 and 10MB ? */
|
||||
if (mh->allocator_rss > 1.1 && mh->allocator_rss_bytes > 10<<20) {
|
||||
high_alloc_rss = 1;
|
||||
num_reports++;
|
||||
}
|
||||
|
||||
/* Non-Allocator fss is higher than 1.1 and 10MB ? */
|
||||
if (mh->rss_extra > 1.1 && mh->rss_extra_bytes > 10<<20) {
|
||||
high_proc_rss = 1;
|
||||
num_reports++;
|
||||
}
|
||||
|
||||
/* Clients using more than 200k each average? */
|
||||
long numslaves = listLength(server.slaves);
|
||||
long numclients = listLength(server.clients)-numslaves;
|
||||
@ -1077,7 +1113,16 @@ sds getMemoryDoctorReport(void) {
|
||||
s = sdscat(s," * Peak memory: In the past this instance used more than 150% the memory that is currently using. The allocator is normally not able to release memory after a peak, so you can expect to see a big fragmentation ratio, however this is actually harmless and is only due to the memory peak, and if the Redis instance Resident Set Size (RSS) is currently bigger than expected, the memory will be used as soon as you fill the Redis instance with more data. If the memory peak was only occasional and you want to try to reclaim memory, please try the MEMORY PURGE command, otherwise the only other option is to shutdown and restart the instance.\n\n");
|
||||
}
|
||||
if (high_frag) {
|
||||
s = sdscatprintf(s," * High fragmentation: This instance has a memory fragmentation greater than 1.4 (this means that the Resident Set Size of the Redis process is much larger than the sum of the logical allocations Redis performed). This problem is usually due either to a large peak memory (check if there is a peak memory entry above in the report) or may result from a workload that causes the allocator to fragment memory a lot. If the problem is a large peak memory, then there is no issue. Otherwise, make sure you are using the Jemalloc allocator and not the default libc malloc. Note: The currently used allocator is \"%s\".\n\n", ZMALLOC_LIB);
|
||||
s = sdscatprintf(s," * High total RSS: This instance has a memory fragmentation and RSS overhead greater than 1.4 (this means that the Resident Set Size of the Redis process is much larger than the sum of the logical allocations Redis performed). This problem is usually due either to a large peak memory (check if there is a peak memory entry above in the report) or may result from a workload that causes the allocator to fragment memory a lot. If the problem is a large peak memory, then there is no issue. Otherwise, make sure you are using the Jemalloc allocator and not the default libc malloc. Note: The currently used allocator is \"%s\".\n\n", ZMALLOC_LIB);
|
||||
}
|
||||
if (high_alloc_frag) {
|
||||
s = sdscatprintf(s," * High allocator fragmentation: This instance has an allocator external fragmentation greater than 1.1. This problem is usually due either to a large peak memory (check if there is a peak memory entry above in the report) or may result from a workload that causes the allocator to fragment memory a lot. You can try enabling 'activedefrag' config option.\n\n");
|
||||
}
|
||||
if (high_alloc_rss) {
|
||||
s = sdscatprintf(s," * High allocator RSS overhead: This instance has an RSS memory overhead is greater than 1.1 (this means that the Resident Set Size of the allocator is much larger than the sum what the allocator actually holds). This problem is usually due to a large peak memory (check if there is a peak memory entry above in the report), you can try the MEMORY PURGE command to reclaim it.\n\n");
|
||||
}
|
||||
if (high_proc_rss) {
|
||||
s = sdscatprintf(s," * High process RSS overhead: This instance has non-allocator RSS memory overhead is greater than 1.1 (this means that the Resident Set Size of the Redis process is much larger than the RSS the allocator holds). This problem may be due to LUA scripts or Modules.\n\n");
|
||||
}
|
||||
if (big_slave_buf) {
|
||||
s = sdscat(s," * Big slave buffers: The slave output buffers in this instance are greater than 10MB for each slave (on average). This likely means that there is some slave instance that is struggling receiving data, either because it is too slow or because of networking issues. As a result, data piles on the master output buffers. Please try to identify what slave is not receiving data correctly and why. You can use the INFO output in order to check the slaves delays and the CLIENT LIST command to check the output buffers of each slave.\n\n");
|
||||
@ -1191,7 +1236,7 @@ void memoryCommand(client *c) {
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"stats") && c->argc == 2) {
|
||||
struct redisMemOverhead *mh = getMemoryOverheadData();
|
||||
|
||||
addReplyMultiBulkLen(c,(14+mh->num_dbs)*2);
|
||||
addReplyMultiBulkLen(c,(24+mh->num_dbs)*2);
|
||||
|
||||
addReplyBulkCString(c,"peak.allocated");
|
||||
addReplyLongLong(c,mh->peak_allocated);
|
||||
@ -1245,8 +1290,38 @@ void memoryCommand(client *c) {
|
||||
addReplyBulkCString(c,"peak.percentage");
|
||||
addReplyDouble(c,mh->peak_perc);
|
||||
|
||||
addReplyBulkCString(c,"fragmentation");
|
||||
addReplyDouble(c,mh->fragmentation);
|
||||
addReplyBulkCString(c,"allocator.allocated");
|
||||
addReplyLongLong(c,server.cron_malloc_stats.allocator_allocated);
|
||||
|
||||
addReplyBulkCString(c,"allocator.active");
|
||||
addReplyLongLong(c,server.cron_malloc_stats.allocator_active);
|
||||
|
||||
addReplyBulkCString(c,"allocator.resident");
|
||||
addReplyLongLong(c,server.cron_malloc_stats.allocator_resident);
|
||||
|
||||
addReplyBulkCString(c,"allocator-fragmentation.ratio");
|
||||
addReplyDouble(c,mh->allocator_frag);
|
||||
|
||||
addReplyBulkCString(c,"allocator-fragmentation.bytes");
|
||||
addReplyLongLong(c,mh->allocator_frag_bytes);
|
||||
|
||||
addReplyBulkCString(c,"allocator-rss.ratio");
|
||||
addReplyDouble(c,mh->allocator_rss);
|
||||
|
||||
addReplyBulkCString(c,"allocator-rss.bytes");
|
||||
addReplyLongLong(c,mh->allocator_rss_bytes);
|
||||
|
||||
addReplyBulkCString(c,"rss-overhead.ratio");
|
||||
addReplyDouble(c,mh->rss_extra);
|
||||
|
||||
addReplyBulkCString(c,"rss-overhead.bytes");
|
||||
addReplyLongLong(c,mh->rss_extra_bytes);
|
||||
|
||||
addReplyBulkCString(c,"fragmentation"); /* this is the total RSS overhead, including fragmentation */
|
||||
addReplyDouble(c,mh->total_frag); /* it is kept here for backwards compatibility */
|
||||
|
||||
addReplyBulkCString(c,"fragmentation.bytes");
|
||||
addReplyLongLong(c,mh->total_frag_bytes);
|
||||
|
||||
freeMemoryOverheadData(mh);
|
||||
} else if (!strcasecmp(c->argv[1]->ptr,"malloc-stats") && c->argc == 2) {
|
||||
|
64
src/server.c
64
src/server.c
@ -1013,8 +1013,33 @@ int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
|
||||
if (zmalloc_used_memory() > server.stat_peak_memory)
|
||||
server.stat_peak_memory = zmalloc_used_memory();
|
||||
|
||||
/* Sample the RSS here since this is a relatively slow call. */
|
||||
server.resident_set_size = zmalloc_get_rss();
|
||||
run_with_period(10) {
|
||||
/* Sample the RSS and other metrics here since this is a relatively slow call.
|
||||
* We must sample the zmalloc_used at the same time we take the rss, otherwise
|
||||
* the frag ratio calculate may be off (ratio of two samples at different times) */
|
||||
server.cron_malloc_stats.process_rss = zmalloc_get_rss();
|
||||
server.cron_malloc_stats.zmalloc_used = zmalloc_used_memory();
|
||||
/* Sampling the allcator info can be slow too.
|
||||
* The fragmentation ratio it'll show is potentically more accurate
|
||||
* it excludes other RSS pages such as: shared libraries, LUA and other non-zmalloc
|
||||
* allocations, and allocator reserved pages that can be pursed (all not actual frag) */
|
||||
zmalloc_get_allocator_info(&server.cron_malloc_stats.allocator_allocated,
|
||||
&server.cron_malloc_stats.allocator_active,
|
||||
&server.cron_malloc_stats.allocator_resident);
|
||||
/* in case the allocator isn't providing these stats, fake them so that
|
||||
* fragmention info still shows some (inaccurate metrics) */
|
||||
if (!server.cron_malloc_stats.allocator_resident) {
|
||||
/* LUA memory isn't part of zmalloc_used, but it is part of the process RSS,
|
||||
* so we must desuct it in order to be able to calculate correct
|
||||
* "allocator fragmentation" ratio */
|
||||
size_t lua_memory = lua_gc(server.lua,LUA_GCCOUNT,0)*1024LL;
|
||||
server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory;
|
||||
}
|
||||
if (!server.cron_malloc_stats.allocator_active)
|
||||
server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident;
|
||||
if (!server.cron_malloc_stats.allocator_allocated)
|
||||
server.cron_malloc_stats.allocator_allocated = server.cron_malloc_stats.zmalloc_used;
|
||||
}
|
||||
|
||||
/* We received a SIGTERM, shutting down here in a safe way, as it is
|
||||
* not ok doing so inside the signal handler. */
|
||||
@ -1395,6 +1420,7 @@ void initServerConfig(void) {
|
||||
server.active_defrag_threshold_upper = CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER;
|
||||
server.active_defrag_cycle_min = CONFIG_DEFAULT_DEFRAG_CYCLE_MIN;
|
||||
server.active_defrag_cycle_max = CONFIG_DEFAULT_DEFRAG_CYCLE_MAX;
|
||||
server.active_defrag_max_scan_fields = CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS;
|
||||
server.proto_max_bulk_len = CONFIG_DEFAULT_PROTO_MAX_BULK_LEN;
|
||||
server.client_max_querybuf_len = PROTO_MAX_QUERYBUF_LEN;
|
||||
server.saveparams = NULL;
|
||||
@ -1816,6 +1842,7 @@ void resetServerStats(void) {
|
||||
server.stat_active_defrag_misses = 0;
|
||||
server.stat_active_defrag_key_hits = 0;
|
||||
server.stat_active_defrag_key_misses = 0;
|
||||
server.stat_active_defrag_scanned = 0;
|
||||
server.stat_fork_time = 0;
|
||||
server.stat_fork_rate = 0;
|
||||
server.stat_rejected_conn = 0;
|
||||
@ -1904,6 +1931,7 @@ void initServer(void) {
|
||||
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
|
||||
server.db[j].id = j;
|
||||
server.db[j].avg_ttl = 0;
|
||||
server.db[j].defrag_later = listCreate();
|
||||
}
|
||||
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
|
||||
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
|
||||
@ -1931,7 +1959,11 @@ void initServer(void) {
|
||||
server.stat_peak_memory = 0;
|
||||
server.stat_rdb_cow_bytes = 0;
|
||||
server.stat_aof_cow_bytes = 0;
|
||||
server.resident_set_size = 0;
|
||||
server.cron_malloc_stats.zmalloc_used = 0;
|
||||
server.cron_malloc_stats.process_rss = 0;
|
||||
server.cron_malloc_stats.allocator_allocated = 0;
|
||||
server.cron_malloc_stats.allocator_active = 0;
|
||||
server.cron_malloc_stats.allocator_resident = 0;
|
||||
server.lastbgsave_status = C_OK;
|
||||
server.aof_last_write_status = C_OK;
|
||||
server.aof_last_write_errno = 0;
|
||||
@ -2981,7 +3013,7 @@ sds genRedisInfoString(char *section) {
|
||||
bytesToHuman(peak_hmem,server.stat_peak_memory);
|
||||
bytesToHuman(total_system_hmem,total_system_mem);
|
||||
bytesToHuman(used_memory_lua_hmem,memory_lua);
|
||||
bytesToHuman(used_memory_rss_hmem,server.resident_set_size);
|
||||
bytesToHuman(used_memory_rss_hmem,server.cron_malloc_stats.process_rss);
|
||||
bytesToHuman(maxmemory_hmem,server.maxmemory);
|
||||
|
||||
if (sections++) info = sdscat(info,"\r\n");
|
||||
@ -2998,6 +3030,9 @@ sds genRedisInfoString(char *section) {
|
||||
"used_memory_startup:%zu\r\n"
|
||||
"used_memory_dataset:%zu\r\n"
|
||||
"used_memory_dataset_perc:%.2f%%\r\n"
|
||||
"allocator_allocated:%zu\r\n"
|
||||
"allocator_active:%zu\r\n"
|
||||
"allocator_resident:%zu\r\n"
|
||||
"total_system_memory:%lu\r\n"
|
||||
"total_system_memory_human:%s\r\n"
|
||||
"used_memory_lua:%lld\r\n"
|
||||
@ -3005,13 +3040,20 @@ sds genRedisInfoString(char *section) {
|
||||
"maxmemory:%lld\r\n"
|
||||
"maxmemory_human:%s\r\n"
|
||||
"maxmemory_policy:%s\r\n"
|
||||
"allocator_frag_ratio:%.2f\r\n"
|
||||
"allocator_frag_bytes:%zu\r\n"
|
||||
"allocator_rss_ratio:%.2f\r\n"
|
||||
"allocator_rss_bytes:%zu\r\n"
|
||||
"rss_overhead_ratio:%.2f\r\n"
|
||||
"rss_overhead_bytes:%zu\r\n"
|
||||
"mem_fragmentation_ratio:%.2f\r\n"
|
||||
"mem_fragmentation_bytes:%zu\r\n"
|
||||
"mem_allocator:%s\r\n"
|
||||
"active_defrag_running:%d\r\n"
|
||||
"lazyfree_pending_objects:%zu\r\n",
|
||||
zmalloc_used,
|
||||
hmem,
|
||||
server.resident_set_size,
|
||||
server.cron_malloc_stats.process_rss,
|
||||
used_memory_rss_hmem,
|
||||
server.stat_peak_memory,
|
||||
peak_hmem,
|
||||
@ -3020,6 +3062,9 @@ sds genRedisInfoString(char *section) {
|
||||
mh->startup_allocated,
|
||||
mh->dataset,
|
||||
mh->dataset_perc,
|
||||
server.cron_malloc_stats.allocator_allocated,
|
||||
server.cron_malloc_stats.allocator_active,
|
||||
server.cron_malloc_stats.allocator_resident,
|
||||
(unsigned long)total_system_mem,
|
||||
total_system_hmem,
|
||||
memory_lua,
|
||||
@ -3027,7 +3072,14 @@ sds genRedisInfoString(char *section) {
|
||||
server.maxmemory,
|
||||
maxmemory_hmem,
|
||||
evict_policy,
|
||||
mh->fragmentation,
|
||||
mh->allocator_frag,
|
||||
mh->allocator_frag_bytes,
|
||||
mh->allocator_rss,
|
||||
mh->allocator_rss_bytes,
|
||||
mh->rss_extra,
|
||||
mh->rss_extra_bytes,
|
||||
mh->total_frag, /* this is the total RSS overhead, including fragmentation, */
|
||||
mh->total_frag_bytes, /* named so for backwards compatibility */
|
||||
ZMALLOC_LIB,
|
||||
server.active_defrag_running,
|
||||
lazyfreeGetPendingObjectsCount()
|
||||
|
25
src/server.h
25
src/server.h
@ -158,8 +158,9 @@ typedef long long mstime_t; /* millisecond time type. */
|
||||
#define CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER 10 /* don't defrag when fragmentation is below 10% */
|
||||
#define CONFIG_DEFAULT_DEFRAG_THRESHOLD_UPPER 100 /* maximum defrag force at 100% fragmentation */
|
||||
#define CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES (100<<20) /* don't defrag if frag overhead is below 100mb */
|
||||
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 25 /* 25% CPU min (at lower threshold) */
|
||||
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MIN 5 /* 5% CPU min (at lower threshold) */
|
||||
#define CONFIG_DEFAULT_DEFRAG_CYCLE_MAX 75 /* 75% CPU max (at upper threshold) */
|
||||
#define CONFIG_DEFAULT_DEFRAG_MAX_SCAN_FIELDS 1000 /* keys with more than 1000 fields will be processed separately */
|
||||
#define CONFIG_DEFAULT_PROTO_MAX_BULK_LEN (512ll*1024*1024) /* Bulk request max size */
|
||||
|
||||
#define ACTIVE_EXPIRE_CYCLE_LOOKUPS_PER_LOOP 20 /* Loopkups per loop. */
|
||||
@ -621,6 +622,7 @@ typedef struct redisDb {
|
||||
dict *watched_keys; /* WATCHED keys for MULTI/EXEC CAS */
|
||||
int id; /* Database ID */
|
||||
long long avg_ttl; /* Average TTL, just for stats */
|
||||
list *defrag_later; /* List of key names to attempt to defrag one by one, gradually. */
|
||||
} redisDb;
|
||||
|
||||
/* Client MULTI/EXEC state */
|
||||
@ -838,7 +840,14 @@ struct redisMemOverhead {
|
||||
size_t bytes_per_key;
|
||||
float dataset_perc;
|
||||
float peak_perc;
|
||||
float fragmentation;
|
||||
float total_frag;
|
||||
size_t total_frag_bytes;
|
||||
float allocator_frag;
|
||||
size_t allocator_frag_bytes;
|
||||
float allocator_rss;
|
||||
size_t allocator_rss_bytes;
|
||||
float rss_extra;
|
||||
size_t rss_extra_bytes;
|
||||
size_t num_dbs;
|
||||
struct {
|
||||
size_t dbid;
|
||||
@ -867,6 +876,14 @@ typedef struct rdbSaveInfo {
|
||||
|
||||
#define RDB_SAVE_INFO_INIT {-1,0,"000000000000000000000000000000",-1}
|
||||
|
||||
typedef struct malloc_stats {
|
||||
size_t zmalloc_used;
|
||||
size_t process_rss;
|
||||
size_t allocator_allocated;
|
||||
size_t allocator_active;
|
||||
size_t allocator_resident;
|
||||
} malloc_stats;
|
||||
|
||||
/*-----------------------------------------------------------------------------
|
||||
* Global server state
|
||||
*----------------------------------------------------------------------------*/
|
||||
@ -959,6 +976,7 @@ struct redisServer {
|
||||
long long stat_active_defrag_misses; /* number of allocations scanned but not moved */
|
||||
long long stat_active_defrag_key_hits; /* number of keys with moved allocations */
|
||||
long long stat_active_defrag_key_misses;/* number of keys scanned and not moved */
|
||||
long long stat_active_defrag_scanned; /* number of dictEntries scanned */
|
||||
size_t stat_peak_memory; /* Max used memory record */
|
||||
long long stat_fork_time; /* Time needed to perform latest fork() */
|
||||
double stat_fork_rate; /* Fork rate in GB/sec. */
|
||||
@ -970,7 +988,7 @@ struct redisServer {
|
||||
long long slowlog_entry_id; /* SLOWLOG current entry ID */
|
||||
long long slowlog_log_slower_than; /* SLOWLOG time limit (to get logged) */
|
||||
unsigned long slowlog_max_len; /* SLOWLOG max number of items logged */
|
||||
size_t resident_set_size; /* RSS sampled in serverCron(). */
|
||||
malloc_stats cron_malloc_stats; /* sampled in serverCron(). */
|
||||
long long stat_net_input_bytes; /* Bytes read from network. */
|
||||
long long stat_net_output_bytes; /* Bytes written to network. */
|
||||
size_t stat_rdb_cow_bytes; /* Copy on write bytes during RDB saving. */
|
||||
@ -994,6 +1012,7 @@ struct redisServer {
|
||||
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */
|
||||
int active_defrag_cycle_min; /* minimal effort for defrag in CPU percentage */
|
||||
int active_defrag_cycle_max; /* maximal effort for defrag in CPU percentage */
|
||||
unsigned long active_defrag_max_scan_fields; /* maximum number of fields of set/hash/zset/list to process from within the main dict scan */
|
||||
size_t client_max_querybuf_len; /* Limit for client query buffer length */
|
||||
int dbnum; /* Total number of configured DBs */
|
||||
int supervised; /* 1 if supervised, 0 otherwise. */
|
||||
|
@ -297,10 +297,33 @@ size_t zmalloc_get_rss(void) {
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Fragmentation = RSS / allocated-bytes */
|
||||
float zmalloc_get_fragmentation_ratio(size_t rss) {
|
||||
return (float)rss/zmalloc_used_memory();
|
||||
#if defined(USE_JEMALLOC)
|
||||
int zmalloc_get_allocator_info(size_t *allocated,
|
||||
size_t *active,
|
||||
size_t *resident) {
|
||||
size_t epoch = 1, sz = sizeof(size_t);
|
||||
*allocated = *resident = *active = 0;
|
||||
/* Update the statistics cached by mallctl. */
|
||||
je_mallctl("epoch", &epoch, &sz, &epoch, sz);
|
||||
/* Unlike RSS, this does not include RSS from shared libraries and other non
|
||||
* heap mappings. */
|
||||
je_mallctl("stats.resident", resident, &sz, NULL, 0);
|
||||
/* Unlike resident, this doesn't not include the pages jemalloc reserves
|
||||
* for re-use (purge will clean that). */
|
||||
je_mallctl("stats.active", active, &sz, NULL, 0);
|
||||
/* Unlike zmalloc_used_memory, this matches the stats.resident by taking
|
||||
* into account all allocations done by this process (not only zmalloc). */
|
||||
je_mallctl("stats.allocated", allocated, &sz, NULL, 0);
|
||||
return 1;
|
||||
}
|
||||
#else
|
||||
int zmalloc_get_allocator_info(size_t *allocated,
|
||||
size_t *active,
|
||||
size_t *resident) {
|
||||
*allocated = *resident = *active = 0;
|
||||
return 1;
|
||||
}
|
||||
#endif
|
||||
|
||||
/* Get the sum of the specified field (converted form kb to bytes) in
|
||||
* /proc/self/smaps. The field must be specified with trailing ":" as it
|
||||
|
@ -79,8 +79,8 @@ void zfree(void *ptr);
|
||||
char *zstrdup(const char *s);
|
||||
size_t zmalloc_used_memory(void);
|
||||
void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
|
||||
float zmalloc_get_fragmentation_ratio(size_t rss);
|
||||
size_t zmalloc_get_rss(void);
|
||||
int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident);
|
||||
size_t zmalloc_get_private_dirty(long pid);
|
||||
size_t zmalloc_get_smap_bytes_by_field(char *field, long pid);
|
||||
size_t zmalloc_get_memory_size(void);
|
||||
|
@ -453,6 +453,8 @@ for {set j 0} {$j < [llength $argv]} {incr j} {
|
||||
puts $t
|
||||
}
|
||||
exit 0
|
||||
} elseif {$opt eq {--verbose}} {
|
||||
set ::verbose 1
|
||||
} elseif {$opt eq {--client}} {
|
||||
set ::client 1
|
||||
set ::test_server_port $arg
|
||||
|
@ -36,50 +36,156 @@ start_server {tags {"memefficiency"}} {
|
||||
}
|
||||
}
|
||||
|
||||
if 0 {
|
||||
start_server {tags {"defrag"}} {
|
||||
if {[string match {*jemalloc*} [s mem_allocator]]} {
|
||||
test "Active defrag" {
|
||||
r config set activedefrag no
|
||||
r config set active-defrag-threshold-lower 5
|
||||
r config set active-defrag-ignore-bytes 2mb
|
||||
r config set maxmemory 100mb
|
||||
r config set maxmemory-policy allkeys-lru
|
||||
r debug populate 700000 asdf 150
|
||||
r debug populate 170000 asdf 300
|
||||
set frag [s mem_fragmentation_ratio]
|
||||
assert {$frag >= 1.7}
|
||||
r config set activedefrag yes
|
||||
after 1500 ;# active defrag tests the status once a second.
|
||||
set hits [s active_defrag_hits]
|
||||
|
||||
# wait for the active defrag to stop working
|
||||
set tries 0
|
||||
while { True } {
|
||||
incr tries
|
||||
after 500
|
||||
set prev_hits $hits
|
||||
set hits [s active_defrag_hits]
|
||||
if {$hits == $prev_hits} {
|
||||
break
|
||||
}
|
||||
assert {$tries < 100}
|
||||
}
|
||||
|
||||
# TODO: we need to expose more accurate fragmentation info
|
||||
# i.e. the allocator used and active pages
|
||||
# instead we currently look at RSS so we need to ask for purge
|
||||
r memory purge
|
||||
|
||||
# Test the the fragmentation is lower and that the defragger
|
||||
# stopped working
|
||||
set frag [s mem_fragmentation_ratio]
|
||||
assert {$frag < 1.55}
|
||||
set misses [s active_defrag_misses]
|
||||
after 500
|
||||
set misses2 [s active_defrag_misses]
|
||||
assert {$misses2 == $misses}
|
||||
start_server {tags {"defrag"}} {
|
||||
if {[string match {*jemalloc*} [s mem_allocator]]} {
|
||||
test "Active defrag" {
|
||||
r config set activedefrag no
|
||||
r config set active-defrag-threshold-lower 5
|
||||
r config set active-defrag-cycle-min 25
|
||||
r config set active-defrag-cycle-max 75
|
||||
r config set active-defrag-ignore-bytes 2mb
|
||||
r config set maxmemory 100mb
|
||||
r config set maxmemory-policy allkeys-lru
|
||||
r debug populate 700000 asdf 150
|
||||
r debug populate 170000 asdf 300
|
||||
after 20 ;# serverCron only updates the info once in 10ms
|
||||
set frag [s allocator_frag_ratio]
|
||||
if {$::verbose} {
|
||||
puts "frag $frag"
|
||||
}
|
||||
}
|
||||
assert {$frag >= 1.4}
|
||||
r config set activedefrag yes
|
||||
|
||||
# wait for the active defrag to start working (decision once a second)
|
||||
wait_for_condition 50 100 {
|
||||
[s active_defrag_running] ne 0
|
||||
} else {
|
||||
fail "defrag not started."
|
||||
}
|
||||
|
||||
# wait for the active defrag to stop working
|
||||
wait_for_condition 100 100 {
|
||||
[s active_defrag_running] eq 0
|
||||
} else {
|
||||
puts [r info memory]
|
||||
fail "defrag didn't stop."
|
||||
}
|
||||
|
||||
# test the the fragmentation is lower
|
||||
after 20 ;# serverCron only updates the info once in 10ms
|
||||
set frag [s allocator_frag_ratio]
|
||||
if {$::verbose} {
|
||||
puts "frag $frag"
|
||||
}
|
||||
assert {$frag < 1.1}
|
||||
} {}
|
||||
|
||||
test "Active defrag big keys" {
|
||||
r flushdb
|
||||
r config resetstat
|
||||
r config set activedefrag no
|
||||
r config set active-defrag-max-scan-fields 1000
|
||||
r config set active-defrag-threshold-lower 5
|
||||
r config set active-defrag-cycle-min 65
|
||||
r config set active-defrag-cycle-max 75
|
||||
r config set active-defrag-ignore-bytes 2mb
|
||||
r config set maxmemory 0
|
||||
r config set list-max-ziplist-size 5 ;# list of 10k items will have 2000 quicklist nodes
|
||||
r hmset hash h1 v1 h2 v2 h3 v3
|
||||
r lpush list a b c d
|
||||
r zadd zset 0 a 1 b 2 c 3 d
|
||||
r sadd set a b c d
|
||||
|
||||
# create big keys with 10k items
|
||||
set rd [redis_deferring_client]
|
||||
for {set j 0} {$j < 10000} {incr j} {
|
||||
$rd hset bighash $j [concat "asdfasdfasdf" $j]
|
||||
$rd lpush biglist [concat "asdfasdfasdf" $j]
|
||||
$rd zadd bigzset $j [concat "asdfasdfasdf" $j]
|
||||
$rd sadd bigset [concat "asdfasdfasdf" $j]
|
||||
}
|
||||
for {set j 0} {$j < 40000} {incr j} {
|
||||
$rd read ; # Discard replies
|
||||
}
|
||||
|
||||
set expected_frag 1.7
|
||||
if {$::accurate} {
|
||||
# scale the hash to 1m fields in order to have a measurable the latency
|
||||
for {set j 10000} {$j < 1000000} {incr j} {
|
||||
$rd hset bighash $j [concat "asdfasdfasdf" $j]
|
||||
}
|
||||
for {set j 10000} {$j < 1000000} {incr j} {
|
||||
$rd read ; # Discard replies
|
||||
}
|
||||
# creating that big hash, increased used_memory, so the relative frag goes down
|
||||
set expected_frag 1.3
|
||||
}
|
||||
|
||||
# add a mass of string keys
|
||||
for {set j 0} {$j < 500000} {incr j} {
|
||||
$rd setrange $j 150 a
|
||||
}
|
||||
for {set j 0} {$j < 500000} {incr j} {
|
||||
$rd read ; # Discard replies
|
||||
}
|
||||
assert {[r dbsize] == 500008}
|
||||
|
||||
# create some fragmentation
|
||||
for {set j 0} {$j < 500000} {incr j 2} {
|
||||
$rd del $j
|
||||
}
|
||||
for {set j 0} {$j < 500000} {incr j 2} {
|
||||
$rd read ; # Discard replies
|
||||
}
|
||||
assert {[r dbsize] == 250008}
|
||||
|
||||
# start defrag
|
||||
after 20 ;# serverCron only updates the info once in 10ms
|
||||
set frag [s allocator_frag_ratio]
|
||||
if {$::verbose} {
|
||||
puts "frag $frag"
|
||||
}
|
||||
assert {$frag >= $expected_frag}
|
||||
r config set latency-monitor-threshold 5
|
||||
r latency reset
|
||||
r config set activedefrag yes
|
||||
|
||||
# wait for the active defrag to start working (decision once a second)
|
||||
wait_for_condition 50 100 {
|
||||
[s active_defrag_running] ne 0
|
||||
} else {
|
||||
fail "defrag not started."
|
||||
}
|
||||
|
||||
# wait for the active defrag to stop working
|
||||
wait_for_condition 500 100 {
|
||||
[s active_defrag_running] eq 0
|
||||
} else {
|
||||
puts [r info memory]
|
||||
puts [r memory malloc-stats]
|
||||
fail "defrag didn't stop."
|
||||
}
|
||||
|
||||
# test the the fragmentation is lower
|
||||
after 20 ;# serverCron only updates the info once in 10ms
|
||||
set frag [s allocator_frag_ratio]
|
||||
set max_latency 0
|
||||
foreach event [r latency latest] {
|
||||
lassign $event eventname time latency max
|
||||
if {$eventname == "active-defrag-cycle"} {
|
||||
set max_latency $max
|
||||
}
|
||||
}
|
||||
if {$::verbose} {
|
||||
puts "frag $frag"
|
||||
puts "max latency $max_latency"
|
||||
puts [r latency latest]
|
||||
puts [r latency history active-defrag-cycle]
|
||||
}
|
||||
assert {$frag < 1.1}
|
||||
# due to high fragmentation, 10hz, and active-defrag-cycle-max set to 75,
|
||||
# we expect max latency to be not much higher than 75ms
|
||||
assert {$max_latency <= 80}
|
||||
} {}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user