Merge 806893f0b4feaad67c66ef725ad60bb6704ccf20 into 26c6f1af9b29d525831c7fa9840ab3e47ed7b700

This commit is contained in:
Ricardo Dias 2025-02-02 04:52:27 +01:00 committed by GitHub
commit cde9e89312
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 2396 additions and 1822 deletions

View File

@ -97,11 +97,13 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/mt19937-64.c
${CMAKE_SOURCE_DIR}/src/resp_parser.c
${CMAKE_SOURCE_DIR}/src/call_reply.c
${CMAKE_SOURCE_DIR}/src/script_lua.c
${CMAKE_SOURCE_DIR}/src/lua/script_lua.c
${CMAKE_SOURCE_DIR}/src/script.c
${CMAKE_SOURCE_DIR}/src/functions.c
${CMAKE_SOURCE_DIR}/src/scripting_engine.c
${CMAKE_SOURCE_DIR}/src/function_lua.c
${CMAKE_SOURCE_DIR}/src/lua/function_lua.c
${CMAKE_SOURCE_DIR}/src/lua/engine_lua.c
${CMAKE_SOURCE_DIR}/src/lua/debug_lua.c
${CMAKE_SOURCE_DIR}/src/commands.c
${CMAKE_SOURCE_DIR}/src/strl.c
${CMAKE_SOURCE_DIR}/src/connection.c

View File

@ -416,7 +416,7 @@ endif
ENGINE_NAME=valkey
SERVER_NAME=$(ENGINE_NAME)-server$(PROG_SUFFIX)
ENGINE_SENTINEL_NAME=$(ENGINE_NAME)-sentinel$(PROG_SUFFIX)
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script_lua.o script.o functions.o function_lua.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o
ENGINE_SERVER_OBJ=threads_mngr.o adlist.o quicklist.o ae.o anet.o dict.o hashtable.o kvstore.o server.o sds.o zmalloc.o lzf_c.o lzf_d.o pqsort.o zipmap.o sha1.o ziplist.o release.o memory_prefetch.o io_threads.o networking.o util.o object.o db.o replication.o rdb.o t_string.o t_list.o t_set.o t_zset.o t_hash.o config.o aof.o pubsub.o multi.o debug.o sort.o intset.o syncio.o cluster.o cluster_legacy.o cluster_slot_stats.o crc16.o endianconv.o commandlog.o eval.o bio.o rio.o rand.o memtest.o syscheck.o crcspeed.o crccombine.o crc64.o bitops.o sentinel.o notify.o setproctitle.o blocked.o hyperloglog.o latency.o sparkline.o valkey-check-rdb.o valkey-check-aof.o geo.o lazyfree.o module.o evict.o expire.o geohash.o geohash_helper.o childinfo.o allocator_defrag.o defrag.o siphash.o rax.o t_stream.o listpack.o localtime.o lolwut.o lolwut5.o lolwut6.o acl.o tracking.o socket.o tls.o sha256.o timeout.o setcpuaffinity.o monotonic.o mt19937-64.o resp_parser.o call_reply.o script.o functions.o commands.o strl.o connection.o unix.o logreqres.o rdma.o scripting_engine.o lua/script_lua.o lua/function_lua.o lua/engine_lua.o lua/debug_lua.o
ENGINE_CLI_NAME=$(ENGINE_NAME)-cli$(PROG_SUFFIX)
ENGINE_CLI_OBJ=anet.o adlist.o dict.o valkey-cli.o zmalloc.o release.o ae.o serverassert.o crcspeed.o crccombine.o crc64.o siphash.o crc16.o monotonic.o cli_common.o mt19937-64.o strl.o cli_commands.o
ENGINE_BENCHMARK_NAME=$(ENGINE_NAME)-benchmark$(PROG_SUFFIX)
@ -537,6 +537,9 @@ DEP = $(ENGINE_SERVER_OBJ:%.o=%.d) $(ENGINE_CLI_OBJ:%.o=%.d) $(ENGINE_BENCHMARK_
%.o: %.c .make-prerequisites
$(SERVER_CC) -MMD -o $@ -c $<
lua/%.o: lua/%.c .make-prerequisites
$(SERVER_CC) -MMD -o $@ -c $<
unit/%.o: unit/%.c .make-prerequisites
$(SERVER_CC) -MMD -o $@ -c $<

View File

@ -35,6 +35,7 @@
#include "server.h"
#include "hashtable.h"
#include "eval.h"
#include "script.h"
#include "module.h"
#include <stddef.h>
@ -236,27 +237,6 @@ robj *activeDefragStringOb(robj *ob) {
return new_robj;
}
/* Defrag helper for lua scripts
*
* Returns NULL in case the allocation wasn't moved.
* When it returns a non-null value, the old pointer was already released
* and should NOT be accessed. */
static luaScript *activeDefragLuaScript(luaScript *script) {
luaScript *ret = NULL;
/* try to defrag script struct */
if ((ret = activeDefragAlloc(script))) {
script = ret;
}
/* try to defrag actual script object */
robj *ob = activeDefragStringOb(script->body);
if (ob) script->body = ob;
return ret;
}
/* Defrag helper for dict main allocations (dict struct, and hash tables).
* Receives a pointer to the dict* and return a new dict* when the dict
* struct itself was moved.
@ -359,7 +339,7 @@ static void activeDefragSdsDict(dict *d, int val_type) {
.defragVal = (val_type == DEFRAG_SDS_DICT_VAL_IS_SDS ? (dictDefragAllocFunction *)activeDefragSds
: val_type == DEFRAG_SDS_DICT_VAL_IS_STROB ? (dictDefragAllocFunction *)activeDefragStringOb
: val_type == DEFRAG_SDS_DICT_VAL_VOID_PTR ? (dictDefragAllocFunction *)activeDefragAlloc
: val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT ? (dictDefragAllocFunction *)activeDefragLuaScript
: val_type == DEFRAG_SDS_DICT_VAL_LUA_SCRIPT ? (dictDefragAllocFunction *)evalActiveDefragScript
: NULL)};
do {
cursor = dictScanDefrag(d, cursor, activeDefragSdsDictCallback, &defragfns, NULL);

1696
src/eval.c

File diff suppressed because it is too large Load Diff

8
src/eval.h Normal file
View File

@ -0,0 +1,8 @@
#ifndef _EVAL_H_
#define _EVAL_H_
void evalInit(void);
void *evalActiveDefragScript(void *ptr);
#endif /* _EVAL_H_ */

View File

@ -64,9 +64,13 @@ typedef struct functionsLibMetaData {
sds code;
} functionsLibMetaData;
static uint64_t dictStrCaseHash(const void *key) {
return dictGenCaseHashFunction((unsigned char *)key, strlen((char *)key));
}
dictType functionDictType = {
dictSdsCaseHash, /* hash function */
dictSdsDup, /* key dup */
dictStrCaseHash, /* hash function */
NULL, /* key dup */
dictSdsKeyCaseCompare, /* key compare */
dictSdsDestructor, /* key destructor */
NULL, /* val destructor */
@ -84,7 +88,7 @@ dictType engineStatsDictType = {
dictType libraryFunctionDictType = {
dictSdsHash, /* hash function */
dictSdsDup, /* key dup */
NULL, /* key dup */
dictSdsKeyCompare, /* key compare */
dictSdsDestructor, /* key destructor */
engineFunctionDispose, /* val destructor */
@ -105,9 +109,7 @@ static functionsLibCtx *curr_functions_lib_ctx = NULL;
static size_t functionMallocSize(functionInfo *fi) {
return zmalloc_size(fi) +
sdsAllocSize(fi->name) +
(fi->desc ? sdsAllocSize(fi->desc) : 0) +
scriptingEngineCallGetFunctionMemoryOverhead(fi->li->engine, fi->function);
scriptingEngineCallGetFunctionMemoryOverhead(fi->li->engine, fi->compiled_function);
}
static size_t libraryMallocSize(functionLibInfo *li) {
@ -125,12 +127,7 @@ static void engineFunctionDispose(void *obj) {
return;
}
functionInfo *fi = obj;
sdsfree(fi->name);
if (fi->desc) {
sdsfree(fi->desc);
}
scriptingEngineCallFreeFunction(fi->li->engine, fi->function);
scriptingEngineCallFreeFunction(fi->li->engine, VMSE_FUNCTION, fi->compiled_function);
zfree(fi);
}
@ -239,22 +236,19 @@ void functionsAddEngineStats(sds engine_name) {
* the function will verify that the given name is following the naming format
* and return an error if its not.
*/
static int functionLibCreateFunction(robj *name,
void *function,
static int functionLibCreateFunction(compiledFunction *function,
functionLibInfo *li,
robj *desc,
uint64_t f_flags,
sds *err) {
serverAssert(name->type == OBJ_STRING);
serverAssert(desc == NULL || desc->type == OBJ_STRING);
serverAssert(function->name->type == OBJ_STRING);
serverAssert(function->desc == NULL || function->desc->type == OBJ_STRING);
if (functionsVerifyName(name->ptr) != C_OK) {
if (functionsVerifyName(function->name->ptr) != C_OK) {
*err = sdsnew("Function names can only contain letters, numbers, or "
"underscores(_) and must be at least one character long");
return C_ERR;
}
sds name_sds = sdsdup(name->ptr);
sds name_sds = sdsdup(function->name->ptr);
if (dictFetchValue(li->functions, name_sds)) {
*err = sdsnew("Function already exists in the library");
sdsfree(name_sds);
@ -263,14 +257,11 @@ static int functionLibCreateFunction(robj *name,
functionInfo *fi = zmalloc(sizeof(*fi));
*fi = (functionInfo){
.name = name_sds,
.function = function,
.compiled_function = function,
.li = li,
.desc = desc ? sdsdup(desc->ptr) : NULL,
.f_flags = f_flags,
};
int res = dictAdd(li->functions, fi->name, fi);
int res = dictAdd(li->functions, name_sds, fi);
serverAssert(res == DICT_OK);
return C_OK;
@ -292,7 +283,8 @@ static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo *li) {
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
functionInfo *fi = dictGetVal(entry);
int ret = dictDelete(lib_ctx->functions, fi->name);
int ret = dictDelete(lib_ctx->functions,
fi->compiled_function->name->ptr);
serverAssert(ret == DICT_OK);
lib_ctx->cache_memory -= functionMallocSize(fi);
}
@ -303,7 +295,8 @@ static void libraryUnlink(functionsLibCtx *lib_ctx, functionLibInfo *li) {
lib_ctx->cache_memory -= libraryMallocSize(li);
/* update stats */
functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats, scriptingEngineGetName(li->engine));
functionsLibEngineStats *stats = dictFetchValue(lib_ctx->engines_stats,
scriptingEngineGetName(li->engine));
serverAssert(stats);
stats->n_lib--;
stats->n_functions -= dictSize(li->functions);
@ -314,7 +307,9 @@ static void libraryLink(functionsLibCtx *lib_ctx, functionLibInfo *li) {
dictEntry *entry = NULL;
while ((entry = dictNext(iter))) {
functionInfo *fi = dictGetVal(entry);
dictAdd(lib_ctx->functions, fi->name, fi);
dictAdd(lib_ctx->functions,
sdsnew(fi->compiled_function->name->ptr),
fi);
lib_ctx->cache_memory += functionMallocSize(fi);
}
dictReleaseIterator(iter);
@ -369,8 +364,11 @@ libraryJoin(functionsLibCtx *functions_lib_ctx_dst, functionsLibCtx *functions_l
iter = dictGetIterator(functions_lib_ctx_src->functions);
while ((entry = dictNext(iter))) {
functionInfo *fi = dictGetVal(entry);
if (dictFetchValue(functions_lib_ctx_dst->functions, fi->name)) {
*err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name);
if (dictFetchValue(functions_lib_ctx_dst->functions,
fi->compiled_function->name->ptr)) {
*err = sdscatfmt(sdsempty(),
"Function %s already exists",
fi->compiled_function->name->ptr);
goto done;
}
}
@ -472,7 +470,7 @@ static void functionListReplyFlags(client *c, functionInfo *fi) {
/* First count the number of flags we have */
int flagcount = 0;
for (scriptFlag *flag = scripts_flags_def; flag->str; ++flag) {
if (fi->f_flags & flag->flag) {
if (fi->compiled_function->f_flags & flag->flag) {
++flagcount;
}
}
@ -480,7 +478,7 @@ static void functionListReplyFlags(client *c, functionInfo *fi) {
addReplySetLen(c, flagcount);
for (scriptFlag *flag = scripts_flags_def; flag->str; ++flag) {
if (fi->f_flags & flag->flag) {
if (fi->compiled_function->f_flags & flag->flag) {
addReplyStatus(c, flag->str);
}
}
@ -552,10 +550,10 @@ void functionListCommand(client *c) {
functionInfo *fi = dictGetVal(function_entry);
addReplyMapLen(c, 3);
addReplyBulkCString(c, "name");
addReplyBulkCBuffer(c, fi->name, sdslen(fi->name));
addReplyBulkCString(c, fi->compiled_function->name->ptr);
addReplyBulkCString(c, "description");
if (fi->desc) {
addReplyBulkCBuffer(c, fi->desc, sdslen(fi->desc));
if (fi->compiled_function->desc) {
addReplyBulkCString(c, fi->compiled_function->desc->ptr);
} else {
addReplyNull(c);
}
@ -606,7 +604,7 @@ uint64_t fcallGetCommandFlags(client *c, uint64_t cmd_flags) {
c->cur_script = dictFind(curr_functions_lib_ctx->functions, function_name->ptr);
if (!c->cur_script) return cmd_flags;
functionInfo *fi = dictGetVal(c->cur_script);
uint64_t script_flags = fi->f_flags;
uint64_t script_flags = fi->compiled_function->f_flags;
return scriptFlagsToCmdFlags(cmd_flags, script_flags);
}
@ -639,12 +637,18 @@ static void fcallCommandGeneric(client *c, int ro) {
}
scriptRunCtx run_ctx;
if (scriptPrepareForRun(&run_ctx, scriptingEngineGetClient(engine), c, fi->name, fi->f_flags, ro) != C_OK) return;
if (scriptPrepareForRun(&run_ctx,
scriptingEngineGetClient(engine),
c,
fi->compiled_function->name->ptr,
fi->compiled_function->f_flags,
ro) != C_OK) return;
scriptingEngineCallFunction(engine,
&run_ctx,
run_ctx.original_client,
fi->function,
fi->compiled_function,
VMSE_FUNCTION,
c->argv + 3,
numkeys,
c->argv + 3 + numkeys,
@ -952,18 +956,9 @@ static void freeCompiledFunctions(scriptingEngine *engine,
compiledFunction **compiled_functions,
size_t num_compiled_functions,
size_t free_function_from_idx) {
for (size_t i = 0; i < num_compiled_functions; i++) {
compiledFunction *func = compiled_functions[i];
decrRefCount(func->name);
if (func->desc) {
decrRefCount(func->desc);
}
if (i >= free_function_from_idx) {
scriptingEngineCallFreeFunction(engine, func->function);
}
zfree(func);
for (size_t i = free_function_from_idx; i < num_compiled_functions; i++) {
scriptingEngineCallFreeFunction(engine, VMSE_FUNCTION, compiled_functions[i]);
}
zfree(compiled_functions);
}
@ -1009,11 +1004,12 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC
size_t num_compiled_functions = 0;
robj *compile_error = NULL;
compiledFunction **compiled_functions =
scriptingEngineCallCreateFunctionsLibrary(engine,
md.code,
timeout,
&num_compiled_functions,
&compile_error);
scriptingEngineCallCompileCode(engine,
VMSE_FUNCTION,
md.code,
timeout,
&num_compiled_functions,
&compile_error);
if (compiled_functions == NULL) {
serverAssert(num_compiled_functions == 0);
serverAssert(compile_error != NULL);
@ -1025,13 +1021,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC
serverAssert(compile_error == NULL);
for (size_t i = 0; i < num_compiled_functions; i++) {
compiledFunction *func = compiled_functions[i];
int ret = functionLibCreateFunction(func->name,
func->function,
new_li,
func->desc,
func->f_flags,
err);
int ret = functionLibCreateFunction(compiled_functions[i], new_li, err);
if (ret == C_ERR) {
freeCompiledFunctions(engine,
compiled_functions,
@ -1040,11 +1030,7 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC
goto error;
}
}
freeCompiledFunctions(engine,
compiled_functions,
num_compiled_functions,
num_compiled_functions);
zfree(compiled_functions);
if (dictSize(new_li->functions) == 0) {
*err = sdsnew("No functions registered");
@ -1055,9 +1041,11 @@ sds functionsCreateWithLibraryCtx(sds code, int replace, sds *err, functionsLibC
iter = dictGetIterator(new_li->functions);
while ((entry = dictNext(iter))) {
functionInfo *fi = dictGetVal(entry);
if (dictFetchValue(lib_ctx->functions, fi->name)) {
if (dictFetchValue(lib_ctx->functions,
fi->compiled_function->name->ptr)) {
/* functions name collision, abort. */
*err = sdscatfmt(sdsempty(), "Function %s already exists", fi->name);
*err = sdscatfmt(sdsempty(), "Function %s already exists",
fi->compiled_function->name->ptr);
goto error;
}
}
@ -1127,7 +1115,7 @@ void functionLoadCommand(client *c) {
static void getEngineUsedMemory(scriptingEngine *engine, void *context) {
size_t *engines_memory = (size_t *)context;
engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(engine);
engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(engine, VMSE_FUNCTION);
*engines_memory += mem_info.used_memory;
}
@ -1171,9 +1159,5 @@ size_t functionsLibCtxFunctionsLen(functionsLibCtx *functions_ctx) {
int functionsInit(void) {
curr_functions_lib_ctx = functionsLibCtxCreate();
if (luaEngineInitEngine() != C_OK) {
return C_ERR;
}
return C_OK;
}

View File

@ -68,12 +68,8 @@ typedef struct engineInfo {
/* Hold information about the specific function.
* Used on rdb.c so it must be declared here. */
typedef struct functionInfo {
sds name; /* Function name */
void *function; /* Opaque object that set by the function's engine and allow it
to run the function, usually it's the function compiled code. */
functionLibInfo *li; /* Pointer to the library created the function */
sds desc; /* Function description */
uint64_t f_flags; /* Function flags */
compiledFunction *compiled_function; /* Compiled function structure */
functionLibInfo *li; /* Pointer to the library created the function */
} functionInfo;
/* Hold information about the specific library.
@ -101,7 +97,6 @@ void functionsLibCtxSwapWithCurrent(functionsLibCtx *new_lib_ctx, int async);
void functionsRemoveLibFromEngine(scriptingEngine *engine);
int luaEngineInitEngine(void);
int functionsInit(void);
#endif /* __FUNCTIONS_H_ */

View File

@ -50,13 +50,13 @@ void lazyFreeErrors(void *args[]) {
atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed);
}
/* Release the lua_scripts dict. */
void lazyFreeLuaScripts(void *args[]) {
dict *lua_scripts = args[0];
list *lua_scripts_lru_list = args[1];
lua_State *lua = args[2];
long long len = dictSize(lua_scripts);
freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
/* Release the eval scripts data structures. */
void lazyFreeEvalScripts(void *args[]) {
dict *scripts = args[0];
list *scripts_lru_list = args[1];
list *engine_callbacks = args[2];
long long len = dictSize(scripts);
freeEvalScripts(scripts, scripts_lru_list, engine_callbacks);
atomic_fetch_sub_explicit(&lazyfree_objects, len, memory_order_relaxed);
atomic_fetch_add_explicit(&lazyfreed_objects, len, memory_order_relaxed);
}
@ -223,14 +223,15 @@ void freeErrorsRadixTreeAsync(rax *errors) {
}
}
/* Free lua_scripts dict and lru list, if the dict is huge enough, free them in async way.
/* Free scripts dict, and lru list, if the dict is huge enough, free them in
* async way.
* Close lua interpreter, if there are a lot of lua scripts, close it in async way. */
void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua) {
if (dictSize(lua_scripts) > LAZYFREE_THRESHOLD) {
atomic_fetch_add_explicit(&lazyfree_objects, dictSize(lua_scripts), memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeLuaScripts, 3, lua_scripts, lua_scripts_lru_list, lua);
void freeEvalScriptsAsync(dict *scripts, list *scripts_lru_list, list *engine_callbacks) {
if (dictSize(scripts) > LAZYFREE_THRESHOLD) {
atomic_fetch_add_explicit(&lazyfree_objects, dictSize(scripts), memory_order_relaxed);
bioCreateLazyFreeJob(lazyFreeEvalScripts, 3, scripts, scripts_lru_list, engine_callbacks);
} else {
freeLuaScriptsSync(lua_scripts, lua_scripts_lru_list, lua);
freeEvalScripts(scripts, scripts_lru_list, engine_callbacks);
}
}

998
src/lua/debug_lua.c Normal file
View File

@ -0,0 +1,998 @@
/*
* Copyright (c) 2009-2012, Redis Ltd.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
#include "debug_lua.h"
#include "script_lua.h"
#include "../connection.h"
#include "../adlist.h"
#include "../server.h"
#include <lua.h>
#include <lauxlib.h>
#include <lualib.h>
#include <signal.h>
/* ---------------------------------------------------------------------------
* LDB: Lua debugging facilities
* ------------------------------------------------------------------------- */
/* Debugger shared state is stored inside this global structure. */
#define LDB_BREAKPOINTS_MAX 64 /* Max number of breakpoints. */
#define LDB_MAX_LEN_DEFAULT 256 /* Default len limit for replies / var dumps. */
struct ldbState {
connection *conn; /* Connection of the debugging client. */
int active; /* Are we debugging EVAL right now? */
int forked; /* Is this a fork()ed debugging session? */
list *logs; /* List of messages to send to the client. */
list *traces; /* Messages about commands executed since last stop.*/
list *children; /* All forked debugging sessions pids. */
int bp[LDB_BREAKPOINTS_MAX]; /* An array of breakpoints line numbers. */
int bpcount; /* Number of valid entries inside bp. */
int step; /* Stop at next line regardless of breakpoints. */
int luabp; /* Stop at next line because server.breakpoint() was called. */
sds *src; /* Lua script source code split by line. */
int lines; /* Number of lines in 'src'. */
int currentline; /* Current line number. */
sds cbuf; /* Debugger client command buffer. */
size_t maxlen; /* Max var dump / reply length. */
int maxlen_hint_sent; /* Did we already hint about "set maxlen"? */
} ldb;
/* Initialize Lua debugger data structures. */
void ldbInit(void) {
ldb.conn = NULL;
ldb.active = 0;
ldb.logs = listCreate();
listSetFreeMethod(ldb.logs, (void (*)(void *))sdsfree);
ldb.children = listCreate();
ldb.src = NULL;
ldb.lines = 0;
ldb.cbuf = sdsempty();
}
/* Remove all the pending messages in the specified list. */
void ldbFlushLog(list *log) {
listNode *ln;
while ((ln = listFirst(log)) != NULL) listDelNode(log, ln);
}
int ldbIsEnabled(void) {
return ldb.active && ldb.step;
}
/* Enable debug mode of Lua scripts for this client. */
void ldbEnable(client *c) {
c->flag.lua_debug = 1;
ldbFlushLog(ldb.logs);
ldb.conn = c->conn;
ldb.step = 1;
ldb.bpcount = 0;
ldb.luabp = 0;
sdsfree(ldb.cbuf);
ldb.cbuf = sdsempty();
ldb.maxlen = LDB_MAX_LEN_DEFAULT;
ldb.maxlen_hint_sent = 0;
}
/* Exit debugging mode from the POV of client. This function is not enough
* to properly shut down a client debugging session, see ldbEndSession()
* for more information. */
void ldbDisable(client *c) {
c->flag.lua_debug = 0;
c->flag.lua_debug_sync = 0;
}
/* Append a log entry to the specified LDB log. */
void ldbLog(sds entry) {
listAddNodeTail(ldb.logs, entry);
}
/* A version of ldbLog() which prevents producing logs greater than
* ldb.maxlen. The first time the limit is reached a hint is generated
* to inform the user that reply trimming can be disabled using the
* debugger "maxlen" command. */
void ldbLogWithMaxLen(sds entry) {
int trimmed = 0;
if (ldb.maxlen && sdslen(entry) > ldb.maxlen) {
sdsrange(entry, 0, ldb.maxlen - 1);
entry = sdscatlen(entry, " ...", 4);
trimmed = 1;
}
ldbLog(entry);
if (trimmed && ldb.maxlen_hint_sent == 0) {
ldb.maxlen_hint_sent = 1;
ldbLog(sdsnew("<hint> The above reply was trimmed. Use 'maxlen 0' to disable trimming."));
}
}
/* Send ldb.logs to the debugging client as a multi-bulk reply
* consisting of simple strings. Log entries which include newlines have them
* replaced with spaces. The entries sent are also consumed. */
void ldbSendLogs(void) {
sds proto = sdsempty();
proto = sdscatfmt(proto, "*%i\r\n", (int)listLength(ldb.logs));
while (listLength(ldb.logs)) {
listNode *ln = listFirst(ldb.logs);
proto = sdscatlen(proto, "+", 1);
sdsmapchars(ln->value, "\r\n", " ", 2);
proto = sdscatsds(proto, ln->value);
proto = sdscatlen(proto, "\r\n", 2);
listDelNode(ldb.logs, ln);
}
if (connWrite(ldb.conn, proto, sdslen(proto)) == -1) {
/* Avoid warning. We don't check the return value of write()
* since the next read() will catch the I/O error and will
* close the debugging session. */
}
sdsfree(proto);
}
/* Start a debugging session before calling EVAL implementation.
* The technique we use is to capture the client socket file descriptor,
* in order to perform direct I/O with it from within Lua hooks. This
* way we don't have to re-enter the server in order to handle I/O.
*
* The function returns 1 if the caller should proceed to call EVAL,
* and 0 if instead the caller should abort the operation (this happens
* for the parent in a forked session, since it's up to the children
* to continue, or when fork returned an error).
*
* The caller should call ldbEndSession() only if ldbStartSession()
* returned 1. */
int ldbStartSession(client *c) {
ldb.forked = !c->flag.lua_debug_sync;
if (ldb.forked) {
pid_t cp = serverFork(CHILD_TYPE_LDB);
if (cp == -1) {
addReplyErrorFormat(c, "Fork() failed: can't run EVAL in debugging mode: %s", strerror(errno));
return 0;
} else if (cp == 0) {
/* Child. Let's ignore important signals handled by the parent. */
struct sigaction act;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_handler = SIG_IGN;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGINT, &act, NULL);
/* Log the creation of the child and close the listening
* socket to make sure if the parent crashes a reset is sent
* to the clients. */
serverLog(LL_NOTICE, "%s forked for debugging eval", SERVER_TITLE);
} else {
/* Parent */
listAddNodeTail(ldb.children, (void *)(unsigned long)cp);
freeClientAsync(c); /* Close the client in the parent side. */
return 0;
}
} else {
serverLog(LL_NOTICE, "%s synchronous debugging eval session started", SERVER_TITLE);
}
/* Setup our debugging session. */
connBlock(ldb.conn);
connSendTimeout(ldb.conn, 5000);
ldb.active = 1;
/* First argument of EVAL is the script itself. We split it into different
* lines since this is the way the debugger accesses the source code. */
sds srcstring = sdsdup(c->argv[1]->ptr);
size_t srclen = sdslen(srcstring);
while (srclen && (srcstring[srclen - 1] == '\n' || srcstring[srclen - 1] == '\r')) {
srcstring[--srclen] = '\0';
}
sdssetlen(srcstring, srclen);
ldb.src = sdssplitlen(srcstring, sdslen(srcstring), "\n", 1, &ldb.lines);
sdsfree(srcstring);
return 1;
}
/* End a debugging session after the EVAL call with debugging enabled
* returned. */
void ldbEndSession(client *c) {
/* Emit the remaining logs and an <endsession> mark. */
ldbLog(sdsnew("<endsession>"));
ldbSendLogs();
/* If it's a fork()ed session, we just exit. */
if (ldb.forked) {
writeToClient(c);
serverLog(LL_NOTICE, "Lua debugging session child exiting");
exitFromChild(0);
} else {
serverLog(LL_NOTICE, "%s synchronous debugging eval session ended", SERVER_TITLE);
}
/* Otherwise let's restore client's state. */
connNonBlock(ldb.conn);
connSendTimeout(ldb.conn, 0);
/* Close the client connection after sending the final EVAL reply
* in order to signal the end of the debugging session. */
c->flag.close_after_reply = 1;
/* Cleanup. */
sdsfreesplitres(ldb.src, ldb.lines);
ldb.lines = 0;
ldb.active = 0;
}
/* If the specified pid is among the list of children spawned for
* forked debugging sessions, it is removed from the children list.
* If the pid was found non-zero is returned. */
int ldbRemoveChild(int pid) {
listNode *ln = listSearchKey(ldb.children, (void *)(unsigned long)pid);
if (ln) {
listDelNode(ldb.children, ln);
return 1;
}
return 0;
}
/* Return the number of children we still did not receive termination
* acknowledge via wait() in the parent process. */
int ldbPendingChildren(void) {
return listLength(ldb.children);
}
/* Kill all the forked sessions. */
void ldbKillForkedSessions(void) {
listIter li;
listNode *ln;
listRewind(ldb.children, &li);
while ((ln = listNext(&li))) {
pid_t pid = (unsigned long)ln->value;
serverLog(LL_NOTICE, "Killing debugging session %ld", (long)pid);
kill(pid, SIGKILL);
}
listRelease(ldb.children);
ldb.children = listCreate();
}
/* Return a pointer to ldb.src source code line, considering line to be
* one-based, and returning a special string for out of range lines. */
char *ldbGetSourceLine(int line) {
int idx = line - 1;
if (idx < 0 || idx >= ldb.lines) return "<out of range source code line>";
return ldb.src[idx];
}
/* Return true if there is a breakpoint in the specified line. */
int ldbIsBreakpoint(int line) {
int j;
for (j = 0; j < ldb.bpcount; j++)
if (ldb.bp[j] == line) return 1;
return 0;
}
/* Add the specified breakpoint. Ignore it if we already reached the max.
* Returns 1 if the breakpoint was added (or was already set). 0 if there is
* no space for the breakpoint or if the line is invalid. */
int ldbAddBreakpoint(int line) {
if (line <= 0 || line > ldb.lines) return 0;
if (!ldbIsBreakpoint(line) && ldb.bpcount != LDB_BREAKPOINTS_MAX) {
ldb.bp[ldb.bpcount++] = line;
return 1;
}
return 0;
}
/* Remove the specified breakpoint, returning 1 if the operation was
* performed or 0 if there was no such breakpoint. */
int ldbDelBreakpoint(int line) {
int j;
for (j = 0; j < ldb.bpcount; j++) {
if (ldb.bp[j] == line) {
ldb.bpcount--;
memmove(ldb.bp + j, ldb.bp + j + 1, ldb.bpcount - j);
return 1;
}
}
return 0;
}
/* Expect a valid multi-bulk command in the debugging client query buffer.
* On success the command is parsed and returned as an array of SDS strings,
* otherwise NULL is returned and there is to read more buffer. */
sds *ldbReplParseCommand(int *argcp, char **err) {
static char *protocol_error = "protocol error";
sds *argv = NULL;
int argc = 0;
if (sdslen(ldb.cbuf) == 0) return NULL;
/* Working on a copy is simpler in this case. We can modify it freely
* for the sake of simpler parsing. */
sds copy = sdsdup(ldb.cbuf);
char *p = copy;
/* This RESP parser is a joke... just the simplest thing that
* works in this context. It is also very forgiving regarding broken
* protocol. */
/* Seek and parse *<count>\r\n. */
p = strchr(p, '*');
if (!p) goto protoerr;
char *plen = p + 1; /* Multi bulk len pointer. */
p = strstr(p, "\r\n");
if (!p) goto keep_reading;
*p = '\0';
p += 2;
*argcp = atoi(plen);
if (*argcp <= 0 || *argcp > 1024) goto protoerr;
/* Parse each argument. */
argv = zmalloc(sizeof(sds) * (*argcp));
argc = 0;
while (argc < *argcp) {
/* reached the end but there should be more data to read */
if (*p == '\0') goto keep_reading;
if (*p != '$') goto protoerr;
plen = p + 1; /* Bulk string len pointer. */
p = strstr(p, "\r\n");
if (!p) goto keep_reading;
*p = '\0';
p += 2;
int slen = atoi(plen); /* Length of this arg. */
if (slen <= 0 || slen > 1024) goto protoerr;
if ((size_t)(p + slen + 2 - copy) > sdslen(copy)) goto keep_reading;
argv[argc++] = sdsnewlen(p, slen);
p += slen; /* Skip the already parsed argument. */
if (p[0] != '\r' || p[1] != '\n') goto protoerr;
p += 2; /* Skip \r\n. */
}
sdsfree(copy);
return argv;
protoerr:
*err = protocol_error;
keep_reading:
sdsfreesplitres(argv, argc);
sdsfree(copy);
return NULL;
}
/* Log the specified line in the Lua debugger output. */
void ldbLogSourceLine(int lnum) {
char *line = ldbGetSourceLine(lnum);
char *prefix;
int bp = ldbIsBreakpoint(lnum);
int current = ldb.currentline == lnum;
if (current && bp)
prefix = "->#";
else if (current)
prefix = "-> ";
else if (bp)
prefix = " #";
else
prefix = " ";
sds thisline = sdscatprintf(sdsempty(), "%s%-3d %s", prefix, lnum, line);
ldbLog(thisline);
}
/* Implement the "list" command of the Lua debugger. If around is 0
* the whole file is listed, otherwise only a small portion of the file
* around the specified line is shown. When a line number is specified
* the amount of context (lines before/after) is specified via the
* 'context' argument. */
void ldbList(int around, int context) {
int j;
for (j = 1; j <= ldb.lines; j++) {
if (around != 0 && abs(around - j) > context) continue;
ldbLogSourceLine(j);
}
}
/* Append a human readable representation of the Lua value at position 'idx'
* on the stack of the 'lua' state, to the SDS string passed as argument.
* The new SDS string with the represented value attached is returned.
* Used in order to implement ldbLogStackValue().
*
* The element is not automatically removed from the stack, nor it is
* converted to a different type. */
#define LDB_MAX_VALUES_DEPTH (LUA_MINSTACK / 2)
static sds ldbCatStackValueRec(sds s, lua_State *lua, int idx, int level) {
int t = lua_type(lua, idx);
if (level++ == LDB_MAX_VALUES_DEPTH) return sdscat(s, "<max recursion level reached! Nested table?>");
switch (t) {
case LUA_TSTRING: {
size_t strl;
char *strp = (char *)lua_tolstring(lua, idx, &strl);
s = sdscatrepr(s, strp, strl);
} break;
case LUA_TBOOLEAN: s = sdscat(s, lua_toboolean(lua, idx) ? "true" : "false"); break;
case LUA_TNUMBER: s = sdscatprintf(s, "%g", (double)lua_tonumber(lua, idx)); break;
case LUA_TNIL: s = sdscatlen(s, "nil", 3); break;
case LUA_TTABLE: {
int expected_index = 1; /* First index we expect in an array. */
int is_array = 1; /* Will be set to null if check fails. */
/* Note: we create two representations at the same time, one
* assuming the table is an array, one assuming it is not. At the
* end we know what is true and select the right one. */
sds repr1 = sdsempty();
sds repr2 = sdsempty();
lua_pushnil(lua); /* The first key to start the iteration is nil. */
while (lua_next(lua, idx - 1)) {
/* Test if so far the table looks like an array. */
if (is_array && (lua_type(lua, -2) != LUA_TNUMBER || lua_tonumber(lua, -2) != expected_index)) is_array = 0;
/* Stack now: table, key, value */
/* Array repr. */
repr1 = ldbCatStackValueRec(repr1, lua, -1, level);
repr1 = sdscatlen(repr1, "; ", 2);
/* Full repr. */
repr2 = sdscatlen(repr2, "[", 1);
repr2 = ldbCatStackValueRec(repr2, lua, -2, level);
repr2 = sdscatlen(repr2, "]=", 2);
repr2 = ldbCatStackValueRec(repr2, lua, -1, level);
repr2 = sdscatlen(repr2, "; ", 2);
lua_pop(lua, 1); /* Stack: table, key. Ready for next iteration. */
expected_index++;
}
/* Strip the last " ;" from both the representations. */
if (sdslen(repr1)) sdsrange(repr1, 0, -3);
if (sdslen(repr2)) sdsrange(repr2, 0, -3);
/* Select the right one and discard the other. */
s = sdscatlen(s, "{", 1);
s = sdscatsds(s, is_array ? repr1 : repr2);
s = sdscatlen(s, "}", 1);
sdsfree(repr1);
sdsfree(repr2);
} break;
case LUA_TFUNCTION:
case LUA_TUSERDATA:
case LUA_TTHREAD:
case LUA_TLIGHTUSERDATA: {
const void *p = lua_topointer(lua, idx);
char *typename = "unknown";
if (t == LUA_TFUNCTION)
typename = "function";
else if (t == LUA_TUSERDATA)
typename = "userdata";
else if (t == LUA_TTHREAD)
typename = "thread";
else if (t == LUA_TLIGHTUSERDATA)
typename = "light-userdata";
s = sdscatprintf(s, "\"%s@%p\"", typename, p);
} break;
default: s = sdscat(s, "\"<unknown-lua-type>\""); break;
}
return s;
}
/* Higher level wrapper for ldbCatStackValueRec() that just uses an initial
* recursion level of '0'. */
sds ldbCatStackValue(sds s, lua_State *lua, int idx) {
return ldbCatStackValueRec(s, lua, idx, 0);
}
/* Produce a debugger log entry representing the value of the Lua object
* currently on the top of the stack. The element is not popped nor modified.
* Check ldbCatStackValue() for the actual implementation. */
void ldbLogStackValue(lua_State *lua, char *prefix) {
sds s = sdsnew(prefix);
s = ldbCatStackValue(s, lua, -1);
ldbLogWithMaxLen(s);
}
char *ldbRespToHuman_Int(sds *o, char *reply);
char *ldbRespToHuman_Bulk(sds *o, char *reply);
char *ldbRespToHuman_Status(sds *o, char *reply);
char *ldbRespToHuman_MultiBulk(sds *o, char *reply);
char *ldbRespToHuman_Set(sds *o, char *reply);
char *ldbRespToHuman_Map(sds *o, char *reply);
char *ldbRespToHuman_Null(sds *o, char *reply);
char *ldbRespToHuman_Bool(sds *o, char *reply);
char *ldbRespToHuman_Double(sds *o, char *reply);
/* Get RESP from 'reply' and appends it in human readable form to
* the passed SDS string 'o'.
*
* Note that the SDS string is passed by reference (pointer of pointer to
* char*) so that we can return a modified pointer, as for SDS semantics. */
char *ldbRespToHuman(sds *o, char *reply) {
char *p = reply;
switch (*p) {
case ':': p = ldbRespToHuman_Int(o, reply); break;
case '$': p = ldbRespToHuman_Bulk(o, reply); break;
case '+': p = ldbRespToHuman_Status(o, reply); break;
case '-': p = ldbRespToHuman_Status(o, reply); break;
case '*': p = ldbRespToHuman_MultiBulk(o, reply); break;
case '~': p = ldbRespToHuman_Set(o, reply); break;
case '%': p = ldbRespToHuman_Map(o, reply); break;
case '_': p = ldbRespToHuman_Null(o, reply); break;
case '#': p = ldbRespToHuman_Bool(o, reply); break;
case ',': p = ldbRespToHuman_Double(o, reply); break;
}
return p;
}
/* The following functions are helpers for ldbRespToHuman(), each
* take care of a given RESP return type. */
char *ldbRespToHuman_Int(sds *o, char *reply) {
char *p = strchr(reply + 1, '\r');
*o = sdscatlen(*o, reply + 1, p - reply - 1);
return p + 2;
}
char *ldbRespToHuman_Bulk(sds *o, char *reply) {
char *p = strchr(reply + 1, '\r');
long long bulklen;
string2ll(reply + 1, p - reply - 1, &bulklen);
if (bulklen == -1) {
*o = sdscatlen(*o, "NULL", 4);
return p + 2;
} else {
*o = sdscatrepr(*o, p + 2, bulklen);
return p + 2 + bulklen + 2;
}
}
char *ldbRespToHuman_Status(sds *o, char *reply) {
char *p = strchr(reply + 1, '\r');
*o = sdscatrepr(*o, reply, p - reply);
return p + 2;
}
char *ldbRespToHuman_MultiBulk(sds *o, char *reply) {
char *p = strchr(reply + 1, '\r');
long long mbulklen;
int j = 0;
string2ll(reply + 1, p - reply - 1, &mbulklen);
p += 2;
if (mbulklen == -1) {
*o = sdscatlen(*o, "NULL", 4);
return p;
}
*o = sdscatlen(*o, "[", 1);
for (j = 0; j < mbulklen; j++) {
p = ldbRespToHuman(o, p);
if (j != mbulklen - 1) *o = sdscatlen(*o, ",", 1);
}
*o = sdscatlen(*o, "]", 1);
return p;
}
char *ldbRespToHuman_Set(sds *o, char *reply) {
char *p = strchr(reply + 1, '\r');
long long mbulklen;
int j = 0;
string2ll(reply + 1, p - reply - 1, &mbulklen);
p += 2;
*o = sdscatlen(*o, "~(", 2);
for (j = 0; j < mbulklen; j++) {
p = ldbRespToHuman(o, p);
if (j != mbulklen - 1) *o = sdscatlen(*o, ",", 1);
}
*o = sdscatlen(*o, ")", 1);
return p;
}
char *ldbRespToHuman_Map(sds *o, char *reply) {
char *p = strchr(reply + 1, '\r');
long long mbulklen;
int j = 0;
string2ll(reply + 1, p - reply - 1, &mbulklen);
p += 2;
*o = sdscatlen(*o, "{", 1);
for (j = 0; j < mbulklen; j++) {
p = ldbRespToHuman(o, p);
*o = sdscatlen(*o, " => ", 4);
p = ldbRespToHuman(o, p);
if (j != mbulklen - 1) *o = sdscatlen(*o, ",", 1);
}
*o = sdscatlen(*o, "}", 1);
return p;
}
char *ldbRespToHuman_Null(sds *o, char *reply) {
char *p = strchr(reply + 1, '\r');
*o = sdscatlen(*o, "(null)", 6);
return p + 2;
}
char *ldbRespToHuman_Bool(sds *o, char *reply) {
char *p = strchr(reply + 1, '\r');
if (reply[1] == 't')
*o = sdscatlen(*o, "#true", 5);
else
*o = sdscatlen(*o, "#false", 6);
return p + 2;
}
char *ldbRespToHuman_Double(sds *o, char *reply) {
char *p = strchr(reply + 1, '\r');
*o = sdscatlen(*o, "(double) ", 9);
*o = sdscatlen(*o, reply + 1, p - reply - 1);
return p + 2;
}
/* Log a RESP reply as debugger output, in a human readable format.
* If the resulting string is longer than 'len' plus a few more chars
* used as prefix, it gets truncated. */
void ldbLogRespReply(char *reply) {
sds log = sdsnew("<reply> ");
ldbRespToHuman(&log, reply);
ldbLogWithMaxLen(log);
}
/* Implements the "print <var>" command of the Lua debugger. It scans for Lua
* var "varname" starting from the current stack frame up to the top stack
* frame. The first matching variable is printed. */
void ldbPrint(lua_State *lua, char *varname) {
lua_Debug ar;
int l = 0; /* Stack level. */
while (lua_getstack(lua, l, &ar) != 0) {
l++;
const char *name;
int i = 1; /* Variable index. */
while ((name = lua_getlocal(lua, &ar, i)) != NULL) {
i++;
if (strcmp(varname, name) == 0) {
ldbLogStackValue(lua, "<value> ");
lua_pop(lua, 1);
return;
} else {
lua_pop(lua, 1); /* Discard the var name on the stack. */
}
}
}
/* Let's try with global vars in two selected cases */
if (!strcmp(varname, "ARGV") || !strcmp(varname, "KEYS")) {
lua_getglobal(lua, varname);
ldbLogStackValue(lua, "<value> ");
lua_pop(lua, 1);
} else {
ldbLog(sdsnew("No such variable."));
}
}
/* Implements the "print" command (without arguments) of the Lua debugger.
* Prints all the variables in the current stack frame. */
void ldbPrintAll(lua_State *lua) {
lua_Debug ar;
int vars = 0;
if (lua_getstack(lua, 0, &ar) != 0) {
const char *name;
int i = 1; /* Variable index. */
while ((name = lua_getlocal(lua, &ar, i)) != NULL) {
i++;
if (!strstr(name, "(*temporary)")) {
sds prefix = sdscatprintf(sdsempty(), "<value> %s = ", name);
ldbLogStackValue(lua, prefix);
sdsfree(prefix);
vars++;
}
lua_pop(lua, 1);
}
}
if (vars == 0) {
ldbLog(sdsnew("No local variables in the current context."));
}
}
/* Implements the break command to list, add and remove breakpoints. */
void ldbBreak(sds *argv, int argc) {
if (argc == 1) {
if (ldb.bpcount == 0) {
ldbLog(sdsnew("No breakpoints set. Use 'b <line>' to add one."));
return;
} else {
ldbLog(sdscatfmt(sdsempty(), "%i breakpoints set:", ldb.bpcount));
int j;
for (j = 0; j < ldb.bpcount; j++) ldbLogSourceLine(ldb.bp[j]);
}
} else {
int j;
for (j = 1; j < argc; j++) {
char *arg = argv[j];
long line;
if (!string2l(arg, sdslen(arg), &line)) {
ldbLog(sdscatfmt(sdsempty(), "Invalid argument:'%s'", arg));
} else {
if (line == 0) {
ldb.bpcount = 0;
ldbLog(sdsnew("All breakpoints removed."));
} else if (line > 0) {
if (ldb.bpcount == LDB_BREAKPOINTS_MAX) {
ldbLog(sdsnew("Too many breakpoints set."));
} else if (ldbAddBreakpoint(line)) {
ldbList(line, 1);
} else {
ldbLog(sdsnew("Wrong line number."));
}
} else if (line < 0) {
if (ldbDelBreakpoint(-line))
ldbLog(sdsnew("Breakpoint removed."));
else
ldbLog(sdsnew("No breakpoint in the specified line."));
}
}
}
}
}
/* Implements the Lua debugger "eval" command. It just compiles the user
* passed fragment of code and executes it, showing the result left on
* the stack. */
void ldbEval(lua_State *lua, sds *argv, int argc) {
/* Glue the script together if it is composed of multiple arguments. */
sds code = sdsjoinsds(argv + 1, argc - 1, " ", 1);
sds expr = sdscatsds(sdsnew("return "), code);
/* Try to compile it as an expression, prepending "return ". */
if (luaL_loadbuffer(lua, expr, sdslen(expr), "@ldb_eval")) {
lua_pop(lua, 1);
/* Failed? Try as a statement. */
if (luaL_loadbuffer(lua, code, sdslen(code), "@ldb_eval")) {
ldbLog(sdscatfmt(sdsempty(), "<error> %s", lua_tostring(lua, -1)));
lua_pop(lua, 1);
sdsfree(code);
sdsfree(expr);
return;
}
}
/* Call it. */
sdsfree(code);
sdsfree(expr);
if (lua_pcall(lua, 0, 1, 0)) {
ldbLog(sdscatfmt(sdsempty(), "<error> %s", lua_tostring(lua, -1)));
lua_pop(lua, 1);
return;
}
ldbLogStackValue(lua, "<retval> ");
lua_pop(lua, 1);
}
/* Implement the debugger "server" command. We use a trick in order to make
* the implementation very simple: we just call the Lua server.call() command
* implementation, with ldb.step enabled, so as a side effect the command
* and its reply are logged. */
void ldbServer(lua_State *lua, sds *argv, int argc) {
int j;
if (!lua_checkstack(lua, argc + 1)) {
/* Increase the Lua stack if needed to make sure there is enough room
* to push 'argc + 1' elements to the stack. On failure, return error.
* Notice that we need, in worst case, 'argc + 1' elements because we push all the arguments
* given by the user (without the first argument) and we also push the 'server' global table and
* 'server.call' function so:
* (1 (server table)) + (1 (server.call function)) + (argc - 1 (all arguments without the first)) = argc + 1*/
ldbLogRespReply("max lua stack reached");
return;
}
lua_getglobal(lua, "server");
lua_pushstring(lua, "call");
lua_gettable(lua, -2); /* Stack: server, server.call */
for (j = 1; j < argc; j++)
lua_pushlstring(lua, argv[j], sdslen(argv[j]));
ldb.step = 1; /* Force server.call() to log. */
lua_pcall(lua, argc - 1, 1, 0); /* Stack: server, result */
ldb.step = 0; /* Disable logging. */
lua_pop(lua, 2); /* Discard the result and clean the stack. */
}
/* Implements "trace" command of the Lua debugger. It just prints a backtrace
* querying Lua starting from the current callframe back to the outer one. */
void ldbTrace(lua_State *lua) {
lua_Debug ar;
int level = 0;
while (lua_getstack(lua, level, &ar)) {
lua_getinfo(lua, "Snl", &ar);
if (strstr(ar.short_src, "user_script") != NULL) {
ldbLog(sdscatprintf(sdsempty(), "%s %s:", (level == 0) ? "In" : "From", ar.name ? ar.name : "top level"));
ldbLogSourceLine(ar.currentline);
}
level++;
}
if (level == 0) {
ldbLog(sdsnew("<error> Can't retrieve Lua stack."));
}
}
/* Implements the debugger "maxlen" command. It just queries or sets the
* ldb.maxlen variable. */
void ldbMaxlen(sds *argv, int argc) {
if (argc == 2) {
int newval = atoi(argv[1]);
ldb.maxlen_hint_sent = 1; /* User knows about this command. */
if (newval != 0 && newval <= 60) newval = 60;
ldb.maxlen = newval;
}
if (ldb.maxlen) {
ldbLog(sdscatprintf(sdsempty(), "<value> replies are truncated at %d bytes.", (int)ldb.maxlen));
} else {
ldbLog(sdscatprintf(sdsempty(), "<value> replies are unlimited."));
}
}
/* Read debugging commands from client.
* Return C_OK if the debugging session is continuing, otherwise
* C_ERR if the client closed the connection or is timing out. */
int ldbRepl(lua_State *lua) {
sds *argv;
int argc;
char *err = NULL;
/* We continue processing commands until a command that should return
* to the Lua interpreter is found. */
while (1) {
while ((argv = ldbReplParseCommand(&argc, &err)) == NULL) {
char buf[1024];
if (err) {
luaPushError(lua, err);
luaError(lua);
}
int nread = connRead(ldb.conn, buf, sizeof(buf));
if (nread <= 0) {
/* Make sure the script runs without user input since the
* client is no longer connected. */
ldb.step = 0;
ldb.bpcount = 0;
return C_ERR;
}
ldb.cbuf = sdscatlen(ldb.cbuf, buf, nread);
/* after 1M we will exit with an error
* so that the client will not blow the memory
*/
if (sdslen(ldb.cbuf) > 1 << 20) {
sdsfree(ldb.cbuf);
ldb.cbuf = sdsempty();
luaPushError(lua, "max client buffer reached");
luaError(lua);
}
}
/* Flush the old buffer. */
sdsfree(ldb.cbuf);
ldb.cbuf = sdsempty();
/* Execute the command. */
if (!strcasecmp(argv[0], "h") || !strcasecmp(argv[0], "help")) {
ldbLog(sdsnew("Lua debugger help:"));
ldbLog(sdsnew("[h]elp Show this help."));
ldbLog(sdsnew("[s]tep Run current line and stop again."));
ldbLog(sdsnew("[n]ext Alias for step."));
ldbLog(sdsnew("[c]ontinue Run till next breakpoint."));
ldbLog(sdsnew("[l]ist List source code around current line."));
ldbLog(sdsnew("[l]ist [line] List source code around [line]."));
ldbLog(sdsnew(" line = 0 means: current position."));
ldbLog(sdsnew("[l]ist [line] [ctx] In this form [ctx] specifies how many lines"));
ldbLog(sdsnew(" to show before/after [line]."));
ldbLog(sdsnew("[w]hole List all source code. Alias for 'list 1 1000000'."));
ldbLog(sdsnew("[p]rint Show all the local variables."));
ldbLog(sdsnew("[p]rint <var> Show the value of the specified variable."));
ldbLog(sdsnew(" Can also show global vars KEYS and ARGV."));
ldbLog(sdsnew("[b]reak Show all breakpoints."));
ldbLog(sdsnew("[b]reak <line> Add a breakpoint to the specified line."));
ldbLog(sdsnew("[b]reak -<line> Remove breakpoint from the specified line."));
ldbLog(sdsnew("[b]reak 0 Remove all breakpoints."));
ldbLog(sdsnew("[t]race Show a backtrace."));
ldbLog(sdsnew("[e]val <code> Execute some Lua code (in a different callframe)."));
ldbLog(sdsnew("[v]alkey <cmd> Execute a command."));
ldbLog(sdsnew("[m]axlen [len] Trim logged replies and Lua var dumps to len."));
ldbLog(sdsnew(" Specifying zero as <len> means unlimited."));
ldbLog(sdsnew("[a]bort Stop the execution of the script. In sync"));
ldbLog(sdsnew(" mode dataset changes will be retained."));
ldbLog(sdsnew(""));
ldbLog(sdsnew("Debugger functions you can call from Lua scripts:"));
ldbLog(sdsnew("server.debug() Produce logs in the debugger console."));
ldbLog(sdsnew("server.breakpoint() Stop execution like if there was a breakpoint in the"));
ldbLog(sdsnew(" next line of code."));
ldbSendLogs();
} else if (!strcasecmp(argv[0], "s") || !strcasecmp(argv[0], "step") || !strcasecmp(argv[0], "n") ||
!strcasecmp(argv[0], "next")) {
ldb.step = 1;
break;
} else if (!strcasecmp(argv[0], "c") || !strcasecmp(argv[0], "continue")) {
break;
} else if (!strcasecmp(argv[0], "t") || !strcasecmp(argv[0], "trace")) {
ldbTrace(lua);
ldbSendLogs();
} else if (!strcasecmp(argv[0], "m") || !strcasecmp(argv[0], "maxlen")) {
ldbMaxlen(argv, argc);
ldbSendLogs();
} else if (!strcasecmp(argv[0], "b") || !strcasecmp(argv[0], "break")) {
ldbBreak(argv, argc);
ldbSendLogs();
} else if (!strcasecmp(argv[0], "e") || !strcasecmp(argv[0], "eval")) {
ldbEval(lua, argv, argc);
ldbSendLogs();
} else if (!strcasecmp(argv[0], "a") || !strcasecmp(argv[0], "abort")) {
luaPushError(lua, "script aborted for user request");
luaError(lua);
} else if (argc > 1 && ((!strcasecmp(argv[0], "r") || !strcasecmp(argv[0], "redis")) ||
(!strcasecmp(argv[0], "v") || !strcasecmp(argv[0], "valkey")) ||
!strcasecmp(argv[0], SERVER_API_NAME))) {
/* [r]redis or [v]alkey calls a command. We accept "server" too, but
* not "s" because that's "step". Neither can we use [c]all because
* "c" is continue. */
ldbServer(lua, argv, argc);
ldbSendLogs();
} else if ((!strcasecmp(argv[0], "p") || !strcasecmp(argv[0], "print"))) {
if (argc == 2)
ldbPrint(lua, argv[1]);
else
ldbPrintAll(lua);
ldbSendLogs();
} else if (!strcasecmp(argv[0], "l") || !strcasecmp(argv[0], "list")) {
int around = ldb.currentline, ctx = 5;
if (argc > 1) {
int num = atoi(argv[1]);
if (num > 0) around = num;
}
if (argc > 2) ctx = atoi(argv[2]);
ldbList(around, ctx);
ldbSendLogs();
} else if (!strcasecmp(argv[0], "w") || !strcasecmp(argv[0], "whole")) {
ldbList(1, 1000000);
ldbSendLogs();
} else {
ldbLog(sdsnew("<error> Unknown Lua debugger command or "
"wrong number of arguments."));
ldbSendLogs();
}
/* Free the command vector. */
sdsfreesplitres(argv, argc);
}
/* Free the current command argv if we break inside the while loop. */
sdsfreesplitres(argv, argc);
return C_OK;
}
int ldbIsActive(void) {
return ldb.active;
}
int ldbGetCurrentLine(void) {
return ldb.currentline;
}
void ldbSetCurrentLine(int line) {
ldb.currentline = line;
}
void ldbSetBreakpointOnNextLine(int enable) {
ldb.luabp = enable;
}
int ldbIsBreakpointOnNextLineEnabled(void) {
return ldb.luabp;
}
int ldbShouldBreak(void) {
return ldbIsBreakpoint(ldb.currentline) || ldb.luabp;
}
int ldbIsStepEnabled(void) {
return ldb.step;
}
void ldbSetStepMode(int enable) {
ldb.step = enable;
}

32
src/lua/debug_lua.h Normal file
View File

@ -0,0 +1,32 @@
#ifndef _LUA_DEBUG_H_
#define _LUA_DEBUG_H_
typedef char *sds;
typedef struct lua_State lua_State;
typedef struct client client;
void ldbInit(void);
int ldbIsEnabled(void);
void ldbDisable(client *c);
void ldbEnable(client *c);
int ldbStartSession(client *c);
void ldbEndSession(client *c);
int ldbIsActive(void);
int ldbGetCurrentLine(void);
void ldbSetCurrentLine(int line);
void ldbSetBreakpointOnNextLine(int enable);
int ldbIsBreakpointOnNextLineEnabled(void);
int ldbShouldBreak(void);
int ldbIsStepEnabled(void);
void ldbSetStepMode(int enable);
void ldbLogSourceLine(int lnum);
void ldbLog(sds entry);
void ldbLogRespReply(char *reply);
int ldbRepl(lua_State *lua);
void ldbSendLogs(void);
sds ldbCatStackValue(sds s, lua_State *lua, int idx);
int ldbRemoveChild(int pid);
int ldbPendingChildren(void);
void ldbKillForkedSessions(void);
#endif /* _LUA_DEBUG_H_ */

392
src/lua/engine_lua.c Normal file
View File

@ -0,0 +1,392 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
#include "engine_lua.h"
#include "function_lua.h"
#include "script_lua.h"
#include "debug_lua.h"
#include "../dict.h"
#include "../adlist.h"
#define LUA_ENGINE_NAME "LUA"
#define REGISTRY_ERROR_HANDLER_NAME "__ERROR_HANDLER__"
typedef struct luaFunction {
lua_State *lua; /* Pointer to the lua context where this function was created. Only used in EVAL context. */
int function_ref; /* Special ID that allows getting the Lua function object from the Lua registry */
} luaFunction;
typedef struct luaEngineCtx {
lua_State *eval_lua; /* The Lua interpreter for EVAL commands. We use just one for all EVAL calls */
lua_State *function_lua; /* The Lua interpreter for FCALL commands. We use just one for all FCALL calls */
} luaEngineCtx;
/* Adds server.debug() function used by lua debugger
*
* Log a string message into the output console.
* Can take multiple arguments that will be separated by commas.
* Nothing is returned to the caller. */
static int luaServerDebugCommand(lua_State *lua) {
if (!ldbIsActive()) return 0;
int argc = lua_gettop(lua);
sds log = sdscatprintf(sdsempty(), "<debug> line %d: ", ldbGetCurrentLine());
while (argc--) {
log = ldbCatStackValue(log, lua, -1 - argc);
if (argc != 0) log = sdscatlen(log, ", ", 2);
}
ldbLog(log);
return 0;
}
/* Adds server.breakpoint() function used by lua debugger.
*
* Allows to stop execution during a debugging session from within
* the Lua code implementation, like if a breakpoint was set in the code
* immediately after the function. */
static int luaServerBreakpointCommand(lua_State *lua) {
if (ldbIsActive()) {
ldbSetBreakpointOnNextLine(1);
lua_pushboolean(lua, 1);
} else {
lua_pushboolean(lua, 0);
}
return 1;
}
/* Adds server.replicate_commands()
*
* DEPRECATED: Now do nothing and always return true.
* Turn on single commands replication if the script never called
* a write command so far, and returns true. Otherwise if the script
* already started to write, returns false and stick to whole scripts
* replication, which is our default. */
int luaServerReplicateCommandsCommand(lua_State *lua) {
lua_pushboolean(lua, 1);
return 1;
}
static void luaStateInstallErrorHandler(lua_State *lua) {
/* Add a helper function we use for pcall error reporting.
* Note that when the error is in the C function we want to report the
* information about the caller, that's what makes sense from the point
* of view of the user debugging a script. */
lua_pushstring(lua, REGISTRY_ERROR_HANDLER_NAME);
char *errh_func = "local dbg = debug\n"
"debug = nil\n"
"local error_handler = function (err)\n"
" local i = dbg.getinfo(2,'nSl')\n"
" if i and i.what == 'C' then\n"
" i = dbg.getinfo(3,'nSl')\n"
" end\n"
" if type(err) ~= 'table' then\n"
" err = {err='ERR ' .. tostring(err)}"
" end"
" if i then\n"
" err['source'] = i.source\n"
" err['line'] = i.currentline\n"
" end"
" return err\n"
"end\n"
"return error_handler";
luaL_loadbuffer(lua, errh_func, strlen(errh_func), "@err_handler_def");
lua_pcall(lua, 0, 1, 0);
lua_settable(lua, LUA_REGISTRYINDEX);
}
static void luaStateLockGlobalTable(lua_State *lua) {
/* Lock the global table from any changes */
lua_pushvalue(lua, LUA_GLOBALSINDEX);
luaSetErrorMetatable(lua);
/* Recursively lock all tables that can be reached from the global table */
luaSetTableProtectionRecursively(lua);
lua_pop(lua, 1);
}
static void initializeEvalLuaState(lua_State *lua) {
/* register debug commands. we only need to add it under 'server' as 'redis'
* is effectively aliased to 'server' table at this point. */
lua_getglobal(lua, "server");
/* server.breakpoint */
lua_pushstring(lua, "breakpoint");
lua_pushcfunction(lua, luaServerBreakpointCommand);
lua_settable(lua, -3);
/* server.debug */
lua_pushstring(lua, "debug");
lua_pushcfunction(lua, luaServerDebugCommand);
lua_settable(lua, -3);
/* server.replicate_commands */
lua_pushstring(lua, "replicate_commands");
lua_pushcfunction(lua, luaServerReplicateCommandsCommand);
lua_settable(lua, -3);
lua_setglobal(lua, "server");
/* Duplicate the function with __server__err__hanler and
* __redis__err_handler name for backwards compatibility. */
lua_pushstring(lua, REGISTRY_ERROR_HANDLER_NAME);
lua_gettable(lua, LUA_REGISTRYINDEX);
lua_setglobal(lua, "__server__err__handler");
lua_getglobal(lua, "__server__err__handler");
lua_setglobal(lua, "__redis__err__handler");
}
static void initializeLuaState(luaEngineCtx *lua_engine_ctx,
subsystemType type) {
lua_State *lua = lua_open();
if (type == VMSE_EVAL) {
lua_engine_ctx->eval_lua = lua;
} else {
serverAssert(type == VMSE_FUNCTION);
lua_engine_ctx->function_lua = lua;
}
luaRegisterServerAPI(lua);
luaStateInstallErrorHandler(lua);
if (type == VMSE_EVAL) {
initializeEvalLuaState(lua);
luaStateLockGlobalTable(lua);
} else {
luaStateLockGlobalTable(lua);
luaFunctionInitializeLuaState(lua);
}
}
static struct luaEngineCtx *createEngineContext(void) {
luaEngineCtx *lua_engine_ctx = zmalloc(sizeof(*lua_engine_ctx));
initializeLuaState(lua_engine_ctx, VMSE_EVAL);
initializeLuaState(lua_engine_ctx, VMSE_FUNCTION);
return lua_engine_ctx;
}
static engineMemoryInfo luaEngineGetMemoryInfo(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
subsystemType type) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
luaEngineCtx *lua_engine_ctx = engine_ctx;
engineMemoryInfo mem_info = {0};
if (type == VMSE_EVAL || type == VMSE_ALL) {
mem_info.used_memory += luaMemory(lua_engine_ctx->eval_lua);
}
if (type == VMSE_FUNCTION || type == VMSE_ALL) {
mem_info.used_memory += luaMemory(lua_engine_ctx->function_lua);
}
mem_info.engine_memory_overhead = zmalloc_size(engine_ctx);
return mem_info;
}
static compiledFunction **luaEngineCompileCode(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
subsystemType type,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
robj **err) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
luaEngineCtx *lua_engine_ctx = (luaEngineCtx *)engine_ctx;
compiledFunction **functions = NULL;
if (type == VMSE_EVAL) {
lua_State *lua = lua_engine_ctx->eval_lua;
if (luaL_loadbuffer(
lua, code, strlen(code), "@user_script")) {
sds error = sdscatfmt(sdsempty(), "Error compiling script (new function): %s", lua_tostring(lua, -1));
*err = createObject(OBJ_STRING, error);
lua_pop(lua, 1);
return functions;
}
serverAssert(lua_isfunction(lua, -1));
int function_ref = luaL_ref(lua, LUA_REGISTRYINDEX);
luaFunction *script = zcalloc(sizeof(luaFunction));
*script = (luaFunction){
.lua = lua,
.function_ref = function_ref,
};
compiledFunction *func = zcalloc(sizeof(*func));
*func = (compiledFunction){
.name = NULL,
.function = script,
.desc = NULL,
.f_flags = 0};
*out_num_compiled_functions = 1;
functions = zcalloc(sizeof(compiledFunction *));
*functions = func;
} else {
functions = luaFunctionLibraryCreate(lua_engine_ctx->function_lua,
code,
timeout,
out_num_compiled_functions,
err);
}
return functions;
}
static void luaEngineFunctionCall(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
serverRuntimeCtx *server_ctx,
compiledFunction *compiled_function,
subsystemType type,
robj **keys,
size_t nkeys,
robj **args,
size_t nargs) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
luaEngineCtx *lua_engine_ctx = (luaEngineCtx *)engine_ctx;
lua_State *lua = NULL;
int lua_function_ref = -1;
if (type == VMSE_EVAL) {
lua = lua_engine_ctx->eval_lua;
luaFunction *script = compiled_function->function;
lua_function_ref = script->function_ref;
} else {
lua = lua_engine_ctx->function_lua;
lua_function_ref = luaFunctionGetLuaFunctionRef(compiled_function);
}
/* Push the pcall error handler function on the stack. */
lua_pushstring(lua, REGISTRY_ERROR_HANDLER_NAME);
lua_gettable(lua, LUA_REGISTRYINDEX);
lua_rawgeti(lua, LUA_REGISTRYINDEX, lua_function_ref);
serverAssert(!lua_isnil(lua, -1));
luaCallFunction(server_ctx,
lua,
keys,
nkeys,
args,
nargs,
type == VMSE_EVAL ? ldbIsActive() : 0);
lua_pop(lua, 1); /* Remove the error handler. */
}
static void resetEvalContext(void *context) {
lua_State *eval_lua = context;
lua_gc(eval_lua, LUA_GCCOLLECT, 0);
lua_close(eval_lua);
#if !defined(USE_LIBC)
/* The lua interpreter may hold a lot of memory internally, and lua is
* using libc. libc may take a bit longer to return the memory to the OS,
* so after lua_close, we call malloc_trim try to purge it earlier.
*
* We do that only when the server itself does not use libc. When Lua and the server
* use different allocators, one won't use the fragmentation holes of the
* other, and released memory can take a long time until it is returned to
* the OS. */
zlibc_trim();
#endif
}
static callableLazyEvalReset *luaEngineResetEvalEnv(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
int async) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
luaEngineCtx *lua_engine_ctx = (luaEngineCtx *)engine_ctx;
serverAssert(lua_engine_ctx->eval_lua);
callableLazyEvalReset *callback = NULL;
if (async) {
callback = zcalloc(sizeof(*callback));
*callback = (callableLazyEvalReset){
.context = lua_engine_ctx->eval_lua,
.engineLazyEvalResetCallback = resetEvalContext,
};
} else {
resetEvalContext(lua_engine_ctx->eval_lua);
}
initializeLuaState(lua_engine_ctx, VMSE_EVAL);
return callback;
}
static size_t luaEngineFunctionMemoryOverhead(ValkeyModuleCtx *module_ctx,
compiledFunction *compiled_function) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
return zmalloc_size(compiled_function->function) +
(compiled_function->name ? zmalloc_size(compiled_function->name) : 0) +
(compiled_function->desc ? zmalloc_size(compiled_function->desc) : 0) +
zmalloc_size(compiled_function);
}
static void luaEngineFreeFunction(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
subsystemType type,
compiledFunction *compiled_function) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
luaEngineCtx *lua_engine_ctx = engine_ctx;
if (type == VMSE_EVAL) {
luaFunction *script = (luaFunction *)compiled_function->function;
if (lua_engine_ctx->eval_lua == script->lua) {
/* The lua context is still the same, which means that we're not
* resetting the whole eval context, and therefore, we need to
* delete the function from the lua context.
*/
lua_unref(lua_engine_ctx->eval_lua, script->function_ref);
}
zfree(script);
} else {
luaFunctionFreeFunction(lua_engine_ctx->function_lua, compiled_function->function);
}
if (compiled_function->name) {
decrRefCount(compiled_function->name);
}
if (compiled_function->desc) {
decrRefCount(compiled_function->desc);
}
zfree(compiled_function);
}
int luaEngineInitEngine(void) {
ldbInit();
engineMethods methods = {
.compile_code = luaEngineCompileCode,
.free_function = luaEngineFreeFunction,
.call_function = luaEngineFunctionCall,
.get_function_memory_overhead = luaEngineFunctionMemoryOverhead,
.reset_eval_env = luaEngineResetEvalEnv,
.get_memory_info = luaEngineGetMemoryInfo,
};
return scriptingEngineManagerRegister(LUA_ENGINE_NAME,
NULL,
createEngineContext(),
&methods);
}

9
src/lua/engine_lua.h Normal file
View File

@ -0,0 +1,9 @@
#ifndef _ENGINE_LUA_
#define _ENGINE_LUA_
#include "../scripting_engine.h"
#include <lua.h>
int luaEngineInitEngine(void);
#endif /* _ENGINE_LUA_ */

View File

@ -39,25 +39,21 @@
* Uses script_lua.c to run the Lua code.
*/
#include "scripting_engine.h"
#include "functions.h"
#include "function_lua.h"
#include "script_lua.h"
#include <lua.h>
#include "../script.h"
#include "../adlist.h"
#include "../monotonic.h"
#include "../server.h"
#include <lauxlib.h>
#include <lualib.h>
#define LUA_ENGINE_NAME "LUA"
#define REGISTRY_ENGINE_CTX_NAME "__ENGINE_CTX__"
#define REGISTRY_ERROR_HANDLER_NAME "__ERROR_HANDLER__"
#define REGISTRY_LOAD_CTX_NAME "__LIBRARY_CTX__"
#define LIBRARY_API_NAME "__LIBRARY_API__"
#define GLOBALS_API_NAME "__GLOBALS_API__"
/* Lua engine ctx */
typedef struct luaEngineCtx {
lua_State *lua;
} luaEngineCtx;
/* Lua function ctx */
typedef struct luaFunctionCtx {
/* Special ID that allows getting the Lua function object from the Lua registry */
@ -70,10 +66,6 @@ typedef struct loadCtx {
size_t timeout;
} loadCtx;
static void luaEngineFreeFunction(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
void *compiled_function);
/* Hook for FUNCTION LOAD execution.
* Used to cancel the execution in case of a timeout (500ms).
* This execution should be fast and should only register
@ -91,19 +83,14 @@ static void luaEngineLoadHook(lua_State *lua, lua_Debug *ar) {
}
}
static void freeCompiledFunc(ValkeyModuleCtx *module_ctx,
luaEngineCtx *lua_engine_ctx,
void *compiled_func) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
compiledFunction *func = compiled_func;
decrRefCount(func->name);
if (func->desc) {
decrRefCount(func->desc);
static void freeCompiledFunc(lua_State *lua,
compiledFunction *compiled_func) {
decrRefCount(compiled_func->name);
if (compiled_func->desc) {
decrRefCount(compiled_func->desc);
}
luaEngineFreeFunction(module_ctx, lua_engine_ctx, func->function);
zfree(func);
luaFunctionFreeFunction(lua, compiled_func->function);
zfree(compiled_func);
}
/*
@ -117,18 +104,12 @@ static void freeCompiledFunc(ValkeyModuleCtx *module_ctx,
*
* Return NULL on compilation error and set the error to the err variable
*/
static compiledFunction **luaEngineCreate(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
robj **err) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
compiledFunction **luaFunctionLibraryCreate(lua_State *lua,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
robj **err) {
compiledFunction **compiled_functions = NULL;
luaEngineCtx *lua_engine_ctx = engine_ctx;
lua_State *lua = lua_engine_ctx->lua;
/* set load library globals */
lua_getmetatable(lua, LUA_GLOBALSINDEX);
@ -163,13 +144,15 @@ static compiledFunction **luaEngineCreate(ValkeyModuleCtx *module_ctx,
*err = createObject(OBJ_STRING, error);
lua_pop(lua, 1); /* pops the error */
luaErrorInformationDiscard(&err_info);
listIter *iter = listGetIterator(load_ctx.functions, AL_START_HEAD);
listNode *node = NULL;
while ((node = listNext(iter)) != NULL) {
freeCompiledFunc(module_ctx, lua_engine_ctx, listNodeValue(node));
freeCompiledFunc(lua, listNodeValue(node));
}
listReleaseIterator(iter);
listRelease(load_ctx.functions);
goto done;
}
@ -200,69 +183,9 @@ done:
return compiled_functions;
}
/*
* Invole the give function with the given keys and args
*/
static void luaEngineCall(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
functionCtx *func_ctx,
void *compiled_function,
robj **keys,
size_t nkeys,
robj **args,
size_t nargs) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
luaEngineCtx *lua_engine_ctx = engine_ctx;
lua_State *lua = lua_engine_ctx->lua;
luaFunctionCtx *f_ctx = compiled_function;
/* Push error handler */
lua_pushstring(lua, REGISTRY_ERROR_HANDLER_NAME);
lua_gettable(lua, LUA_REGISTRYINDEX);
lua_rawgeti(lua, LUA_REGISTRYINDEX, f_ctx->lua_function_ref);
serverAssert(lua_isfunction(lua, -1));
scriptRunCtx *run_ctx = (scriptRunCtx *)func_ctx;
luaCallFunction(run_ctx, lua, keys, nkeys, args, nargs, 0);
lua_pop(lua, 1); /* Pop error handler */
}
static engineMemoryInfo luaEngineGetMemoryInfo(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
luaEngineCtx *lua_engine_ctx = engine_ctx;
return (engineMemoryInfo){
.used_memory = luaMemory(lua_engine_ctx->lua),
.engine_memory_overhead = zmalloc_size(lua_engine_ctx),
};
}
static size_t luaEngineFunctionMemoryOverhead(ValkeyModuleCtx *module_ctx,
void *compiled_function) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
return zmalloc_size(compiled_function);
}
static void luaEngineFreeFunction(ValkeyModuleCtx *module_ctx,
engineCtx *engine_ctx,
void *compiled_function) {
/* The lua engine is implemented in the core, and not in a Valkey Module */
serverAssert(module_ctx == NULL);
luaEngineCtx *lua_engine_ctx = engine_ctx;
lua_State *lua = lua_engine_ctx->lua;
luaFunctionCtx *f_ctx = compiled_function;
lua_unref(lua, f_ctx->lua_function_ref);
zfree(f_ctx);
int luaFunctionGetLuaFunctionRef(compiledFunction *compiled_function) {
luaFunctionCtx *f = compiled_function->function;
return f->lua_function_ref;
}
static void luaRegisterFunctionArgsInitialize(compiledFunction *func,
@ -455,16 +378,15 @@ static int luaRegisterFunctionReadArgs(lua_State *lua, compiledFunction *func) {
}
}
static int luaRegisterFunction(lua_State *lua) {
compiledFunction *func = zcalloc(sizeof(*func));
static int luaFunctionRegisterFunction(lua_State *lua) {
loadCtx *load_ctx = luaGetFromRegistry(lua, REGISTRY_LOAD_CTX_NAME);
if (!load_ctx) {
zfree(func);
luaPushError(lua, "server.register_function can only be called on FUNCTION LOAD command");
return luaError(lua);
}
compiledFunction *func = zcalloc(sizeof(*func));
if (luaRegisterFunctionReadArgs(lua, func) != C_OK) {
zfree(func);
return luaError(lua);
@ -475,93 +397,48 @@ static int luaRegisterFunction(lua_State *lua) {
return 0;
}
/* Initialize Lua engine, should be called once on start. */
int luaEngineInitEngine(void) {
luaEngineCtx *lua_engine_ctx = zmalloc(sizeof(*lua_engine_ctx));
lua_engine_ctx->lua = lua_open();
luaRegisterServerAPI(lua_engine_ctx->lua);
void luaFunctionInitializeLuaState(lua_State *lua) {
/* Register the library commands table and fields and store it to registry */
lua_newtable(lua_engine_ctx->lua); /* load library globals */
lua_newtable(lua_engine_ctx->lua); /* load library `server` table */
lua_newtable(lua); /* load library globals */
lua_newtable(lua); /* load library `server` table */
lua_pushstring(lua_engine_ctx->lua, "register_function");
lua_pushcfunction(lua_engine_ctx->lua, luaRegisterFunction);
lua_settable(lua_engine_ctx->lua, -3);
lua_pushstring(lua, "register_function");
lua_pushcfunction(lua, luaFunctionRegisterFunction);
lua_settable(lua, -3);
luaRegisterLogFunction(lua_engine_ctx->lua);
luaRegisterVersion(lua_engine_ctx->lua);
luaRegisterLogFunction(lua);
luaRegisterVersion(lua);
luaSetErrorMetatable(lua_engine_ctx->lua);
lua_setfield(lua_engine_ctx->lua, -2, SERVER_API_NAME);
luaSetErrorMetatable(lua);
lua_setfield(lua, -2, SERVER_API_NAME);
/* Get the server object and also set it to the Redis API
* compatibility namespace. */
lua_getfield(lua_engine_ctx->lua, -1, SERVER_API_NAME);
lua_setfield(lua_engine_ctx->lua, -2, REDIS_API_NAME);
lua_getfield(lua, -1, SERVER_API_NAME);
lua_setfield(lua, -2, REDIS_API_NAME);
luaSetErrorMetatable(lua_engine_ctx->lua);
luaSetTableProtectionRecursively(lua_engine_ctx->lua); /* protect load library globals */
lua_setfield(lua_engine_ctx->lua, LUA_REGISTRYINDEX, LIBRARY_API_NAME);
/* Save error handler to registry */
lua_pushstring(lua_engine_ctx->lua, REGISTRY_ERROR_HANDLER_NAME);
char *errh_func = "local dbg = debug\n"
"debug = nil\n"
"local error_handler = function (err)\n"
" local i = dbg.getinfo(2,'nSl')\n"
" if i and i.what == 'C' then\n"
" i = dbg.getinfo(3,'nSl')\n"
" end\n"
" if type(err) ~= 'table' then\n"
" err = {err='ERR ' .. tostring(err)}"
" end"
" if i then\n"
" err['source'] = i.source\n"
" err['line'] = i.currentline\n"
" end"
" return err\n"
"end\n"
"return error_handler";
luaL_loadbuffer(lua_engine_ctx->lua, errh_func, strlen(errh_func), "@err_handler_def");
lua_pcall(lua_engine_ctx->lua, 0, 1, 0);
lua_settable(lua_engine_ctx->lua, LUA_REGISTRYINDEX);
lua_pushvalue(lua_engine_ctx->lua, LUA_GLOBALSINDEX);
luaSetErrorMetatable(lua_engine_ctx->lua);
luaSetTableProtectionRecursively(lua_engine_ctx->lua); /* protect globals */
lua_pop(lua_engine_ctx->lua, 1);
luaSetErrorMetatable(lua);
luaSetTableProtectionRecursively(lua); /* protect load library globals */
lua_setfield(lua, LUA_REGISTRYINDEX, LIBRARY_API_NAME);
/* Save default globals to registry */
lua_pushvalue(lua_engine_ctx->lua, LUA_GLOBALSINDEX);
lua_setfield(lua_engine_ctx->lua, LUA_REGISTRYINDEX, GLOBALS_API_NAME);
/* save the engine_ctx on the registry so we can get it from the Lua interpreter */
luaSaveOnRegistry(lua_engine_ctx->lua, REGISTRY_ENGINE_CTX_NAME, lua_engine_ctx);
lua_pushvalue(lua, LUA_GLOBALSINDEX);
lua_setfield(lua, LUA_REGISTRYINDEX, GLOBALS_API_NAME);
/* Create new empty table to be the new globals, we will be able to control the real globals
* using metatable */
lua_newtable(lua_engine_ctx->lua); /* new globals */
lua_newtable(lua_engine_ctx->lua); /* new globals metatable */
lua_pushvalue(lua_engine_ctx->lua, LUA_GLOBALSINDEX);
lua_setfield(lua_engine_ctx->lua, -2, "__index");
lua_enablereadonlytable(lua_engine_ctx->lua, -1, 1); /* protect the metatable */
lua_setmetatable(lua_engine_ctx->lua, -2);
lua_enablereadonlytable(lua_engine_ctx->lua, -1, 1); /* protect the new global table */
lua_replace(lua_engine_ctx->lua, LUA_GLOBALSINDEX); /* set new global table as the new globals */
engineMethods lua_engine_methods = {
.version = VALKEYMODULE_SCRIPTING_ENGINE_ABI_VERSION,
.create_functions_library = luaEngineCreate,
.call_function = luaEngineCall,
.get_function_memory_overhead = luaEngineFunctionMemoryOverhead,
.free_function = luaEngineFreeFunction,
.get_memory_info = luaEngineGetMemoryInfo,
};
return scriptingEngineManagerRegister(LUA_ENGINE_NAME,
NULL,
lua_engine_ctx,
&lua_engine_methods);
lua_newtable(lua); /* new globals */
lua_newtable(lua); /* new globals metatable */
lua_pushvalue(lua, LUA_GLOBALSINDEX);
lua_setfield(lua, -2, "__index");
lua_enablereadonlytable(lua, -1, 1); /* protect the metatable */
lua_setmetatable(lua, -2);
lua_enablereadonlytable(lua, -1, 1); /* protect the new global table */
lua_replace(lua, LUA_GLOBALSINDEX); /* set new global table as the new globals */
}
void luaFunctionFreeFunction(lua_State *lua, void *function) {
luaFunctionCtx *funcCtx = function;
lua_unref(lua, funcCtx->lua_function_ref);
zfree(function);
}

18
src/lua/function_lua.h Normal file
View File

@ -0,0 +1,18 @@
#ifndef _FUNCTION_LUA_H_
#define _FUNCTION_LUA_H_
#include "engine_lua.h"
void luaFunctionInitializeLuaState(lua_State *lua);
compiledFunction **luaFunctionLibraryCreate(lua_State *lua,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
robj **err);
int luaFunctionGetLuaFunctionRef(compiledFunction *compiled_function);
void luaFunctionFreeFunction(lua_State *lua, void *function);
#endif /* _FUNCTION_LUA_H_ */

View File

@ -28,15 +28,16 @@
*/
#include "script_lua.h"
#include "fpconv_dtoa.h"
#include "debug_lua.h"
#include "server.h"
#include "sha1.h"
#include "rand.h"
#include "cluster.h"
#include "monotonic.h"
#include "resp_parser.h"
#include "version.h"
#include "../sha1.h"
#include "../rand.h"
#include "../cluster.h"
#include "../monotonic.h"
#include "../resp_parser.h"
#include "../version.h"
#include <fpconv_dtoa.h>
#include <lauxlib.h>
#include <lualib.h>
#include <ctype.h>
@ -1669,6 +1670,55 @@ void luaExtractErrorInformation(lua_State *lua, errorInfo *err_info) {
lua_pop(lua, 1);
}
/* This is the core of our Lua debugger, called each time Lua is about
* to start executing a new line. */
void luaLdbLineHook(lua_State *lua, lua_Debug *ar) {
scriptRunCtx *rctx = luaGetFromRegistry(lua, REGISTRY_RUN_CTX_NAME);
serverAssert(rctx); /* Only supported inside script invocation */
lua_getstack(lua, 0, ar);
lua_getinfo(lua, "Sl", ar);
ldbSetCurrentLine(ar->currentline);
int bp = ldbShouldBreak();
int timeout = 0;
/* Events outside our script are not interesting. */
if (strstr(ar->short_src, "user_script") == NULL) return;
/* Check if a timeout occurred. */
if (ar->event == LUA_HOOKCOUNT && !ldbIsStepEnabled() && bp == 0) {
mstime_t elapsed = elapsedMs(rctx->start_time);
mstime_t timelimit = server.busy_reply_threshold ? server.busy_reply_threshold : 5000;
if (elapsed >= timelimit) {
timeout = 1;
ldbSetStepMode(1);
} else {
return; /* No timeout, ignore the COUNT event. */
}
}
if (ldbIsStepEnabled() || bp) {
char *reason = "step over";
if (bp)
reason = ldbIsBreakpointOnNextLineEnabled() ? "server.breakpoint() called" : "break point";
else if (timeout)
reason = "timeout reached, infinite loop?";
ldbSetStepMode(0);
ldbSetBreakpointOnNextLine(0);
ldbLog(sdscatprintf(sdsempty(), "* Stopped at %d, stop reason = %s", ldbGetCurrentLine(), reason));
ldbLogSourceLine(ldbGetCurrentLine());
ldbSendLogs();
if (ldbRepl(lua) == C_ERR && timeout) {
/* If the client closed the connection and we have a timeout
* connection, let's kill the script otherwise the process
* will remain blocked indefinitely. */
luaPushError(lua, "timeout during Lua debugging with client closing connection");
luaError(lua);
}
rctx->start_time = getMonotonicUs();
}
}
void luaCallFunction(scriptRunCtx *run_ctx,
lua_State *lua,
robj **keys,

View File

@ -48,14 +48,13 @@
* Uses script.c for interaction back with Redis.
*/
#include "server.h"
#include "script.h"
#include "../server.h"
#include "../script.h"
#include <lua.h>
#include <lauxlib.h>
#include <lualib.h>
#define REGISTRY_RUN_CTX_NAME "__RUN_CTX__"
#define REGISTRY_SET_GLOBALS_PROTECTION_NAME "__GLOBAL_PROTECTION__"
#define REDIS_API_NAME "redis"
#define SERVER_API_NAME "server"

View File

@ -61,8 +61,8 @@
#include "hdr_histogram.h"
#include "crc16_slottable.h"
#include "valkeymodule.h"
#include "io_threads.h"
#include "module.h"
#include "io_threads.h"
#include "scripting_engine.h"
#include <dlfcn.h>
#include <sys/stat.h>
@ -13192,6 +13192,21 @@ int VM_UnregisterScriptingEngine(ValkeyModuleCtx *ctx, const char *engine_name)
return VALKEYMODULE_OK;
}
/* Returns the state of the current function being executed by the scripting
* engine.
*
* `server_ctx` is the server runtime context.
*
* It will return VMSE_STATE_KILLED if the function was already killed either by
* a `SCRIPT KILL`, or `FUNCTION KILL`.
*/
ValkeyModuleScriptingEngineExecutionState VM_GetFunctionExecutionState(
ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx) {
int ret = scriptInterrupt(server_ctx);
serverAssert(ret == SCRIPT_CONTINUE || ret == SCRIPT_KILL);
return ret == SCRIPT_CONTINUE ? VMSE_STATE_EXECUTING : VMSE_STATE_KILLED;
}
/* MODULE command.
*
* MODULE LIST
@ -14064,4 +14079,5 @@ void moduleRegisterCoreAPI(void) {
REGISTER_API(RdbSave);
REGISTER_API(RegisterScriptingEngine);
REGISTER_API(UnregisterScriptingEngine);
REGISTER_API(GetFunctionExecutionState);
}

View File

@ -1,3 +1,9 @@
/*
* Copyright Valkey Contributors.
* All rights reserved.
* SPDX-License-Identifier: BSD-3-Clause
*/
#include "scripting_engine.h"
#include "dict.h"
#include "functions.h"
@ -15,7 +21,7 @@ typedef struct scriptingEngine {
sds name; /* Name of the engine */
ValkeyModule *module; /* the module that implements the scripting engine */
scriptingEngineImpl impl; /* engine context and callbacks to interact with the engine */
client *c; /* Client that is used to run commands */
client *client; /* Client that is used to run commands */
ValkeyModuleCtx *module_ctx; /* Cache of the module context object */
} scriptingEngine;
@ -105,20 +111,21 @@ int scriptingEngineManagerRegister(const char *engine_name,
.impl = {
.ctx = engine_ctx,
.methods = {
.create_functions_library = engine_methods->create_functions_library,
.compile_code = engine_methods->compile_code,
.call_function = engine_methods->call_function,
.free_function = engine_methods->free_function,
.get_function_memory_overhead = engine_methods->get_function_memory_overhead,
.reset_eval_env = engine_methods->reset_eval_env,
.get_memory_info = engine_methods->get_memory_info,
},
},
.c = c,
.client = c,
.module_ctx = engine_module ? moduleAllocateContext() : NULL,
};
dictAdd(engineMgr.engines, engine_name_sds, e);
engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(e);
engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(e, VMSE_ALL);
engineMgr.total_memory_overhead += zmalloc_size(e) +
sdsAllocSize(e->name) +
mem_info.engine_memory_overhead;
@ -141,13 +148,13 @@ int scriptingEngineManagerUnregister(const char *engine_name) {
functionsRemoveLibFromEngine(e);
engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(e);
engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(e, VMSE_ALL);
engineMgr.total_memory_overhead -= zmalloc_size(e) +
sdsAllocSize(e->name) +
mem_info.engine_memory_overhead;
sdsfree(e->name);
freeClient(e->c);
freeClient(e->client);
if (e->module_ctx) {
serverAssert(e->module != NULL);
zfree(e->module_ctx);
@ -163,7 +170,7 @@ int scriptingEngineManagerUnregister(const char *engine_name) {
* Lookups the engine with `engine_name` in the engine manager and returns it if
* it exists. Otherwise returns `NULL`.
*/
scriptingEngine *scriptingEngineManagerFind(sds engine_name) {
scriptingEngine *scriptingEngineManagerFind(const char *engine_name) {
dictEntry *entry = dictFind(engineMgr.engines, engine_name);
if (entry) {
return dictGetVal(entry);
@ -176,7 +183,7 @@ sds scriptingEngineGetName(scriptingEngine *engine) {
}
client *scriptingEngineGetClient(scriptingEngine *engine) {
return engine->c;
return engine->client;
}
ValkeyModule *scriptingEngineGetModule(scriptingEngine *engine) {
@ -214,16 +221,20 @@ static void engineTeardownModuleCtx(scriptingEngine *e) {
}
}
compiledFunction **scriptingEngineCallCreateFunctionsLibrary(scriptingEngine *engine,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
robj **err) {
compiledFunction **scriptingEngineCallCompileCode(scriptingEngine *engine,
subsystemType type,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
robj **err) {
serverAssert(type == VMSE_EVAL || type == VMSE_FUNCTION);
engineSetupModuleCtx(engine, NULL);
compiledFunction **functions = engine->impl.methods.create_functions_library(
compiledFunction **functions = engine->impl.methods.compile_code(
engine->module_ctx,
engine->impl.ctx,
type,
code,
timeout,
out_num_compiled_functions,
@ -234,21 +245,38 @@ compiledFunction **scriptingEngineCallCreateFunctionsLibrary(scriptingEngine *en
return functions;
}
void scriptingEngineCallFreeFunction(scriptingEngine *engine,
subsystemType type,
compiledFunction *compiled_func) {
serverAssert(type == VMSE_EVAL || type == VMSE_FUNCTION);
engineSetupModuleCtx(engine, NULL);
engine->impl.methods.free_function(
engine->module_ctx,
engine->impl.ctx,
type,
compiled_func);
engineTeardownModuleCtx(engine);
}
void scriptingEngineCallFunction(scriptingEngine *engine,
functionCtx *func_ctx,
serverRuntimeCtx *server_ctx,
client *caller,
void *compiled_function,
compiledFunction *compiled_function,
subsystemType type,
robj **keys,
size_t nkeys,
robj **args,
size_t nargs) {
serverAssert(type == VMSE_EVAL || type == VMSE_FUNCTION);
engineSetupModuleCtx(engine, caller);
engine->impl.methods.call_function(
engine->module_ctx,
engine->impl.ctx,
func_ctx,
server_ctx,
compiled_function,
type,
keys,
nkeys,
args,
@ -257,28 +285,34 @@ void scriptingEngineCallFunction(scriptingEngine *engine,
engineTeardownModuleCtx(engine);
}
void scriptingEngineCallFreeFunction(scriptingEngine *engine,
void *compiled_func) {
engineSetupModuleCtx(engine, NULL);
engine->impl.methods.free_function(engine->module_ctx,
engine->impl.ctx,
compiled_func);
engineTeardownModuleCtx(engine);
}
size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine,
void *compiled_function) {
compiledFunction *compiled_function) {
engineSetupModuleCtx(engine, NULL);
size_t mem = engine->impl.methods.get_function_memory_overhead(
engine->module_ctx, compiled_function);
engine->module_ctx,
compiled_function);
engineTeardownModuleCtx(engine);
return mem;
}
engineMemoryInfo scriptingEngineCallGetMemoryInfo(scriptingEngine *engine) {
callableLazyEvalReset *scriptingEngineCallResetEvalEnvFunc(scriptingEngine *engine,
int async) {
engineSetupModuleCtx(engine, NULL);
callableLazyEvalReset *callback = engine->impl.methods.reset_eval_env(
engine->module_ctx,
engine->impl.ctx,
async);
engineTeardownModuleCtx(engine);
return callback;
}
engineMemoryInfo scriptingEngineCallGetMemoryInfo(scriptingEngine *engine,
subsystemType type) {
engineSetupModuleCtx(engine, NULL);
engineMemoryInfo mem_info = engine->impl.methods.get_memory_info(
engine->module_ctx, engine->impl.ctx);
engine->module_ctx,
engine->impl.ctx,
type);
engineTeardownModuleCtx(engine);
return mem_info;
}

View File

@ -9,9 +9,11 @@ typedef struct scriptingEngine scriptingEngine;
/* ValkeyModule type aliases for scripting engine structs and types. */
typedef struct ValkeyModule ValkeyModule;
typedef ValkeyModuleScriptingEngineCtx engineCtx;
typedef ValkeyModuleScriptingEngineFunctionCtx functionCtx;
typedef ValkeyModuleScriptingEngineServerRuntimeCtx serverRuntimeCtx;
typedef ValkeyModuleScriptingEngineCompiledFunction compiledFunction;
typedef ValkeyModuleScriptingEngineSubsystemType subsystemType;
typedef ValkeyModuleScriptingEngineMemoryInfo engineMemoryInfo;
typedef ValkeyModuleScriptingEngineCallableLazyEvalReset callableLazyEvalReset;
typedef ValkeyModuleScriptingEngineMethods engineMethods;
/*
@ -37,9 +39,8 @@ int scriptingEngineManagerRegister(const char *engine_name,
engineCtx *engine_ctx,
engineMethods *engine_methods);
int scriptingEngineManagerUnregister(const char *engine_name);
scriptingEngine *scriptingEngineManagerFind(sds engine_name);
void scriptingEngineManagerForEachEngine(engineIterCallback callback,
void *context);
scriptingEngine *scriptingEngineManagerFind(const char *engine_name);
void scriptingEngineManagerForEachEngine(engineIterCallback callback, void *context);
/*
* Engine API functions.
@ -51,23 +52,34 @@ ValkeyModule *scriptingEngineGetModule(scriptingEngine *engine);
/*
* API to call engine callback functions.
*/
compiledFunction **scriptingEngineCallCreateFunctionsLibrary(scriptingEngine *engine,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
robj **err);
compiledFunction **scriptingEngineCallCompileCode(scriptingEngine *engine,
subsystemType type,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
robj **err);
void scriptingEngineCallFreeFunction(scriptingEngine *engine,
subsystemType type,
compiledFunction *compiled_func);
void scriptingEngineCallFunction(scriptingEngine *engine,
functionCtx *func_ctx,
serverRuntimeCtx *server_ctx,
client *caller,
void *compiled_function,
compiledFunction *compiled_function,
subsystemType type,
robj **keys,
size_t nkeys,
robj **args,
size_t nargs);
void scriptingEngineCallFreeFunction(scriptingEngine *engine,
void *compiled_func);
size_t scriptingEngineCallGetFunctionMemoryOverhead(scriptingEngine *engine,
void *compiled_function);
engineMemoryInfo scriptingEngineCallGetMemoryInfo(scriptingEngine *engine);
compiledFunction *compiled_function);
callableLazyEvalReset *scriptingEngineCallResetEvalEnvFunc(scriptingEngine *engine,
int async);
engineMemoryInfo scriptingEngineCallGetMemoryInfo(scriptingEngine *engine,
subsystemType type);
#endif /* _SCRIPTING_ENGINE_H_ */

View File

@ -44,6 +44,9 @@
#include "sds.h"
#include "module.h"
#include "scripting_engine.h"
#include "lua/engine_lua.h"
#include "lua/debug_lua.h"
#include "eval.h"
#include <time.h>
#include <signal.h>
@ -1377,6 +1380,12 @@ void checkChildrenDone(void) {
}
}
static void sumEngineUsedMemory(scriptingEngine *engine, void *context) {
size_t *total_memory = (size_t *)context;
engineMemoryInfo mem_info = scriptingEngineCallGetMemoryInfo(engine, VMSE_ALL);
*total_memory += mem_info.used_memory;
}
/* Called from serverCron and cronUpdateMemoryStats to update cached memory metrics. */
void cronUpdateMemoryStats(void) {
/* Record the max memory used since the server was started. */
@ -1402,8 +1411,9 @@ void cronUpdateMemoryStats(void) {
/* LUA memory isn't part of zmalloc_used, but it is part of the process RSS,
* so we must deduct it in order to be able to calculate correct
* "allocator fragmentation" ratio */
size_t lua_memory = evalMemory();
server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - lua_memory;
size_t engines_memory = 0;
scriptingEngineManagerForEachEngine(sumEngineUsedMemory, &engines_memory);
server.cron_malloc_stats.allocator_resident = server.cron_malloc_stats.process_rss - engines_memory;
}
if (!server.cron_malloc_stats.allocator_active)
server.cron_malloc_stats.allocator_active = server.cron_malloc_stats.allocator_resident;
@ -2911,12 +2921,25 @@ void initServer(void) {
serverPanic("Scripting engine manager initialization failed, check the server logs.");
}
// Since we initialized the scripting engine manager, we need to ensure
// that commands with `CMD_NOSCRIPT` flag are not allowed to run in scripts.
//
server.script_disable_deny_script = 0;
/* Initialize the LUA scripting engine. */
scriptingInit(1);
if (luaEngineInitEngine() != C_OK) {
serverPanic("Lua engine initialization failed, check the server logs.");
exit(1);
}
/* Initialize the functions engine based off of LUA initialization. */
if (functionsInit() == C_ERR) {
serverPanic("Functions initialization failed, check the server logs.");
}
/* Initialize the EVAL scripting component. */
evalInit();
commandlogInit();
latencyMonitorInit();
initSharedQueryBuf();

View File

@ -51,7 +51,6 @@
#include <syslog.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <lua.h>
#include <signal.h>
#ifdef HAVE_LIBSYSTEMD
@ -3507,17 +3506,9 @@ int redis_check_rdb_main(int argc, char **argv, FILE *fp);
int redis_check_aof_main(int argc, char **argv);
/* Scripting */
void scriptingInit(int setup);
int ldbRemoveChild(pid_t pid);
void ldbKillForkedSessions(void);
int ldbPendingChildren(void);
void luaLdbLineHook(lua_State *lua, lua_Debug *ar);
void freeLuaScriptsSync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua);
void freeLuaScriptsAsync(dict *lua_scripts, list *lua_scripts_lru_list, lua_State *lua);
void freeEvalScripts(dict *scripts, list *scripts_lru_list, list *engine_callbacks);
void freeEvalScriptsAsync(dict *scripts, list *scripts_lru_list, list *engine_callbacks);
void freeFunctionsAsync(functionsLibCtx *lib_ctx);
int ldbIsEnabled(void);
void ldbLog(sds entry);
void ldbLogRespReply(char *reply);
void sha1hex(char *digest, char *script, size_t len);
unsigned long evalMemory(void);
dict *evalScriptsDict(void);
@ -3526,11 +3517,6 @@ uint64_t evalGetCommandFlags(client *c, uint64_t orig_flags);
uint64_t fcallGetCommandFlags(client *c, uint64_t orig_flags);
int isInsideYieldingLongCommand(void);
typedef struct luaScript {
uint64_t flags;
robj *body;
listNode *node; /* list node in lua_scripts_lru_list list. */
} luaScript;
/* Cache of recently used small arguments to avoid malloc calls. */
#define LUA_CMD_OBJCACHE_SIZE 32
#define LUA_CMD_OBJCACHE_MAX_LEN 64

View File

@ -800,7 +800,7 @@ typedef void (*ValkeyModuleUserChangedFunc)(uint64_t client_id, void *privdata);
/* Type definitions for implementing scripting engines modules. */
typedef void ValkeyModuleScriptingEngineCtx;
typedef void ValkeyModuleScriptingEngineFunctionCtx;
typedef void ValkeyModuleScriptingEngineServerRuntimeCtx;
/* This struct represents a scripting engine function that results from the
* compilation of a script by the engine implementation.
@ -811,7 +811,7 @@ typedef void ValkeyModuleScriptingEngineFunctionCtx;
*/
typedef struct ValkeyModuleScriptingEngineCompiledFunction {
ValkeyModuleString *name; /* Function name */
void *function; /* Opaque object representing a function, usually it'
void *function; /* Opaque object representing a function, usually it's
the function compiled code. */
ValkeyModuleString *desc; /* Function description */
uint64_t f_flags; /* Function flags */
@ -831,12 +831,46 @@ typedef struct ValkeyModuleScriptingEngineMemoryInfo {
size_t engine_memory_overhead;
} ValkeyModuleScriptingEngineMemoryInfo;
/* The callback function called when `FUNCTION LOAD` command is called to load
* a library of functions.
* This callback function evaluates the source code passed to `FUNCTION LOAD`
* and registers the functions declared in the source code.
typedef enum ValkeyModuleScriptingEngineSubsystemType {
VMSE_EVAL,
VMSE_FUNCTION,
VMSE_ALL
} ValkeyModuleScriptingEngineSubsystemType;
typedef enum ValkeyModuleScriptingEngineExecutionState {
VMSE_STATE_EXECUTING,
VMSE_STATE_KILLED,
} ValkeyModuleScriptingEngineExecutionState;
typedef struct ValkeyModuleScriptingEngineCallableLazyEvalReset {
void *context;
/*
* Callback function used for resetting the EVAL context implemented by an
* engine. This callback will be called by a background thread when it's
* ready for resetting the context.
*
* - `context`: a generic pointer to a context object, stored in the
* callableLazyEvalReset struct.
*
*/
void (*engineLazyEvalResetCallback)(void *context);
} ValkeyModuleScriptingEngineCallableLazyEvalReset;
/* The callback function called when either `EVAL`, `SCRIPT LOAD`, or
* `FUNCTION LOAD` command is called to compile the code.
* This callback function evaluates the source code passed and produces a list
* of pointers to the compiled functions structure.
* In the `EVAL` and `SCRIPT LOAD` case, the list only contains a single
* function.
* In the `FUNCTION LOAD` case, there are as many functions as there are calls
* to the `server.register_function` function in the source code.
*
* - `engine_ctx`: the engine specific context pointer.
* - `module_ctx`: the module runtime context.
*
* - `engine_ctx`: the scripting engine runtime context.
*
* - `type`: the subsystem type. Either EVAL or FUNCTION.
*
* - `code`: string pointer to the source code.
*
@ -850,28 +884,48 @@ typedef struct ValkeyModuleScriptingEngineMemoryInfo {
* Returns an array of compiled function objects, or `NULL` if some error
* occurred.
*/
typedef ValkeyModuleScriptingEngineCompiledFunction **(*ValkeyModuleScriptingEngineCreateFunctionsLibraryFunc)(
typedef ValkeyModuleScriptingEngineCompiledFunction **(*ValkeyModuleScriptingEngineCompileCodeFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineSubsystemType type,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
ValkeyModuleString **err);
/* The callback function called when `FCALL` command is called on a function
* registered in the scripting engine.
/* Free the given function.
*
* - `module_ctx`: the module runtime context.
*
* - `engine_ctx`: the scripting engine runtime context.
*
* - `type`: the subsystem where the function is associated with, either `EVAL`
* or `FUNCTION`.
*
* - `compiled_function`: the compiled function to be freed.
*/
typedef void (*ValkeyModuleScriptingEngineFreeFunctionFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineSubsystemType type,
ValkeyModuleScriptingEngineCompiledFunction *compiled_function);
/* The callback function called when either `EVAL`, or`FCALL`, command is
* called.
* This callback function executes the `compiled_function` code.
*
* - `module_ctx`: the module runtime context.
*
* - `engine_ctx`: the engine specific context pointer.
* - `engine_ctx`: the scripting engine runtime context.
*
* - `func_ctx`: the context opaque structure that represents the runtime
* context for the function.
* - `server_ctx`: the context opaque structure that represents the server-side
* runtime context for the function.
*
* - `compiled_function`: pointer to the compiled function registered by the
* engine.
*
* - `type`: the subsystem type. Either EVAL or FUNCTION.
*
* - `keys`: the array of key strings passed in the `FCALL` command.
*
* - `nkeys`: the number of elements present in the `keys` array.
@ -883,49 +937,72 @@ typedef ValkeyModuleScriptingEngineCompiledFunction **(*ValkeyModuleScriptingEng
typedef void (*ValkeyModuleScriptingEngineCallFunctionFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineFunctionCtx *func_ctx,
void *compiled_function,
ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx,
ValkeyModuleScriptingEngineCompiledFunction *compiled_function,
ValkeyModuleScriptingEngineSubsystemType type,
ValkeyModuleString **keys,
size_t nkeys,
ValkeyModuleString **args,
size_t nargs);
/* Return memory overhead for a given function, such memory is not counted as
* engine memory but as general structs memory that hold different information
*/
typedef size_t (*ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc)(
ValkeyModuleCtx *module_ctx,
void *compiled_function);
ValkeyModuleScriptingEngineCompiledFunction *compiled_function);
/* Free the given function */
typedef void (*ValkeyModuleScriptingEngineFreeFunctionFunc)(
/* The callback function called when `SCRIPT FLUSH` command is called. The
* engine should reset the runtime environment used for EVAL scripts.
*
* - `module_ctx`: the module runtime context.
*
* - `engine_ctx`: the scripting engine runtime context.
*
* - `async`: if has value 1 then the reset is done asynchronously through
* the callback structure returned by this function.
*/
typedef ValkeyModuleScriptingEngineCallableLazyEvalReset *(*ValkeyModuleScriptingEngineResetEvalEnvFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
void *compiled_function);
int async);
/* Return the current used memory by the engine.
*
* - `module_ctx`: the module runtime context.
*
* - `engine_ctx`: the scripting engine runtime context.
*
* - `type`: the subsystem type.
*/
typedef ValkeyModuleScriptingEngineMemoryInfo (*ValkeyModuleScriptingEngineGetMemoryInfoFunc)(
ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx);
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineSubsystemType type);
typedef struct ValkeyModuleScriptingEngineMethodsV1 {
uint64_t version; /* Version of this structure for ABI compat. */
/* Library create function callback. When a new script is loaded, this
* callback will be called with the script code, and returns a list of
* ValkeyModuleScriptingEngineCompiledFunc objects. */
ValkeyModuleScriptingEngineCreateFunctionsLibraryFunc create_functions_library;
/* Compile code function callback. When a new script is loaded, this
* callback will be called with the script code, compiles it, and returns a
* list of `ValkeyModuleScriptingEngineCompiledFunc` objects. */
ValkeyModuleScriptingEngineCompileCodeFunc compile_code;
/* Function callback to free the memory of a registered engine function. */
ValkeyModuleScriptingEngineFreeFunctionFunc free_function;
/* The callback function called when `FCALL` command is called on a function
* registered in this engine. */
ValkeyModuleScriptingEngineCallFunctionFunc call_function;
/* Function callback to free the memory of a registered engine function. */
ValkeyModuleScriptingEngineFreeFunctionFunc free_function;
/* Function callback to return memory overhead for a given function. */
ValkeyModuleScriptingEngineGetFunctionMemoryOverheadFunc get_function_memory_overhead;
/* The callback function used to reset the runtime environment used
* by the scripting engine for EVAL scripts. */
ValkeyModuleScriptingEngineResetEvalEnvFunc reset_eval_env;
/* Function callback to get the used memory by the engine. */
ValkeyModuleScriptingEngineGetMemoryInfoFunc get_memory_info;
@ -1796,6 +1873,8 @@ VALKEYMODULE_API int (*ValkeyModule_RegisterScriptingEngine)(ValkeyModuleCtx *mo
VALKEYMODULE_API int (*ValkeyModule_UnregisterScriptingEngine)(ValkeyModuleCtx *module_ctx,
const char *engine_name) VALKEYMODULE_ATTR;
VALKEYMODULE_API ValkeyModuleScriptingEngineExecutionState (*ValkeyModule_GetFunctionExecutionState)(ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx) VALKEYMODULE_ATTR;
#define ValkeyModule_IsAOFClient(id) ((id) == UINT64_MAX)
/* This is included inline inside each Valkey module. */
@ -2165,6 +2244,7 @@ static int ValkeyModule_Init(ValkeyModuleCtx *ctx, const char *name, int ver, in
VALKEYMODULE_GET_API(RdbSave);
VALKEYMODULE_GET_API(RegisterScriptingEngine);
VALKEYMODULE_GET_API(UnregisterScriptingEngine);
VALKEYMODULE_GET_API(GetFunctionExecutionState);
if (ValkeyModule_IsModuleNameBusy && ValkeyModule_IsModuleNameBusy(name)) return VALKEYMODULE_ERR;
ValkeyModule_SetModuleAttribs(ctx, name, ver, apiver);

View File

@ -3,6 +3,7 @@
#include <ctype.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
/*
* This module implements a very simple stack based scripting language.
@ -28,6 +29,15 @@
* CONSTI 432 # pushes the value 432 to the top of the stack
* RETURN # returns the current value on the top of the stack and marks
* # the end of the function declaration.
*
* FUNCTION sleep # declaration of function 'sleep'
* ARGS 0 # pushes the value in the first argument to the top of the
* # stack
* SLEEP # Pops the current value in the stack and sleeps for `value`
* # seconds
* CONSTI 0 # pushes the value 0 to the top of the stack
* RETURN # returns the current value on the top of the stack and marks
* # the end of the function declaration.
* ```
*/
@ -38,6 +48,7 @@ typedef enum HelloInstKind {
FUNCTION = 0,
CONSTI,
ARGS,
SLEEP,
RETURN,
_NUM_INSTRUCTIONS, // Not a real instruction.
} HelloInstKind;
@ -49,6 +60,7 @@ const char *HelloInstKindStr[] = {
"FUNCTION",
"CONSTI",
"ARGS",
"SLEEP",
"RETURN",
};
@ -185,6 +197,10 @@ static int helloLangParseCode(const char *code,
ValkeyModule_Assert(currentFunc != NULL);
helloLangParseArgs(currentFunc);
break;
case SLEEP:
ValkeyModule_Assert(currentFunc != NULL);
currentFunc->num_instructions++;
break;
case RETURN:
ValkeyModule_Assert(currentFunc != NULL);
currentFunc->num_instructions++;
@ -204,13 +220,40 @@ static int helloLangParseCode(const char *code,
return 0;
}
static ValkeyModuleScriptingEngineExecutionState executeSleepInst(ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx,
uint32_t seconds) {
uint32_t elapsed_milliseconds = 0;
ValkeyModuleScriptingEngineExecutionState state = VMSE_STATE_EXECUTING;
while(1) {
state = ValkeyModule_GetFunctionExecutionState(server_ctx);
if (state != VMSE_STATE_EXECUTING) {
break;
}
if (elapsed_milliseconds >= (seconds * 1000)) {
break;
}
usleep(1000);
elapsed_milliseconds++;
}
return state;
}
/*
* Executes an HELLO function.
*/
static uint32_t executeHelloLangFunction(HelloFunc *func,
ValkeyModuleString **args, int nargs) {
static ValkeyModuleScriptingEngineExecutionState executeHelloLangFunction(ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx,
HelloFunc *func,
ValkeyModuleString **args,
int nargs,
uint32_t *result) {
ValkeyModule_Assert(result != NULL);
uint32_t stack[64];
uint32_t val = 0;
int sp = 0;
ValkeyModuleScriptingEngineExecutionState state = VMSE_STATE_EXECUTING;
for (uint32_t pc = 0; pc < func->num_instructions; pc++) {
HelloInst instr = func->instructions[pc];
@ -226,26 +269,34 @@ static uint32_t executeHelloLangFunction(HelloFunc *func,
uint32_t arg = str2int(argStr);
stack[sp++] = arg;
break;
}
}
case SLEEP: {
val = stack[--sp];
state = executeSleepInst(server_ctx, val);
break;
}
case RETURN: {
ValkeyModule_Assert(sp > 0);
uint32_t val = stack[--sp];
val = stack[--sp];
ValkeyModule_Assert(sp == 0);
return val;
}
*result = val;
return state;
}
case FUNCTION:
default:
case _NUM_INSTRUCTIONS:
ValkeyModule_Assert(0);
}
}
ValkeyModule_Assert(0);
return 0;
return state;
}
static ValkeyModuleScriptingEngineMemoryInfo engineGetMemoryInfo(ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx) {
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineSubsystemType type) {
VALKEYMODULE_NOT_USED(module_ctx);
VALKEYMODULE_NOT_USED(type);
HelloLangCtx *ctx = (HelloLangCtx *)engine_ctx;
ValkeyModuleScriptingEngineMemoryInfo mem_info = {0};
@ -270,32 +321,37 @@ static ValkeyModuleScriptingEngineMemoryInfo engineGetMemoryInfo(ValkeyModuleCtx
}
static size_t engineFunctionMemoryOverhead(ValkeyModuleCtx *module_ctx,
void *compiled_function) {
ValkeyModuleScriptingEngineCompiledFunction *compiled_function) {
VALKEYMODULE_NOT_USED(module_ctx);
HelloFunc *func = (HelloFunc *)compiled_function;
HelloFunc *func = (HelloFunc *)compiled_function->function;
return ValkeyModule_MallocSize(func->name);
}
static void engineFreeFunction(ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
void *compiled_function) {
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineSubsystemType type,
ValkeyModuleScriptingEngineCompiledFunction *compiled_function) {
VALKEYMODULE_NOT_USED(module_ctx);
VALKEYMODULE_NOT_USED(engine_ctx);
VALKEYMODULE_NOT_USED(type);
HelloLangCtx *ctx = (HelloLangCtx *)engine_ctx;
HelloFunc *func = (HelloFunc *)compiled_function;
HelloFunc *func = (HelloFunc *)compiled_function->function;
ctx->program->functions[func->index] = NULL;
ValkeyModule_Free(func->name);
func->name = NULL;
ValkeyModule_Free(func);
ValkeyModule_Free(compiled_function->name);
ValkeyModule_Free(compiled_function);
}
static ValkeyModuleScriptingEngineCompiledFunction **createHelloLangEngine(ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineSubsystemType type,
const char *code,
size_t timeout,
size_t *out_num_compiled_functions,
ValkeyModuleString **err) {
VALKEYMODULE_NOT_USED(module_ctx);
VALKEYMODULE_NOT_USED(type);
VALKEYMODULE_NOT_USED(timeout);
VALKEYMODULE_NOT_USED(err);
@ -346,21 +402,43 @@ static ValkeyModuleScriptingEngineCompiledFunction **createHelloLangEngine(Valke
static void
callHelloLangFunction(ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
ValkeyModuleScriptingEngineFunctionCtx *func_ctx,
void *compiled_function,
ValkeyModuleScriptingEngineServerRuntimeCtx *server_ctx,
ValkeyModuleScriptingEngineCompiledFunction *compiled_function,
ValkeyModuleScriptingEngineSubsystemType type,
ValkeyModuleString **keys, size_t nkeys,
ValkeyModuleString **args, size_t nargs) {
VALKEYMODULE_NOT_USED(engine_ctx);
VALKEYMODULE_NOT_USED(func_ctx);
VALKEYMODULE_NOT_USED(keys);
VALKEYMODULE_NOT_USED(nkeys);
HelloFunc *func = (HelloFunc *)compiled_function;
uint32_t result = executeHelloLangFunction(func, args, nargs);
ValkeyModule_Assert(type == VMSE_EVAL || type == VMSE_FUNCTION);
HelloFunc *func = (HelloFunc *)compiled_function->function;
uint32_t result;
ValkeyModuleScriptingEngineExecutionState state = executeHelloLangFunction(server_ctx, func, args, nargs, &result);
ValkeyModule_Assert(state == VMSE_STATE_KILLED || state == VMSE_STATE_EXECUTING);
if (state == VMSE_STATE_KILLED) {
if (type == VMSE_EVAL) {
ValkeyModule_ReplyWithError(module_ctx, "ERR Script killed by user with SCRIPT KILL.");
}
if (type == VMSE_FUNCTION) {
ValkeyModule_ReplyWithError(module_ctx, "ERR Script killed by user with FUNCTION KILL");
}
}
ValkeyModule_ReplyWithLongLong(module_ctx, result);
}
static ValkeyModuleScriptingEngineCallableLazyEvalReset *helloResetEvalEnv(ValkeyModuleCtx *module_ctx,
ValkeyModuleScriptingEngineCtx *engine_ctx,
int async) {
VALKEYMODULE_NOT_USED(module_ctx);
VALKEYMODULE_NOT_USED(engine_ctx);
VALKEYMODULE_NOT_USED(async);
return NULL;
}
int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx,
ValkeyModuleString **argv,
int argc) {
@ -376,10 +454,11 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx,
ValkeyModuleScriptingEngineMethods methods = {
.version = VALKEYMODULE_SCRIPTING_ENGINE_ABI_VERSION,
.create_functions_library = createHelloLangEngine,
.compile_code = createHelloLangEngine,
.free_function = engineFreeFunction,
.call_function = callHelloLangFunction,
.get_function_memory_overhead = engineFunctionMemoryOverhead,
.free_function = engineFreeFunction,
.reset_eval_env = helloResetEvalEnv,
.get_memory_info = engineGetMemoryInfo,
};
@ -387,7 +466,6 @@ int ValkeyModule_OnLoad(ValkeyModuleCtx *ctx,
"HELLO",
hello_ctx,
&methods);
return VALKEYMODULE_OK;
}

View File

@ -323,7 +323,7 @@ start_server {tags {"scripting repl external:skip"}} {
test {FUNCTION - creation is replicated to replica} {
r function load [get_no_writes_function_code LUA test test {return 'hello'}]
wait_for_condition 150 100 {
wait_for_condition 150 100 {
[r -1 function list] eq {{library_name test engine LUA functions {{name test description {} flags no-writes}}}}
} else {
fail "Failed waiting for function to replicate to replica"
@ -1062,7 +1062,7 @@ start_server {tags {"scripting"}} {
test {FUNCTION - deny oom} {
r FUNCTION load replace {#!lua name=test
server.register_function('f1', function() return redis.call('set', 'x', '1') end)
server.register_function('f1', function() return redis.call('set', 'x', '1') end)
}
r config set maxmemory 1
@ -1093,7 +1093,7 @@ start_server {tags {"scripting"}} {
server.register_function{function_name='f3', callback=function() return redis.call('get', 'x') end, flags={'allow-stale', 'no-writes'}}
server.register_function{function_name='f4', callback=function() return redis.call('info', 'server') end, flags={'allow-stale', 'no-writes'}}
}
r config set replica-serve-stale-data no
r replicaof 127.0.0.1 1
@ -1166,7 +1166,7 @@ start_server {tags {"scripting"}} {
server.register_function('f3', function() return 1 end)
}} e
assert_match "*Library 'test1' already exists*" $e
r function stats
} {running_script {} engines {LUA {libraries_count 1 functions_count 2}}}
@ -1214,7 +1214,7 @@ start_server {tags {"scripting"}} {
r FUNCTION FLUSH
r FUNCTION load {#!lua name=test1
server.register_function('f1', function()
server.register_function('f1', function()
mt = getmetatable(_G)
original_globals = mt.__index
original_globals['redis'] = function() return 1 end

View File

@ -123,6 +123,57 @@ start_server {tags {"modules"}} {
assert_equal $result 432
}
test {Test function kill} {
set rd [valkey_deferring_client]
r config set busy-reply-threshold 10
r function load REPLACE "#!hello name=mylib\nFUNCTION wait\nARGS 0\nSLEEP\nARGS 0\nRETURN"
$rd fcall wait 0 100
after 1000
catch {r ping} e
assert_match {BUSY*} $e
assert_match {running_script {name wait command {fcall wait 0 100} duration_ms *} engines {*}} [r FUNCTION STATS]
r function kill
after 1000 ;
assert_equal [r ping] "PONG"
assert_error {ERR Script killed by user with FUNCTION KILL*} {$rd read}
$rd close
}
test {Test eval execution} {
set result [r eval "#!hello\nFUNCTION foo\nARGS 0\nRETURN" 0 145]
assert_equal $result 145
}
test {Test evalsha execution} {
set sha [r script load "#!hello\nFUNCTION foo\nARGS 0\nRETURN"]
set result [r evalsha $sha 0 167]
assert_equal $result 167
}
test {Test script exists} {
set sha [r script load "#!hello\nFUNCTION foo\nARGS 0\nRETURN"]
set result [r script exists $sha]
assert_equal $result 1
}
test {Test script flush sync} {
set sha [r script load "#!hello\nFUNCTION foo\nARGS 0\nRETURN"]
set result [r script exists $sha]
assert_equal $result 1
r script flush SYNC
set result [r script exists $sha]
assert_equal $result 0
}
test {Test script flush async} {
set sha [r script load "#!hello\nFUNCTION foo\nARGS 0\nRETURN"]
set result [r script exists $sha]
assert_equal $result 1
r script flush ASYNC
set result [r script exists $sha]
assert_equal $result 0
}
test {Unload scripting engine module} {
set result [r module unload helloengine]
assert_equal $result "OK"

View File

@ -1021,11 +1021,11 @@ start_server {tags {"scripting"}} {
return redis.call("EXISTS", "key")
} 1 key] 0
}
test "Script ACL check" {
r acl setuser bob on {>123} {+@scripting} {+set} {~x*}
assert_equal [r auth bob 123] {OK}
# Check permission granted
assert_equal [run_script {
return redis.acl_check_cmd('set','xx',1)
@ -1035,7 +1035,7 @@ start_server {tags {"scripting"}} {
assert_equal [run_script {
return redis.acl_check_cmd('hset','xx','f',1)
} 1 xx] {}
# Check permission denied unauthorised key
# Note: we don't pass the "yy" key as an argument to the script so key acl checks won't block the script
assert_equal [run_script {
@ -1569,16 +1569,16 @@ start_server {tags {"scripting needs:debug external:skip"}} {
reconnect
assert_equal [r ping] {PONG}
}
test {Test scripting debug lua server invocations} {
r script debug sync
r eval {return 'hello'} 0
r eval {return 'hello'} 0
set cmd "*2\r\n\$6\r\nserver\r\n\$4\r\nping\r\n"
r write $cmd
r flush
set ret [r read]
assert_match {*PONG*} $ret
reconnect
reconnect
assert_equal [r ping] {PONG}
}
}
@ -1962,7 +1962,7 @@ start_server {tags {"scripting"}} {
return 1
} 0
} e
assert_match {*Unexpected engine in script shebang*} $e
assert_match {*Could not find scripting engine*} $e
assert_equal [r eval {#!lua
return 1
@ -1989,7 +1989,7 @@ start_server {tags {"scripting"}} {
test "allow-oom shebang flag" {
r set x 123
r config set maxmemory 1
# Fail to execute deny-oom command in OOM condition (backwards compatibility mode without flags)
@ -2062,7 +2062,7 @@ start_server {tags {"scripting"}} {
} 1 x
}
}
start_server {tags {"external:skip"}} {
r -1 set x "some value"
test "no-writes shebang flag on replica" {
@ -2235,13 +2235,13 @@ start_server {tags {"scripting"}} {
return redis.call('get','x')
} 1 x
}
assert_match {foobar} [
r eval {#!lua flags=allow-stale,no-writes
return redis.call('echo','foobar')
} 0
]
# Test again with EVALSHA
set sha [
r script load {#!lua flags=allow-stale,no-writes
@ -2249,7 +2249,7 @@ start_server {tags {"scripting"}} {
}
]
assert_match {foobar} [r evalsha $sha 0]
r replicaof no one
r config set replica-serve-stale-data yes
set _ {}
@ -2301,7 +2301,7 @@ start_server {tags {"scripting"}} {
assert_equal [s total_error_replies] {1}
assert_match {calls=0*rejected_calls=1,failed_calls=0*} [cmdrstat set r]
assert_match {calls=1*rejected_calls=0,failed_calls=0*} [cmdrstat eval r]
# Returning an error object from lua is handled as a valid RESP error result.
r config resetstat
assert_error {OOM command not allowed when used memory > 'maxmemory'.} {
@ -2323,7 +2323,7 @@ start_server {tags {"scripting"}} {
assert_equal [s total_error_replies] {1}
assert_match {calls=1*rejected_calls=0,failed_calls=1*} [cmdrstat select r]
assert_match {calls=1*rejected_calls=0,failed_calls=1*} [cmdrstat eval r]
# redis.pcall() failure due to error in server command returns lua error table with server error message without '-' prefix
r config resetstat
assert_equal [
@ -2379,7 +2379,7 @@ start_server {tags {"scripting"}} {
assert_match {calls=1*rejected_calls=0,failed_calls=1*} [cmdrstat geoadd r]
assert_match {calls=1*rejected_calls=0,failed_calls=1*} [cmdrstat eval r]
} {} {cluster:skip}
test "LUA redis.error_reply API" {
r config resetstat
assert_error {MY_ERR_CODE custom msg} {