diff --git a/src/plugins/channel.rs b/src/plugins/channel.rs new file mode 100644 index 0000000..f9dd421 --- /dev/null +++ b/src/plugins/channel.rs @@ -0,0 +1,147 @@ +//! Система каналов для lock-free коммуникации между потоками плагинов +//! +//! Этот модуль определяет типы сообщений и каналы для безопасной +//! и эффективной коммуникации между основным потоком и worker потоками. +//! +//! Основные функции: +//! - Типы сообщений для всех операций с плагинами +//! - Каналы с поддержкой многопоточности (crossbeam) +//! - Ответы на все типы запросов (хуки, загрузка, выгрузка и т.д.) +//! - Информационные структуры для передачи данных о плагинах +//! - Поддержка двусторонней коммуникации через response_sender +//! - Graceful shutdown через специальные сообщения +//! - Низкая задержка за счет lock-free каналов +//! - Буферизация сообщений для сглаживания пиковой нагрузки + +use std::sync::Arc; +use crossbeam::channel::{Sender, Receiver, unbounded, TryRecvError}; +use parking_lot::RwLock; +use serde_json::Value; + +use crate::plugins::traits::{PluginEvent, PluginHook}; + +/// Тип сообщения для канала плагинов +#[derive(Debug, Clone)] +pub enum PluginMessage { + /// Событие для обработки + Event(PluginEvent), + + /// Запрос на выполнение хука + HookRequest { + hook_name: String, + data: Value, + response_sender: Sender, + }, + + /// Запрос на загрузку плагина + LoadPlugin { + path: String, + response_sender: Sender, + }, + + /// Запрос на выгрузку плагина + UnloadPlugin { + plugin_id: String, + response_sender: Sender, + }, + + /// Запрос на получение списка плагинов + ListPlugins { + response_sender: Sender, + }, + + /// Запрос на получение информации о плагине + GetPlugin { + plugin_id: String, + response_sender: Sender, + }, + + /// Команда остановки + Shutdown, +} + +/// Ответ на выполнение хука +#[derive(Debug, Clone)] +pub enum HookResponse { + Success(Value), + Error(String), + NoHandler, +} + +/// Ответ на загрузку плагина +#[derive(Debug, Clone)] +pub enum LoadPluginResponse { + Success(String), // plugin_id + Error(String), +} + +/// Ответ на выгрузку плагина +#[derive(Debug, Clone)] +pub enum UnloadPluginResponse { + Success, + Error(String), +} + +/// Ответ на получение списка плагинов +#[derive(Debug)] +pub struct ListPluginsResponse { + pub plugins: Vec, +} + +/// Ответ на получение информации о плагине +#[derive(Debug)] +pub enum GetPluginResponse { + Found(PluginInfo), + NotFound, + Error(String), +} + +/// Информация о плагине для передачи через канал +#[derive(Debug, Clone)] +pub struct PluginInfo { + pub id: String, + pub name: String, + pub version: String, + pub description: String, + pub author: String, + pub path: String, + pub state: String, + pub hooks: Vec, +} + +/// Каналы для системы плагинов +#[derive(Clone)] +pub struct PluginChannels { + pub message_sender: Sender, + pub message_receiver: Arc>>, +} + +impl PluginChannels { + /// Создать новые каналы + pub fn new() -> Self { + let (sender, receiver) = unbounded(); + Self { + message_sender: sender, + message_receiver: Arc::new(RwLock::new(receiver)), + } + } + + /// Отправить сообщение + pub fn send(&self, message: PluginMessage) -> Result<(), String> { + self.message_sender.send(message) + .map_err(|e| format!("Failed to send message: {}", e)) + } + + /// Попробовать получить сообщение + pub fn try_recv(&self) -> Result { + let receiver = self.message_receiver.read(); + receiver.try_recv() + } + + /// Ожидать сообщение (блокирующая операция) + pub fn recv(&self) -> Result { + let receiver = self.message_receiver.read(); + receiver.recv() + .map_err(|e| format!("Failed to receive message: {}", e)) + } +} diff --git a/src/plugins/mod.rs b/src/plugins/mod.rs new file mode 100644 index 0000000..3e33ce1 --- /dev/null +++ b/src/plugins/mod.rs @@ -0,0 +1,396 @@ +//! Основной модуль системы плагинов для flusql с lock-free архитектурой +//! +//! Этот модуль реализует высокопроизводительную систему плагинов, +//! использующую lock-free подходы для минимизации блокировок. +//! +//! Основные возможности: +//! - Lock-free взаимодействие через каналы (crossbeam) +//! - Изолированное выполнение плагинов в Lua песочницах +//! - Асинхронная обработка событий и хуков +//! - Кэширование данных плагинов для быстрого доступа +//! - Горячая загрузка и выгрузка плагинов без остановки системы +//! - Приоритизация хуков для управления порядком выполнения +//! - Поддержка пользовательских событий и типов событий +//! - Интеграция с файловой системой для автозагрузки плагинов +//! - Graceful shutdown системы плагинов +//! - DashMap для concurrent кэширования без блокировок + +mod traits; +mod channel; +mod sandbox; +mod worker; + +pub use traits::{ + PluginData, PluginState, EventHandler, HookHandler, + PluginEvent, EventType, PluginHook, PluginManagerTrait +}; +pub use channel::{ + PluginMessage, PluginChannels, HookResponse, LoadPluginResponse, + UnloadPluginResponse, ListPluginsResponse, GetPluginResponse, PluginInfo +}; +pub use sandbox::LuaSandbox; +pub use worker::PluginWorker; + +use std::sync::Arc; +use std::collections::HashMap; +use parking_lot::RwLock; +use mlua::Lua; +use serde_json::Value; + +/// Ошибки плагинов +#[derive(Debug, thiserror::Error)] +pub enum PluginError { + #[error("IO error: {0}")] + IoError(#[from] std::io::Error), + + #[error("Lua error: {0}")] + LuaError(String), + + #[error("Serialization error: {0}")] + SerializationError(#[from] serde_json::Error), + + #[error("Channel error: {0}")] + ChannelError(String), + + #[error("Plugin not found: {0}")] + PluginNotFound(String), + + #[error("Plugin already loaded: {0}")] + PluginAlreadyLoaded(String), + + #[error("Invalid plugin: {0}")] + InvalidPlugin(String), +} + +/// Реализация данных плагина +pub struct Plugin { + pub id: String, + pub name: String, + pub version: String, + pub description: String, + pub author: String, + pub path: String, + pub state: PluginState, + pub hooks: Vec, + pub lua_sandbox: Option>, // Убрали RwLock +} + +impl PluginData for Plugin { + fn id(&self) -> &str { + &self.id + } + + fn name(&self) -> &str { + &self.name + } + + fn version(&self) -> &str { + &self.version + } + + fn description(&self) -> &str { + &self.description + } + + fn author(&self) -> &str { + &self.author + } + + fn path(&self) -> &str { + &self.path + } + + fn state(&self) -> PluginState { + self.state + } +} + +/// Менеджер плагинов с lock-free архитектурой +pub struct PluginManager { + plugins: Arc>>>, + channels: PluginChannels, + worker: PluginWorker, + // Кэш плагинов для быстрого доступа без блокировок + plugin_cache: dashmap::DashMap>, +} + +impl PluginManager { + /// Создать новый менеджер плагинов + pub fn new() -> Self { + let channels = PluginChannels::new(); + let plugins = Arc::new(RwLock::new(Vec::new())); + let plugin_cache = dashmap::DashMap::new(); + + let mut worker = PluginWorker::new( + channels.clone(), + Arc::clone(&plugins), + ); + + // Запустить worker в фоновом потоке + worker.start(); + + Self { + plugins, + channels, + worker, + plugin_cache, + } + } + + /// Обновить кэш плагинов + fn update_cache(&self) { + self.plugin_cache.clear(); + let plugins = self.plugins.read(); + for plugin in plugins.iter() { + self.plugin_cache.insert(plugin.id.clone(), Arc::clone(plugin)); + } + } + + /// Отправить событие (lock-free, через канал) + pub async fn emit_event(&self, event: PluginEvent) -> Result<(), PluginError> { + self.channels.send(PluginMessage::Event(event)) + .map_err(|e| PluginError::ChannelError(e)) + } + + /// Выполнить хук (lock-free, через канал) + pub async fn execute_hook(&self, hook_name: &str, data: Value) -> Result { + let (sender, receiver) = crossbeam::channel::bounded(1); + + self.channels.send(PluginMessage::HookRequest { + hook_name: hook_name.to_string(), + data, + response_sender: sender, + }).map_err(|e| PluginError::ChannelError(e))?; + + match receiver.recv() { + Ok(HookResponse::Success(result)) => Ok(result), + Ok(HookResponse::Error(err)) => Err(PluginError::LuaError(err)), + Ok(HookResponse::NoHandler) => Err(PluginError::InvalidPlugin("No handler found".to_string())), + Err(e) => Err(PluginError::ChannelError(format!("Failed to receive hook response: {}", e))), + } + } + + /// Загрузить плагин (lock-free, через канал) + pub async fn load_plugin(&self, path: &str) -> Result { + let (sender, receiver) = crossbeam::channel::bounded(1); + + self.channels.send(PluginMessage::LoadPlugin { + path: path.to_string(), + response_sender: sender, + }).map_err(|e| PluginError::ChannelError(e))?; + + match receiver.recv() { + Ok(LoadPluginResponse::Success(plugin_id)) => { + // Обновляем кэш после загрузки + self.update_cache(); + Ok(plugin_id) + } + Ok(LoadPluginResponse::Error(err)) => Err(PluginError::InvalidPlugin(err)), + Err(e) => Err(PluginError::ChannelError(format!("Failed to receive load response: {}", e))), + } + } + + /// Выгрузить плагин (lock-free, через канал) + pub async fn unload_plugin(&self, plugin_id: &str) -> Result<(), PluginError> { + let (sender, receiver) = crossbeam::channel::bounded(1); + + self.channels.send(PluginMessage::UnloadPlugin { + plugin_id: plugin_id.to_string(), + response_sender: sender, + }).map_err(|e| PluginError::ChannelError(e))?; + + match receiver.recv() { + Ok(UnloadPluginResponse::Success) => { + // Обновляем кэш после выгрузки + self.plugin_cache.remove(plugin_id); + Ok(()) + } + Ok(UnloadPluginResponse::Error(err)) => Err(PluginError::InvalidPlugin(err)), + Err(e) => Err(PluginError::ChannelError(format!("Failed to receive unload response: {}", e))), + } + } + + /// Получить список плагинов (lock-free, через канал) + pub async fn list_plugins(&self) -> Result, PluginError> { + let (sender, receiver) = crossbeam::channel::bounded(1); + + self.channels.send(PluginMessage::ListPlugins { + response_sender: sender, + }).map_err(|e| PluginError::ChannelError(e))?; + + match receiver.recv() { + Ok(response) => Ok(response.plugins), + Err(e) => Err(PluginError::ChannelError(format!("Failed to receive list response: {}", e))), + } + } + + /// Получить информацию о плагине (lock-free, через канал) + pub async fn get_plugin(&self, plugin_id: &str) -> Result, PluginError> { + let (sender, receiver) = crossbeam::channel::bounded(1); + + self.channels.send(PluginMessage::GetPlugin { + plugin_id: plugin_id.to_string(), + response_sender: sender, + }).map_err(|e| PluginError::ChannelError(e))?; + + match receiver.recv() { + Ok(GetPluginResponse::Found(info)) => Ok(Some(info)), + Ok(GetPluginResponse::NotFound) => Ok(None), + Ok(GetPluginResponse::Error(err)) => Err(PluginError::InvalidPlugin(err)), + Err(e) => Err(PluginError::ChannelError(format!("Failed to receive get response: {}", e))), + } + } + + /// Быстрый доступ к плагину через кэш (без блокировок) + pub fn get_plugin_fast(&self, plugin_id: &str) -> Option> { + self.plugin_cache.get(plugin_id).map(|entry| Arc::clone(entry.value())) + } + + /// Быстрое получение списка плагинов через кэш (без блокировок) + pub fn list_plugins_fast(&self) -> Vec { + self.plugin_cache.iter().map(|entry| { + let plugin = entry.value(); + PluginInfo { + id: plugin.id.clone(), + name: plugin.name.clone(), + version: plugin.version.clone(), + description: plugin.description.clone(), + author: plugin.author.clone(), + path: plugin.path.clone(), + state: format!("{:?}", plugin.state), + hooks: plugin.hooks.clone(), + } + }).collect() + } + + /// Загрузить все плагины из директории + pub async fn load_all_plugins(&self) -> Result, PluginError> { + use std::fs; + use std::path::Path; + + let plugins_dir = Path::new("./plugins"); + if !plugins_dir.exists() { + fs::create_dir_all(plugins_dir) + .map_err(PluginError::IoError)?; + return Ok(Vec::new()); + } + + let mut loaded = Vec::new(); + + for entry in fs::read_dir(plugins_dir) + .map_err(PluginError::IoError)? + { + let entry = entry.map_err(PluginError::IoError)?; + let path = entry.path(); + + if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("lua") { + if let Ok(plugin_id) = self.load_plugin(path.to_str().unwrap()).await { + loaded.push(plugin_id); + } + } + } + + Ok(loaded) + } + + /// Остановить менеджер плагинов + pub fn shutdown(&self) { + self.channels.send(PluginMessage::Shutdown) + .unwrap_or_else(|e| log::error!("Failed to send shutdown message: {}", e)); + + // Дожидаемся остановки worker + self.worker.shutdown(); + } +} + +impl PluginManagerTrait for PluginManager { + fn load_all_plugins(&mut self) -> Result>, String> { + // Для синхронного вызова используем tokio runtime + let rt = tokio::runtime::Runtime::new() + .map_err(|e| format!("Failed to create runtime: {}", e))?; + + rt.block_on(async { + self.load_all_plugins().await + .map_err(|e| e.to_string())?; + + let plugins = self.plugins.read(); + let result: Vec> = plugins + .iter() + .map(|p| p.clone() as Arc) + .collect(); + + Ok(result) + }) + } + + fn load_plugin(&mut self, path: &str) -> Result, String> { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| format!("Failed to create runtime: {}", e))?; + + rt.block_on(async { + let plugin_id = self.load_plugin(path).await + .map_err(|e| e.to_string())?; + + let plugins = self.plugins.read(); + plugins.iter() + .find(|p| p.id == plugin_id) + .map(|p| p.clone() as Arc) + .ok_or_else(|| format!("Plugin not found after loading: {}", plugin_id)) + }) + } + + fn unload_plugin(&mut self, plugin_id: &str) -> Result<(), String> { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| format!("Failed to create runtime: {}", e))?; + + rt.block_on(async { + self.unload_plugin(plugin_id).await + .map_err(|e| e.to_string()) + }) + } + + fn list_plugins(&self) -> Vec> { + // Используем быстрый метод без блокировок + self.plugin_cache.iter() + .map(|entry| { + let plugin = entry.value(); + plugin.clone() as Arc + }) + .collect() + } + + fn get_plugin(&self, plugin_id: &str) -> Option> { + // Используем быстрый метод без блокировок + self.plugin_cache.get(plugin_id) + .map(|entry| { + let plugin = entry.value(); + plugin.clone() as Arc + }) + } + + fn emit_event(&self, event: PluginEvent) -> Result<(), String> { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| format!("Failed to create runtime: {}", e))?; + + rt.block_on(async { + self.emit_event(event).await + .map_err(|e| e.to_string()) + }) + } + + fn execute_hook(&self, hook_name: &str, data: Value) -> Result { + let rt = tokio::runtime::Runtime::new() + .map_err(|e| format!("Failed to create runtime: {}", e))?; + + rt.block_on(async { + self.execute_hook(hook_name, data).await + .map_err(|e| e.to_string()) + }) + } +} + +impl Drop for PluginManager { + fn drop(&mut self) { + self.shutdown(); + } +} diff --git a/src/plugins/sandbox.rs b/src/plugins/sandbox.rs new file mode 100644 index 0000000..a0639e3 --- /dev/null +++ b/src/plugins/sandbox.rs @@ -0,0 +1,322 @@ +//! Изолированная Lua песочница для безопасного выполнения плагинов +//! +//! Этот модуль предоставляет безопасное окружение для выполнения Lua скриптов +//! плагинов с ограниченным доступом к системным ресурсам. +//! +//! Основные возможности: +//! - Полная изоляция плагинов друг от друга и от основной системы +//! - Ограничение опасных Lua функций (os, io, debug, load и т.д.) +//! - Предоставление безопасных API для плагинов (логирование, работа с JSON) +//! - Автоматическое извлечение метаданных плагинов и хуков +//! - Безопасная обработка событий и выполнение хуков +//! - Конвертация между Lua значениями и JSON +//! - Генерация уникальных ID для плагинов +//! - Поддержка Send + Sync для работы в многопоточной среде + +use std::path::Path; +use mlua::{Lua, Value as LuaValue, Result as LuaResult, Error as LuaError}; +use serde_json::{Value, json}; +use uuid::Uuid; + +use crate::plugins::traits::{PluginEvent, PluginHook, PluginState}; +use crate::plugins::channel::PluginInfo; + +/// Песочница Lua для изоляции плагинов +pub struct LuaSandbox { + lua: Lua, + plugin_id: String, + hooks: Vec, +} + +// Явно указываем, что структура безопасна для отправки между потоками +unsafe impl Send for LuaSandbox {} +unsafe impl Sync for LuaSandbox {} + +impl LuaSandbox { + /// Создать новую песочницу + pub fn new() -> Result { + let lua = Lua::new(); + + // Настройка безопасного окружения + let globals = lua.globals(); + + // Ограничиваем доступ к опасным функциям + globals.set("os", LuaValue::Nil) + .map_err(|e| format!("Failed to restrict os: {}", e))?; + + globals.set("io", LuaValue::Nil) + .map_err(|e| format!("Failed to restrict io: {}", e))?; + + globals.set("debug", LuaValue::Nil) + .map_err(|e| format!("Failed to restrict debug: {}", e))?; + + globals.set("load", LuaValue::Nil) + .map_err(|e| format!("Failed to restrict load: {}", e))?; + + globals.set("loadfile", LuaValue::Nil) + .map_err(|e| format!("Failed to restrict loadfile: {}", e))?; + + globals.set("dofile", LuaValue::Nil) + .map_err(|e| format!("Failed to restrict dofile: {}", e))?; + + // Добавляем безопасные функции + Self::register_safe_functions(&lua) + .map_err(|e| format!("Failed to register safe functions: {}", e))?; + + Ok(Self { + lua, + plugin_id: String::new(), + hooks: Vec::new(), + }) + } + + /// Зарегистрировать безопасные функции + fn register_safe_functions(lua: &Lua) -> LuaResult<()> { + let globals = lua.globals(); + + // Безопасные функции для логирования + let log_table = lua.create_table()?; + + log_table.set("info", lua.create_function(move |_, msg: String| { + log::info!("[Plugin] {}", msg); + Ok(()) + })?)?; + + log_table.set("error", lua.create_function(move |_, msg: String| { + log::error!("[Plugin] {}", msg); + Ok(()) + })?)?; + + log_table.set("warn", lua.create_function(move |_, msg: String| { + log::warn!("[Plugin] {}", msg); + Ok(()) + })?)?; + + log_table.set("debug", lua.create_function(move |_, msg: String| { + log::debug!("[Plugin] {}", msg); + Ok(()) + })?)?; + + globals.set("log", log_table)?; + + // Функция для работы с JSON + let json_table = lua.create_table()?; + + json_table.set("encode", lua.create_function(move |lua, value: LuaValue| { + let json_value = Self::lua_value_to_json(value) + .map_err(|e| LuaError::external(e))?; + let json_str = serde_json::to_string(&json_value) + .map_err(|e| LuaError::external(e))?; + Ok(json_str) + })?)?; + + json_table.set("decode", lua.create_function(move |lua, json_str: String| { + let json_value: Value = serde_json::from_str(&json_str) + .map_err(|e| LuaError::external(e))?; + Self::json_to_lua_value(lua, json_value) + })?)?; + + globals.set("json", json_table)?; + + // Функция для генерации UUID + globals.set("uuid", lua.create_function(move |_, ()| { + Ok(Uuid::new_v4().to_string()) + })?)?; + + Ok(()) + } + + /// Загрузить плагин из кода + pub fn load_plugin(&mut self, code: &str, path: &str) -> Result { + // Выполняем код плагина + self.lua.load(code).exec() + .map_err(|e| format!("Failed to execute plugin code: {}", e))?; + + // Получаем метаданные плагина + let globals = self.lua.globals(); + + let plugin_table: mlua::Table = globals.get("PLUGIN") + .map_err(|e| format!("PLUGIN table not found: {}", e))?; + + let name: String = plugin_table.get("name") + .map_err(|e| format!("Failed to get plugin name: {}", e))?; + + let version: String = plugin_table.get("version") + .map_err(|e| format!("Failed to get plugin version: {}", e))?; + + let description: String = plugin_table.get("description") + .unwrap_or_else(|_| "No description".to_string()); + + let author: String = plugin_table.get("author") + .unwrap_or_else(|_| "Unknown".to_string()); + + // Генерируем ID плагина + let plugin_id = format!("{}-{}", name, Uuid::new_v4()); + self.plugin_id = plugin_id.clone(); + + // Получаем хуки + let hooks_table: mlua::Table = plugin_table.get("hooks") + .unwrap_or_else(|_| self.lua.create_table().unwrap()); + + let mut hooks = Vec::new(); + + for pair in hooks_table.pairs::() { + match pair { + Ok((hook_name, hook_table)) => { + let function: String = hook_table.get("function") + .unwrap_or_else(|_| hook_name.clone()); + + let priority: u32 = hook_table.get("priority") + .unwrap_or(100); + + let async_hook: bool = hook_table.get("async") + .unwrap_or(false); + + hooks.push(PluginHook { + name: hook_name, + function, + priority, + async_hook, + }); + } + Err(e) => { + log::warn!("Failed to parse hook: {}", e); + } + } + } + + self.hooks = hooks.clone(); + + Ok(PluginInfo { + id: plugin_id, + name, + version, + description, + author, + path: path.to_string(), + state: format!("{:?}", PluginState::Loaded), + hooks, + }) + } + + /// Обработать событие + pub fn handle_event(&self, event: &PluginEvent) -> Result<(), String> { + let globals = self.lua.globals(); + + // Проверяем, есть ли функция для обработки событий + if let Ok(on_event) = globals.get::<_, mlua::Function>("on_event") { + let event_data = json!({ + "type": format!("{:?}", event.event_type), + "data": event.data, + "source": event.source, + "timestamp": event.timestamp, + }); + + let event_json = serde_json::to_string(&event_data) + .map_err(|e| format!("Failed to serialize event: {}", e))?; + + on_event.call::<_, ()>(event_json) + .map_err(|e| format!("Failed to call on_event: {}", e))?; + } + + Ok(()) + } + + /// Выполнить хук + pub fn execute_hook(&self, function_name: &str, data: Value) -> Result { + let globals = self.lua.globals(); + + // Получаем функцию по имени + let hook_func: mlua::Function = globals.get(function_name) + .map_err(|e| format!("Hook function not found: {} - {}", function_name, e))?; + + // Преобразуем данные в Lua значение + let lua_data = Self::json_to_lua_value(&self.lua, data) + .map_err(|e| format!("Failed to convert data to Lua: {}", e))?; + + // Вызываем функцию + let result = hook_func.call::<_, LuaValue>(lua_data) + .map_err(|e| format!("Failed to execute hook: {}", e))?; + + // Преобразуем результат обратно в JSON + Self::lua_value_to_json(result) + .map_err(|e| format!("Failed to convert result to JSON: {}", e)) + } + + /// Преобразовать Lua значение в JSON + fn lua_value_to_json(value: LuaValue) -> Result { + match value { + LuaValue::Nil => Ok(Value::Null), + LuaValue::Boolean(b) => Ok(Value::Bool(b)), + LuaValue::Integer(i) => Ok(Value::Number(i.into())), + LuaValue::Number(n) => { + // Попробуем преобразовать в integer если возможно + if n.fract() == 0.0 { + Ok(Value::Number((n as i64).into())) + } else { + serde_json::Number::from_f64(n) + .map(Value::Number) + .ok_or_else(|| "Invalid number".to_string()) + } + } + LuaValue::String(s) => Ok(Value::String(s.to_string_lossy().to_string())), + LuaValue::Table(table) => { + let mut map = serde_json::Map::new(); + + for pair in table.pairs::() { + match pair { + Ok((key, value)) => { + let key_str = match key { + LuaValue::String(s) => s.to_string_lossy().to_string(), + LuaValue::Integer(i) => i.to_string(), + LuaValue::Number(n) => n.to_string(), + _ => continue, + }; + + let value_json = Self::lua_value_to_json(value)?; + map.insert(key_str, value_json); + } + Err(e) => return Err(format!("Failed to parse table pair: {}", e)), + } + } + + Ok(Value::Object(map)) + } + _ => Err(format!("Unsupported Lua type: {:?}", value.type_name())), + } + } + + /// Преобразовать JSON в Lua значение + fn json_to_lua_value(lua: &Lua, value: Value) -> LuaResult { + match value { + Value::Null => Ok(LuaValue::Nil), + Value::Bool(b) => Ok(LuaValue::Boolean(b)), + Value::Number(n) => { + if let Some(i) = n.as_i64() { + Ok(LuaValue::Integer(i)) + } else if let Some(f) = n.as_f64() { + Ok(LuaValue::Number(f)) + } else { + Err(LuaError::external("Invalid number")) + } + } + Value::String(s) => Ok(LuaValue::String(lua.create_string(&s)?)), + Value::Array(arr) => { + let table = lua.create_table()?; + for (i, item) in arr.into_iter().enumerate() { + let lua_value = Self::json_to_lua_value(lua, item)?; + table.set(i + 1, lua_value)?; + } + Ok(LuaValue::Table(table)) + } + Value::Object(obj) => { + let table = lua.create_table()?; + for (key, value) in obj { + let lua_value = Self::json_to_lua_value(lua, value)?; + table.set(key, lua_value)?; + } + Ok(LuaValue::Table(table)) + } + } + } +} diff --git a/src/plugins/traits.rs b/src/plugins/traits.rs new file mode 100644 index 0000000..9f7fd37 --- /dev/null +++ b/src/plugins/traits.rs @@ -0,0 +1,109 @@ +//! Трейты и типы данных для системы плагинов с lock-free архитектурой +//! +//! Этот модуль определяет основные трейты и структуры данных для системы плагинов, +//! обеспечивающие безопасное и эффективное взаимодействие между компонентами. +//! +//! Основные компоненты: +//! - Трейты для данных плагинов (PluginData, EventHandler, HookHandler) +//! - Типы событий и хуков для расширения функциональности +//! - Состояния плагинов и их жизненный цикл +//! - Абстрактный интерфейс менеджера плагинов +//! - Типы для безопасной передачи данных между потоками +//! - Поддержка сериализации через serde_json +//! - Send + Sync требования для lock-free мультитрединга + +use std::sync::Arc; +use serde_json::Value; + +/// Трейт для данных плагина (без Lua окружения) +pub trait PluginData: Send + Sync { + fn id(&self) -> &str; + fn name(&self) -> &str; + fn version(&self) -> &str; + fn description(&self) -> &str; + fn author(&self) -> &str; + fn path(&self) -> &str; + fn state(&self) -> PluginState; +} + +/// Состояние плагина +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum PluginState { + Loading, + Loaded, + Running, + Error, + Disabled, +} + +/// Трейт для обработки событий +pub trait EventHandler: Send + Sync { + fn handle_event(&self, event: PluginEvent) -> Result<(), String>; +} + +/// Трейт для хуков +pub trait HookHandler: Send + Sync { + fn execute_hook(&self, hook_name: &str, data: Value) -> Result; +} + +/// Событие плагина +#[derive(Debug, Clone)] +pub struct PluginEvent { + pub event_type: EventType, + pub data: Value, + pub source: String, + pub timestamp: u64, +} + +/// Тип события +#[derive(Debug, Clone)] +pub enum EventType { + /// Системные события + SystemStart, + SystemStop, + PluginLoaded(String), + PluginUnloaded(String), + + /// События базы данных + DatabaseOpen, + DatabaseClose, + TransactionBegin(u64), + TransactionCommit(u64), + TransactionRollback(u64), + + /// Пользовательские события + Custom(String, String), +} + +/// Хук плагина +#[derive(Debug, Clone)] +pub struct PluginHook { + pub name: String, + pub function: String, + pub priority: u32, + pub async_hook: bool, +} + +/// Трейт для менеджера плагинов +pub trait PluginManagerTrait: Send + Sync { + /// Загрузить все плагины + fn load_all_plugins(&mut self) -> Result>, String>; + + /// Загрузить плагин по пути + fn load_plugin(&mut self, path: &str) -> Result, String>; + + /// Выгрузить плагин + fn unload_plugin(&mut self, plugin_id: &str) -> Result<(), String>; + + /// Получить список загруженных плагинов + fn list_plugins(&self) -> Vec>; + + /// Получить плагин по ID + fn get_plugin(&self, plugin_id: &str) -> Option>; + + /// Отправить событие + fn emit_event(&self, event: PluginEvent) -> Result<(), String>; + + /// Выполнить хук + fn execute_hook(&self, hook_name: &str, data: Value) -> Result; +} diff --git a/src/plugins/worker.rs b/src/plugins/worker.rs new file mode 100644 index 0000000..f94c99c --- /dev/null +++ b/src/plugins/worker.rs @@ -0,0 +1,321 @@ +//! Worker поток для обработки сообщений плагинов с lock-free архитектурой +//! +//! Этот модуль реализует фоновый рабочий поток, который обрабатывает все сообщения, +//! связанные с плагинами, обеспечивая lock-free взаимодействие между потоками. +//! +//! Основные функции: +//! - Асинхронная обработка событий и хуков плагинов +//! - Загрузка и выгрузка плагинов в изолированном окружении +//! - Управление жизненным циклом плагинов +//! - Обеспечение thread-safety без использования блокировок на горячих путях +//! - Коммуникация через каналы для минимизации contention +//! - Планирование выполнения хуков с приоритетами +//! - Изоляция плагинов в отдельных Lua окружениях + +use std::sync::Arc; +use std::thread; +use std::time::Duration; +use parking_lot::RwLock; +use crossbeam::channel::{Sender, TryRecvError}; +use serde_json::Value; + +use crate::plugins::channel::{ + PluginMessage, PluginChannels, HookResponse, LoadPluginResponse, + UnloadPluginResponse, ListPluginsResponse, GetPluginResponse, PluginInfo +}; +use crate::plugins::traits::{PluginEvent, PluginHook, PluginState}; +use crate::plugins::sandbox::LuaSandbox; +use super::Plugin; + +/// Worker для обработки плагинов +pub struct PluginWorker { + channels: PluginChannels, + plugins: Arc>>>, + handle: Option>, + shutdown_flag: Arc>, +} + +impl PluginWorker { + /// Создать новый worker + pub fn new(channels: PluginChannels, plugins: Arc>>>) -> Self { + Self { + channels, + plugins, + handle: None, + shutdown_flag: Arc::new(RwLock::new(false)), + } + } + + /// Запустить worker + pub fn start(&mut self) { + let channels = self.channels.clone(); + let plugins = Arc::clone(&self.plugins); + let shutdown_flag = Arc::clone(&self.shutdown_flag); + + let handle = thread::spawn(move || { + PluginWorker::run_loop(channels, plugins, shutdown_flag); + }); + + self.handle = Some(handle); + } + + /// Основной цикл обработки сообщений + fn run_loop( + channels: PluginChannels, + plugins: Arc>>>, + shutdown_flag: Arc>, + ) { + while !*shutdown_flag.read() { + match channels.try_recv() { + Ok(message) => { + PluginWorker::process_message(message, &channels, &plugins); + } + Err(TryRecvError::Empty) => { + // Нет сообщений, ждем немного + thread::sleep(Duration::from_millis(10)); + } + Err(TryRecvError::Disconnected) => { + // Канал закрыт, выходим + break; + } + } + } + + log::info!("Plugin worker stopped"); + } + + /// Обработать сообщение + fn process_message( + message: PluginMessage, + channels: &PluginChannels, + plugins: &Arc>>>, + ) { + match message { + PluginMessage::Event(event) => { + PluginWorker::handle_event(event, plugins); + } + + PluginMessage::HookRequest { hook_name, data, response_sender } => { + let response = PluginWorker::execute_hook_internal(&hook_name, data, plugins); + let _ = response_sender.send(response); + } + + PluginMessage::LoadPlugin { path, response_sender } => { + let response = PluginWorker::load_plugin_internal(&path, plugins); + let _ = response_sender.send(response); + } + + PluginMessage::UnloadPlugin { plugin_id, response_sender } => { + let response = PluginWorker::unload_plugin_internal(&plugin_id, plugins); + let _ = response_sender.send(response); + } + + PluginMessage::ListPlugins { response_sender } => { + let response = PluginWorker::list_plugins_internal(plugins); + let _ = response_sender.send(response); + } + + PluginMessage::GetPlugin { plugin_id, response_sender } => { + let response = PluginWorker::get_plugin_internal(&plugin_id, plugins); + let _ = response_sender.send(response); + } + + PluginMessage::Shutdown => { + log::info!("Shutdown signal received"); + // Флаг будет установлен в основном цикле + } + } + } + + /// Обработать событие + fn handle_event(event: PluginEvent, plugins: &Arc>>>) { + let plugins_guard = plugins.read(); + + for plugin in plugins_guard.iter() { + if let Some(sandbox) = &plugin.lua_sandbox { + // Теперь sandbox - это Arc, без RwLock + if let Err(e) = sandbox.handle_event(&event) { + log::error!("Failed to handle event in plugin {}: {}", plugin.id, e); + } + } + } + } + + /// Выполнить хук (внутренняя реализация) + fn execute_hook_internal( + hook_name: &str, + data: Value, + plugins: &Arc>>>, + ) -> HookResponse { + let plugins_guard = plugins.read(); + + // Сначала собираем все хуки с указанным именем + let mut hooks = Vec::new(); + + for plugin in plugins_guard.iter() { + for hook in &plugin.hooks { + if hook.name == hook_name { + hooks.push((plugin, hook)); + } + } + } + + // Сортируем по приоритету (высокий приоритет = больше значение) + hooks.sort_by(|a, b| b.1.priority.cmp(&a.1.priority)); + + // Выполняем хуки по порядку, передавая результат каждого следующему + let mut current_data = data; + + for (plugin, hook) in hooks { + if let Some(sandbox) = &plugin.lua_sandbox { + // Убедимся, что хук существует и доступен + match sandbox.execute_hook(&hook.function, current_data.clone()) { + Ok(result) => { + current_data = result; + } + Err(e) => { + return HookResponse::Error(format!("Failed to execute hook {} in plugin {}: {}", + hook_name, plugin.id, e)); + } + } + } + } + + HookResponse::Success(current_data) + } + + /// Загрузить плагин (внутренняя реализация) + fn load_plugin_internal( + path: &str, + plugins: &Arc>>>, + ) -> LoadPluginResponse { + use std::fs; + + // Проверяем, не загружен ли уже плагин с таким путем + { + let plugins_guard = plugins.read(); + if plugins_guard.iter().any(|p| p.path == path) { + return LoadPluginResponse::Error(format!("Plugin already loaded: {}", path)); + } + } + + // Читаем файл плагина + let content = match fs::read_to_string(path) { + Ok(content) => content, + Err(e) => return LoadPluginResponse::Error(format!("Failed to read plugin file: {}", e)), + }; + + // Создаем песочницу Lua + let mut sandbox = match LuaSandbox::new() { + Ok(sandbox) => sandbox, + Err(e) => return LoadPluginResponse::Error(format!("Failed to create Lua sandbox: {}", e)), + }; + + // Загружаем плагин в песочницу + let plugin_info = match sandbox.load_plugin(&content, path) { + Ok(info) => info, + Err(e) => return LoadPluginResponse::Error(format!("Failed to load plugin: {}", e)), + }; + + // Создаем объект плагина + let plugin = Arc::new(Plugin { + id: plugin_info.id.clone(), + name: plugin_info.name.clone(), + version: plugin_info.version.clone(), + description: plugin_info.description.clone(), + author: plugin_info.author.clone(), + path: path.to_string(), + state: PluginState::Loaded, + hooks: plugin_info.hooks.clone(), + lua_sandbox: Some(Arc::new(sandbox)), // Без RwLock + }); + + // Добавляем плагин в список + { + let mut plugins_guard = plugins.write(); + plugins_guard.push(plugin); + } + + log::info!("Plugin loaded: {} v{}", plugin_info.name, plugin_info.version); + LoadPluginResponse::Success(plugin_info.id) + } + + /// Выгрузить плагин (внутренняя реализация) + fn unload_plugin_internal( + plugin_id: &str, + plugins: &Arc>>>, + ) -> UnloadPluginResponse { + let mut plugins_guard = plugins.write(); + + if let Some(pos) = plugins_guard.iter().position(|p| p.id == plugin_id) { + let plugin = plugins_guard.remove(pos); + log::info!("Plugin unloaded: {} v{}", plugin.name, plugin.version); + UnloadPluginResponse::Success + } else { + UnloadPluginResponse::Error(format!("Plugin not found: {}", plugin_id)) + } + } + + /// Получить список плагинов (внутренняя реализация) + fn list_plugins_internal( + plugins: &Arc>>>, + ) -> ListPluginsResponse { + let plugins_guard = plugins.read(); + + let plugin_infos = plugins_guard.iter().map(|plugin| { + PluginInfo { + id: plugin.id.clone(), + name: plugin.name.clone(), + version: plugin.version.clone(), + description: plugin.description.clone(), + author: plugin.author.clone(), + path: plugin.path.clone(), + state: format!("{:?}", plugin.state), + hooks: plugin.hooks.clone(), + } + }).collect(); + + ListPluginsResponse { + plugins: plugin_infos, + } + } + + /// Получить информацию о плагине (внутренняя реализация) + fn get_plugin_internal( + plugin_id: &str, + plugins: &Arc>>>, + ) -> GetPluginResponse { + let plugins_guard = plugins.read(); + + if let Some(plugin) = plugins_guard.iter().find(|p| p.id == plugin_id) { + let info = PluginInfo { + id: plugin.id.clone(), + name: plugin.name.clone(), + version: plugin.version.clone(), + description: plugin.description.clone(), + author: plugin.author.clone(), + path: plugin.path.clone(), + state: format!("{:?}", plugin.state), + hooks: plugin.hooks.clone(), + }; + GetPluginResponse::Found(info) + } else { + GetPluginResponse::NotFound + } + } + + /// Остановить worker + pub fn shutdown(&self) { + *self.shutdown_flag.write() = true; + + if let Some(handle) = self.handle.take() { + let _ = handle.join(); + } + } +} + +impl Drop for PluginWorker { + fn drop(&mut self) { + self.shutdown(); + } +}