From 687210f1550cf9048bed5f5539c9411fb22cd3b0 Mon Sep 17 00:00:00 2001 From: "Meir Shpilraien (Spielrein)" Date: Thu, 16 Dec 2021 17:58:25 +0200 Subject: [PATCH] Add FUNCTION FLUSH command to flush all functions (#9936) Added `FUNCTION FLUSH` command. The new sub-command allows delete all the functions. An optional `[SYNC|ASYNC]` argument can be given to control whether or not to flush the functions synchronously or asynchronously. if not given the default flush mode is chosen by `lazyfree-lazy-user-flush` configuration values. Add the missing `functions.tcl` test to the list of tests that are executed in test_helper.tcl, and call FUNCTION FLUSH in between servers in external mode --- src/commands.c | 22 +++++++++++++++++ src/commands/function-flush.json | 37 ++++++++++++++++++++++++++++ src/functions.c | 40 ++++++++++++++++++++++++++++++ src/functions.h | 1 + src/lazyfree.c | 20 +++++++++++++++ src/server.h | 2 ++ tests/support/server.tcl | 1 + tests/test_helper.tcl | 1 + tests/unit/functions.tcl | 42 +++++++++++++++++++++++++++++++- 9 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 src/commands/function-flush.json diff --git a/src/commands.c b/src/commands.c index 252b009c1..c88072072 100644 --- a/src/commands.c +++ b/src/commands.c @@ -3055,6 +3055,27 @@ struct redisCommandArg FUNCTION_DELETE_Args[] = { {0} }; +/********** FUNCTION FLUSH ********************/ + +/* FUNCTION FLUSH history */ +#define FUNCTION_FLUSH_History NULL + +/* FUNCTION FLUSH hints */ +#define FUNCTION_FLUSH_Hints NULL + +/* FUNCTION FLUSH async argument table */ +struct redisCommandArg FUNCTION_FLUSH_async_Subargs[] = { +{"async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,NULL,CMD_ARG_NONE}, +{"sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,NULL,CMD_ARG_NONE}, +{0} +}; + +/* FUNCTION FLUSH argument table */ +struct redisCommandArg FUNCTION_FLUSH_Args[] = { +{"async",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=FUNCTION_FLUSH_async_Subargs}, +{0} +}; + /********** FUNCTION HELP ********************/ /* FUNCTION HELP history */ @@ -3106,6 +3127,7 @@ struct redisCommandArg FUNCTION_INFO_Args[] = { struct redisCommand FUNCTION_Subcommands[] = { {"create","PATCH__TBD__15__","PATCH__TBD__14__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_CREATE_History,FUNCTION_CREATE_Hints,functionCreateCommand,-5,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_CREATE_Args}, {"delete","PATCH__TBD__23__","PATCH__TBD__22__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DELETE_History,FUNCTION_DELETE_Hints,functionDeleteCommand,3,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_DELETE_Args}, +{"flush","PATCH__TBD__29__","PATCH__TBD__28__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_FLUSH_History,FUNCTION_FLUSH_Hints,functionFlushCommand,-2,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_FLUSH_Args}, {"help","Show helpful text about the different subcommands","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_HELP_History,FUNCTION_HELP_Hints,functionHelpCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING}, {"info","PATCH__TBD__11__","PATCH__TBD__10__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_INFO_History,FUNCTION_INFO_Hints,functionInfoCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_INFO_Args}, {"kill","PATCH__TBD__19__","PATCH__TBD__18__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING}, diff --git a/src/commands/function-flush.json b/src/commands/function-flush.json new file mode 100644 index 000000000..792f518e6 --- /dev/null +++ b/src/commands/function-flush.json @@ -0,0 +1,37 @@ +{ + "FLUSH": { + "summary": "PATCH__TBD__29__", + "complexity": "PATCH__TBD__28__", + "group": "scripting", + "since": "7.0.0", + "arity": -2, + "container": "FUNCTION", + "function": "functionFlushCommand", + "command_flags": [ + "NOSCRIPT", + "MAY_REPLICATE" + ], + "acl_categories": [ + "SCRIPTING" + ], + "arguments": [ + { + "name": "async", + "type": "oneof", + "optional": true, + "arguments": [ + { + "name": "async", + "type": "pure-token", + "token": "ASYNC" + }, + { + "name": "sync", + "type": "pure-token", + "token": "SYNC" + } + ] + } + ] + } +} diff --git a/src/functions.c b/src/functions.c index 156616423..23843794d 100644 --- a/src/functions.c +++ b/src/functions.c @@ -372,6 +372,36 @@ void fcallroCommand(client *c) { fcallCommandGeneric(c, 1); } +void functionFlushCommand(client *c) { + if (c->argc > 3) { + addReplySubcommandSyntaxError(c); + return; + } + int async = 0; + if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"sync")) { + async = 0; + } else if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"async")) { + async = 1; + } else if (c->argc == 2) { + async = server.lazyfree_lazy_user_flush ? 1 : 0; + } else { + addReplyError(c,"FUNCTION FLUSH only supports SYNC|ASYNC option"); + return; + } + + if (async) { + functionsCtx *old_f_ctx = functions_ctx; + functions_ctx = functionsCtxCreate(); + freeFunctionsAsync(old_f_ctx); + } else { + functionsCtxClear(functions_ctx); + } + /* Indicate that the command changed the data so it will be replicated and + * counted as a data change (for persistence configuration) */ + server.dirty++; + addReply(c,shared.ok); +} + void functionHelpCommand(client *c) { const char *help[] = { "CREATE [REPLACE] [DESC ] ", @@ -398,6 +428,12 @@ void functionHelpCommand(client *c) { " In addition, returns a list of available engines.", "KILL", " Kill the current running function.", +"FLUSH [ASYNC|SYNC]", +" Delete all the functions.", +" When called without the optional mode argument, the behavior is determined by the", +" lazyfree-lazy-user-flush configuration directive. Valid modes are:", +" * ASYNC: Asynchronously flush the functions.", +" * SYNC: Synchronously flush the functions.", NULL }; addReplyHelp(c, help); } @@ -528,6 +564,10 @@ dict* functionsGet() { return functions_ctx->functions; } +size_t functionsLen(functionsCtx *functions_ctx) { + return dictSize(functions_ctx->functions); +} + /* Initialize engine data structures. * Should be called once on server initialization */ int functionsInit() { diff --git a/src/functions.h b/src/functions.h index 4a1ec4a24..66147f97c 100644 --- a/src/functions.h +++ b/src/functions.h @@ -105,6 +105,7 @@ unsigned long functionsMemoryOverhead(); int functionsLoad(rio *rdb, int ver); unsigned long functionsNum(); dict* functionsGet(); +size_t functionsLen(functionsCtx *functions_ctx); functionsCtx* functionsCtxGetCurrent(); functionsCtx* functionsCtxCreate(); void functionsCtxFree(functionsCtx *functions_ctx); diff --git a/src/lazyfree.c b/src/lazyfree.c index 6127abe77..1cc521cbd 100644 --- a/src/lazyfree.c +++ b/src/lazyfree.c @@ -1,6 +1,7 @@ #include "server.h" #include "bio.h" #include "atomicvar.h" +#include "functions.h" static redisAtomic size_t lazyfree_objects = 0; static redisAtomic size_t lazyfreed_objects = 0; @@ -46,6 +47,15 @@ void lazyFreeLuaScripts(void *args[]) { atomicIncr(lazyfreed_objects,len); } +/* Release the functions ctx. */ +void lazyFreeFunctionsCtx(void *args[]) { + functionsCtx *f_ctx = args[0]; + size_t len = functionsLen(f_ctx); + functionsCtxFree(f_ctx); + atomicDecr(lazyfree_objects,len); + atomicIncr(lazyfreed_objects,len); +} + /* Release replication backlog referencing memory. */ void lazyFreeReplicationBacklogRefMem(void *args[]) { list *blocks = args[0]; @@ -193,6 +203,16 @@ void freeLuaScriptsAsync(dict *lua_scripts) { } } +/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */ +void freeFunctionsAsync(functionsCtx *f_ctx) { + if (functionsLen(f_ctx) > LAZYFREE_THRESHOLD) { + atomicIncr(lazyfree_objects,functionsLen(f_ctx)); + bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,f_ctx); + } else { + functionsCtxFree(f_ctx); + } +} + /* Free replication backlog referencing buffer blocks and rax index. */ void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) { if (listLength(blocks) > LAZYFREE_THRESHOLD || diff --git a/src/server.h b/src/server.h index 10f45323c..a3600e13a 100644 --- a/src/server.h +++ b/src/server.h @@ -2905,6 +2905,7 @@ int ldbPendingChildren(void); sds luaCreateFunction(client *c, robj *body); void luaLdbLineHook(lua_State *lua, lua_Debug *ar); void freeLuaScriptsAsync(dict *lua_scripts); +void freeFunctionsAsync(functionsCtx *f_ctx); int ldbIsEnabled(); void ldbLog(sds entry); void ldbLogRedisReply(char *reply); @@ -3169,6 +3170,7 @@ void functionStatsCommand(client *c); void functionInfoCommand(client *c); void functionListCommand(client *c); void functionHelpCommand(client *c); +void functionFlushCommand(client *c); void timeCommand(client *c); void bitopCommand(client *c); void bitcountCommand(client *c); diff --git a/tests/support/server.tcl b/tests/support/server.tcl index 4c63d7b3a..ee39c8df9 100644 --- a/tests/support/server.tcl +++ b/tests/support/server.tcl @@ -343,6 +343,7 @@ proc run_external_server_test {code overrides} { } r flushall + r function flush # store overrides set saved_config {} diff --git a/tests/test_helper.tcl b/tests/test_helper.tcl index d1405e3e3..570d9e85f 100644 --- a/tests/test_helper.tcl +++ b/tests/test_helper.tcl @@ -63,6 +63,7 @@ set ::all_tests { unit/pubsub unit/slowlog unit/scripting + unit/functions unit/maxmemory unit/introspection unit/introspection-2 diff --git a/tests/unit/functions.tcl b/tests/unit/functions.tcl index 0736a44da..27bb29b72 100644 --- a/tests/unit/functions.tcl +++ b/tests/unit/functions.tcl @@ -185,9 +185,34 @@ start_server {tags {"scripting"}} { after 200 ; # Give some time to Lua to call the hook again... assert_equal [r ping] "PONG" } + + test {FUNCTION - test function flush} { + r function create lua test REPLACE {local a = 1 while true do a = a + 1 end} + assert_match {{name test engine LUA description {}}} [r function list] + r function flush + assert_match {} [r function list] + + r function create lua test REPLACE {local a = 1 while true do a = a + 1 end} + assert_match {{name test engine LUA description {}}} [r function list] + r function flush async + assert_match {} [r function list] + + r function create lua test REPLACE {local a = 1 while true do a = a + 1 end} + assert_match {{name test engine LUA description {}}} [r function list] + r function flush sync + assert_match {} [r function list] + } + + test {FUNCTION - test function wrong argument} { + catch {r function flush bad_arg} e + assert_match {*only supports SYNC|ASYNC*} $e + + catch {r function flush sync extra_arg} e + assert_match {*wrong number of arguments*} $e + } } -start_server {tags {"scripting repl"}} { +start_server {tags {"scripting repl external:skip"}} { start_server {} { test "Connect a replica to the master instance" { r -1 slaveof [srv 0 host] [srv 0 port] @@ -221,6 +246,21 @@ start_server {tags {"scripting repl"}} { } } + test {FUNCTION - flush is replicated to replica} { + r function create LUA test DESCRIPTION {some description} {return 'hello'} + wait_for_condition 50 100 { + [r -1 function list] eq {{name test engine LUA description {some description}}} + } else { + fail "Failed waiting for function to replicate to replica" + } + r function flush + wait_for_condition 50 100 { + [r -1 function list] eq {} + } else { + fail "Failed waiting for function to replicate to replica" + } + } + test "Disconnecting the replica from master instance" { r -1 slaveof no one # creating a function after disconnect to make sure function