Add FUNCTION FLUSH command to flush all functions (#9936)

Added `FUNCTION FLUSH` command. The new sub-command allows delete all the functions.
An optional `[SYNC|ASYNC]` argument can be given to control whether or not to flush the
functions synchronously or asynchronously. if not given the default flush mode is chosen by
`lazyfree-lazy-user-flush` configuration values.

Add the missing `functions.tcl` test to the list of tests that are executed in test_helper.tcl,
and call FUNCTION FLUSH in between servers in external mode
This commit is contained in:
Meir Shpilraien (Spielrein) 2021-12-16 17:58:25 +02:00 committed by GitHub
parent ffbe36fc3e
commit 687210f155
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 165 additions and 1 deletions

View File

@ -3055,6 +3055,27 @@ struct redisCommandArg FUNCTION_DELETE_Args[] = {
{0}
};
/********** FUNCTION FLUSH ********************/
/* FUNCTION FLUSH history */
#define FUNCTION_FLUSH_History NULL
/* FUNCTION FLUSH hints */
#define FUNCTION_FLUSH_Hints NULL
/* FUNCTION FLUSH async argument table */
struct redisCommandArg FUNCTION_FLUSH_async_Subargs[] = {
{"async",ARG_TYPE_PURE_TOKEN,-1,"ASYNC",NULL,NULL,CMD_ARG_NONE},
{"sync",ARG_TYPE_PURE_TOKEN,-1,"SYNC",NULL,NULL,CMD_ARG_NONE},
{0}
};
/* FUNCTION FLUSH argument table */
struct redisCommandArg FUNCTION_FLUSH_Args[] = {
{"async",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=FUNCTION_FLUSH_async_Subargs},
{0}
};
/********** FUNCTION HELP ********************/
/* FUNCTION HELP history */
@ -3106,6 +3127,7 @@ struct redisCommandArg FUNCTION_INFO_Args[] = {
struct redisCommand FUNCTION_Subcommands[] = {
{"create","PATCH__TBD__15__","PATCH__TBD__14__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_CREATE_History,FUNCTION_CREATE_Hints,functionCreateCommand,-5,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_CREATE_Args},
{"delete","PATCH__TBD__23__","PATCH__TBD__22__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DELETE_History,FUNCTION_DELETE_Hints,functionDeleteCommand,3,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_DELETE_Args},
{"flush","PATCH__TBD__29__","PATCH__TBD__28__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_FLUSH_History,FUNCTION_FLUSH_Hints,functionFlushCommand,-2,CMD_NOSCRIPT|CMD_MAY_REPLICATE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_FLUSH_Args},
{"help","Show helpful text about the different subcommands","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_HELP_History,FUNCTION_HELP_Hints,functionHelpCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING},
{"info","PATCH__TBD__11__","PATCH__TBD__10__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_INFO_History,FUNCTION_INFO_Hints,functionInfoCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_INFO_Args},
{"kill","PATCH__TBD__19__","PATCH__TBD__18__","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},

View File

@ -0,0 +1,37 @@
{
"FLUSH": {
"summary": "PATCH__TBD__29__",
"complexity": "PATCH__TBD__28__",
"group": "scripting",
"since": "7.0.0",
"arity": -2,
"container": "FUNCTION",
"function": "functionFlushCommand",
"command_flags": [
"NOSCRIPT",
"MAY_REPLICATE"
],
"acl_categories": [
"SCRIPTING"
],
"arguments": [
{
"name": "async",
"type": "oneof",
"optional": true,
"arguments": [
{
"name": "async",
"type": "pure-token",
"token": "ASYNC"
},
{
"name": "sync",
"type": "pure-token",
"token": "SYNC"
}
]
}
]
}
}

View File

@ -372,6 +372,36 @@ void fcallroCommand(client *c) {
fcallCommandGeneric(c, 1);
}
void functionFlushCommand(client *c) {
if (c->argc > 3) {
addReplySubcommandSyntaxError(c);
return;
}
int async = 0;
if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"sync")) {
async = 0;
} else if (c->argc == 3 && !strcasecmp(c->argv[2]->ptr,"async")) {
async = 1;
} else if (c->argc == 2) {
async = server.lazyfree_lazy_user_flush ? 1 : 0;
} else {
addReplyError(c,"FUNCTION FLUSH only supports SYNC|ASYNC option");
return;
}
if (async) {
functionsCtx *old_f_ctx = functions_ctx;
functions_ctx = functionsCtxCreate();
freeFunctionsAsync(old_f_ctx);
} else {
functionsCtxClear(functions_ctx);
}
/* Indicate that the command changed the data so it will be replicated and
* counted as a data change (for persistence configuration) */
server.dirty++;
addReply(c,shared.ok);
}
void functionHelpCommand(client *c) {
const char *help[] = {
"CREATE <ENGINE NAME> <FUNCTION NAME> [REPLACE] [DESC <FUNCTION DESCRIPTION>] <FUNCTION CODE>",
@ -398,6 +428,12 @@ void functionHelpCommand(client *c) {
" In addition, returns a list of available engines.",
"KILL",
" Kill the current running function.",
"FLUSH [ASYNC|SYNC]",
" Delete all the functions.",
" When called without the optional mode argument, the behavior is determined by the",
" lazyfree-lazy-user-flush configuration directive. Valid modes are:",
" * ASYNC: Asynchronously flush the functions.",
" * SYNC: Synchronously flush the functions.",
NULL };
addReplyHelp(c, help);
}
@ -528,6 +564,10 @@ dict* functionsGet() {
return functions_ctx->functions;
}
size_t functionsLen(functionsCtx *functions_ctx) {
return dictSize(functions_ctx->functions);
}
/* Initialize engine data structures.
* Should be called once on server initialization */
int functionsInit() {

View File

@ -105,6 +105,7 @@ unsigned long functionsMemoryOverhead();
int functionsLoad(rio *rdb, int ver);
unsigned long functionsNum();
dict* functionsGet();
size_t functionsLen(functionsCtx *functions_ctx);
functionsCtx* functionsCtxGetCurrent();
functionsCtx* functionsCtxCreate();
void functionsCtxFree(functionsCtx *functions_ctx);

View File

@ -1,6 +1,7 @@
#include "server.h"
#include "bio.h"
#include "atomicvar.h"
#include "functions.h"
static redisAtomic size_t lazyfree_objects = 0;
static redisAtomic size_t lazyfreed_objects = 0;
@ -46,6 +47,15 @@ void lazyFreeLuaScripts(void *args[]) {
atomicIncr(lazyfreed_objects,len);
}
/* Release the functions ctx. */
void lazyFreeFunctionsCtx(void *args[]) {
functionsCtx *f_ctx = args[0];
size_t len = functionsLen(f_ctx);
functionsCtxFree(f_ctx);
atomicDecr(lazyfree_objects,len);
atomicIncr(lazyfreed_objects,len);
}
/* Release replication backlog referencing memory. */
void lazyFreeReplicationBacklogRefMem(void *args[]) {
list *blocks = args[0];
@ -193,6 +203,16 @@ void freeLuaScriptsAsync(dict *lua_scripts) {
}
}
/* Free functions ctx, if the functions ctx contains enough functions, free it in async way. */
void freeFunctionsAsync(functionsCtx *f_ctx) {
if (functionsLen(f_ctx) > LAZYFREE_THRESHOLD) {
atomicIncr(lazyfree_objects,functionsLen(f_ctx));
bioCreateLazyFreeJob(lazyFreeFunctionsCtx,1,f_ctx);
} else {
functionsCtxFree(f_ctx);
}
}
/* Free replication backlog referencing buffer blocks and rax index. */
void freeReplicationBacklogRefMemAsync(list *blocks, rax *index) {
if (listLength(blocks) > LAZYFREE_THRESHOLD ||

View File

@ -2905,6 +2905,7 @@ int ldbPendingChildren(void);
sds luaCreateFunction(client *c, robj *body);
void luaLdbLineHook(lua_State *lua, lua_Debug *ar);
void freeLuaScriptsAsync(dict *lua_scripts);
void freeFunctionsAsync(functionsCtx *f_ctx);
int ldbIsEnabled();
void ldbLog(sds entry);
void ldbLogRedisReply(char *reply);
@ -3169,6 +3170,7 @@ void functionStatsCommand(client *c);
void functionInfoCommand(client *c);
void functionListCommand(client *c);
void functionHelpCommand(client *c);
void functionFlushCommand(client *c);
void timeCommand(client *c);
void bitopCommand(client *c);
void bitcountCommand(client *c);

View File

@ -343,6 +343,7 @@ proc run_external_server_test {code overrides} {
}
r flushall
r function flush
# store overrides
set saved_config {}

View File

@ -63,6 +63,7 @@ set ::all_tests {
unit/pubsub
unit/slowlog
unit/scripting
unit/functions
unit/maxmemory
unit/introspection
unit/introspection-2

View File

@ -185,9 +185,34 @@ start_server {tags {"scripting"}} {
after 200 ; # Give some time to Lua to call the hook again...
assert_equal [r ping] "PONG"
}
test {FUNCTION - test function flush} {
r function create lua test REPLACE {local a = 1 while true do a = a + 1 end}
assert_match {{name test engine LUA description {}}} [r function list]
r function flush
assert_match {} [r function list]
r function create lua test REPLACE {local a = 1 while true do a = a + 1 end}
assert_match {{name test engine LUA description {}}} [r function list]
r function flush async
assert_match {} [r function list]
r function create lua test REPLACE {local a = 1 while true do a = a + 1 end}
assert_match {{name test engine LUA description {}}} [r function list]
r function flush sync
assert_match {} [r function list]
}
test {FUNCTION - test function wrong argument} {
catch {r function flush bad_arg} e
assert_match {*only supports SYNC|ASYNC*} $e
catch {r function flush sync extra_arg} e
assert_match {*wrong number of arguments*} $e
}
}
start_server {tags {"scripting repl"}} {
start_server {tags {"scripting repl external:skip"}} {
start_server {} {
test "Connect a replica to the master instance" {
r -1 slaveof [srv 0 host] [srv 0 port]
@ -221,6 +246,21 @@ start_server {tags {"scripting repl"}} {
}
}
test {FUNCTION - flush is replicated to replica} {
r function create LUA test DESCRIPTION {some description} {return 'hello'}
wait_for_condition 50 100 {
[r -1 function list] eq {{name test engine LUA description {some description}}}
} else {
fail "Failed waiting for function to replicate to replica"
}
r function flush
wait_for_condition 50 100 {
[r -1 function list] eq {}
} else {
fail "Failed waiting for function to replicate to replica"
}
}
test "Disconnecting the replica from master instance" {
r -1 slaveof no one
# creating a function after disconnect to make sure function