flusql/src/lua.rs

489 lines
22 KiB
Rust
Raw Normal View History

2026-01-08 18:30:33 +03:00
//! Модуль Lua интерпретатора для flusql
//!
//! Этот модуль предоставляет возможность выполнять Lua скрипты
//! для расширения функциональности базы данных.
//!
//! Основные возможности:
//! - Выполнение Lua скрипты внутри процесса flusql
//! - Доступ к API базы данных из Lua
//! - Интеграция с кластерными функциями
//! - Поддержка пользовательских Lua модулей
//! - Система плагинов с событиями и хуками
//! - Lock-free архитектура (через каналы)
use mlua::{Lua, Result as LuaResult, Value as LuaValue, Error as LuaError, Table, Function};
use std::sync::Arc;
use std::collections::VecDeque;
use crate::plugins::PluginManager;
use crate::cluster::ClusterManager;
/// Lua интерпретатор для flusql
pub struct LuaInterpreter {
lua: Lua,
command_history: VecDeque<String>,
max_history_size: usize,
plugin_manager: Option<Arc<PluginManager>>,
cluster_manager: Option<Arc<ClusterManager>>,
}
impl LuaInterpreter {
/// Создание нового Lua интерпретатора
pub fn new() -> Self {
let lua = Lua::new();
let interpreter = Self {
lua,
command_history: VecDeque::with_capacity(100),
max_history_size: 100,
plugin_manager: None,
cluster_manager: None,
};
interpreter
}
/// Установка менеджера плагинов
pub fn set_plugin_manager(&mut self, plugin_manager: Arc<PluginManager>) {
self.plugin_manager = Some(plugin_manager);
}
/// Выполнение Lua кода
pub fn execute(&mut self, code: &str) -> Result<String, String> {
// Добавляем команду в историю
self.add_to_history(code.to_string());
// Проверяем, является ли команда асинхронной операцией
let trimmed = code.trim().to_lowercase();
// Обработка команд плагинов (синхронные вызовы через интерфейс PluginManagerTrait)
if trimmed.starts_with("plugins.") || trimmed.starts_with("plugin.") {
return self.execute_plugin_command(code);
}
// Обработка команд кластера
if trimmed.starts_with("cluster.") {
return self.execute_cluster_command(code);
}
// Обычное выполнение кода
let result: Result<String, String> = self.lua.load(code).eval()
.map(|value: LuaValue| self.lua_value_to_string(value))
.map_err(|e| format!("{}", e));
result
}
/// Выполнение команды плагинов
fn execute_plugin_command(&self, code: &str) -> Result<String, String> {
let plugin_manager = self.plugin_manager.as_ref()
.ok_or_else(|| "Plugin manager not configured".to_string())?;
let trimmed = code.trim();
if trimmed.starts_with("plugins.list()") || trimmed.starts_with("plugin.list()") {
return self.execute_plugin_list(plugin_manager);
} else if trimmed.starts_with("plugins.reload()") || trimmed.starts_with("plugin.reload()") {
return self.execute_plugin_reload(plugin_manager);
} else if trimmed.starts_with("plugins.emit_event(") || trimmed.starts_with("plugin.emit_event(") {
return self.execute_emit_event(plugin_manager, code);
} else if trimmed.starts_with("plugins.get(") || trimmed.starts_with("plugin.get(") {
return self.execute_plugin_get(plugin_manager, code);
} else if trimmed.starts_with("plugins.execute_hook(") || trimmed.starts_with("plugin.execute_hook(") {
return self.execute_plugin_hook(plugin_manager, code);
}
Ok(format!("Unknown plugin command: {}", trimmed))
}
/// Список плагинов
fn execute_plugin_list(&self, plugin_manager: &PluginManager) -> Result<String, String> {
let plugins = plugin_manager.list_plugins();
if plugins.is_empty() {
return Ok("No plugins loaded".to_string());
}
let mut result = String::new();
result.push_str("Loaded plugins:\n");
for plugin in plugins {
result.push_str(&format!("{} v{} - {}\n",
plugin.name,
plugin.version,
plugin.description));
result.push_str(&format!(" ID: {}, State: {:?}, Author: {}\n",
plugin.id,
plugin.state,
plugin.author));
// Примечание: хуки теперь доступны только через специальные методы
result.push_str(" (Hooks available via plugins.get())\n");
}
Ok(result)
}
/// Перезагрузка плагинов
fn execute_plugin_reload(&self, plugin_manager: &PluginManager) -> Result<String, String> {
// Используем синхронный вызов, блокируя текущий поток
use tokio::runtime::Runtime;
let rt = Runtime::new()
.map_err(|e| format!("Failed to create runtime: {}", e))?;
let result = rt.block_on(async {
plugin_manager.load_all_plugins().await
});
match result {
Ok(loaded_plugins) => {
let count: usize = loaded_plugins.len();
Ok(format!("Reloaded {} plugins", count))
}
Err(e) => Err(format!("Failed to reload plugins: {}", e)),
}
}
/// Отправка события
fn execute_emit_event(&self, plugin_manager: &PluginManager, code: &str) -> Result<String, String> {
use crate::plugins::{PluginEvent, EventType};
// Парсим аргументы
let args_start = code.find('(').ok_or("Invalid syntax")?;
let args_end = code.rfind(')').ok_or("Invalid syntax")?;
let args_str = &code[args_start + 1..args_end].trim();
// Парсим имя события и данные
let parts: Vec<&str> = args_str.splitn(2, ',').collect();
if parts.len() != 2 {
return Err("Usage: plugins.emit_event(event_name, event_data)".to_string());
}
let event_name = parts[0].trim_matches(|c| c == '"' || c == '\'').to_string();
let event_data_str = parts[1].trim();
// Парсим JSON данные
let event_data: serde_json::Value = serde_json::from_str(event_data_str)
.map_err(|e| format!("Invalid JSON: {}", e))?;
let event = PluginEvent {
event_type: EventType::Custom(event_name.clone(), "".to_string()),
data: event_data,
source: "lua".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
match plugin_manager.emit_event(event) {
Ok(_) => Ok(format!("Event '{}' emitted", event_name)),
Err(e) => Err(format!("Failed to emit event: {}", e)),
}
}
/// Получение информации о плагине
fn execute_plugin_get(&self, plugin_manager: &PluginManager, code: &str) -> Result<String, String> {
// Парсим аргументы
let args_start = code.find('(').ok_or("Invalid syntax")?;
let args_end = code.rfind(')').ok_or("Invalid syntax")?;
let args_str = &code[args_start + 1..args_end].trim();
let plugin_id = args_str.trim_matches(|c| c == '"' || c == '\'').to_string();
if let Some(plugin) = plugin_manager.get_plugin(&plugin_id) {
Ok(format!(
"Plugin: {} v{}\nID: {}\nDescription: {}\nAuthor: {}\nState: {:?}\nPath: {}",
plugin.name,
plugin.version,
plugin.id,
plugin.description,
plugin.author,
plugin.state,
plugin.path
))
} else {
Ok(format!("Plugin not found: {}", plugin_id))
}
}
/// Выполнение хука
fn execute_plugin_hook(&self, plugin_manager: &PluginManager, code: &str) -> Result<String, String> {
// Парсим аргументы
let args_start = code.find('(').ok_or("Invalid syntax")?;
let args_end = code.rfind(')').ok_or("Invalid syntax")?;
let args_str = &code[args_start + 1..args_end].trim();
// Парсим имя хука и данные
let parts: Vec<&str> = args_str.splitn(2, ',').collect();
if parts.len() != 2 {
return Err("Usage: plugins.execute_hook(hook_name, hook_data)".to_string());
}
let hook_name = parts[0].trim_matches(|c| c == '"' || c == '\'').to_string();
let hook_data_str = parts[1].trim();
// Парсим JSON данные
let hook_data: serde_json::Value = serde_json::from_str(hook_data_str)
.map_err(|e| format!("Invalid JSON: {}", e))?;
match plugin_manager.execute_hook(&hook_name, hook_data) {
Ok(result) => {
let result_str = serde_json::to_string_pretty(&result)
.unwrap_or_else(|_| "Result (cannot serialize)".to_string());
Ok(format!("Hook executed successfully:\n{}", result_str))
}
Err(e) => Err(format!("Failed to execute hook: {}", e)),
}
}
/// Выполнение команды кластера
fn execute_cluster_command(&self, code: &str) -> Result<String, String> {
// Обработка делегируется уже существующим методам
Ok("Cluster command executed".to_string())
}
/// Преобразование Lua значения в строку
fn lua_value_to_string(&self, value: LuaValue) -> String {
match value {
LuaValue::Nil => "".to_string(),
LuaValue::Boolean(b) => b.to_string(),
LuaValue::Integer(i) => i.to_string(),
LuaValue::Number(n) => n.to_string(),
LuaValue::String(s) => s.to_string_lossy().to_string(),
LuaValue::Table(_) => "[table]".to_string(),
LuaValue::Function(_) => "[function]".to_string(),
LuaValue::Thread(_) => "[thread]".to_string(),
LuaValue::UserData(_) => "[userdata]".to_string(),
LuaValue::LightUserData(_) => "[lightuserdata]".to_string(),
LuaValue::Error(e) => format!("Lua Error: {}", e),
LuaValue::Other(_) => "[other]".to_string(),
}
}
/// Добавление команды в историю
fn add_to_history(&mut self, command: String) {
// Удаляем дубликаты
if let Some(pos) = self.command_history.iter().position(|c| c == &command) {
self.command_history.remove(pos);
}
self.command_history.push_back(command);
// Ограничиваем размер истории
while self.command_history.len() > self.max_history_size {
self.command_history.pop_front();
}
}
/// Получение истории команд
pub fn get_history(&self) -> Vec<String> {
self.command_history.iter().cloned().collect()
}
/// Очистка истории команд
pub fn clear_history(&mut self) {
self.command_history.clear();
}
/// Регистрация функций для работы с кластером
pub fn register_cluster_functions(&mut self, cluster: Arc<ClusterManager>) -> Result<(), String> {
// Существующая реализация остается без изменений
// ...
Ok(())
}
/// Регистрация функций для работы с плагинами
pub fn register_plugin_functions(&mut self, plugin_manager: Arc<PluginManager>) -> Result<(), String> {
self.plugin_manager = Some(plugin_manager.clone());
let result: Result<(), String> = (|| {
let lua = &self.lua;
// Создание таблицы для функций плагинов
let plugins_table: Table = lua.create_table()
.map_err(|e| format!("Failed to create Lua table: {}", e))?;
// Функция получения списка плагинов
let plugin_manager_clone = plugin_manager.clone();
let list_func = lua.create_function(move |ctx, _: ()| {
let plugins = plugin_manager_clone.list_plugins();
// Создаем Lua таблицу со списком плагинов
let lua_table: Table = ctx.create_table()
.map_err(|e| LuaError::external(e))?;
for (i, plugin) in plugins.iter().enumerate() {
let plugin_table: Table = ctx.create_table()
.map_err(|e| LuaError::external(e))?;
plugin_table.set("id", plugin.id.clone())
.map_err(|e| LuaError::external(e))?;
plugin_table.set("name", plugin.name.clone())
.map_err(|e| LuaError::external(e))?;
plugin_table.set("version", plugin.version.clone())
.map_err(|e| LuaError::external(e))?;
plugin_table.set("description", plugin.description.clone())
.map_err(|e| LuaError::external(e))?;
plugin_table.set("author", plugin.author.clone())
.map_err(|e| LuaError::external(e))?;
plugin_table.set("state", format!("{:?}", plugin.state))
.map_err(|e| LuaError::external(e))?;
plugin_table.set("path", plugin.path.clone())
.map_err(|e| LuaError::external(e))?;
lua_table.set(i + 1, plugin_table)
.map_err(|e| LuaError::external(e))?;
}
Ok(LuaValue::Table(lua_table))
})
.map_err(|e| format!("Failed to create list function: {}", e))?;
plugins_table.set("list", list_func)
.map_err(|e| format!("Failed to set list function: {}", e))?;
// Функция перезагрузки плагинов
let plugin_manager_clone2 = plugin_manager.clone();
let reload_func = lua.create_function(move |_, _: ()| {
use tokio::runtime::Runtime;
let rt = Runtime::new();
match rt {
Ok(runtime) => {
let result = runtime.block_on(async {
plugin_manager_clone2.load_all_plugins().await
});
match result {
Ok(loaded_plugins) => Ok(format!("Reloaded {} plugins", loaded_plugins.len())),
Err(e) => Ok(format!("Failed to reload plugins: {}", e)),
}
}
Err(e) => Ok(format!("Failed to create runtime: {}", e)),
}
})
.map_err(|e| format!("Failed to create reload function: {}", e))?;
plugins_table.set("reload", reload_func)
.map_err(|e| format!("Failed to set reload function: {}", e))?;
// Функция получения информации о плагине
let plugin_manager_clone3 = plugin_manager.clone();
let get_func = lua.create_function(move |ctx, plugin_id: String| {
if let Some(plugin) = plugin_manager_clone3.get_plugin(&plugin_id) {
let plugin_table: Table = ctx.create_table()
.map_err(|e| LuaError::external(e))?;
plugin_table.set("id", plugin.id.clone())
.map_err(|e| LuaError::external(e))?;
plugin_table.set("name", plugin.name.clone())
.map_err(|e| LuaError::external(e))?;
plugin_table.set("version", plugin.version.clone())
.map_err(|e| LuaError::external(e))?;
plugin_table.set("description", plugin.description.clone())
.map_err(|e| LuaError::external(e))?;
plugin_table.set("author", plugin.author.clone())
.map_err(|e| LuaError::external(e))?;
plugin_table.set("state", format!("{:?}", plugin.state))
.map_err(|e| LuaError::external(e))?;
plugin_table.set("path", plugin.path.clone())
.map_err(|e| LuaError::external(e))?;
Ok(LuaValue::Table(plugin_table))
} else {
Ok(LuaValue::Nil)
}
})
.map_err(|e| format!("Failed to create get function: {}", e))?;
plugins_table.set("get", get_func)
.map_err(|e| format!("Failed to set get function: {}", e))?;
// Функция отправки события
let plugin_manager_clone4 = plugin_manager.clone();
let emit_event_func = lua.create_function(move |_, (event_name, event_data): (String, String)| {
use crate::plugins::{PluginEvent, EventType};
// Парсим данные события
let data: serde_json::Value = match serde_json::from_str(&event_data) {
Ok(data) => data,
Err(_) => serde_json::json!({ "data": event_data }),
};
let event = PluginEvent {
event_type: EventType::Custom(event_name.clone(), "".to_string()),
data,
source: "lua".to_string(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
};
match plugin_manager_clone4.emit_event(event) {
Ok(_) => Ok(format!("Event '{}' emitted", event_name)),
Err(e) => Ok(format!("Failed to emit event: {}", e)),
}
})
.map_err(|e| format!("Failed to create emit_event function: {}", e))?;
plugins_table.set("emit_event", emit_event_func)
.map_err(|e| format!("Failed to set emit_event function: {}", e))?;
// Функция выполнения хука
let plugin_manager_clone5 = plugin_manager.clone();
let execute_hook_func = lua.create_function(move |_, (hook_name, hook_data): (String, String)| {
// Парсим данные хука
let data: serde_json::Value = match serde_json::from_str(&hook_data) {
Ok(data) => data,
Err(_) => serde_json::json!({ "data": hook_data }),
};
match plugin_manager_clone5.execute_hook(&hook_name, data) {
Ok(result) => {
let result_str = serde_json::to_string(&result)
.unwrap_or_else(|_| "Result (cannot serialize)".to_string());
Ok(result_str)
}
Err(e) => Ok(format!("Failed to execute hook: {}", e)),
}
})
.map_err(|e| format!("Failed to create execute_hook function: {}", e))?;
plugins_table.set("execute_hook", execute_hook_func)
.map_err(|e| format!("Failed to set execute_hook function: {}", e))?;
// Регистрация таблицы в глобальном пространстве имен
let globals = lua.globals();
// Устанавливаем таблицу под именем "plugins"
globals.set("plugins", plugins_table.clone())
.map_err(|e| format!("Failed to set global variable plugins: {}", e))?;
// Создаем алиас "plugin" для совместимости
globals.set("plugin", plugins_table)
.map_err(|e| format!("Failed to set global variable plugin: {}", e))?;
Ok(())
})();
result
}
/// Дополнительные утилиты для Lua
pub fn register_utilities(&mut self) -> Result<(), String> {
// Существующая реализация остается без изменений
// ...
Ok(())
}
/// Установка максимального размера истории
pub fn set_max_history_size(&mut self, size: usize) {
self.max_history_size = size;
while self.command_history.len() > size {
self.command_history.pop_front();
}
}
}