From 365cbf46a717d660bbe9c832b6c9d7fc15982029 Mon Sep 17 00:00:00 2001 From: "Meir Shpilraien (Spielrein)" Date: Sun, 26 Dec 2021 09:03:37 +0200 Subject: [PATCH] Add FUNCTION DUMP and RESTORE. (#9938) Follow the conclusions to support Functions in redis cluster (#9899) Added 2 new FUNCTION sub-commands: 1. `FUNCTION DUMP` - dump a binary payload representation of all the functions. 2. `FUNCTION RESTORE [FLUSH|APPEND|REPLACE]` - give the binary payload extracted using `FUNCTION DUMP`, restore all the functions on the given payload. Restore policy can be given to control how to handle existing functions (default is APPEND): * FLUSH: delete all existing functions. * APPEND: appends the restored functions to the existing functions. On collision, abort. * REPLACE: appends the restored functions to the existing functions. On collision, replace the old function with the new function. Modify `redis-cli --cluster add-node` to use `FUNCTION DUMP` to get existing functions from one of the nodes in the cluster, and `FUNCTION RESTORE` to load the same set of functions to the new node. `redis-cli` will execute this step before sending the `CLUSTER MEET` command to the new node. If `FUNCTION DUMP` returns an error, assume the current Redis version do not support functions and skip `FUNCTION RESTORE`. If `FUNCTION RESTORE` fails, abort and do not send the `CLUSTER MEET` command. If the new node already contains functions (before the `FUNCTION RESTORE` is sent), abort and do not add the node to the cluster. Test was added to verify `redis-cli --cluster add-node` works as expected. --- src/cluster.c | 10 +- src/commands.c | 33 ++++++ src/commands/function-dump.json | 17 +++ src/commands/function-restore.json | 46 ++++++++ src/functions.c | 166 +++++++++++++++++++++++++++++ src/rdb.c | 82 ++++++++------ src/rdb.h | 2 + src/redis-cli.c | 41 +++++++ src/server.h | 3 + tests/unit/cluster.tcl | 68 ++++++++++++ tests/unit/functions.tcl | 76 +++++++++++++ 11 files changed, 511 insertions(+), 33 deletions(-) create mode 100644 src/commands/function-dump.json create mode 100644 src/commands/function-restore.json diff --git a/src/cluster.c b/src/cluster.c index d44f41ad8..15d2fc38f 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -5398,8 +5398,9 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) { /* Verify that the RDB version of the dump payload matches the one of this Redis * instance and that the checksum is ok. * If the DUMP payload looks valid C_OK is returned, otherwise C_ERR - * is returned. */ -int verifyDumpPayload(unsigned char *p, size_t len) { + * is returned. If rdbver_ptr is not NULL, its populated with the value read + * from the input buffer. */ +int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) { unsigned char *footer; uint16_t rdbver; uint64_t crc; @@ -5410,6 +5411,9 @@ int verifyDumpPayload(unsigned char *p, size_t len) { /* Verify RDB version */ rdbver = (footer[1] << 8) | footer[0]; + if (rdbver_ptr) { + *rdbver_ptr = rdbver; + } if (rdbver > RDB_VERSION) return C_ERR; if (server.skip_checksum_validation) @@ -5499,7 +5503,7 @@ void restoreCommand(client *c) { } /* Verify RDB version and data checksum. */ - if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR) + if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR) { addReplyError(c,"DUMP payload version or checksum are wrong"); return; diff --git a/src/commands.c b/src/commands.c index 0a07fb25f..2514c51b6 100644 --- a/src/commands.c +++ b/src/commands.c @@ -3064,6 +3064,14 @@ struct redisCommandArg FUNCTION_DELETE_Args[] = { {0} }; +/********** FUNCTION DUMP ********************/ + +/* FUNCTION DUMP history */ +#define FUNCTION_DUMP_History NULL + +/* FUNCTION DUMP hints */ +#define FUNCTION_DUMP_Hints NULL + /********** FUNCTION FLUSH ********************/ /* FUNCTION FLUSH history */ @@ -3124,6 +3132,29 @@ struct redisCommandArg FUNCTION_INFO_Args[] = { /* FUNCTION LIST hints */ #define FUNCTION_LIST_Hints NULL +/********** FUNCTION RESTORE ********************/ + +/* FUNCTION RESTORE history */ +#define FUNCTION_RESTORE_History NULL + +/* FUNCTION RESTORE hints */ +#define FUNCTION_RESTORE_Hints NULL + +/* FUNCTION RESTORE policy argument table */ +struct redisCommandArg FUNCTION_RESTORE_policy_Subargs[] = { +{"flush",ARG_TYPE_PURE_TOKEN,-1,"FLUSH",NULL,NULL,CMD_ARG_NONE}, +{"append",ARG_TYPE_PURE_TOKEN,-1,"APPEND",NULL,NULL,CMD_ARG_NONE}, +{"replace",ARG_TYPE_PURE_TOKEN,-1,"REPLACE",NULL,NULL,CMD_ARG_NONE}, +{0} +}; + +/* FUNCTION RESTORE argument table */ +struct redisCommandArg FUNCTION_RESTORE_Args[] = { +{"serialized-value",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE}, +{"policy",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=FUNCTION_RESTORE_policy_Subargs}, +{0} +}; + /********** FUNCTION STATS ********************/ /* FUNCTION STATS history */ @@ -3136,11 +3167,13 @@ struct redisCommandArg FUNCTION_INFO_Args[] = { struct redisCommand FUNCTION_Subcommands[] = { {"create","Create a function with the given arguments (name, code, description)","O(1) (considering compilation time is redundant)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_CREATE_History,FUNCTION_CREATE_Hints,functionCreateCommand,-5,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_CREATE_Args}, {"delete","Delete a function by name","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DELETE_History,FUNCTION_DELETE_Hints,functionDeleteCommand,3,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_DELETE_Args}, +{"dump","Dump all functions into a serialized binary payload","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DUMP_History,FUNCTION_DUMP_Hints,functionDumpCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING}, {"flush","Deleting all functions","O(N) where N is the number of functions deleted","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_FLUSH_History,FUNCTION_FLUSH_Hints,functionFlushCommand,-2,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_FLUSH_Args}, {"help","Show helpful text about the different subcommands","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_HELP_History,FUNCTION_HELP_Hints,functionHelpCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING}, {"info","Return information about a function by function name","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_INFO_History,FUNCTION_INFO_Hints,functionInfoCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_INFO_Args}, {"kill","Kill the function currently in execution.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING}, {"list","List information about all the functions","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_LIST_History,FUNCTION_LIST_Hints,functionListCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING}, +{"restore","Restore all the functions on the given payload","O(N) where N is the number of functions on the payload","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_RESTORE_History,FUNCTION_RESTORE_Hints,functionRestoreCommand,-3,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_RESTORE_Args}, {"stats","Return information about the function currently running (name, description, duration)","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_STATS_History,FUNCTION_STATS_Hints,functionStatsCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING}, {0} }; diff --git a/src/commands/function-dump.json b/src/commands/function-dump.json new file mode 100644 index 000000000..de402f589 --- /dev/null +++ b/src/commands/function-dump.json @@ -0,0 +1,17 @@ +{ + "DUMP": { + "summary": "Dump all functions into a serialized binary payload", + "complexity": "O(N) where N is the number of functions", + "group": "scripting", + "since": "7.0.0", + "arity": 2, + "container": "FUNCTION", + "function": "functionDumpCommand", + "command_flags": [ + "NOSCRIPT" + ], + "acl_categories": [ + "SCRIPTING" + ] + } +} diff --git a/src/commands/function-restore.json b/src/commands/function-restore.json new file mode 100644 index 000000000..bc9b32be4 --- /dev/null +++ b/src/commands/function-restore.json @@ -0,0 +1,46 @@ +{ + "RESTORE": { + "summary": "Restore all the functions on the given payload", + "complexity": "O(N) where N is the number of functions on the payload", + "group": "scripting", + "since": "7.0.0", + "arity": -3, + "container": "FUNCTION", + "function": "functionRestoreCommand", + "command_flags": [ + "NOSCRIPT", + "WRITE" + ], + "acl_categories": [ + "SCRIPTING" + ], + "arguments": [ + { + "name": "serialized-value", + "type": "string" + }, + { + "name": "policy", + "type": "oneof", + "optional": true, + "arguments": [ + { + "name": "flush", + "type": "pure-token", + "token": "FLUSH" + }, + { + "name": "append", + "type": "pure-token", + "token": "APPEND" + }, + { + "name": "replace", + "type": "pure-token", + "token": "REPLACE" + } + ] + } + ] + } +} diff --git a/src/functions.c b/src/functions.c index 8da845250..2da142ecb 100644 --- a/src/functions.c +++ b/src/functions.c @@ -33,6 +33,10 @@ #include "adlist.h" #include "atomicvar.h" +typedef enum { + restorePolicy_Flush, restorePolicy_Append, restorePolicy_Replace +} restorePolicy; + static size_t engine_cache_memory = 0; /* Forward declaration */ @@ -81,6 +85,9 @@ static size_t functionMallocSize(functionInfo *fi) { /* Dispose function memory */ static void engineFunctionDispose(dict *d, void *obj) { UNUSED(d); + if (!obj) { + return; + } functionInfo *fi = obj; sdsfree(fi->code); sdsfree(fi->name); @@ -377,6 +384,156 @@ void fcallroCommand(client *c) { fcallCommandGeneric(c, 1); } +/* + * FUNCTION DUMP + * + * Returns a binary payload representing all the functions. + * Can be loaded using FUNCTION RESTORE + * + * The payload structure is the same as on RDB. Each function + * is saved separately with the following information: + * * Function name + * * Engine name + * * Function description + * * Function code + * RDB_OPCODE_FUNCTION is saved before each function to present + * that the payload is a function. + * RDB version and crc64 is saved at the end of the payload. + * The RDB version is saved for backward compatibility. + * crc64 is saved so we can verify the payload content. + */ +void functionDumpCommand(client *c) { + unsigned char buf[2]; + uint64_t crc; + rio payload; + rioInitWithBuffer(&payload, sdsempty()); + + functionsSaveRio(&payload); + + /* RDB version */ + buf[0] = RDB_VERSION & 0xff; + buf[1] = (RDB_VERSION >> 8) & 0xff; + payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, buf, 2); + + /* CRC64 */ + crc = crc64(0, (unsigned char*) payload.io.buffer.ptr, + sdslen(payload.io.buffer.ptr)); + memrev64ifbe(&crc); + payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, &crc, 8); + + addReplyBulkSds(c, payload.io.buffer.ptr); +} + +/* + * FUNCTION RESTORE [FLUSH|APPEND|REPLACE] + * + * Restore the functions represented by the give payload. + * Restore policy to can be given to control how to handle existing functions (default APPEND): + * * FLUSH: delete all existing functions. + * * APPEND: appends the restored functions to the existing functions. On collision, abort. + * * REPLACE: appends the restored functions to the existing functions. + * On collision, replace the old function with the new function. + */ +void functionRestoreCommand(client *c) { + if (c->argc > 4) { + addReplySubcommandSyntaxError(c); + return; + } + + restorePolicy restore_replicy = restorePolicy_Append; /* default policy: APPEND */ + sds data = c->argv[2]->ptr; + size_t data_len = sdslen(data); + rio payload; + dictIterator *iter = NULL; + sds err = NULL; + + if (c->argc == 4) { + const char *restore_policy_str = c->argv[3]->ptr; + if (!strcasecmp(restore_policy_str, "append")) { + restore_replicy = restorePolicy_Append; + } else if (!strcasecmp(restore_policy_str, "replace")) { + restore_replicy = restorePolicy_Replace; + } else if (!strcasecmp(restore_policy_str, "flush")) { + restore_replicy = restorePolicy_Flush; + } else { + addReplyError(c, "Wrong restore policy given, value should be either FLUSH, APPEND or REPLACE."); + return; + } + } + + uint16_t rdbver; + if (verifyDumpPayload((unsigned char*)data, data_len, &rdbver) != C_OK) { + addReplyError(c, "DUMP payload version or checksum are wrong"); + return; + } + + functionsCtx *f_ctx = functionsCtxCreate(); + rioInitWithBuffer(&payload, data); + + /* Read until reaching last 10 bytes that should contain RDB version and checksum. */ + while (data_len - payload.io.buffer.pos > 10) { + int type; + if ((type = rdbLoadType(&payload)) == -1) { + err = sdsnew("can not read data type"); + goto load_error; + } + if (type != RDB_OPCODE_FUNCTION) { + err = sdsnew("given type is not a function"); + goto load_error; + } + if (rdbFunctionLoad(&payload, rdbver, f_ctx, RDBFLAGS_NONE, &err) != C_OK) { + if (!err) { + err = sdsnew("failed loading the given functions payload"); + } + goto load_error; + } + } + + if (restore_replicy == restorePolicy_Flush) { + functionsCtxSwapWithCurrent(f_ctx); + f_ctx = NULL; /* avoid releasing the f_ctx in the end */ + } else { + if (restore_replicy == restorePolicy_Append) { + /* First make sure there is only new functions */ + iter = dictGetIterator(f_ctx->functions); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + functionInfo *fi = dictGetVal(entry); + if (dictFetchValue(functions_ctx->functions, fi->name)) { + /* function already exists, failed the restore. */ + err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name); + goto load_error; + } + } + dictReleaseIterator(iter); + } + iter = dictGetIterator(f_ctx->functions); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + functionInfo *fi = dictGetVal(entry); + dictReplace(functions_ctx->functions, fi->name, fi); + dictSetVal(f_ctx->functions, entry, NULL); /* make sure value will not be disposed */ + } + } + + /* Indicate that the command changed the data so it will be replicated and + * counted as a data change (for persistence configuration) */ + server.dirty++; + +load_error: + if (err) { + addReplyErrorSds(c, err); + } else { + addReply(c, shared.ok); + } + if (iter) { + dictReleaseIterator(iter); + } + if (f_ctx) { + functionsCtxFree(f_ctx); + } +} + void functionFlushCommand(client *c) { if (c->argc > 3) { addReplySubcommandSyntaxError(c); @@ -434,6 +591,15 @@ void functionHelpCommand(client *c) { " lazyfree-lazy-user-flush configuration directive. Valid modes are:", " * ASYNC: Asynchronously flush the functions.", " * SYNC: Synchronously flush the functions.", +"DUMP", +" Returns a serialized payload representing the current functions, can be restored using FUNCTION RESTORE command", +"RESTORE [FLUSH|APPEND|REPLACE]", +" Restore the functions represented by the given payload, it is possible to give a restore policy to", +" control how to handle existing functions (default APPEND):", +" * FLUSH: delete all existing functions.", +" * APPEND: appends the restored functions to the existing functions. On collision, abort.", +" * REPLACE: appends the restored functions to the existing functions, On collision, replace the old", +" function with the new function.", NULL }; addReplyHelp(c, help); } diff --git a/src/rdb.c b/src/rdb.c index 108804741..2126d6e6d 100644 --- a/src/rdb.c +++ b/src/rdb.c @@ -1214,6 +1214,30 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) { return io.bytes; } +int functionsSaveRio(rio *rdb) { + int ret = C_ERR; + dict *functions = functionsGet(); + dictIterator *iter = dictGetIterator(functions); + dictEntry *entry = NULL; + while ((entry = dictNext(iter))) { + rdbSaveType(rdb, RDB_OPCODE_FUNCTION); + functionInfo *fi = dictGetVal(entry); + if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto done; + if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto done; + if (fi->desc) { + if (rdbSaveLen(rdb, 1) == -1) goto done; /* desc exists */ + if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto done; + } else { + if (rdbSaveLen(rdb, 0) == -1) goto done; /* desc not exists */ + } + if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto done; + } + ret = C_OK; +done: + dictReleaseIterator(iter); + return ret; +} + /* Produces a dump of the database in RDB format sending it to the specified * Redis I/O channel. On success C_OK is returned, otherwise C_ERR * is returned and part of the output, or all the output, can be @@ -1240,24 +1264,7 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; - /* save functions */ - dict *functions = functionsGet(); - dictIterator *iter = dictGetIterator(functions); - dictEntry *entry = NULL; - while ((entry = dictNext(iter))) { - rdbSaveType(rdb, RDB_OPCODE_FUNCTION); - functionInfo* fi = dictGetVal(entry); - if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto werr; - if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto werr; - if (fi->desc) { - if (rdbSaveLen(rdb, 1) == -1) goto werr; /* desc exists */ - if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto werr; - } else { - if (rdbSaveLen(rdb, 0) == -1) goto werr; /* desc not exists */ - } - if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto werr; - } - dictReleaseIterator(iter); + if (functionsSaveRio(rdb) != C_OK) goto werr; for (j = 0; j < server.dbnum; j++) { redisDb *db = server.db+j; @@ -2697,42 +2704,48 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) { } } -static int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags) { +/* Save the given functions_ctx to the rdb. + * The err output parameter is optional and will be set with relevant error + * message on failure, it is the caller responsibility to free the error + * message on failure. */ +int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags, sds *err) { UNUSED(ver); sds name = NULL; sds engine_name = NULL; sds desc = NULL; sds blob = NULL; - sds err = NULL; uint64_t has_desc; + sds error = NULL; int res = C_ERR; if (!(name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { - serverLog(LL_WARNING, "Failed loading function name"); + error = sdsnew("Failed loading function name"); goto error; } if (!(engine_name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { - serverLog(LL_WARNING, "Failed loading engine name"); + error = sdsnew("Failed loading engine name"); goto error; } if ((has_desc = rdbLoadLen(rdb, NULL)) == RDB_LENERR) { - serverLog(LL_WARNING, "Failed loading function desc indicator"); + error = sdsnew("Failed loading function description indicator"); goto error; } if (has_desc && !(desc = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { - serverLog(LL_WARNING, "Failed loading function desc"); + error = sdsnew("Failed loading function description"); goto error; } if (!(blob = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) { - serverLog(LL_WARNING, "Failed loading function blob"); + error = sdsnew("Failed loading function blob"); goto error; } - if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, rdbflags & RDBFLAGS_ALLOW_DUP, &err, functions_ctx) != C_OK) { - serverLog(LL_WARNING, "Failed compiling and saving the function %s", err); + if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, rdbflags & RDBFLAGS_ALLOW_DUP, &error, functions_ctx) != C_OK) { + if (!error) { + error = sdsnew("Failed creating the function"); + } goto error; } @@ -2743,7 +2756,14 @@ error: if (engine_name) sdsfree(engine_name); if (desc) sdsfree(desc); if (blob) sdsfree(blob); - if (err) sdsfree(err); + if (error) { + if (err) { + *err = error; + } else { + serverLog(LL_WARNING, "Failed creating function, %s", error); + sdsfree(error); + } + } return res; } @@ -2964,8 +2984,10 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin continue; /* Read next opcode. */ } } else if (type == RDB_OPCODE_FUNCTION) { - if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx, rdbflags) != C_OK) { - serverLog(LL_WARNING,"Failed loading function"); + sds err = NULL; + if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx, rdbflags, &err) != C_OK) { + serverLog(LL_WARNING,"Failed loading function, %s", err); + sdsfree(err); goto eoferr; } continue; diff --git a/src/rdb.h b/src/rdb.h index 66496bcec..5942c4333 100644 --- a/src/rdb.h +++ b/src/rdb.h @@ -169,7 +169,9 @@ int rdbSaveBinaryFloatValue(rio *rdb, float val); int rdbLoadBinaryFloatValue(rio *rdb, float *val); int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi); int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx); +int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags, sds *err); int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi); +int functionsSaveRio(rio *rdb); rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi); #endif diff --git a/src/redis-cli.c b/src/redis-cli.c index 81e4f4883..24e8fe562 100644 --- a/src/redis-cli.c +++ b/src/redis-cli.c @@ -5880,6 +5880,8 @@ cleanup: static int clusterManagerCommandAddNode(int argc, char **argv) { int success = 1; redisReply *reply = NULL; + redisReply *function_restore_reply = NULL; + redisReply *function_list_reply = NULL; char *ref_ip = NULL, *ip = NULL; int ref_port = 0, port = 0; if (!getClusterHostFromCmdArgs(argc - 1, argv + 1, &ref_ip, &ref_port)) @@ -5944,6 +5946,43 @@ static int clusterManagerCommandAddNode(int argc, char **argv) { listAddNodeTail(cluster_manager.nodes, new_node); added = 1; + if (!master_node) { + /* Send functions to the new node, if new node is a replica it will get the functions from its primary. */ + clusterManagerLogInfo(">>> Getting functions from cluster\n"); + reply = CLUSTER_MANAGER_COMMAND(refnode, "FUNCTION DUMP"); + if (!clusterManagerCheckRedisReply(refnode, reply, &err)) { + clusterManagerLogInfo(">>> Failed retrieving Functions from the cluster, " + "skip this step as Redis version do not support function command (error = '%s')\n", err? err : "NULL reply"); + if (err) zfree(err); + } else { + assert(reply->type == REDIS_REPLY_STRING); + clusterManagerLogInfo(">>> Send FUNCTION LIST to %s:%d to verify there is no functions in it\n", ip, port); + function_list_reply = CLUSTER_MANAGER_COMMAND(new_node, "FUNCTION LIST"); + if (!clusterManagerCheckRedisReply(new_node, function_list_reply, &err)) { + clusterManagerLogErr(">>> Failed on CLUSTER LIST (error = '%s')\r\n", err? err : "NULL reply"); + if (err) zfree(err); + success = 0; + goto cleanup; + } + assert(function_list_reply->type == REDIS_REPLY_ARRAY); + if (function_list_reply->elements > 0) { + clusterManagerLogErr(">>> New node already contains functions and can not be added to the cluster. Use FUNCTION FLUSH and try again.\r\n"); + success = 0; + goto cleanup; + } + clusterManagerLogInfo(">>> Send FUNCTION RESTORE to %s:%d\n", ip, port); + function_restore_reply = CLUSTER_MANAGER_COMMAND(new_node, "FUNCTION RESTORE %b", reply->str, reply->len); + if (!clusterManagerCheckRedisReply(new_node, function_restore_reply, &err)) { + clusterManagerLogErr(">>> Failed loading functions to the new node (error = '%s')\r\n", err? err : "NULL reply"); + if (err) zfree(err); + success = 0; + goto cleanup; + } + } + } + + if (reply) freeReplyObject(reply); + // Send CLUSTER MEET command to the new node clusterManagerLogInfo(">>> Send CLUSTER MEET to node %s:%d to make it " "join the cluster.\n", ip, port); @@ -5968,6 +6007,8 @@ static int clusterManagerCommandAddNode(int argc, char **argv) { cleanup: if (!added && new_node) freeClusterManagerNode(new_node); if (reply) freeReplyObject(reply); + if (function_restore_reply) freeReplyObject(function_restore_reply); + if (function_list_reply) freeReplyObject(function_list_reply); return success; invalid_args: fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG); diff --git a/src/server.h b/src/server.h index 8e4d8d902..471e8c72a 100644 --- a/src/server.h +++ b/src/server.h @@ -3162,6 +3162,7 @@ void migrateCommand(client *c); void askingCommand(client *c); void readonlyCommand(client *c); void readwriteCommand(client *c); +int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr); void dumpCommand(client *c); void objectCommand(client *c); void memoryCommand(client *c); @@ -3182,6 +3183,8 @@ void functionInfoCommand(client *c); void functionListCommand(client *c); void functionHelpCommand(client *c); void functionFlushCommand(client *c); +void functionRestoreCommand(client *c); +void functionDumpCommand(client *c); void timeCommand(client *c); void bitopCommand(client *c); void bitcountCommand(client *c); diff --git a/tests/unit/cluster.tcl b/tests/unit/cluster.tcl index 885bf76a2..2507d5208 100644 --- a/tests/unit/cluster.tcl +++ b/tests/unit/cluster.tcl @@ -154,4 +154,72 @@ start_server [list overrides $base_conf] { } } +# Test redis-cli -- cluster create, add-node, call. +# Test that functions are propagated on add-node +start_server [list overrides $base_conf] { +start_server [list overrides $base_conf] { +start_server [list overrides $base_conf] { +start_server [list overrides $base_conf] { +start_server [list overrides $base_conf] { + + set node4_rd [redis_client -3] + set node5_rd [redis_client -4] + + test {Functions are added to new node on redis-cli cluster add-node} { + exec src/redis-cli --cluster-yes --cluster create \ + 127.0.0.1:[srv 0 port] \ + 127.0.0.1:[srv -1 port] \ + 127.0.0.1:[srv -2 port] + + + wait_for_condition 1000 50 { + [csi 0 cluster_state] eq {ok} && + [csi -1 cluster_state] eq {ok} && + [csi -2 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # upload a function to all the cluster + exec src/redis-cli --cluster-yes --cluster call 127.0.0.1:[srv 0 port] \ + FUNCTION CREATE LUA TEST {return 'hello'} + + # adding node to the cluster + exec src/redis-cli --cluster-yes --cluster add-node \ + 127.0.0.1:[srv -3 port] \ + 127.0.0.1:[srv 0 port] + + wait_for_condition 1000 50 { + [csi 0 cluster_state] eq {ok} && + [csi -1 cluster_state] eq {ok} && + [csi -2 cluster_state] eq {ok} && + [csi -3 cluster_state] eq {ok} + } else { + fail "Cluster doesn't stabilize" + } + + # make sure 'test' function was added to the new node + assert_equal {{name TEST engine LUA description {}}} [$node4_rd FUNCTION LIST] + + # add function to node 5 + assert_equal {OK} [$node5_rd FUNCTION CREATE LUA TEST {return 'hello1'}] + + # make sure functions was added to node 5 + assert_equal {{name TEST engine LUA description {}}} [$node5_rd FUNCTION LIST] + + # adding node 5 to the cluster should failed because it already contains the 'test' function + catch { + exec src/redis-cli --cluster-yes --cluster add-node \ + 127.0.0.1:[srv -4 port] \ + 127.0.0.1:[srv 0 port] + } e + assert_match {*node already contains functions*} $e + } +# stop 5 servers +} +} +} +} +} + } ;# tags \ No newline at end of file diff --git a/tests/unit/functions.tcl b/tests/unit/functions.tcl index 2ba8ae70e..8b38688b4 100644 --- a/tests/unit/functions.tcl +++ b/tests/unit/functions.tcl @@ -161,6 +161,63 @@ start_server {tags {"scripting"}} { r function list } {{name test engine LUA description {}}} + test {FUNCTION - test function dump and restore} { + r function flush + r function create lua test description {some description} {return 'hello'} + set e [r function dump] + r function delete test + assert_match {} [r function list] + r function restore $e + r function list + } {{name test engine LUA description {some description}}} + + test {FUNCTION - test function dump and restore with flush argument} { + set e [r function dump] + r function flush + assert_match {} [r function list] + r function restore $e FLUSH + r function list + } {{name test engine LUA description {some description}}} + + test {FUNCTION - test function dump and restore with append argument} { + set e [r function dump] + r function flush + assert_match {} [r function list] + r function create lua test {return 'hello1'} + catch {r function restore $e APPEND} err + assert_match {*already exists*} $err + r function flush + r function create lua test1 {return 'hello1'} + r function restore $e APPEND + assert_match {hello} [r fcall test 0] + assert_match {hello1} [r fcall test1 0] + } + + test {FUNCTION - test function dump and restore with replace argument} { + r function flush + r function create LUA test DESCRIPTION {some description} {return 'hello'} + set e [r function dump] + r function flush + assert_match {} [r function list] + r function create lua test {return 'hello1'} + assert_match {hello1} [r fcall test 0] + r function restore $e REPLACE + assert_match {hello} [r fcall test 0] + } + + test {FUNCTION - test function restore with bad payload do not drop existing functions} { + r function flush + r function create LUA test DESCRIPTION {some description} {return 'hello'} + catch {r function restore bad_payload} e + assert_match {*payload version or checksum are wrong*} $e + r function list + } {{name test engine LUA description {some description}}} + + test {FUNCTION - test function restore with wrong number of arguments} { + catch {r function restore arg1 args2 arg3} e + set _ $e + } {*wrong number of arguments*} + test {FUNCTION - test fcall_ro with write command} { r function create lua test REPLACE {return redis.call('set', 'x', '1')} catch { r fcall_ro test 0 } e @@ -281,6 +338,25 @@ start_server {tags {"scripting repl external:skip"}} { r -1 fcall test 0 } {hello} + test {FUNCTION - restore is replicated to replica} { + set e [r function dump] + + r function delete test + wait_for_condition 50 100 { + [r -1 function list] eq {} + } else { + fail "Failed waiting for function to replicate to replica" + } + + r function restore $e + + wait_for_condition 50 100 { + [r -1 function list] eq {{name test engine LUA description {some description}}} + } else { + fail "Failed waiting for function to replicate to replica" + } + } + test {FUNCTION - delete is replicated to replica} { r function delete test wait_for_condition 50 100 {