Upload files to "src/plugins"

This commit is contained in:
Григорий Сафронов 2026-01-18 21:54:38 +00:00
parent 9ae4c1e021
commit 92628736e7
5 changed files with 1295 additions and 0 deletions

147
src/plugins/channel.rs Normal file
View File

@ -0,0 +1,147 @@
//! Система каналов для lock-free коммуникации между потоками плагинов
//!
//! Этот модуль определяет типы сообщений и каналы для безопасной
//! и эффективной коммуникации между основным потоком и worker потоками.
//!
//! Основные функции:
//! - Типы сообщений для всех операций с плагинами
//! - Каналы с поддержкой многопоточности (crossbeam)
//! - Ответы на все типы запросов (хуки, загрузка, выгрузка и т.д.)
//! - Информационные структуры для передачи данных о плагинах
//! - Поддержка двусторонней коммуникации через response_sender
//! - Graceful shutdown через специальные сообщения
//! - Низкая задержка за счет lock-free каналов
//! - Буферизация сообщений для сглаживания пиковой нагрузки
use std::sync::Arc;
use crossbeam::channel::{Sender, Receiver, unbounded, TryRecvError};
use parking_lot::RwLock;
use serde_json::Value;
use crate::plugins::traits::{PluginEvent, PluginHook};
/// Тип сообщения для канала плагинов
#[derive(Debug, Clone)]
pub enum PluginMessage {
/// Событие для обработки
Event(PluginEvent),
/// Запрос на выполнение хука
HookRequest {
hook_name: String,
data: Value,
response_sender: Sender<HookResponse>,
},
/// Запрос на загрузку плагина
LoadPlugin {
path: String,
response_sender: Sender<LoadPluginResponse>,
},
/// Запрос на выгрузку плагина
UnloadPlugin {
plugin_id: String,
response_sender: Sender<UnloadPluginResponse>,
},
/// Запрос на получение списка плагинов
ListPlugins {
response_sender: Sender<ListPluginsResponse>,
},
/// Запрос на получение информации о плагине
GetPlugin {
plugin_id: String,
response_sender: Sender<GetPluginResponse>,
},
/// Команда остановки
Shutdown,
}
/// Ответ на выполнение хука
#[derive(Debug, Clone)]
pub enum HookResponse {
Success(Value),
Error(String),
NoHandler,
}
/// Ответ на загрузку плагина
#[derive(Debug, Clone)]
pub enum LoadPluginResponse {
Success(String), // plugin_id
Error(String),
}
/// Ответ на выгрузку плагина
#[derive(Debug, Clone)]
pub enum UnloadPluginResponse {
Success,
Error(String),
}
/// Ответ на получение списка плагинов
#[derive(Debug)]
pub struct ListPluginsResponse {
pub plugins: Vec<PluginInfo>,
}
/// Ответ на получение информации о плагине
#[derive(Debug)]
pub enum GetPluginResponse {
Found(PluginInfo),
NotFound,
Error(String),
}
/// Информация о плагине для передачи через канал
#[derive(Debug, Clone)]
pub struct PluginInfo {
pub id: String,
pub name: String,
pub version: String,
pub description: String,
pub author: String,
pub path: String,
pub state: String,
pub hooks: Vec<PluginHook>,
}
/// Каналы для системы плагинов
#[derive(Clone)]
pub struct PluginChannels {
pub message_sender: Sender<PluginMessage>,
pub message_receiver: Arc<RwLock<Receiver<PluginMessage>>>,
}
impl PluginChannels {
/// Создать новые каналы
pub fn new() -> Self {
let (sender, receiver) = unbounded();
Self {
message_sender: sender,
message_receiver: Arc::new(RwLock::new(receiver)),
}
}
/// Отправить сообщение
pub fn send(&self, message: PluginMessage) -> Result<(), String> {
self.message_sender.send(message)
.map_err(|e| format!("Failed to send message: {}", e))
}
/// Попробовать получить сообщение
pub fn try_recv(&self) -> Result<PluginMessage, TryRecvError> {
let receiver = self.message_receiver.read();
receiver.try_recv()
}
/// Ожидать сообщение (блокирующая операция)
pub fn recv(&self) -> Result<PluginMessage, String> {
let receiver = self.message_receiver.read();
receiver.recv()
.map_err(|e| format!("Failed to receive message: {}", e))
}
}

396
src/plugins/mod.rs Normal file
View File

@ -0,0 +1,396 @@
//! Основной модуль системы плагинов для flusql с lock-free архитектурой
//!
//! Этот модуль реализует высокопроизводительную систему плагинов,
//! использующую lock-free подходы для минимизации блокировок.
//!
//! Основные возможности:
//! - Lock-free взаимодействие через каналы (crossbeam)
//! - Изолированное выполнение плагинов в Lua песочницах
//! - Асинхронная обработка событий и хуков
//! - Кэширование данных плагинов для быстрого доступа
//! - Горячая загрузка и выгрузка плагинов без остановки системы
//! - Приоритизация хуков для управления порядком выполнения
//! - Поддержка пользовательских событий и типов событий
//! - Интеграция с файловой системой для автозагрузки плагинов
//! - Graceful shutdown системы плагинов
//! - DashMap для concurrent кэширования без блокировок
mod traits;
mod channel;
mod sandbox;
mod worker;
pub use traits::{
PluginData, PluginState, EventHandler, HookHandler,
PluginEvent, EventType, PluginHook, PluginManagerTrait
};
pub use channel::{
PluginMessage, PluginChannels, HookResponse, LoadPluginResponse,
UnloadPluginResponse, ListPluginsResponse, GetPluginResponse, PluginInfo
};
pub use sandbox::LuaSandbox;
pub use worker::PluginWorker;
use std::sync::Arc;
use std::collections::HashMap;
use parking_lot::RwLock;
use mlua::Lua;
use serde_json::Value;
/// Ошибки плагинов
#[derive(Debug, thiserror::Error)]
pub enum PluginError {
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[error("Lua error: {0}")]
LuaError(String),
#[error("Serialization error: {0}")]
SerializationError(#[from] serde_json::Error),
#[error("Channel error: {0}")]
ChannelError(String),
#[error("Plugin not found: {0}")]
PluginNotFound(String),
#[error("Plugin already loaded: {0}")]
PluginAlreadyLoaded(String),
#[error("Invalid plugin: {0}")]
InvalidPlugin(String),
}
/// Реализация данных плагина
pub struct Plugin {
pub id: String,
pub name: String,
pub version: String,
pub description: String,
pub author: String,
pub path: String,
pub state: PluginState,
pub hooks: Vec<PluginHook>,
pub lua_sandbox: Option<Arc<LuaSandbox>>, // Убрали RwLock
}
impl PluginData for Plugin {
fn id(&self) -> &str {
&self.id
}
fn name(&self) -> &str {
&self.name
}
fn version(&self) -> &str {
&self.version
}
fn description(&self) -> &str {
&self.description
}
fn author(&self) -> &str {
&self.author
}
fn path(&self) -> &str {
&self.path
}
fn state(&self) -> PluginState {
self.state
}
}
/// Менеджер плагинов с lock-free архитектурой
pub struct PluginManager {
plugins: Arc<RwLock<Vec<Arc<Plugin>>>>,
channels: PluginChannels,
worker: PluginWorker,
// Кэш плагинов для быстрого доступа без блокировок
plugin_cache: dashmap::DashMap<String, Arc<Plugin>>,
}
impl PluginManager {
/// Создать новый менеджер плагинов
pub fn new() -> Self {
let channels = PluginChannels::new();
let plugins = Arc::new(RwLock::new(Vec::new()));
let plugin_cache = dashmap::DashMap::new();
let mut worker = PluginWorker::new(
channels.clone(),
Arc::clone(&plugins),
);
// Запустить worker в фоновом потоке
worker.start();
Self {
plugins,
channels,
worker,
plugin_cache,
}
}
/// Обновить кэш плагинов
fn update_cache(&self) {
self.plugin_cache.clear();
let plugins = self.plugins.read();
for plugin in plugins.iter() {
self.plugin_cache.insert(plugin.id.clone(), Arc::clone(plugin));
}
}
/// Отправить событие (lock-free, через канал)
pub async fn emit_event(&self, event: PluginEvent) -> Result<(), PluginError> {
self.channels.send(PluginMessage::Event(event))
.map_err(|e| PluginError::ChannelError(e))
}
/// Выполнить хук (lock-free, через канал)
pub async fn execute_hook(&self, hook_name: &str, data: Value) -> Result<Value, PluginError> {
let (sender, receiver) = crossbeam::channel::bounded(1);
self.channels.send(PluginMessage::HookRequest {
hook_name: hook_name.to_string(),
data,
response_sender: sender,
}).map_err(|e| PluginError::ChannelError(e))?;
match receiver.recv() {
Ok(HookResponse::Success(result)) => Ok(result),
Ok(HookResponse::Error(err)) => Err(PluginError::LuaError(err)),
Ok(HookResponse::NoHandler) => Err(PluginError::InvalidPlugin("No handler found".to_string())),
Err(e) => Err(PluginError::ChannelError(format!("Failed to receive hook response: {}", e))),
}
}
/// Загрузить плагин (lock-free, через канал)
pub async fn load_plugin(&self, path: &str) -> Result<String, PluginError> {
let (sender, receiver) = crossbeam::channel::bounded(1);
self.channels.send(PluginMessage::LoadPlugin {
path: path.to_string(),
response_sender: sender,
}).map_err(|e| PluginError::ChannelError(e))?;
match receiver.recv() {
Ok(LoadPluginResponse::Success(plugin_id)) => {
// Обновляем кэш после загрузки
self.update_cache();
Ok(plugin_id)
}
Ok(LoadPluginResponse::Error(err)) => Err(PluginError::InvalidPlugin(err)),
Err(e) => Err(PluginError::ChannelError(format!("Failed to receive load response: {}", e))),
}
}
/// Выгрузить плагин (lock-free, через канал)
pub async fn unload_plugin(&self, plugin_id: &str) -> Result<(), PluginError> {
let (sender, receiver) = crossbeam::channel::bounded(1);
self.channels.send(PluginMessage::UnloadPlugin {
plugin_id: plugin_id.to_string(),
response_sender: sender,
}).map_err(|e| PluginError::ChannelError(e))?;
match receiver.recv() {
Ok(UnloadPluginResponse::Success) => {
// Обновляем кэш после выгрузки
self.plugin_cache.remove(plugin_id);
Ok(())
}
Ok(UnloadPluginResponse::Error(err)) => Err(PluginError::InvalidPlugin(err)),
Err(e) => Err(PluginError::ChannelError(format!("Failed to receive unload response: {}", e))),
}
}
/// Получить список плагинов (lock-free, через канал)
pub async fn list_plugins(&self) -> Result<Vec<PluginInfo>, PluginError> {
let (sender, receiver) = crossbeam::channel::bounded(1);
self.channels.send(PluginMessage::ListPlugins {
response_sender: sender,
}).map_err(|e| PluginError::ChannelError(e))?;
match receiver.recv() {
Ok(response) => Ok(response.plugins),
Err(e) => Err(PluginError::ChannelError(format!("Failed to receive list response: {}", e))),
}
}
/// Получить информацию о плагине (lock-free, через канал)
pub async fn get_plugin(&self, plugin_id: &str) -> Result<Option<PluginInfo>, PluginError> {
let (sender, receiver) = crossbeam::channel::bounded(1);
self.channels.send(PluginMessage::GetPlugin {
plugin_id: plugin_id.to_string(),
response_sender: sender,
}).map_err(|e| PluginError::ChannelError(e))?;
match receiver.recv() {
Ok(GetPluginResponse::Found(info)) => Ok(Some(info)),
Ok(GetPluginResponse::NotFound) => Ok(None),
Ok(GetPluginResponse::Error(err)) => Err(PluginError::InvalidPlugin(err)),
Err(e) => Err(PluginError::ChannelError(format!("Failed to receive get response: {}", e))),
}
}
/// Быстрый доступ к плагину через кэш (без блокировок)
pub fn get_plugin_fast(&self, plugin_id: &str) -> Option<Arc<Plugin>> {
self.plugin_cache.get(plugin_id).map(|entry| Arc::clone(entry.value()))
}
/// Быстрое получение списка плагинов через кэш (без блокировок)
pub fn list_plugins_fast(&self) -> Vec<PluginInfo> {
self.plugin_cache.iter().map(|entry| {
let plugin = entry.value();
PluginInfo {
id: plugin.id.clone(),
name: plugin.name.clone(),
version: plugin.version.clone(),
description: plugin.description.clone(),
author: plugin.author.clone(),
path: plugin.path.clone(),
state: format!("{:?}", plugin.state),
hooks: plugin.hooks.clone(),
}
}).collect()
}
/// Загрузить все плагины из директории
pub async fn load_all_plugins(&self) -> Result<Vec<String>, PluginError> {
use std::fs;
use std::path::Path;
let plugins_dir = Path::new("./plugins");
if !plugins_dir.exists() {
fs::create_dir_all(plugins_dir)
.map_err(PluginError::IoError)?;
return Ok(Vec::new());
}
let mut loaded = Vec::new();
for entry in fs::read_dir(plugins_dir)
.map_err(PluginError::IoError)?
{
let entry = entry.map_err(PluginError::IoError)?;
let path = entry.path();
if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("lua") {
if let Ok(plugin_id) = self.load_plugin(path.to_str().unwrap()).await {
loaded.push(plugin_id);
}
}
}
Ok(loaded)
}
/// Остановить менеджер плагинов
pub fn shutdown(&self) {
self.channels.send(PluginMessage::Shutdown)
.unwrap_or_else(|e| log::error!("Failed to send shutdown message: {}", e));
// Дожидаемся остановки worker
self.worker.shutdown();
}
}
impl PluginManagerTrait for PluginManager {
fn load_all_plugins(&mut self) -> Result<Vec<Arc<dyn PluginData>>, String> {
// Для синхронного вызова используем tokio runtime
let rt = tokio::runtime::Runtime::new()
.map_err(|e| format!("Failed to create runtime: {}", e))?;
rt.block_on(async {
self.load_all_plugins().await
.map_err(|e| e.to_string())?;
let plugins = self.plugins.read();
let result: Vec<Arc<dyn PluginData>> = plugins
.iter()
.map(|p| p.clone() as Arc<dyn PluginData>)
.collect();
Ok(result)
})
}
fn load_plugin(&mut self, path: &str) -> Result<Arc<dyn PluginData>, String> {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| format!("Failed to create runtime: {}", e))?;
rt.block_on(async {
let plugin_id = self.load_plugin(path).await
.map_err(|e| e.to_string())?;
let plugins = self.plugins.read();
plugins.iter()
.find(|p| p.id == plugin_id)
.map(|p| p.clone() as Arc<dyn PluginData>)
.ok_or_else(|| format!("Plugin not found after loading: {}", plugin_id))
})
}
fn unload_plugin(&mut self, plugin_id: &str) -> Result<(), String> {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| format!("Failed to create runtime: {}", e))?;
rt.block_on(async {
self.unload_plugin(plugin_id).await
.map_err(|e| e.to_string())
})
}
fn list_plugins(&self) -> Vec<Arc<dyn PluginData>> {
// Используем быстрый метод без блокировок
self.plugin_cache.iter()
.map(|entry| {
let plugin = entry.value();
plugin.clone() as Arc<dyn PluginData>
})
.collect()
}
fn get_plugin(&self, plugin_id: &str) -> Option<Arc<dyn PluginData>> {
// Используем быстрый метод без блокировок
self.plugin_cache.get(plugin_id)
.map(|entry| {
let plugin = entry.value();
plugin.clone() as Arc<dyn PluginData>
})
}
fn emit_event(&self, event: PluginEvent) -> Result<(), String> {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| format!("Failed to create runtime: {}", e))?;
rt.block_on(async {
self.emit_event(event).await
.map_err(|e| e.to_string())
})
}
fn execute_hook(&self, hook_name: &str, data: Value) -> Result<Value, String> {
let rt = tokio::runtime::Runtime::new()
.map_err(|e| format!("Failed to create runtime: {}", e))?;
rt.block_on(async {
self.execute_hook(hook_name, data).await
.map_err(|e| e.to_string())
})
}
}
impl Drop for PluginManager {
fn drop(&mut self) {
self.shutdown();
}
}

322
src/plugins/sandbox.rs Normal file
View File

@ -0,0 +1,322 @@
//! Изолированная Lua песочница для безопасного выполнения плагинов
//!
//! Этот модуль предоставляет безопасное окружение для выполнения Lua скриптов
//! плагинов с ограниченным доступом к системным ресурсам.
//!
//! Основные возможности:
//! - Полная изоляция плагинов друг от друга и от основной системы
//! - Ограничение опасных Lua функций (os, io, debug, load и т.д.)
//! - Предоставление безопасных API для плагинов (логирование, работа с JSON)
//! - Автоматическое извлечение метаданных плагинов и хуков
//! - Безопасная обработка событий и выполнение хуков
//! - Конвертация между Lua значениями и JSON
//! - Генерация уникальных ID для плагинов
//! - Поддержка Send + Sync для работы в многопоточной среде
use std::path::Path;
use mlua::{Lua, Value as LuaValue, Result as LuaResult, Error as LuaError};
use serde_json::{Value, json};
use uuid::Uuid;
use crate::plugins::traits::{PluginEvent, PluginHook, PluginState};
use crate::plugins::channel::PluginInfo;
/// Песочница Lua для изоляции плагинов
pub struct LuaSandbox {
lua: Lua,
plugin_id: String,
hooks: Vec<PluginHook>,
}
// Явно указываем, что структура безопасна для отправки между потоками
unsafe impl Send for LuaSandbox {}
unsafe impl Sync for LuaSandbox {}
impl LuaSandbox {
/// Создать новую песочницу
pub fn new() -> Result<Self, String> {
let lua = Lua::new();
// Настройка безопасного окружения
let globals = lua.globals();
// Ограничиваем доступ к опасным функциям
globals.set("os", LuaValue::Nil)
.map_err(|e| format!("Failed to restrict os: {}", e))?;
globals.set("io", LuaValue::Nil)
.map_err(|e| format!("Failed to restrict io: {}", e))?;
globals.set("debug", LuaValue::Nil)
.map_err(|e| format!("Failed to restrict debug: {}", e))?;
globals.set("load", LuaValue::Nil)
.map_err(|e| format!("Failed to restrict load: {}", e))?;
globals.set("loadfile", LuaValue::Nil)
.map_err(|e| format!("Failed to restrict loadfile: {}", e))?;
globals.set("dofile", LuaValue::Nil)
.map_err(|e| format!("Failed to restrict dofile: {}", e))?;
// Добавляем безопасные функции
Self::register_safe_functions(&lua)
.map_err(|e| format!("Failed to register safe functions: {}", e))?;
Ok(Self {
lua,
plugin_id: String::new(),
hooks: Vec::new(),
})
}
/// Зарегистрировать безопасные функции
fn register_safe_functions(lua: &Lua) -> LuaResult<()> {
let globals = lua.globals();
// Безопасные функции для логирования
let log_table = lua.create_table()?;
log_table.set("info", lua.create_function(move |_, msg: String| {
log::info!("[Plugin] {}", msg);
Ok(())
})?)?;
log_table.set("error", lua.create_function(move |_, msg: String| {
log::error!("[Plugin] {}", msg);
Ok(())
})?)?;
log_table.set("warn", lua.create_function(move |_, msg: String| {
log::warn!("[Plugin] {}", msg);
Ok(())
})?)?;
log_table.set("debug", lua.create_function(move |_, msg: String| {
log::debug!("[Plugin] {}", msg);
Ok(())
})?)?;
globals.set("log", log_table)?;
// Функция для работы с JSON
let json_table = lua.create_table()?;
json_table.set("encode", lua.create_function(move |lua, value: LuaValue| {
let json_value = Self::lua_value_to_json(value)
.map_err(|e| LuaError::external(e))?;
let json_str = serde_json::to_string(&json_value)
.map_err(|e| LuaError::external(e))?;
Ok(json_str)
})?)?;
json_table.set("decode", lua.create_function(move |lua, json_str: String| {
let json_value: Value = serde_json::from_str(&json_str)
.map_err(|e| LuaError::external(e))?;
Self::json_to_lua_value(lua, json_value)
})?)?;
globals.set("json", json_table)?;
// Функция для генерации UUID
globals.set("uuid", lua.create_function(move |_, ()| {
Ok(Uuid::new_v4().to_string())
})?)?;
Ok(())
}
/// Загрузить плагин из кода
pub fn load_plugin(&mut self, code: &str, path: &str) -> Result<PluginInfo, String> {
// Выполняем код плагина
self.lua.load(code).exec()
.map_err(|e| format!("Failed to execute plugin code: {}", e))?;
// Получаем метаданные плагина
let globals = self.lua.globals();
let plugin_table: mlua::Table = globals.get("PLUGIN")
.map_err(|e| format!("PLUGIN table not found: {}", e))?;
let name: String = plugin_table.get("name")
.map_err(|e| format!("Failed to get plugin name: {}", e))?;
let version: String = plugin_table.get("version")
.map_err(|e| format!("Failed to get plugin version: {}", e))?;
let description: String = plugin_table.get("description")
.unwrap_or_else(|_| "No description".to_string());
let author: String = plugin_table.get("author")
.unwrap_or_else(|_| "Unknown".to_string());
// Генерируем ID плагина
let plugin_id = format!("{}-{}", name, Uuid::new_v4());
self.plugin_id = plugin_id.clone();
// Получаем хуки
let hooks_table: mlua::Table = plugin_table.get("hooks")
.unwrap_or_else(|_| self.lua.create_table().unwrap());
let mut hooks = Vec::new();
for pair in hooks_table.pairs::<String, mlua::Table>() {
match pair {
Ok((hook_name, hook_table)) => {
let function: String = hook_table.get("function")
.unwrap_or_else(|_| hook_name.clone());
let priority: u32 = hook_table.get("priority")
.unwrap_or(100);
let async_hook: bool = hook_table.get("async")
.unwrap_or(false);
hooks.push(PluginHook {
name: hook_name,
function,
priority,
async_hook,
});
}
Err(e) => {
log::warn!("Failed to parse hook: {}", e);
}
}
}
self.hooks = hooks.clone();
Ok(PluginInfo {
id: plugin_id,
name,
version,
description,
author,
path: path.to_string(),
state: format!("{:?}", PluginState::Loaded),
hooks,
})
}
/// Обработать событие
pub fn handle_event(&self, event: &PluginEvent) -> Result<(), String> {
let globals = self.lua.globals();
// Проверяем, есть ли функция для обработки событий
if let Ok(on_event) = globals.get::<_, mlua::Function>("on_event") {
let event_data = json!({
"type": format!("{:?}", event.event_type),
"data": event.data,
"source": event.source,
"timestamp": event.timestamp,
});
let event_json = serde_json::to_string(&event_data)
.map_err(|e| format!("Failed to serialize event: {}", e))?;
on_event.call::<_, ()>(event_json)
.map_err(|e| format!("Failed to call on_event: {}", e))?;
}
Ok(())
}
/// Выполнить хук
pub fn execute_hook(&self, function_name: &str, data: Value) -> Result<Value, String> {
let globals = self.lua.globals();
// Получаем функцию по имени
let hook_func: mlua::Function = globals.get(function_name)
.map_err(|e| format!("Hook function not found: {} - {}", function_name, e))?;
// Преобразуем данные в Lua значение
let lua_data = Self::json_to_lua_value(&self.lua, data)
.map_err(|e| format!("Failed to convert data to Lua: {}", e))?;
// Вызываем функцию
let result = hook_func.call::<_, LuaValue>(lua_data)
.map_err(|e| format!("Failed to execute hook: {}", e))?;
// Преобразуем результат обратно в JSON
Self::lua_value_to_json(result)
.map_err(|e| format!("Failed to convert result to JSON: {}", e))
}
/// Преобразовать Lua значение в JSON
fn lua_value_to_json(value: LuaValue) -> Result<Value, String> {
match value {
LuaValue::Nil => Ok(Value::Null),
LuaValue::Boolean(b) => Ok(Value::Bool(b)),
LuaValue::Integer(i) => Ok(Value::Number(i.into())),
LuaValue::Number(n) => {
// Попробуем преобразовать в integer если возможно
if n.fract() == 0.0 {
Ok(Value::Number((n as i64).into()))
} else {
serde_json::Number::from_f64(n)
.map(Value::Number)
.ok_or_else(|| "Invalid number".to_string())
}
}
LuaValue::String(s) => Ok(Value::String(s.to_string_lossy().to_string())),
LuaValue::Table(table) => {
let mut map = serde_json::Map::new();
for pair in table.pairs::<LuaValue, LuaValue>() {
match pair {
Ok((key, value)) => {
let key_str = match key {
LuaValue::String(s) => s.to_string_lossy().to_string(),
LuaValue::Integer(i) => i.to_string(),
LuaValue::Number(n) => n.to_string(),
_ => continue,
};
let value_json = Self::lua_value_to_json(value)?;
map.insert(key_str, value_json);
}
Err(e) => return Err(format!("Failed to parse table pair: {}", e)),
}
}
Ok(Value::Object(map))
}
_ => Err(format!("Unsupported Lua type: {:?}", value.type_name())),
}
}
/// Преобразовать JSON в Lua значение
fn json_to_lua_value(lua: &Lua, value: Value) -> LuaResult<LuaValue> {
match value {
Value::Null => Ok(LuaValue::Nil),
Value::Bool(b) => Ok(LuaValue::Boolean(b)),
Value::Number(n) => {
if let Some(i) = n.as_i64() {
Ok(LuaValue::Integer(i))
} else if let Some(f) = n.as_f64() {
Ok(LuaValue::Number(f))
} else {
Err(LuaError::external("Invalid number"))
}
}
Value::String(s) => Ok(LuaValue::String(lua.create_string(&s)?)),
Value::Array(arr) => {
let table = lua.create_table()?;
for (i, item) in arr.into_iter().enumerate() {
let lua_value = Self::json_to_lua_value(lua, item)?;
table.set(i + 1, lua_value)?;
}
Ok(LuaValue::Table(table))
}
Value::Object(obj) => {
let table = lua.create_table()?;
for (key, value) in obj {
let lua_value = Self::json_to_lua_value(lua, value)?;
table.set(key, lua_value)?;
}
Ok(LuaValue::Table(table))
}
}
}
}

109
src/plugins/traits.rs Normal file
View File

@ -0,0 +1,109 @@
//! Трейты и типы данных для системы плагинов с lock-free архитектурой
//!
//! Этот модуль определяет основные трейты и структуры данных для системы плагинов,
//! обеспечивающие безопасное и эффективное взаимодействие между компонентами.
//!
//! Основные компоненты:
//! - Трейты для данных плагинов (PluginData, EventHandler, HookHandler)
//! - Типы событий и хуков для расширения функциональности
//! - Состояния плагинов и их жизненный цикл
//! - Абстрактный интерфейс менеджера плагинов
//! - Типы для безопасной передачи данных между потоками
//! - Поддержка сериализации через serde_json
//! - Send + Sync требования для lock-free мультитрединга
use std::sync::Arc;
use serde_json::Value;
/// Трейт для данных плагина (без Lua окружения)
pub trait PluginData: Send + Sync {
fn id(&self) -> &str;
fn name(&self) -> &str;
fn version(&self) -> &str;
fn description(&self) -> &str;
fn author(&self) -> &str;
fn path(&self) -> &str;
fn state(&self) -> PluginState;
}
/// Состояние плагина
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PluginState {
Loading,
Loaded,
Running,
Error,
Disabled,
}
/// Трейт для обработки событий
pub trait EventHandler: Send + Sync {
fn handle_event(&self, event: PluginEvent) -> Result<(), String>;
}
/// Трейт для хуков
pub trait HookHandler: Send + Sync {
fn execute_hook(&self, hook_name: &str, data: Value) -> Result<Value, String>;
}
/// Событие плагина
#[derive(Debug, Clone)]
pub struct PluginEvent {
pub event_type: EventType,
pub data: Value,
pub source: String,
pub timestamp: u64,
}
/// Тип события
#[derive(Debug, Clone)]
pub enum EventType {
/// Системные события
SystemStart,
SystemStop,
PluginLoaded(String),
PluginUnloaded(String),
/// События базы данных
DatabaseOpen,
DatabaseClose,
TransactionBegin(u64),
TransactionCommit(u64),
TransactionRollback(u64),
/// Пользовательские события
Custom(String, String),
}
/// Хук плагина
#[derive(Debug, Clone)]
pub struct PluginHook {
pub name: String,
pub function: String,
pub priority: u32,
pub async_hook: bool,
}
/// Трейт для менеджера плагинов
pub trait PluginManagerTrait: Send + Sync {
/// Загрузить все плагины
fn load_all_plugins(&mut self) -> Result<Vec<Arc<dyn PluginData>>, String>;
/// Загрузить плагин по пути
fn load_plugin(&mut self, path: &str) -> Result<Arc<dyn PluginData>, String>;
/// Выгрузить плагин
fn unload_plugin(&mut self, plugin_id: &str) -> Result<(), String>;
/// Получить список загруженных плагинов
fn list_plugins(&self) -> Vec<Arc<dyn PluginData>>;
/// Получить плагин по ID
fn get_plugin(&self, plugin_id: &str) -> Option<Arc<dyn PluginData>>;
/// Отправить событие
fn emit_event(&self, event: PluginEvent) -> Result<(), String>;
/// Выполнить хук
fn execute_hook(&self, hook_name: &str, data: Value) -> Result<Value, String>;
}

321
src/plugins/worker.rs Normal file
View File

@ -0,0 +1,321 @@
//! Worker поток для обработки сообщений плагинов с lock-free архитектурой
//!
//! Этот модуль реализует фоновый рабочий поток, который обрабатывает все сообщения,
//! связанные с плагинами, обеспечивая lock-free взаимодействие между потоками.
//!
//! Основные функции:
//! - Асинхронная обработка событий и хуков плагинов
//! - Загрузка и выгрузка плагинов в изолированном окружении
//! - Управление жизненным циклом плагинов
//! - Обеспечение thread-safety без использования блокировок на горячих путях
//! - Коммуникация через каналы для минимизации contention
//! - Планирование выполнения хуков с приоритетами
//! - Изоляция плагинов в отдельных Lua окружениях
use std::sync::Arc;
use std::thread;
use std::time::Duration;
use parking_lot::RwLock;
use crossbeam::channel::{Sender, TryRecvError};
use serde_json::Value;
use crate::plugins::channel::{
PluginMessage, PluginChannels, HookResponse, LoadPluginResponse,
UnloadPluginResponse, ListPluginsResponse, GetPluginResponse, PluginInfo
};
use crate::plugins::traits::{PluginEvent, PluginHook, PluginState};
use crate::plugins::sandbox::LuaSandbox;
use super::Plugin;
/// Worker для обработки плагинов
pub struct PluginWorker {
channels: PluginChannels,
plugins: Arc<RwLock<Vec<Arc<Plugin>>>>,
handle: Option<thread::JoinHandle<()>>,
shutdown_flag: Arc<RwLock<bool>>,
}
impl PluginWorker {
/// Создать новый worker
pub fn new(channels: PluginChannels, plugins: Arc<RwLock<Vec<Arc<Plugin>>>>) -> Self {
Self {
channels,
plugins,
handle: None,
shutdown_flag: Arc::new(RwLock::new(false)),
}
}
/// Запустить worker
pub fn start(&mut self) {
let channels = self.channels.clone();
let plugins = Arc::clone(&self.plugins);
let shutdown_flag = Arc::clone(&self.shutdown_flag);
let handle = thread::spawn(move || {
PluginWorker::run_loop(channels, plugins, shutdown_flag);
});
self.handle = Some(handle);
}
/// Основной цикл обработки сообщений
fn run_loop(
channels: PluginChannels,
plugins: Arc<RwLock<Vec<Arc<Plugin>>>>,
shutdown_flag: Arc<RwLock<bool>>,
) {
while !*shutdown_flag.read() {
match channels.try_recv() {
Ok(message) => {
PluginWorker::process_message(message, &channels, &plugins);
}
Err(TryRecvError::Empty) => {
// Нет сообщений, ждем немного
thread::sleep(Duration::from_millis(10));
}
Err(TryRecvError::Disconnected) => {
// Канал закрыт, выходим
break;
}
}
}
log::info!("Plugin worker stopped");
}
/// Обработать сообщение
fn process_message(
message: PluginMessage,
channels: &PluginChannels,
plugins: &Arc<RwLock<Vec<Arc<Plugin>>>>,
) {
match message {
PluginMessage::Event(event) => {
PluginWorker::handle_event(event, plugins);
}
PluginMessage::HookRequest { hook_name, data, response_sender } => {
let response = PluginWorker::execute_hook_internal(&hook_name, data, plugins);
let _ = response_sender.send(response);
}
PluginMessage::LoadPlugin { path, response_sender } => {
let response = PluginWorker::load_plugin_internal(&path, plugins);
let _ = response_sender.send(response);
}
PluginMessage::UnloadPlugin { plugin_id, response_sender } => {
let response = PluginWorker::unload_plugin_internal(&plugin_id, plugins);
let _ = response_sender.send(response);
}
PluginMessage::ListPlugins { response_sender } => {
let response = PluginWorker::list_plugins_internal(plugins);
let _ = response_sender.send(response);
}
PluginMessage::GetPlugin { plugin_id, response_sender } => {
let response = PluginWorker::get_plugin_internal(&plugin_id, plugins);
let _ = response_sender.send(response);
}
PluginMessage::Shutdown => {
log::info!("Shutdown signal received");
// Флаг будет установлен в основном цикле
}
}
}
/// Обработать событие
fn handle_event(event: PluginEvent, plugins: &Arc<RwLock<Vec<Arc<Plugin>>>>) {
let plugins_guard = plugins.read();
for plugin in plugins_guard.iter() {
if let Some(sandbox) = &plugin.lua_sandbox {
// Теперь sandbox - это Arc<LuaSandbox>, без RwLock
if let Err(e) = sandbox.handle_event(&event) {
log::error!("Failed to handle event in plugin {}: {}", plugin.id, e);
}
}
}
}
/// Выполнить хук (внутренняя реализация)
fn execute_hook_internal(
hook_name: &str,
data: Value,
plugins: &Arc<RwLock<Vec<Arc<Plugin>>>>,
) -> HookResponse {
let plugins_guard = plugins.read();
// Сначала собираем все хуки с указанным именем
let mut hooks = Vec::new();
for plugin in plugins_guard.iter() {
for hook in &plugin.hooks {
if hook.name == hook_name {
hooks.push((plugin, hook));
}
}
}
// Сортируем по приоритету (высокий приоритет = больше значение)
hooks.sort_by(|a, b| b.1.priority.cmp(&a.1.priority));
// Выполняем хуки по порядку, передавая результат каждого следующему
let mut current_data = data;
for (plugin, hook) in hooks {
if let Some(sandbox) = &plugin.lua_sandbox {
// Убедимся, что хук существует и доступен
match sandbox.execute_hook(&hook.function, current_data.clone()) {
Ok(result) => {
current_data = result;
}
Err(e) => {
return HookResponse::Error(format!("Failed to execute hook {} in plugin {}: {}",
hook_name, plugin.id, e));
}
}
}
}
HookResponse::Success(current_data)
}
/// Загрузить плагин (внутренняя реализация)
fn load_plugin_internal(
path: &str,
plugins: &Arc<RwLock<Vec<Arc<Plugin>>>>,
) -> LoadPluginResponse {
use std::fs;
// Проверяем, не загружен ли уже плагин с таким путем
{
let plugins_guard = plugins.read();
if plugins_guard.iter().any(|p| p.path == path) {
return LoadPluginResponse::Error(format!("Plugin already loaded: {}", path));
}
}
// Читаем файл плагина
let content = match fs::read_to_string(path) {
Ok(content) => content,
Err(e) => return LoadPluginResponse::Error(format!("Failed to read plugin file: {}", e)),
};
// Создаем песочницу Lua
let mut sandbox = match LuaSandbox::new() {
Ok(sandbox) => sandbox,
Err(e) => return LoadPluginResponse::Error(format!("Failed to create Lua sandbox: {}", e)),
};
// Загружаем плагин в песочницу
let plugin_info = match sandbox.load_plugin(&content, path) {
Ok(info) => info,
Err(e) => return LoadPluginResponse::Error(format!("Failed to load plugin: {}", e)),
};
// Создаем объект плагина
let plugin = Arc::new(Plugin {
id: plugin_info.id.clone(),
name: plugin_info.name.clone(),
version: plugin_info.version.clone(),
description: plugin_info.description.clone(),
author: plugin_info.author.clone(),
path: path.to_string(),
state: PluginState::Loaded,
hooks: plugin_info.hooks.clone(),
lua_sandbox: Some(Arc::new(sandbox)), // Без RwLock
});
// Добавляем плагин в список
{
let mut plugins_guard = plugins.write();
plugins_guard.push(plugin);
}
log::info!("Plugin loaded: {} v{}", plugin_info.name, plugin_info.version);
LoadPluginResponse::Success(plugin_info.id)
}
/// Выгрузить плагин (внутренняя реализация)
fn unload_plugin_internal(
plugin_id: &str,
plugins: &Arc<RwLock<Vec<Arc<Plugin>>>>,
) -> UnloadPluginResponse {
let mut plugins_guard = plugins.write();
if let Some(pos) = plugins_guard.iter().position(|p| p.id == plugin_id) {
let plugin = plugins_guard.remove(pos);
log::info!("Plugin unloaded: {} v{}", plugin.name, plugin.version);
UnloadPluginResponse::Success
} else {
UnloadPluginResponse::Error(format!("Plugin not found: {}", plugin_id))
}
}
/// Получить список плагинов (внутренняя реализация)
fn list_plugins_internal(
plugins: &Arc<RwLock<Vec<Arc<Plugin>>>>,
) -> ListPluginsResponse {
let plugins_guard = plugins.read();
let plugin_infos = plugins_guard.iter().map(|plugin| {
PluginInfo {
id: plugin.id.clone(),
name: plugin.name.clone(),
version: plugin.version.clone(),
description: plugin.description.clone(),
author: plugin.author.clone(),
path: plugin.path.clone(),
state: format!("{:?}", plugin.state),
hooks: plugin.hooks.clone(),
}
}).collect();
ListPluginsResponse {
plugins: plugin_infos,
}
}
/// Получить информацию о плагине (внутренняя реализация)
fn get_plugin_internal(
plugin_id: &str,
plugins: &Arc<RwLock<Vec<Arc<Plugin>>>>,
) -> GetPluginResponse {
let plugins_guard = plugins.read();
if let Some(plugin) = plugins_guard.iter().find(|p| p.id == plugin_id) {
let info = PluginInfo {
id: plugin.id.clone(),
name: plugin.name.clone(),
version: plugin.version.clone(),
description: plugin.description.clone(),
author: plugin.author.clone(),
path: plugin.path.clone(),
state: format!("{:?}", plugin.state),
hooks: plugin.hooks.clone(),
};
GetPluginResponse::Found(info)
} else {
GetPluginResponse::NotFound
}
}
/// Остановить worker
pub fn shutdown(&self) {
*self.shutdown_flag.write() = true;
if let Some(handle) = self.handle.take() {
let _ = handle.join();
}
}
}
impl Drop for PluginWorker {
fn drop(&mut self) {
self.shutdown();
}
}