Merge pull request #6145 from oranagra/jemalloc_purge_bg

purge jemalloc after flush, and enable background purging thread
This commit is contained in:
Salvatore Sanfilippo 2019-10-10 14:48:59 +02:00 committed by GitHub
commit 0e40efe007
9 changed files with 135 additions and 17 deletions

View File

@ -787,7 +787,13 @@ background_thread_stats_read(tsdn_t *tsdn, background_thread_stats_t *stats) {
nstime_init(&stats->run_interval, 0); nstime_init(&stats->run_interval, 0);
for (unsigned i = 0; i < max_background_threads; i++) { for (unsigned i = 0; i < max_background_threads; i++) {
background_thread_info_t *info = &background_thread_info[i]; background_thread_info_t *info = &background_thread_info[i];
malloc_mutex_lock(tsdn, &info->mtx); if (malloc_mutex_trylock(tsdn, &info->mtx)) {
/*
* Each background thread run may take a long time;
* avoid waiting on the stats if the thread is active.
*/
continue;
}
if (info->state != background_thread_stopped) { if (info->state != background_thread_stopped) {
num_runs += info->tot_n_runs; num_runs += info->tot_n_runs;
nstime_add(&stats->run_interval, &info->tot_sleep_time); nstime_add(&stats->run_interval, &info->tot_sleep_time);

View File

@ -144,6 +144,7 @@ configYesNo configs_yesno[] = {
{"replica-serve-stale-data","slave-serve-stale-data",&server.repl_serve_stale_data,1,CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA}, {"replica-serve-stale-data","slave-serve-stale-data",&server.repl_serve_stale_data,1,CONFIG_DEFAULT_SLAVE_SERVE_STALE_DATA},
{"replica-read-only","slave-read-only",&server.repl_slave_ro,1,CONFIG_DEFAULT_SLAVE_READ_ONLY}, {"replica-read-only","slave-read-only",&server.repl_slave_ro,1,CONFIG_DEFAULT_SLAVE_READ_ONLY},
{"replica-ignore-maxmemory","slave-ignore-maxmemory",&server.repl_slave_ignore_maxmemory,1,CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY}, {"replica-ignore-maxmemory","slave-ignore-maxmemory",&server.repl_slave_ignore_maxmemory,1,CONFIG_DEFAULT_SLAVE_IGNORE_MAXMEMORY},
{"jemalloc-bg-thread",NULL,&server.jemalloc_bg_thread,1,1},
{NULL, NULL, 0, 0} {NULL, NULL, 0, 0}
}; };

View File

@ -457,6 +457,13 @@ void flushdbCommand(client *c) {
if (getFlushCommandFlags(c,&flags) == C_ERR) return; if (getFlushCommandFlags(c,&flags) == C_ERR) return;
server.dirty += emptyDb(c->db->id,flags,NULL); server.dirty += emptyDb(c->db->id,flags,NULL);
addReply(c,shared.ok); addReply(c,shared.ok);
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchroneus. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
} }
/* FLUSHALL [ASYNC] /* FLUSHALL [ASYNC]
@ -479,6 +486,13 @@ void flushallCommand(client *c) {
server.dirty = saved_dirty; server.dirty = saved_dirty;
} }
server.dirty++; server.dirty++;
#if defined(USE_JEMALLOC)
/* jemalloc 5 doesn't release pages back to the OS when there's no traffic.
* for large databases, flushdb blocks for long anyway, so a bit more won't
* harm and this way the flush and purge will be synchroneus. */
if (!(flags & EMPTYDB_ASYNC))
jemalloc_purge();
#endif
} }
/* This command implements DEL and LAZYDEL. */ /* This command implements DEL and LAZYDEL. */

View File

@ -297,6 +297,56 @@ void computeDatasetDigest(unsigned char *final) {
} }
} }
#ifdef USE_JEMALLOC
void mallctl_int(client *c, robj **argv, int argc) {
int ret;
/* start with the biggest size (int64), and if that fails, try smaller sizes (int32, bool) */
int64_t old = 0, val;
if (argc > 1) {
long long ll;
if (getLongLongFromObjectOrReply(c, argv[1], &ll, NULL) != C_OK)
return;
val = ll;
}
size_t sz = sizeof(old);
while (sz > 0) {
if ((ret=je_mallctl(argv[0]->ptr, &old, &sz, argc > 1? &val: NULL, argc > 1?sz: 0))) {
if (ret==EINVAL) {
/* size might be wrong, try a smaller one */
sz /= 2;
#if BYTE_ORDER == BIG_ENDIAN
val <<= 8*sz;
#endif
continue;
}
addReplyErrorFormat(c,"%s", strerror(ret));
return;
} else {
#if BYTE_ORDER == BIG_ENDIAN
old >>= 64 - 8*sz;
#endif
addReplyLongLong(c, old);
return;
}
}
addReplyErrorFormat(c,"%s", strerror(EINVAL));
}
void mallctl_string(client *c, robj **argv, int argc) {
int ret;
char *old;
size_t sz = sizeof(old);
/* for strings, it seems we need to first get the old value, before overriding it. */
if ((ret=je_mallctl(argv[0]->ptr, &old, &sz, NULL, 0))) {
addReplyErrorFormat(c,"%s", strerror(ret));
return;
}
addReplyBulkCString(c, old);
if(argc > 1)
je_mallctl(argv[0]->ptr, NULL, 0, &argv[1]->ptr, sizeof(char*));
}
#endif
void debugCommand(client *c) { void debugCommand(client *c) {
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) { if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"help")) {
const char *help[] = { const char *help[] = {
@ -323,6 +373,10 @@ void debugCommand(client *c) {
"STRUCTSIZE -- Return the size of different Redis core C structures.", "STRUCTSIZE -- Return the size of different Redis core C structures.",
"ZIPLIST <key> -- Show low level info about the ziplist encoding.", "ZIPLIST <key> -- Show low level info about the ziplist encoding.",
"STRINGMATCH-TEST -- Run a fuzz tester against the stringmatchlen() function.", "STRINGMATCH-TEST -- Run a fuzz tester against the stringmatchlen() function.",
#ifdef USE_JEMALLOC
"MALLCTL <key> [<val>] -- Get or set a malloc tunning integer.",
"MALLCTL-STR <key> [<val>] -- Get or set a malloc tunning string.",
#endif
NULL NULL
}; };
addReplyHelp(c, help); addReplyHelp(c, help);
@ -677,6 +731,14 @@ NULL
{ {
stringmatchlen_fuzz_test(); stringmatchlen_fuzz_test();
addReplyStatus(c,"Apparently Redis did not crash: test passed"); addReplyStatus(c,"Apparently Redis did not crash: test passed");
#ifdef USE_JEMALLOC
} else if(!strcasecmp(c->argv[1]->ptr,"mallctl") && c->argc >= 3) {
mallctl_int(c, c->argv+2, c->argc-2);
return;
} else if(!strcasecmp(c->argv[1]->ptr,"mallctl-str") && c->argc >= 3) {
mallctl_string(c, c->argv+2, c->argc-2);
return;
#endif
} else { } else {
addReplySubcommandSyntaxError(c); addReplySubcommandSyntaxError(c);
return; return;

View File

@ -1450,22 +1450,10 @@ NULL
addReplyVerbatim(c,report,sdslen(report),"txt"); addReplyVerbatim(c,report,sdslen(report),"txt");
sdsfree(report); sdsfree(report);
} else if (!strcasecmp(c->argv[1]->ptr,"purge") && c->argc == 2) { } else if (!strcasecmp(c->argv[1]->ptr,"purge") && c->argc == 2) {
#if defined(USE_JEMALLOC) if (jemalloc_purge() == 0)
char tmp[32]; addReply(c, shared.ok);
unsigned narenas = 0; else
size_t sz = sizeof(unsigned); addReplyError(c, "Error purging dirty pages");
if (!je_mallctl("arenas.narenas", &narenas, &sz, NULL, 0)) {
sprintf(tmp, "arena.%d.purge", narenas);
if (!je_mallctl(tmp, NULL, 0, NULL, 0)) {
addReply(c, shared.ok);
return;
}
}
addReplyError(c, "Error purging dirty pages");
#else
addReply(c, shared.ok);
/* Nothing to do for other allocators. */
#endif
} else { } else {
addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)c->argv[1]->ptr); addReplyErrorFormat(c, "Unknown subcommand or wrong number of arguments for '%s'. Try MEMORY HELP", (char*)c->argv[1]->ptr);
} }

View File

@ -2260,6 +2260,7 @@ void initServerConfig(void) {
server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT; server.maxidletime = CONFIG_DEFAULT_CLIENT_TIMEOUT;
server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE; server.tcpkeepalive = CONFIG_DEFAULT_TCP_KEEPALIVE;
server.active_expire_enabled = 1; server.active_expire_enabled = 1;
server.jemalloc_bg_thread = 1;
server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG; server.active_defrag_enabled = CONFIG_DEFAULT_ACTIVE_DEFRAG;
server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES; server.active_defrag_ignore_bytes = CONFIG_DEFAULT_DEFRAG_IGNORE_BYTES;
server.active_defrag_threshold_lower = CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER; server.active_defrag_threshold_lower = CONFIG_DEFAULT_DEFRAG_THRESHOLD_LOWER;
@ -2904,8 +2905,17 @@ void initServer(void) {
scriptingInit(1); scriptingInit(1);
slowlogInit(); slowlogInit();
latencyMonitorInit(); latencyMonitorInit();
}
/* Some steps in server initialization need to be done last (after modules
* are loaded).
* Specifically, creation of threads due to a race bug in ld.so, in which
* Thread Local Storage initialization collides with dlopen call.
* see: https://sourceware.org/bugzilla/show_bug.cgi?id=19329 */
void InitServerLast() {
bioInit(); bioInit();
initThreadedIO(); initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory(); server.initial_memory_usage = zmalloc_used_memory();
} }
@ -5033,6 +5043,7 @@ int main(int argc, char **argv) {
#endif #endif
moduleLoadFromQueue(); moduleLoadFromQueue();
ACLLoadUsersAtStartup(); ACLLoadUsersAtStartup();
InitServerLast();
loadDataFromDisk(); loadDataFromDisk();
if (server.cluster_enabled) { if (server.cluster_enabled) {
if (verifyClusterConfigWithData() == C_ERR) { if (verifyClusterConfigWithData() == C_ERR) {
@ -5047,6 +5058,7 @@ int main(int argc, char **argv) {
if (server.sofd > 0) if (server.sofd > 0)
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket); serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
} else { } else {
InitServerLast();
sentinelIsRunning(); sentinelIsRunning();
} }

View File

@ -1174,6 +1174,7 @@ struct redisServer {
int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */ int tcpkeepalive; /* Set SO_KEEPALIVE if non-zero. */
int active_expire_enabled; /* Can be disabled for testing purposes. */ int active_expire_enabled; /* Can be disabled for testing purposes. */
int active_defrag_enabled; int active_defrag_enabled;
int jemalloc_bg_thread; /* Enable jemalloc background thread */
size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */ size_t active_defrag_ignore_bytes; /* minimum amount of fragmentation waste to start active defrag */
int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */ int active_defrag_threshold_lower; /* minimum percentage of fragmentation to start active defrag */
int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */ int active_defrag_threshold_upper; /* maximum percentage of fragmentation at which we use maximum effort */

View File

@ -326,6 +326,7 @@ size_t zmalloc_get_rss(void) {
#endif #endif
#if defined(USE_JEMALLOC) #if defined(USE_JEMALLOC)
int zmalloc_get_allocator_info(size_t *allocated, int zmalloc_get_allocator_info(size_t *allocated,
size_t *active, size_t *active,
size_t *resident) { size_t *resident) {
@ -347,13 +348,44 @@ int zmalloc_get_allocator_info(size_t *allocated,
je_mallctl("stats.allocated", allocated, &sz, NULL, 0); je_mallctl("stats.allocated", allocated, &sz, NULL, 0);
return 1; return 1;
} }
void set_jemalloc_bg_thread(int enable) {
/* let jemalloc do purging asynchronously, required when there's no traffic
* after flushdb */
char val = !!enable;
je_mallctl("background_thread", NULL, 0, &val, 1);
}
int jemalloc_purge() {
/* return all unused (reserved) pages to the OS */
char tmp[32];
unsigned narenas = 0;
size_t sz = sizeof(unsigned);
if (!je_mallctl("arenas.narenas", &narenas, &sz, NULL, 0)) {
sprintf(tmp, "arena.%d.purge", narenas);
if (!je_mallctl(tmp, NULL, 0, NULL, 0))
return 0;
}
return -1;
}
#else #else
int zmalloc_get_allocator_info(size_t *allocated, int zmalloc_get_allocator_info(size_t *allocated,
size_t *active, size_t *active,
size_t *resident) { size_t *resident) {
*allocated = *resident = *active = 0; *allocated = *resident = *active = 0;
return 1; return 1;
} }
void set_jemalloc_bg_thread(int enable) {
((void)(enable));
}
int jemalloc_purge() {
return 0;
}
#endif #endif
/* Get the sum of the specified field (converted form kb to bytes) in /* Get the sum of the specified field (converted form kb to bytes) in

View File

@ -86,6 +86,8 @@ size_t zmalloc_used_memory(void);
void zmalloc_set_oom_handler(void (*oom_handler)(size_t)); void zmalloc_set_oom_handler(void (*oom_handler)(size_t));
size_t zmalloc_get_rss(void); size_t zmalloc_get_rss(void);
int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident); int zmalloc_get_allocator_info(size_t *allocated, size_t *active, size_t *resident);
void set_jemalloc_bg_thread(int enable);
int jemalloc_purge();
size_t zmalloc_get_private_dirty(long pid); size_t zmalloc_get_private_dirty(long pid);
size_t zmalloc_get_smap_bytes_by_field(char *field, long pid); size_t zmalloc_get_smap_bytes_by_field(char *field, long pid);
size_t zmalloc_get_memory_size(void); size_t zmalloc_get_memory_size(void);