// src/lua_shell.rs //! Интерактивная Lua оболочка для Futriix //! //! Предоставляет интерфейс для взаимодействия с базой данных через Lua //! и CRUD команды. Использует wait-free доступ к данным через атомарные ссылки. #![allow(dead_code)] use std::sync::Arc; use tokio::io::{AsyncBufReadExt, BufReader}; use serde_json::Value; use crate::common::Result; use crate::server::database::Database; use crate::server::lua_engine::LuaEngine; use crate::server::sharding::ShardingManager; use crate::server::csv_import_export::CsvManager; use crate::common::protocol; use crate::server::database::{Index, IndexType}; /// Конвертация HEX цвета в ANSI escape code fn hex_to_ansi(hex_color: &str) -> String { let hex = hex_color.trim_start_matches('#'); if hex.len() == 6 { if let (Ok(r), Ok(g), Ok(b)) = ( u8::from_str_radix(&hex[0..2], 16), u8::from_str_radix(&hex[2..4], 16), u8::from_str_radix(&hex[4..6], 16), ) { return format!("\x1b[38;2;{};{};{}m", r, g, b); } } "\x1b[38;2;255;255;255m".to_string() } /// Вывод текста с красным цветом для ошибок fn print_error(text: &str) { let red_color = hex_to_ansi("#FF0000"); println!("{}{}\x1b[0m", red_color, text); } /// Вывод текста с зеленым цветом для успеха fn print_success(text: &str) { let green_color = hex_to_ansi("#00FF00"); println!("{}{}\x1b[0m", green_color, text); } /// Вывод текста с синим цветом для информации fn print_info(text: &str) { let blue_color = hex_to_ansi("#00bfff"); println!("{}{}\x1b[0m", blue_color, text); } /// Вывод текста с цветом #33d17a для приглашения Lua fn print_lua_color(text: &str) { let lua_color = hex_to_ansi("#33d17a"); // Новый цвет #33d17a println!("{}{}\x1b[0m", lua_color, text); } /// Интерактивная Lua оболочка pub struct LuaShell { lua_engine: LuaEngine, database: Arc, sharding_manager: Arc, csv_manager: Arc, inbox_mode: bool, } impl LuaShell { pub fn new( lua_engine: LuaEngine, database: Arc, sharding_manager: Arc, csv_manager: Arc, ) -> Self { Self { lua_engine, database, sharding_manager, csv_manager, inbox_mode: false, } } /// Запуск интерактивной оболочки pub async fn run(&mut self) -> Result<()> { let stdin = tokio::io::stdin(); let mut reader = BufReader::new(stdin).lines(); // Выводим приветственное сообщение при запуске Lua интерпретатора print_lua_color("Lua interpreter started. Type 'inbox.start' for database commands or Lua code."); println!(); loop { if self.inbox_mode { let inbox_prompt_color = hex_to_ansi("#00bfff"); print!("{}futriix:~>\x1b[0m ", inbox_prompt_color); } else { // ПРИГЛАШЕНИЕ LUA ТЕПЕРЬ ЦВЕТА #33d17a let lua_color = hex_to_ansi("#33d17a"); print!("{}lua>\x1b[0m ", lua_color); } let _ = std::io::Write::flush(&mut std::io::stdout()); let line = match reader.next_line().await { Ok(Some(line)) => line, Ok(None) => break, Err(e) => { eprintln!("Read error: {}", e); continue; } }; let input = line.trim(); match input { "exit" | "quit" => break, "inbox.start" => { self.inbox_mode = true; // ИЗМЕНЕНИЕ: убрана точка и изменен цвет на #33d17a let lua_color = hex_to_ansi("#33d17a"); println!("{}Entering database mode\x1b[0m", lua_color); continue; } "inbox.stop" if self.inbox_mode => { self.inbox_mode = false; print_success("Exiting database mode. Back to Lua interpreter."); continue; } "help" if self.inbox_mode => { self.show_help().await?; continue; } _ => {} } if self.inbox_mode { self.handle_inbox_command(input).await?; } else { self.handle_lua_command(input).await?; } } print_info("Shutting down Futriix server..."); Ok(()) } /// Обработка Lua команд async fn handle_lua_command(&self, input: &str) -> Result<()> { if input.is_empty() { return Ok(()); } match self.lua_engine.execute_script(input) { Ok(_) => {} Err(e) => { let error_msg = e.to_string(); if error_msg.contains("Lua error: syntax error:") || error_msg.contains("Unknown command:") { print_error(&error_msg); } else { eprintln!("Lua error: {}", e); } } } Ok(()) } /// Обработка команд inbox (CRUD + новые команды) async fn handle_inbox_command(&self, input: &str) -> Result<()> { let parts: Vec<&str> = input.split_whitespace().collect(); if parts.is_empty() { return Ok(()); } match parts[0] { // Базовые CRUD команды "create" => self.handle_create(parts).await, "read" => self.handle_read(parts).await, "update" => self.handle_update(parts).await, "delete" => self.handle_delete(parts).await, "list" => self.handle_list(parts).await, // Новые команды для управления кластером "cluster.status" => self.handle_cluster_status(parts).await, "add.node" => self.handle_add_node(parts).await, "evict.node" => self.handle_evict_node(parts).await, "list.raft.nodes" => self.handle_list_raft_nodes(parts).await, "cluster.rebalance" => self.handle_cluster_rebalance(parts).await, // Новые команды для CSV операций "csv" => self.handle_csv(parts).await, "help" => self.show_help().await, _ => { let error_msg = format!("Unknown command: {}. Type 'help' for available commands.", parts[0]); print_error(&error_msg); Ok(()) } } } // Новые методы для управления кластером async fn handle_cluster_status(&self, _parts: Vec<&str>) -> Result<()> { match self.sharding_manager.get_cluster_status() { Ok(status) => { println!("Cluster Status:"); println!(" Formed: {}", status.cluster_formed); println!(" Leader Exists: {}", status.leader_exists); println!(" Total Capacity: {}", status.total_capacity); println!(" Total Used: {}", status.total_used); println!(" Nodes: {}", status.nodes.len()); for node in status.nodes { println!(" - {}: {} ({}% used)", node.node_id, node.address, (node.used as f64 / node.capacity as f64) * 100.0); } println!(" Raft Nodes: {}", status.raft_nodes.len()); for raft_node in status.raft_nodes { println!(" - {}: {} (term: {}, state: {})", raft_node.node_id, raft_node.address, raft_node.term, raft_node.state); } } Err(e) => { println!("Error getting cluster status: {}", e); } } Ok(()) } async fn handle_add_node(&self, parts: Vec<&str>) -> Result<()> { if parts.len() < 2 { println!("Usage: add.node or add.node "); return Ok(()); } let node_address = parts[1].to_string(); let node_id = format!("node_{}", uuid::Uuid::new_v4().to_string()[..8].to_string()); match self.sharding_manager.add_node(node_id.clone(), node_address.clone(), 1024 * 1024 * 1024) { Ok(_) => { println!("Node '{}' added to cluster at address '{}'", node_id, node_address); } Err(e) => { println!("Error adding node: {}", e); } } Ok(()) } async fn handle_evict_node(&self, parts: Vec<&str>) -> Result<()> { if parts.len() < 2 { println!("Usage: evict.node or evict.node "); return Ok(()); } let node_address = parts[1].to_string(); // Находим node_id по адресу let mut node_id_to_remove = None; for entry in self.sharding_manager.get_nodes() { if entry.address == node_address { node_id_to_remove = Some(entry.node_id.clone()); break; } } if let Some(node_id) = node_id_to_remove { match self.sharding_manager.remove_node(&node_id) { Ok(_) => { println!("Node '{}' at address '{}' removed from cluster", node_id, node_address); } Err(e) => { println!("Error removing node: {}", e); } } } else { println!("Node with address '{}' not found in cluster", node_address); } Ok(()) } async fn handle_list_raft_nodes(&self, _parts: Vec<&str>) -> Result<()> { let raft_nodes = self.sharding_manager.get_raft_nodes(); println!("Raft Nodes ({}):", raft_nodes.len()); for node in raft_nodes { println!(" - {}: {} (term: {}, state: {:?}, last_heartbeat: {})", node.node_id, node.address, node.term, node.state, node.last_heartbeat); } Ok(()) } async fn handle_cluster_rebalance(&self, _parts: Vec<&str>) -> Result<()> { match self.sharding_manager.rebalance_cluster() { Ok(_) => { println!("Cluster rebalancing completed successfully"); } Err(e) => { println!("Error rebalancing cluster: {}", e); } } Ok(()) } // Новый метод для CSV операций async fn handle_csv(&self, parts: Vec<&str>) -> Result<()> { if parts.len() < 2 { println!("Usage: csv import "); println!(" csv export "); println!(" csv list"); println!(" csv progress "); return Ok(()); } match parts[1] { "import" => { if parts.len() < 4 { println!("Usage: csv import "); return Ok(()); } let collection = parts[2].to_string(); let file_path = parts[3].to_string(); match self.csv_manager.import_csv(&collection, &file_path) { Ok(count) => { println!("Successfully imported {} records from '{}'", count, file_path); } Err(e) => { println!("Error importing CSV: {}", e); } } } "export" => { if parts.len() < 4 { println!("Usage: csv export "); return Ok(()); } let collection = parts[2].to_string(); let file_path = parts[3].to_string(); match self.csv_manager.export_csv(&collection, &file_path) { Ok(count) => { println!("Successfully exported {} records to '{}'", count, file_path); } Err(e) => { println!("Error exporting CSV: {}", e); } } } "list" => { match self.csv_manager.list_csv_files() { Ok(files) => { if files.is_empty() { println!("No CSV files found"); } else { println!("CSV files:"); for file in files { println!(" - {}", file); } } } Err(e) => { println!("Error listing CSV files: {}", e); } } } "progress" => { if parts.len() < 3 { println!("Usage: csv progress "); return Ok(()); } let collection = parts[2].to_string(); let progress = self.csv_manager.get_import_progress(&collection); println!("Import progress for '{}': {:.2}%", collection, progress); } _ => { println!("Usage: csv import "); println!(" csv export "); println!(" csv list"); println!(" csv progress "); } } Ok(()) } // Базовые методы CRUD (упрощенные) async fn handle_create(&self, parts: Vec<&str>) -> Result<()> { if parts.len() < 3 { println!("Usage: create "); return Ok(()); } let collection = parts[1].to_string(); let document = parts[2..].join(" ").into_bytes(); let command = protocol::Command::Create { collection, document, }; match self.database.execute_command(command) { Ok(response) => { if let protocol::Response::Success(data) = response { if let Ok(id) = String::from_utf8(data) { println!("Document created with ID: {}", id); } else { println!("Document created successfully"); } } else if let protocol::Response::Error(e) = response { println!("Error: {}", e); } } Err(e) => { println!("Error: {}", e); } } Ok(()) } async fn handle_read(&self, parts: Vec<&str>) -> Result<()> { if parts.len() < 3 { println!("Usage: read "); return Ok(()); } let collection = parts[1].to_string(); let id = parts[2].to_string(); let command = protocol::Command::Read { collection, id, }; match self.database.execute_command(command) { Ok(response) => { if let protocol::Response::Success(data) = response { if let Ok(document) = String::from_utf8(data) { println!("{}", document); } else { println!("Document read successfully (binary data)"); } } else if let protocol::Response::Error(e) = response { println!("Error: {}", e); } } Err(e) => { println!("Error: {}", e); } } Ok(()) } async fn handle_update(&self, parts: Vec<&str>) -> Result<()> { if parts.len() < 4 { println!("Usage: update "); return Ok(()); } let collection = parts[1].to_string(); let id = parts[2].to_string(); let document = parts[3..].join(" ").into_bytes(); let command = protocol::Command::Update { collection, id, document, }; match self.database.execute_command(command) { Ok(response) => { if let protocol::Response::Success(_) = response { println!("Document updated successfully"); } else if let protocol::Response::Error(e) = response { println!("Error: {}", e); } } Err(e) => { println!("Error: {}", e); } } Ok(()) } async fn handle_delete(&self, parts: Vec<&str>) -> Result<()> { if parts.len() < 3 { println!("Usage: delete "); return Ok(()); } let collection = parts[1].to_string(); let id = parts[2].to_string(); let command = protocol::Command::Delete { collection, id, }; match self.database.execute_command(command) { Ok(response) => { if let protocol::Response::Success(_) = response { println!("Document deleted successfully"); } else if let protocol::Response::Error(e) = response { println!("Error: {}", e); } } Err(e) => { println!("Error: {}", e); } } Ok(()) } async fn handle_list(&self, parts: Vec<&str>) -> Result<()> { if parts.len() < 2 { println!("Usage: list [filter]"); return Ok(()); } let collection = parts[1].to_string(); let filter = if parts.len() > 2 { parts[2..].join(" ").into_bytes() } else { vec![] }; let command = protocol::Command::Query { collection, filter, }; match self.database.execute_command(command) { Ok(response) => { if let protocol::Response::Success(data) = response { if let Ok(documents) = String::from_utf8(data) { // Используем std::result::Result вместо нашего Result let parsed: std::result::Result = serde_json::from_str(&documents); match parsed { Ok(value) => { println!("{}", serde_json::to_string_pretty(&value).unwrap()); } Err(_) => { println!("{}", documents); } } } else { println!("Documents read successfully (binary data)"); } } else if let protocol::Response::Error(e) = response { println!("Error: {}", e); } } Err(e) => { println!("Error: {}", e); } } Ok(()) } /// Показать справку по командам async fn show_help(&self) -> Result<()> { println!("Available commands:"); println!(" Basic CRUD:"); println!(" create - Create document"); println!(" read - Read document"); println!(" update - Update document"); println!(" delete - Delete document"); println!(" list [filter] - List documents"); println!(" Cluster Management:"); println!(" cluster.status - Show cluster status"); println!(" add.node - Add node to cluster"); println!(" evict.node - Remove node from cluster"); println!(" list.raft.nodes - List Raft nodes"); println!(" cluster.rebalance - Rebalance cluster (shards and nodes)"); println!(" CSV Operations:"); println!(" csv import - Import CSV to collection"); println!(" csv export - Export collection to CSV"); println!(" csv list - List CSV files"); println!(" csv progress - Show import progress"); println!(" Other:"); println!(" inbox.stop - Exit database mode"); println!(" help - Show this help"); Ok(()) } }