Add lazyfree-lazy-user-flush config to control default behavior of FLUSH[ALL|DB], SCRIPT FLUSH (#8258)
* Adds ASYNC and SYNC arguments to SCRIPT FLUSH * Adds SYNC argument to FLUSHDB and FLUSHALL * Adds new config to control the default behavior of FLUSHDB, FLUSHALL and SCRIPT FLUASH. the new behavior is as follows: * FLUSH[ALL|DB],SCRIPT FLUSH: Determine sync or async according to the value of lazyfree-lazy-user-flush. * FLUSH[ALL|DB],SCRIPT FLUSH ASYNC: Always flushes the database in an async manner. * FLUSH[ALL|DB],SCRIPT FLUSH SYNC: Always flushes the database in a sync manner.
This commit is contained in:
parent
fcb3dfe56d
commit
294f93af97
@ -1089,6 +1089,13 @@ replica-lazy-flush no
|
|||||||
|
|
||||||
lazyfree-lazy-user-del no
|
lazyfree-lazy-user-del no
|
||||||
|
|
||||||
|
# FLUSHDB, FLUSHALL, and SCRIPT FLUSH support both asynchronous and synchronous
|
||||||
|
# deletion, which can be controlled by passing the [SYNC|ASYNC] flags into the
|
||||||
|
# commands. When neither flag is passed, this directive will be used to determine
|
||||||
|
# if the data should be deleted asynchronously.
|
||||||
|
|
||||||
|
lazyfree-lazy-user-flush no
|
||||||
|
|
||||||
################################ THREADED I/O #################################
|
################################ THREADED I/O #################################
|
||||||
|
|
||||||
# Redis is mostly single threaded, however there are certain threaded
|
# Redis is mostly single threaded, however there are certain threaded
|
||||||
|
@ -2385,6 +2385,7 @@ standardConfig configs[] = {
|
|||||||
createBoolConfig("lazyfree-lazy-expire", NULL, MODIFIABLE_CONFIG, server.lazyfree_lazy_expire, 0, NULL, NULL),
|
createBoolConfig("lazyfree-lazy-expire", NULL, MODIFIABLE_CONFIG, server.lazyfree_lazy_expire, 0, NULL, NULL),
|
||||||
createBoolConfig("lazyfree-lazy-server-del", NULL, MODIFIABLE_CONFIG, server.lazyfree_lazy_server_del, 0, NULL, NULL),
|
createBoolConfig("lazyfree-lazy-server-del", NULL, MODIFIABLE_CONFIG, server.lazyfree_lazy_server_del, 0, NULL, NULL),
|
||||||
createBoolConfig("lazyfree-lazy-user-del", NULL, MODIFIABLE_CONFIG, server.lazyfree_lazy_user_del , 0, NULL, NULL),
|
createBoolConfig("lazyfree-lazy-user-del", NULL, MODIFIABLE_CONFIG, server.lazyfree_lazy_user_del , 0, NULL, NULL),
|
||||||
|
createBoolConfig("lazyfree-lazy-user-flush", NULL, MODIFIABLE_CONFIG, server.lazyfree_lazy_user_flush , 0, NULL, NULL),
|
||||||
createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL),
|
createBoolConfig("repl-disable-tcp-nodelay", NULL, MODIFIABLE_CONFIG, server.repl_disable_tcp_nodelay, 0, NULL, NULL),
|
||||||
createBoolConfig("repl-diskless-sync", NULL, MODIFIABLE_CONFIG, server.repl_diskless_sync, 0, NULL, NULL),
|
createBoolConfig("repl-diskless-sync", NULL, MODIFIABLE_CONFIG, server.repl_diskless_sync, 0, NULL, NULL),
|
||||||
createBoolConfig("gopher-enabled", NULL, MODIFIABLE_CONFIG, server.gopher_enabled, 0, NULL, NULL),
|
createBoolConfig("gopher-enabled", NULL, MODIFIABLE_CONFIG, server.gopher_enabled, 0, NULL, NULL),
|
||||||
|
20
src/db.c
20
src/db.c
@ -595,21 +595,23 @@ void signalFlushedDb(int dbid, int async) {
|
|||||||
/* Return the set of flags to use for the emptyDb() call for FLUSHALL
|
/* Return the set of flags to use for the emptyDb() call for FLUSHALL
|
||||||
* and FLUSHDB commands.
|
* and FLUSHDB commands.
|
||||||
*
|
*
|
||||||
* Currently the command just attempts to parse the "ASYNC" option. It
|
* sync: flushes the database in an sync manner.
|
||||||
* also checks if the command arity is wrong.
|
* async: flushes the database in an async manner.
|
||||||
|
* no option: determine sync or async according to the value of lazyfree-lazy-user-flush.
|
||||||
*
|
*
|
||||||
* On success C_OK is returned and the flags are stored in *flags, otherwise
|
* On success C_OK is returned and the flags are stored in *flags, otherwise
|
||||||
* C_ERR is returned and the function sends an error to the client. */
|
* C_ERR is returned and the function sends an error to the client. */
|
||||||
int getFlushCommandFlags(client *c, int *flags) {
|
int getFlushCommandFlags(client *c, int *flags) {
|
||||||
/* Parse the optional ASYNC option. */
|
/* Parse the optional ASYNC option. */
|
||||||
if (c->argc > 1) {
|
if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"sync")) {
|
||||||
if (c->argc > 2 || strcasecmp(c->argv[1]->ptr,"async")) {
|
|
||||||
addReplyErrorObject(c,shared.syntaxerr);
|
|
||||||
return C_ERR;
|
|
||||||
}
|
|
||||||
*flags = EMPTYDB_ASYNC;
|
|
||||||
} else {
|
|
||||||
*flags = EMPTYDB_NO_FLAGS;
|
*flags = EMPTYDB_NO_FLAGS;
|
||||||
|
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"async")) {
|
||||||
|
*flags = EMPTYDB_ASYNC;
|
||||||
|
} else if (c->argc == 1) {
|
||||||
|
*flags = server.lazyfree_lazy_user_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS;
|
||||||
|
} else {
|
||||||
|
addReplyErrorObject(c,shared.syntaxerr);
|
||||||
|
return C_ERR;
|
||||||
}
|
}
|
||||||
return C_OK;
|
return C_OK;
|
||||||
}
|
}
|
||||||
|
@ -49,6 +49,14 @@ void lazyFreeTrackingTable(void *args[]) {
|
|||||||
atomicIncr(lazyfreed_objects,len);
|
atomicIncr(lazyfreed_objects,len);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void lazyFreeLuaScripts(void *args[]) {
|
||||||
|
dict *lua_scripts = args[0];
|
||||||
|
long long len = dictSize(lua_scripts);
|
||||||
|
dictRelease(lua_scripts);
|
||||||
|
atomicDecr(lazyfree_objects,len);
|
||||||
|
atomicIncr(lazyfreed_objects,len);
|
||||||
|
}
|
||||||
|
|
||||||
/* Return the number of currently pending objects to free. */
|
/* Return the number of currently pending objects to free. */
|
||||||
size_t lazyfreeGetPendingObjectsCount(void) {
|
size_t lazyfreeGetPendingObjectsCount(void) {
|
||||||
size_t aux;
|
size_t aux;
|
||||||
@ -212,3 +220,13 @@ void freeTrackingRadixTreeAsync(rax *tracking) {
|
|||||||
atomicIncr(lazyfree_objects,tracking->numele);
|
atomicIncr(lazyfree_objects,tracking->numele);
|
||||||
bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
|
bioCreateLazyFreeJob(lazyFreeTrackingTable,1,tracking);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* Free lua_scripts dict, if the dict is huge enough, free it in async way. */
|
||||||
|
void freeLuaScriptsAsync(dict *lua_scripts) {
|
||||||
|
if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) {
|
||||||
|
atomicIncr(lazyfree_objects,dictSize(lua_scripts));
|
||||||
|
bioCreateLazyFreeJob(lazyFreeLuaScripts,1,lua_scripts);
|
||||||
|
} else {
|
||||||
|
dictRelease(lua_scripts);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -1282,14 +1282,17 @@ void scriptingInit(int setup) {
|
|||||||
|
|
||||||
/* Release resources related to Lua scripting.
|
/* Release resources related to Lua scripting.
|
||||||
* This function is used in order to reset the scripting environment. */
|
* This function is used in order to reset the scripting environment. */
|
||||||
void scriptingRelease(void) {
|
void scriptingRelease(int async) {
|
||||||
dictRelease(server.lua_scripts);
|
if (async)
|
||||||
|
freeLuaScriptsAsync(server.lua_scripts);
|
||||||
|
else
|
||||||
|
dictRelease(server.lua_scripts);
|
||||||
server.lua_scripts_mem = 0;
|
server.lua_scripts_mem = 0;
|
||||||
lua_close(server.lua);
|
lua_close(server.lua);
|
||||||
}
|
}
|
||||||
|
|
||||||
void scriptingReset(void) {
|
void scriptingReset(int async) {
|
||||||
scriptingRelease();
|
scriptingRelease(async);
|
||||||
scriptingInit(0);
|
scriptingInit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1711,8 +1714,12 @@ void scriptCommand(client *c) {
|
|||||||
" Set the debug mode for subsequent scripts executed.",
|
" Set the debug mode for subsequent scripts executed.",
|
||||||
"EXISTS <sha1> [<sha1> ...]",
|
"EXISTS <sha1> [<sha1> ...]",
|
||||||
" Return information about the existence of the scripts in the script cache.",
|
" Return information about the existence of the scripts in the script cache.",
|
||||||
"FLUSH",
|
"FLUSH [ASYNC|SYNC]",
|
||||||
" Flush the Lua scripts cache. Very dangerous on replicas.",
|
" Flush the Lua scripts cache. Very dangerous on replicas.",
|
||||||
|
" When called without the optional mode argument, the behavior is determined by the",
|
||||||
|
" lazyfree-lazy-user-flush configuration directive. Valid modes are:",
|
||||||
|
" * ASYNC: Asynchronously flush the scripts cache.",
|
||||||
|
" * SYNC: Synchronously flush the scripts cache.",
|
||||||
"KILL",
|
"KILL",
|
||||||
" Kill the currently executing Lua script.",
|
" Kill the currently executing Lua script.",
|
||||||
"LOAD <script>",
|
"LOAD <script>",
|
||||||
@ -1720,8 +1727,19 @@ void scriptCommand(client *c) {
|
|||||||
NULL
|
NULL
|
||||||
};
|
};
|
||||||
addReplyHelp(c, help);
|
addReplyHelp(c, help);
|
||||||
} else if (c->argc == 2 && !strcasecmp(c->argv[1]->ptr,"flush")) {
|
} else if (c->argc >= 2 && !strcasecmp(c->argv[1]->ptr,"flush")) {
|
||||||
scriptingReset();
|
int async = 0;
|
||||||
|
if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"sync")) {
|
||||||
|
async = 0;
|
||||||
|
} else if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"async")) {
|
||||||
|
async = 1;
|
||||||
|
} else if (c->argc == 2) {
|
||||||
|
async = server.lazyfree_lazy_user_flush ? 1 : 0;
|
||||||
|
} else {
|
||||||
|
addReplyError(c,"SCRIPT FLUSH only support SYNC|ASYNC option");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
scriptingReset(async);
|
||||||
addReply(c,shared.ok);
|
addReply(c,shared.ok);
|
||||||
replicationScriptCacheFlush();
|
replicationScriptCacheFlush();
|
||||||
server.dirty++; /* Propagating this command is a good idea. */
|
server.dirty++; /* Propagating this command is a good idea. */
|
||||||
|
@ -1530,6 +1530,7 @@ struct redisServer {
|
|||||||
int lazyfree_lazy_expire;
|
int lazyfree_lazy_expire;
|
||||||
int lazyfree_lazy_server_del;
|
int lazyfree_lazy_server_del;
|
||||||
int lazyfree_lazy_user_del;
|
int lazyfree_lazy_user_del;
|
||||||
|
int lazyfree_lazy_user_flush;
|
||||||
/* Latency monitor */
|
/* Latency monitor */
|
||||||
long long latency_monitor_threshold;
|
long long latency_monitor_threshold;
|
||||||
dict *latency_events;
|
dict *latency_events;
|
||||||
@ -2344,6 +2345,7 @@ int ldbRemoveChild(pid_t pid);
|
|||||||
void ldbKillForkedSessions(void);
|
void ldbKillForkedSessions(void);
|
||||||
int ldbPendingChildren(void);
|
int ldbPendingChildren(void);
|
||||||
sds luaCreateFunction(client *c, lua_State *lua, robj *body);
|
sds luaCreateFunction(client *c, lua_State *lua, robj *body);
|
||||||
|
void freeLuaScriptsAsync(dict *lua_scripts);
|
||||||
|
|
||||||
/* Blocked clients */
|
/* Blocked clients */
|
||||||
void processUnblockedClients(void);
|
void processUnblockedClients(void);
|
||||||
|
@ -330,6 +330,15 @@ start_server {tags {"scripting"}} {
|
|||||||
set e
|
set e
|
||||||
} {NOSCRIPT*}
|
} {NOSCRIPT*}
|
||||||
|
|
||||||
|
test {SCRIPTING FLUSH ASYNC} {
|
||||||
|
for {set j 0} {$j < 100} {incr j} {
|
||||||
|
r script load "return $j"
|
||||||
|
}
|
||||||
|
assert { [string match "*number_of_cached_scripts:100*" [r info Memory]] }
|
||||||
|
r script flush async
|
||||||
|
assert { [string match "*number_of_cached_scripts:0*" [r info Memory]] }
|
||||||
|
}
|
||||||
|
|
||||||
test {SCRIPT EXISTS - can detect already defined scripts?} {
|
test {SCRIPT EXISTS - can detect already defined scripts?} {
|
||||||
r eval "return 1+1" 0
|
r eval "return 1+1" 0
|
||||||
r script exists a27e7e8a43702b7046d4f6a7ccf5b60cef6b9bd9 a27e7e8a43702b7046d4f6a7ccf5b60cef6b9bda
|
r script exists a27e7e8a43702b7046d4f6a7ccf5b60cef6b9bd9 a27e7e8a43702b7046d4f6a7ccf5b60cef6b9bda
|
||||||
|
Loading…
x
Reference in New Issue
Block a user