Add FUNCTION DUMP and RESTORE. (#9938)

Follow the conclusions to support Functions in redis cluster (#9899)

Added 2 new FUNCTION sub-commands:
1. `FUNCTION DUMP` - dump a binary payload representation of all the functions.
2. `FUNCTION RESTORE <PAYLOAD> [FLUSH|APPEND|REPLACE]` - give the binary payload extracted
   using `FUNCTION DUMP`, restore all the functions on the given payload. Restore policy can be given to
   control how to handle existing functions (default is APPEND):
   * FLUSH: delete all existing functions.
   * APPEND: appends the restored functions to the existing functions. On collision, abort.
   * REPLACE: appends the restored functions to the existing functions. On collision,
     replace the old function with the new function.

Modify `redis-cli --cluster add-node` to use `FUNCTION DUMP` to get existing functions from
one of the nodes in the cluster, and `FUNCTION RESTORE` to load the same set of functions
to the new node. `redis-cli` will execute this step before sending the `CLUSTER MEET` command
to the new node. If `FUNCTION DUMP` returns an error, assume the current Redis version do not
support functions and skip `FUNCTION RESTORE`. If `FUNCTION RESTORE` fails, abort and do not send
the `CLUSTER MEET` command. If the new node already contains functions (before the `FUNCTION RESTORE`
is sent), abort and do not add the node to the cluster. Test was added to verify
`redis-cli --cluster add-node` works as expected.
This commit is contained in:
Meir Shpilraien (Spielrein) 2021-12-26 09:03:37 +02:00 committed by GitHub
parent 08ff606b0b
commit 365cbf46a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 511 additions and 33 deletions

View File

@ -5398,8 +5398,9 @@ void createDumpPayload(rio *payload, robj *o, robj *key, int dbid) {
/* Verify that the RDB version of the dump payload matches the one of this Redis
* instance and that the checksum is ok.
* If the DUMP payload looks valid C_OK is returned, otherwise C_ERR
* is returned. */
int verifyDumpPayload(unsigned char *p, size_t len) {
* is returned. If rdbver_ptr is not NULL, its populated with the value read
* from the input buffer. */
int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr) {
unsigned char *footer;
uint16_t rdbver;
uint64_t crc;
@ -5410,6 +5411,9 @@ int verifyDumpPayload(unsigned char *p, size_t len) {
/* Verify RDB version */
rdbver = (footer[1] << 8) | footer[0];
if (rdbver_ptr) {
*rdbver_ptr = rdbver;
}
if (rdbver > RDB_VERSION) return C_ERR;
if (server.skip_checksum_validation)
@ -5499,7 +5503,7 @@ void restoreCommand(client *c) {
}
/* Verify RDB version and data checksum. */
if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr)) == C_ERR)
if (verifyDumpPayload(c->argv[3]->ptr,sdslen(c->argv[3]->ptr),NULL) == C_ERR)
{
addReplyError(c,"DUMP payload version or checksum are wrong");
return;

View File

@ -3064,6 +3064,14 @@ struct redisCommandArg FUNCTION_DELETE_Args[] = {
{0}
};
/********** FUNCTION DUMP ********************/
/* FUNCTION DUMP history */
#define FUNCTION_DUMP_History NULL
/* FUNCTION DUMP hints */
#define FUNCTION_DUMP_Hints NULL
/********** FUNCTION FLUSH ********************/
/* FUNCTION FLUSH history */
@ -3124,6 +3132,29 @@ struct redisCommandArg FUNCTION_INFO_Args[] = {
/* FUNCTION LIST hints */
#define FUNCTION_LIST_Hints NULL
/********** FUNCTION RESTORE ********************/
/* FUNCTION RESTORE history */
#define FUNCTION_RESTORE_History NULL
/* FUNCTION RESTORE hints */
#define FUNCTION_RESTORE_Hints NULL
/* FUNCTION RESTORE policy argument table */
struct redisCommandArg FUNCTION_RESTORE_policy_Subargs[] = {
{"flush",ARG_TYPE_PURE_TOKEN,-1,"FLUSH",NULL,NULL,CMD_ARG_NONE},
{"append",ARG_TYPE_PURE_TOKEN,-1,"APPEND",NULL,NULL,CMD_ARG_NONE},
{"replace",ARG_TYPE_PURE_TOKEN,-1,"REPLACE",NULL,NULL,CMD_ARG_NONE},
{0}
};
/* FUNCTION RESTORE argument table */
struct redisCommandArg FUNCTION_RESTORE_Args[] = {
{"serialized-value",ARG_TYPE_STRING,-1,NULL,NULL,NULL,CMD_ARG_NONE},
{"policy",ARG_TYPE_ONEOF,-1,NULL,NULL,NULL,CMD_ARG_OPTIONAL,.subargs=FUNCTION_RESTORE_policy_Subargs},
{0}
};
/********** FUNCTION STATS ********************/
/* FUNCTION STATS history */
@ -3136,11 +3167,13 @@ struct redisCommandArg FUNCTION_INFO_Args[] = {
struct redisCommand FUNCTION_Subcommands[] = {
{"create","Create a function with the given arguments (name, code, description)","O(1) (considering compilation time is redundant)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_CREATE_History,FUNCTION_CREATE_Hints,functionCreateCommand,-5,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_CREATE_Args},
{"delete","Delete a function by name","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DELETE_History,FUNCTION_DELETE_Hints,functionDeleteCommand,3,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_DELETE_Args},
{"dump","Dump all functions into a serialized binary payload","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_DUMP_History,FUNCTION_DUMP_Hints,functionDumpCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
{"flush","Deleting all functions","O(N) where N is the number of functions deleted","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_FLUSH_History,FUNCTION_FLUSH_Hints,functionFlushCommand,-2,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_FLUSH_Args},
{"help","Show helpful text about the different subcommands","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_HELP_History,FUNCTION_HELP_Hints,functionHelpCommand,2,CMD_LOADING|CMD_STALE,ACL_CATEGORY_SCRIPTING},
{"info","Return information about a function by function name","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_INFO_History,FUNCTION_INFO_Hints,functionInfoCommand,-3,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_INFO_Args},
{"kill","Kill the function currently in execution.","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_KILL_History,FUNCTION_KILL_Hints,functionKillCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
{"list","List information about all the functions","O(N) where N is the number of functions","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_LIST_History,FUNCTION_LIST_Hints,functionListCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
{"restore","Restore all the functions on the given payload","O(N) where N is the number of functions on the payload","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_RESTORE_History,FUNCTION_RESTORE_Hints,functionRestoreCommand,-3,CMD_NOSCRIPT|CMD_WRITE,ACL_CATEGORY_SCRIPTING,.args=FUNCTION_RESTORE_Args},
{"stats","Return information about the function currently running (name, description, duration)","O(1)","7.0.0",CMD_DOC_NONE,NULL,NULL,COMMAND_GROUP_SCRIPTING,FUNCTION_STATS_History,FUNCTION_STATS_Hints,functionStatsCommand,2,CMD_NOSCRIPT,ACL_CATEGORY_SCRIPTING},
{0}
};

View File

@ -0,0 +1,17 @@
{
"DUMP": {
"summary": "Dump all functions into a serialized binary payload",
"complexity": "O(N) where N is the number of functions",
"group": "scripting",
"since": "7.0.0",
"arity": 2,
"container": "FUNCTION",
"function": "functionDumpCommand",
"command_flags": [
"NOSCRIPT"
],
"acl_categories": [
"SCRIPTING"
]
}
}

View File

@ -0,0 +1,46 @@
{
"RESTORE": {
"summary": "Restore all the functions on the given payload",
"complexity": "O(N) where N is the number of functions on the payload",
"group": "scripting",
"since": "7.0.0",
"arity": -3,
"container": "FUNCTION",
"function": "functionRestoreCommand",
"command_flags": [
"NOSCRIPT",
"WRITE"
],
"acl_categories": [
"SCRIPTING"
],
"arguments": [
{
"name": "serialized-value",
"type": "string"
},
{
"name": "policy",
"type": "oneof",
"optional": true,
"arguments": [
{
"name": "flush",
"type": "pure-token",
"token": "FLUSH"
},
{
"name": "append",
"type": "pure-token",
"token": "APPEND"
},
{
"name": "replace",
"type": "pure-token",
"token": "REPLACE"
}
]
}
]
}
}

View File

@ -33,6 +33,10 @@
#include "adlist.h"
#include "atomicvar.h"
typedef enum {
restorePolicy_Flush, restorePolicy_Append, restorePolicy_Replace
} restorePolicy;
static size_t engine_cache_memory = 0;
/* Forward declaration */
@ -81,6 +85,9 @@ static size_t functionMallocSize(functionInfo *fi) {
/* Dispose function memory */
static void engineFunctionDispose(dict *d, void *obj) {
UNUSED(d);
if (!obj) {
return;
}
functionInfo *fi = obj;
sdsfree(fi->code);
sdsfree(fi->name);
@ -377,6 +384,156 @@ void fcallroCommand(client *c) {
fcallCommandGeneric(c, 1);
}
/*
* FUNCTION DUMP
*
* Returns a binary payload representing all the functions.
* Can be loaded using FUNCTION RESTORE
*
* The payload structure is the same as on RDB. Each function
* is saved separately with the following information:
* * Function name
* * Engine name
* * Function description
* * Function code
* RDB_OPCODE_FUNCTION is saved before each function to present
* that the payload is a function.
* RDB version and crc64 is saved at the end of the payload.
* The RDB version is saved for backward compatibility.
* crc64 is saved so we can verify the payload content.
*/
void functionDumpCommand(client *c) {
unsigned char buf[2];
uint64_t crc;
rio payload;
rioInitWithBuffer(&payload, sdsempty());
functionsSaveRio(&payload);
/* RDB version */
buf[0] = RDB_VERSION & 0xff;
buf[1] = (RDB_VERSION >> 8) & 0xff;
payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, buf, 2);
/* CRC64 */
crc = crc64(0, (unsigned char*) payload.io.buffer.ptr,
sdslen(payload.io.buffer.ptr));
memrev64ifbe(&crc);
payload.io.buffer.ptr = sdscatlen(payload.io.buffer.ptr, &crc, 8);
addReplyBulkSds(c, payload.io.buffer.ptr);
}
/*
* FUNCTION RESTORE <payload> [FLUSH|APPEND|REPLACE]
*
* Restore the functions represented by the give payload.
* Restore policy to can be given to control how to handle existing functions (default APPEND):
* * FLUSH: delete all existing functions.
* * APPEND: appends the restored functions to the existing functions. On collision, abort.
* * REPLACE: appends the restored functions to the existing functions.
* On collision, replace the old function with the new function.
*/
void functionRestoreCommand(client *c) {
if (c->argc > 4) {
addReplySubcommandSyntaxError(c);
return;
}
restorePolicy restore_replicy = restorePolicy_Append; /* default policy: APPEND */
sds data = c->argv[2]->ptr;
size_t data_len = sdslen(data);
rio payload;
dictIterator *iter = NULL;
sds err = NULL;
if (c->argc == 4) {
const char *restore_policy_str = c->argv[3]->ptr;
if (!strcasecmp(restore_policy_str, "append")) {
restore_replicy = restorePolicy_Append;
} else if (!strcasecmp(restore_policy_str, "replace")) {
restore_replicy = restorePolicy_Replace;
} else if (!strcasecmp(restore_policy_str, "flush")) {
restore_replicy = restorePolicy_Flush;
} else {
addReplyError(c, "Wrong restore policy given, value should be either FLUSH, APPEND or REPLACE.");
return;
}
}
uint16_t rdbver;
if (verifyDumpPayload((unsigned char*)data, data_len, &rdbver) != C_OK) {
addReplyError(c, "DUMP payload version or checksum are wrong");
return;
}
functionsCtx *f_ctx = functionsCtxCreate();
rioInitWithBuffer(&payload, data);
/* Read until reaching last 10 bytes that should contain RDB version and checksum. */
while (data_len - payload.io.buffer.pos > 10) {
int type;
if ((type = rdbLoadType(&payload)) == -1) {
err = sdsnew("can not read data type");
goto load_error;
}
if (type != RDB_OPCODE_FUNCTION) {
err = sdsnew("given type is not a function");
goto load_error;
}
if (rdbFunctionLoad(&payload, rdbver, f_ctx, RDBFLAGS_NONE, &err) != C_OK) {
if (!err) {
err = sdsnew("failed loading the given functions payload");
}
goto load_error;
}
}
if (restore_replicy == restorePolicy_Flush) {
functionsCtxSwapWithCurrent(f_ctx);
f_ctx = NULL; /* avoid releasing the f_ctx in the end */
} else {
if (restore_replicy == restorePolicy_Append) {
/* First make sure there is only new functions */
iter = dictGetIterator(f_ctx->functions);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
functionInfo *fi = dictGetVal(entry);
if (dictFetchValue(functions_ctx->functions, fi->name)) {
/* function already exists, failed the restore. */
err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name);
goto load_error;
}
}
dictReleaseIterator(iter);
}
iter = dictGetIterator(f_ctx->functions);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
functionInfo *fi = dictGetVal(entry);
dictReplace(functions_ctx->functions, fi->name, fi);
dictSetVal(f_ctx->functions, entry, NULL); /* make sure value will not be disposed */
}
}
/* Indicate that the command changed the data so it will be replicated and
* counted as a data change (for persistence configuration) */
server.dirty++;
load_error:
if (err) {
addReplyErrorSds(c, err);
} else {
addReply(c, shared.ok);
}
if (iter) {
dictReleaseIterator(iter);
}
if (f_ctx) {
functionsCtxFree(f_ctx);
}
}
void functionFlushCommand(client *c) {
if (c->argc > 3) {
addReplySubcommandSyntaxError(c);
@ -434,6 +591,15 @@ void functionHelpCommand(client *c) {
" lazyfree-lazy-user-flush configuration directive. Valid modes are:",
" * ASYNC: Asynchronously flush the functions.",
" * SYNC: Synchronously flush the functions.",
"DUMP",
" Returns a serialized payload representing the current functions, can be restored using FUNCTION RESTORE command",
"RESTORE <PAYLOAD> [FLUSH|APPEND|REPLACE]",
" Restore the functions represented by the given payload, it is possible to give a restore policy to",
" control how to handle existing functions (default APPEND):",
" * FLUSH: delete all existing functions.",
" * APPEND: appends the restored functions to the existing functions. On collision, abort.",
" * REPLACE: appends the restored functions to the existing functions, On collision, replace the old",
" function with the new function.",
NULL };
addReplyHelp(c, help);
}

View File

@ -1214,6 +1214,30 @@ ssize_t rdbSaveSingleModuleAux(rio *rdb, int when, moduleType *mt) {
return io.bytes;
}
int functionsSaveRio(rio *rdb) {
int ret = C_ERR;
dict *functions = functionsGet();
dictIterator *iter = dictGetIterator(functions);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
rdbSaveType(rdb, RDB_OPCODE_FUNCTION);
functionInfo *fi = dictGetVal(entry);
if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto done;
if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto done;
if (fi->desc) {
if (rdbSaveLen(rdb, 1) == -1) goto done; /* desc exists */
if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto done;
} else {
if (rdbSaveLen(rdb, 0) == -1) goto done; /* desc not exists */
}
if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto done;
}
ret = C_OK;
done:
dictReleaseIterator(iter);
return ret;
}
/* Produces a dump of the database in RDB format sending it to the specified
* Redis I/O channel. On success C_OK is returned, otherwise C_ERR
* is returned and part of the output, or all the output, can be
@ -1240,24 +1264,7 @@ int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
/* save functions */
dict *functions = functionsGet();
dictIterator *iter = dictGetIterator(functions);
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
rdbSaveType(rdb, RDB_OPCODE_FUNCTION);
functionInfo* fi = dictGetVal(entry);
if (rdbSaveRawString(rdb, (unsigned char *) fi->name, sdslen(fi->name)) == -1) goto werr;
if (rdbSaveRawString(rdb, (unsigned char *) fi->ei->name, sdslen(fi->ei->name)) == -1) goto werr;
if (fi->desc) {
if (rdbSaveLen(rdb, 1) == -1) goto werr; /* desc exists */
if (rdbSaveRawString(rdb, (unsigned char *) fi->desc, sdslen(fi->desc)) == -1) goto werr;
} else {
if (rdbSaveLen(rdb, 0) == -1) goto werr; /* desc not exists */
}
if (rdbSaveRawString(rdb, (unsigned char *) fi->code, sdslen(fi->code)) == -1) goto werr;
}
dictReleaseIterator(iter);
if (functionsSaveRio(rdb) != C_OK) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
@ -2697,42 +2704,48 @@ void rdbLoadProgressCallback(rio *r, const void *buf, size_t len) {
}
}
static int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags) {
/* Save the given functions_ctx to the rdb.
* The err output parameter is optional and will be set with relevant error
* message on failure, it is the caller responsibility to free the error
* message on failure. */
int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags, sds *err) {
UNUSED(ver);
sds name = NULL;
sds engine_name = NULL;
sds desc = NULL;
sds blob = NULL;
sds err = NULL;
uint64_t has_desc;
sds error = NULL;
int res = C_ERR;
if (!(name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
serverLog(LL_WARNING, "Failed loading function name");
error = sdsnew("Failed loading function name");
goto error;
}
if (!(engine_name = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
serverLog(LL_WARNING, "Failed loading engine name");
error = sdsnew("Failed loading engine name");
goto error;
}
if ((has_desc = rdbLoadLen(rdb, NULL)) == RDB_LENERR) {
serverLog(LL_WARNING, "Failed loading function desc indicator");
error = sdsnew("Failed loading function description indicator");
goto error;
}
if (has_desc && !(desc = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
serverLog(LL_WARNING, "Failed loading function desc");
error = sdsnew("Failed loading function description");
goto error;
}
if (!(blob = rdbGenericLoadStringObject(rdb, RDB_LOAD_SDS, NULL))) {
serverLog(LL_WARNING, "Failed loading function blob");
error = sdsnew("Failed loading function blob");
goto error;
}
if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, rdbflags & RDBFLAGS_ALLOW_DUP, &err, functions_ctx) != C_OK) {
serverLog(LL_WARNING, "Failed compiling and saving the function %s", err);
if (functionsCreateWithFunctionCtx(name, engine_name, desc, blob, rdbflags & RDBFLAGS_ALLOW_DUP, &error, functions_ctx) != C_OK) {
if (!error) {
error = sdsnew("Failed creating the function");
}
goto error;
}
@ -2743,7 +2756,14 @@ error:
if (engine_name) sdsfree(engine_name);
if (desc) sdsfree(desc);
if (blob) sdsfree(blob);
if (err) sdsfree(err);
if (error) {
if (err) {
*err = error;
} else {
serverLog(LL_WARNING, "Failed creating function, %s", error);
sdsfree(error);
}
}
return res;
}
@ -2964,8 +2984,10 @@ int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadin
continue; /* Read next opcode. */
}
} else if (type == RDB_OPCODE_FUNCTION) {
if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx, rdbflags) != C_OK) {
serverLog(LL_WARNING,"Failed loading function");
sds err = NULL;
if (rdbFunctionLoad(rdb, rdbver, rdb_loading_ctx->functions_ctx, rdbflags, &err) != C_OK) {
serverLog(LL_WARNING,"Failed loading function, %s", err);
sdsfree(err);
goto eoferr;
}
continue;

View File

@ -169,7 +169,9 @@ int rdbSaveBinaryFloatValue(rio *rdb, float val);
int rdbLoadBinaryFloatValue(rio *rdb, float *val);
int rdbLoadRio(rio *rdb, int rdbflags, rdbSaveInfo *rsi);
int rdbLoadRioWithLoadingCtx(rio *rdb, int rdbflags, rdbSaveInfo *rsi, rdbLoadingCtx *rdb_loading_ctx);
int rdbFunctionLoad(rio *rdb, int ver, functionsCtx* functions_ctx, int rdbflags, sds *err);
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi);
int functionsSaveRio(rio *rdb);
rdbSaveInfo *rdbPopulateSaveInfo(rdbSaveInfo *rsi);
#endif

View File

@ -5880,6 +5880,8 @@ cleanup:
static int clusterManagerCommandAddNode(int argc, char **argv) {
int success = 1;
redisReply *reply = NULL;
redisReply *function_restore_reply = NULL;
redisReply *function_list_reply = NULL;
char *ref_ip = NULL, *ip = NULL;
int ref_port = 0, port = 0;
if (!getClusterHostFromCmdArgs(argc - 1, argv + 1, &ref_ip, &ref_port))
@ -5944,6 +5946,43 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
listAddNodeTail(cluster_manager.nodes, new_node);
added = 1;
if (!master_node) {
/* Send functions to the new node, if new node is a replica it will get the functions from its primary. */
clusterManagerLogInfo(">>> Getting functions from cluster\n");
reply = CLUSTER_MANAGER_COMMAND(refnode, "FUNCTION DUMP");
if (!clusterManagerCheckRedisReply(refnode, reply, &err)) {
clusterManagerLogInfo(">>> Failed retrieving Functions from the cluster, "
"skip this step as Redis version do not support function command (error = '%s')\n", err? err : "NULL reply");
if (err) zfree(err);
} else {
assert(reply->type == REDIS_REPLY_STRING);
clusterManagerLogInfo(">>> Send FUNCTION LIST to %s:%d to verify there is no functions in it\n", ip, port);
function_list_reply = CLUSTER_MANAGER_COMMAND(new_node, "FUNCTION LIST");
if (!clusterManagerCheckRedisReply(new_node, function_list_reply, &err)) {
clusterManagerLogErr(">>> Failed on CLUSTER LIST (error = '%s')\r\n", err? err : "NULL reply");
if (err) zfree(err);
success = 0;
goto cleanup;
}
assert(function_list_reply->type == REDIS_REPLY_ARRAY);
if (function_list_reply->elements > 0) {
clusterManagerLogErr(">>> New node already contains functions and can not be added to the cluster. Use FUNCTION FLUSH and try again.\r\n");
success = 0;
goto cleanup;
}
clusterManagerLogInfo(">>> Send FUNCTION RESTORE to %s:%d\n", ip, port);
function_restore_reply = CLUSTER_MANAGER_COMMAND(new_node, "FUNCTION RESTORE %b", reply->str, reply->len);
if (!clusterManagerCheckRedisReply(new_node, function_restore_reply, &err)) {
clusterManagerLogErr(">>> Failed loading functions to the new node (error = '%s')\r\n", err? err : "NULL reply");
if (err) zfree(err);
success = 0;
goto cleanup;
}
}
}
if (reply) freeReplyObject(reply);
// Send CLUSTER MEET command to the new node
clusterManagerLogInfo(">>> Send CLUSTER MEET to node %s:%d to make it "
"join the cluster.\n", ip, port);
@ -5968,6 +6007,8 @@ static int clusterManagerCommandAddNode(int argc, char **argv) {
cleanup:
if (!added && new_node) freeClusterManagerNode(new_node);
if (reply) freeReplyObject(reply);
if (function_restore_reply) freeReplyObject(function_restore_reply);
if (function_list_reply) freeReplyObject(function_list_reply);
return success;
invalid_args:
fprintf(stderr, CLUSTER_MANAGER_INVALID_HOST_ARG);

View File

@ -3162,6 +3162,7 @@ void migrateCommand(client *c);
void askingCommand(client *c);
void readonlyCommand(client *c);
void readwriteCommand(client *c);
int verifyDumpPayload(unsigned char *p, size_t len, uint16_t *rdbver_ptr);
void dumpCommand(client *c);
void objectCommand(client *c);
void memoryCommand(client *c);
@ -3182,6 +3183,8 @@ void functionInfoCommand(client *c);
void functionListCommand(client *c);
void functionHelpCommand(client *c);
void functionFlushCommand(client *c);
void functionRestoreCommand(client *c);
void functionDumpCommand(client *c);
void timeCommand(client *c);
void bitopCommand(client *c);
void bitcountCommand(client *c);

View File

@ -154,4 +154,72 @@ start_server [list overrides $base_conf] {
}
}
# Test redis-cli -- cluster create, add-node, call.
# Test that functions are propagated on add-node
start_server [list overrides $base_conf] {
start_server [list overrides $base_conf] {
start_server [list overrides $base_conf] {
start_server [list overrides $base_conf] {
start_server [list overrides $base_conf] {
set node4_rd [redis_client -3]
set node5_rd [redis_client -4]
test {Functions are added to new node on redis-cli cluster add-node} {
exec src/redis-cli --cluster-yes --cluster create \
127.0.0.1:[srv 0 port] \
127.0.0.1:[srv -1 port] \
127.0.0.1:[srv -2 port]
wait_for_condition 1000 50 {
[csi 0 cluster_state] eq {ok} &&
[csi -1 cluster_state] eq {ok} &&
[csi -2 cluster_state] eq {ok}
} else {
fail "Cluster doesn't stabilize"
}
# upload a function to all the cluster
exec src/redis-cli --cluster-yes --cluster call 127.0.0.1:[srv 0 port] \
FUNCTION CREATE LUA TEST {return 'hello'}
# adding node to the cluster
exec src/redis-cli --cluster-yes --cluster add-node \
127.0.0.1:[srv -3 port] \
127.0.0.1:[srv 0 port]
wait_for_condition 1000 50 {
[csi 0 cluster_state] eq {ok} &&
[csi -1 cluster_state] eq {ok} &&
[csi -2 cluster_state] eq {ok} &&
[csi -3 cluster_state] eq {ok}
} else {
fail "Cluster doesn't stabilize"
}
# make sure 'test' function was added to the new node
assert_equal {{name TEST engine LUA description {}}} [$node4_rd FUNCTION LIST]
# add function to node 5
assert_equal {OK} [$node5_rd FUNCTION CREATE LUA TEST {return 'hello1'}]
# make sure functions was added to node 5
assert_equal {{name TEST engine LUA description {}}} [$node5_rd FUNCTION LIST]
# adding node 5 to the cluster should failed because it already contains the 'test' function
catch {
exec src/redis-cli --cluster-yes --cluster add-node \
127.0.0.1:[srv -4 port] \
127.0.0.1:[srv 0 port]
} e
assert_match {*node already contains functions*} $e
}
# stop 5 servers
}
}
}
}
}
} ;# tags

View File

@ -161,6 +161,63 @@ start_server {tags {"scripting"}} {
r function list
} {{name test engine LUA description {}}}
test {FUNCTION - test function dump and restore} {
r function flush
r function create lua test description {some description} {return 'hello'}
set e [r function dump]
r function delete test
assert_match {} [r function list]
r function restore $e
r function list
} {{name test engine LUA description {some description}}}
test {FUNCTION - test function dump and restore with flush argument} {
set e [r function dump]
r function flush
assert_match {} [r function list]
r function restore $e FLUSH
r function list
} {{name test engine LUA description {some description}}}
test {FUNCTION - test function dump and restore with append argument} {
set e [r function dump]
r function flush
assert_match {} [r function list]
r function create lua test {return 'hello1'}
catch {r function restore $e APPEND} err
assert_match {*already exists*} $err
r function flush
r function create lua test1 {return 'hello1'}
r function restore $e APPEND
assert_match {hello} [r fcall test 0]
assert_match {hello1} [r fcall test1 0]
}
test {FUNCTION - test function dump and restore with replace argument} {
r function flush
r function create LUA test DESCRIPTION {some description} {return 'hello'}
set e [r function dump]
r function flush
assert_match {} [r function list]
r function create lua test {return 'hello1'}
assert_match {hello1} [r fcall test 0]
r function restore $e REPLACE
assert_match {hello} [r fcall test 0]
}
test {FUNCTION - test function restore with bad payload do not drop existing functions} {
r function flush
r function create LUA test DESCRIPTION {some description} {return 'hello'}
catch {r function restore bad_payload} e
assert_match {*payload version or checksum are wrong*} $e
r function list
} {{name test engine LUA description {some description}}}
test {FUNCTION - test function restore with wrong number of arguments} {
catch {r function restore arg1 args2 arg3} e
set _ $e
} {*wrong number of arguments*}
test {FUNCTION - test fcall_ro with write command} {
r function create lua test REPLACE {return redis.call('set', 'x', '1')}
catch { r fcall_ro test 0 } e
@ -281,6 +338,25 @@ start_server {tags {"scripting repl external:skip"}} {
r -1 fcall test 0
} {hello}
test {FUNCTION - restore is replicated to replica} {
set e [r function dump]
r function delete test
wait_for_condition 50 100 {
[r -1 function list] eq {}
} else {
fail "Failed waiting for function to replicate to replica"
}
r function restore $e
wait_for_condition 50 100 {
[r -1 function list] eq {{name test engine LUA description {some description}}}
} else {
fail "Failed waiting for function to replicate to replica"
}
}
test {FUNCTION - delete is replicated to replica} {
r function delete test
wait_for_condition 50 100 {