From 133fb06ea7304aa8aac5bd9801fd35094e9cc08d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Mon, 1 Dec 2025 17:58:17 +0000 Subject: [PATCH] Added backup cli-commands, raft-leader error fixed --- src/lua_shell.rs | 1119 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 1119 insertions(+) create mode 100644 src/lua_shell.rs diff --git a/src/lua_shell.rs b/src/lua_shell.rs new file mode 100644 index 0000000..8075150 --- /dev/null +++ b/src/lua_shell.rs @@ -0,0 +1,1119 @@ +// src/lua_shell.rs +//! Интерактивная Lua оболочка для Futriix +//! +//! Предоставляет интерфейс для взаимодействия с базой данных через Lua +//! и CRUD команды. Использует wait-free доступ к данным через атомарные ссылки. + +#![allow(dead_code)] + +use std::sync::Arc; +use tokio::io::{AsyncBufReadExt, BufReader}; +use serde_json::Value; + +use crate::common::Result; +use crate::server::database::Database; +use crate::server::lua_engine::LuaEngine; +use crate::server::sharding::ShardingManager; +use crate::server::csv_import_export::CsvManager; +use crate::common::protocol; +use crate::server::database::{Index, IndexType, Trigger, TriggerEvent}; + +/// Конвертация HEX цвета в ANSI escape code +fn hex_to_ansi(hex_color: &str) -> String { + let hex = hex_color.trim_start_matches('#'); + + if hex.len() == 6 { + if let (Ok(r), Ok(g), Ok(b)) = ( + u8::from_str_radix(&hex[0..2], 16), + u8::from_str_radix(&hex[2..4], 16), + u8::from_str_radix(&hex[4..6], 16), + ) { + return format!("\x1b[38;2;{};{};{}m", r, g, b); + } + } + + "\x1b[38;2;255;255;255m".to_string() +} + +/// Вывод текста с красным цветом для ошибок +fn print_error(text: &str) { + let red_color = hex_to_ansi("#FF0000"); + println!("{}{}\x1b[0m", red_color, text); +} + +/// Вывод текста с зеленым цветом для успеха +fn print_success(text: &str) { + let green_color = hex_to_ansi("#00FF00"); + println!("{}{}\x1b[0m", green_color, text); +} + +/// Вывод текста с синим цветом для информации +fn print_info(text: &str) { + let blue_color = hex_to_ansi("#00bfff"); + println!("{}{}\x1b[0m", blue_color, text); +} + +/// Вывод текста с цветом #33d17a для приглашения Lua +fn print_lua_color(text: &str) { + let lua_color = hex_to_ansi("#33d17a"); // Новый цвет #33d17a + println!("{}{}\x1b[0m", lua_color, text); +} + +/// Интерактивная Lua оболочка +pub struct LuaShell { + lua_engine: LuaEngine, + database: Arc, + sharding_manager: Arc, + csv_manager: Arc, + inbox_mode: bool, +} + +impl LuaShell { + pub fn new( + lua_engine: LuaEngine, + database: Arc, + sharding_manager: Arc, + csv_manager: Arc, + ) -> Self { + Self { + lua_engine, + database, + sharding_manager, + csv_manager, + inbox_mode: false, + } + } + + /// Запуск интерактивной оболочки + pub async fn run(&mut self) -> Result<()> { + let stdin = tokio::io::stdin(); + let mut reader = BufReader::new(stdin).lines(); + + // Выводим приветственное сообщение при запуске Lua интерпретатора + print_lua_color("Lua interpreter started. Type 'inbox.start' for database commands or Lua code."); + println!(); + + loop { + if self.inbox_mode { + let inbox_prompt_color = hex_to_ansi("#00bfff"); + print!("{}futriix:~>\x1b[0m ", inbox_prompt_color); + } else { + // ПРИГЛАШЕНИЕ LUA ТЕПЕРЬ ЦВЕТА #33d17a + let lua_color = hex_to_ansi("#33d17a"); + print!("{}lua>\x1b[0m ", lua_color); + } + + let _ = std::io::Write::flush(&mut std::io::stdout()); + + let line = match reader.next_line().await { + Ok(Some(line)) => line, + Ok(None) => break, + Err(e) => { + eprintln!("Read error: {}", e); + continue; + } + }; + + let input = line.trim(); + + match input { + "exit" | "quit" => break, + "inbox.start" => { + self.inbox_mode = true; + // ИЗМЕНЕНИЕ: убрана точка и изменен цвет на #33d17a + let lua_color = hex_to_ansi("#33d17a"); + println!("{}Entering database mode\x1b[0m", lua_color); + continue; + } + "inbox.stop" if self.inbox_mode => { + self.inbox_mode = false; + print_success("Exiting database mode. Back to Lua interpreter."); + continue; + } + "help" if self.inbox_mode => { + self.show_help().await?; + continue; + } + _ => {} + } + + if self.inbox_mode { + self.handle_inbox_command(input).await?; + } else { + self.handle_lua_command(input).await?; + } + } + + print_info("Shutting down Futriix server..."); + Ok(()) + } + + /// Обработка Lua команд + async fn handle_lua_command(&self, input: &str) -> Result<()> { + if input.is_empty() { + return Ok(()); + } + + match self.lua_engine.execute_script(input) { + Ok(_) => {} + Err(e) => { + let error_msg = e.to_string(); + if error_msg.contains("Lua error: syntax error:") || error_msg.contains("Unknown command:") { + print_error(&error_msg); + } else { + eprintln!("Lua error: {}", e); + } + } + } + + Ok(()) + } + + /// Обработка команд inbox (CRUD + новые команды) + async fn handle_inbox_command(&self, input: &str) -> Result<()> { + let parts: Vec<&str> = input.split_whitespace().collect(); + + if parts.is_empty() { + return Ok(()); + } + + match parts[0] { + // Базовые CRUD команды + "create" => self.handle_create(parts).await, + "read" => self.handle_read(parts).await, + "update" => self.handle_update(parts).await, + "delete" => self.handle_delete(parts).await, + "list" => self.handle_list(parts).await, + + // Новые команды для управления кластером + "cluster.status" => self.handle_cluster_status(parts).await, + "add.node" => self.handle_add_node(parts).await, + "evict.node" => self.handle_evict_node(parts).await, + "list.raft.nodes" => self.handle_list_raft_nodes(parts).await, + "cluster.rebalance" => self.handle_cluster_rebalance(parts).await, + + // Новые команды для CSV операций + "csv" => self.handle_csv(parts).await, + + // НОВЫЕ КОМАНДЫ ИЗ README + "begin" => self.handle_begin_transaction(parts).await, + "commit" => self.handle_commit_transaction(parts).await, + "rollback" => self.handle_rollback_transaction(parts).await, + "index" => self.handle_index(parts).await, + "constraint" => self.handle_constraint(parts).await, + "procedure" => self.handle_procedure(parts).await, + "trigger" => self.handle_trigger(parts).await, + "shard" => self.handle_shard(parts).await, + "backup" => self.handle_backup(parts).await, + + "help" => self.show_help().await, + _ => { + let error_msg = format!("Unknown command: {}. Type 'help' for available commands.", parts[0]); + print_error(&error_msg); + Ok(()) + } + } + } + + // Новые методы для управления кластером + async fn handle_cluster_status(&self, _parts: Vec<&str>) -> Result<()> { + match self.sharding_manager.get_cluster_status() { + Ok(status) => { + println!("Cluster Status:"); + println!(" Formed: {}", status.cluster_formed); + println!(" Leader Exists: {}", status.leader_exists); + println!(" Total Capacity: {}", status.total_capacity); + println!(" Total Used: {}", status.total_used); + println!(" Nodes: {}", status.nodes.len()); + for node in status.nodes { + println!(" - {}: {} ({}% used)", node.node_id, node.address, (node.used as f64 / node.capacity as f64) * 100.0); + } + println!(" Raft Nodes: {}", status.raft_nodes.len()); + for raft_node in status.raft_nodes { + println!(" - {}: {} (term: {}, state: {})", raft_node.node_id, raft_node.address, raft_node.term, raft_node.state); + } + } + Err(e) => { + println!("Error getting cluster status: {}", e); + } + } + Ok(()) + } + + async fn handle_add_node(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: add.node or add.node "); + return Ok(()); + } + + let node_address = parts[1].to_string(); + let node_id = format!("node_{}", uuid::Uuid::new_v4().to_string()[..8].to_string()); + + match self.sharding_manager.add_node(node_id.clone(), node_address.clone(), 1024 * 1024 * 1024) { + Ok(_) => { + println!("Node '{}' added to cluster at address '{}'", node_id, node_address); + } + Err(e) => { + println!("Error adding node: {}", e); + } + } + + Ok(()) + } + + async fn handle_evict_node(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: evict.node or evict.node "); + return Ok(()); + } + + let node_address = parts[1].to_string(); + + // Находим node_id по адресу + let mut node_id_to_remove = None; + for entry in self.sharding_manager.get_nodes() { + if entry.address == node_address { + node_id_to_remove = Some(entry.node_id.clone()); + break; + } + } + + if let Some(node_id) = node_id_to_remove { + match self.sharding_manager.remove_node(&node_id) { + Ok(_) => { + println!("Node '{}' at address '{}' removed from cluster", node_id, node_address); + } + Err(e) => { + println!("Error removing node: {}", e); + } + } + } else { + println!("Node with address '{}' not found in cluster", node_address); + } + + Ok(()) + } + + async fn handle_list_raft_nodes(&self, _parts: Vec<&str>) -> Result<()> { + let raft_nodes = self.sharding_manager.get_raft_nodes(); + println!("Raft Nodes ({}):", raft_nodes.len()); + for node in raft_nodes { + println!(" - {}: {} (term: {}, state: {:?}, last_heartbeat: {})", + node.node_id, node.address, node.term, node.state, node.last_heartbeat); + } + Ok(()) + } + + async fn handle_cluster_rebalance(&self, _parts: Vec<&str>) -> Result<()> { + match self.sharding_manager.rebalance_cluster() { + Ok(_) => { + println!("Cluster rebalancing completed successfully"); + } + Err(e) => { + println!("Error rebalancing cluster: {}", e); + } + } + Ok(()) + } + + // Новый метод для CSV операций + async fn handle_csv(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: csv import "); + println!(" csv export "); + println!(" csv list"); + println!(" csv progress "); + return Ok(()); + } + + match parts[1] { + "import" => { + if parts.len() < 4 { + println!("Usage: csv import "); + return Ok(()); + } + + let collection = parts[2].to_string(); + let file_path = parts[3].to_string(); + + match self.csv_manager.import_csv(&collection, &file_path) { + Ok(count) => { + println!("Successfully imported {} records from '{}'", count, file_path); + } + Err(e) => { + println!("Error importing CSV: {}", e); + } + } + } + "export" => { + if parts.len() < 4 { + println!("Usage: csv export "); + return Ok(()); + } + + let collection = parts[2].to_string(); + let file_path = parts[3].to_string(); + + match self.csv_manager.export_csv(&collection, &file_path) { + Ok(count) => { + println!("Successfully exported {} records to '{}'", count, file_path); + } + Err(e) => { + println!("Error exporting CSV: {}", e); + } + } + } + "list" => { + match self.csv_manager.list_csv_files() { + Ok(files) => { + if files.is_empty() { + println!("No CSV files found"); + } else { + println!("CSV files:"); + for file in files { + println!(" - {}", file); + } + } + } + Err(e) => { + println!("Error listing CSV files: {}", e); + } + } + } + "progress" => { + if parts.len() < 3 { + println!("Usage: csv progress "); + return Ok(()); + } + + let collection = parts[2].to_string(); + let progress = self.csv_manager.get_import_progress(&collection); + println!("Import progress for '{}': {:.2}%", collection, progress); + } + _ => { + println!("Usage: csv import "); + println!(" csv export "); + println!(" csv list"); + println!(" csv progress "); + } + } + + Ok(()) + } + + // НОВЫЕ МЕТОДЫ ДЛЯ РЕАЛИЗАЦИИ КОМАНД ИЗ README + + async fn handle_begin_transaction(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: begin "); + return Ok(()); + } + + let transaction_id = parts[1].to_string(); + let command = protocol::Command::BeginTransaction { transaction_id }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(_) = response { + println!("Transaction '{}' started successfully", parts[1]); + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + + Ok(()) + } + + async fn handle_commit_transaction(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: commit "); + return Ok(()); + } + + let transaction_id = parts[1].to_string(); + let command = protocol::Command::CommitTransaction { transaction_id }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(_) = response { + println!("Transaction '{}' committed successfully", parts[1]); + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + + Ok(()) + } + + async fn handle_rollback_transaction(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: rollback "); + return Ok(()); + } + + let transaction_id = parts[1].to_string(); + let command = protocol::Command::RollbackTransaction { transaction_id }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(_) = response { + println!("Transaction '{}' rolled back successfully", parts[1]); + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + + Ok(()) + } + + async fn handle_index(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: index create [unique]"); + println!(" index query "); + return Ok(()); + } + + match parts[1] { + "create" => { + if parts.len() < 5 { + println!("Usage: index create [unique]"); + return Ok(()); + } + + let collection = parts[2].to_string(); + let index_name = parts[3].to_string(); + let field = parts[4].to_string(); + let unique = parts.get(5).map_or(false, |&s| s == "unique"); + + let index = Index { + name: index_name, + index_type: IndexType::Secondary, + field, + unique, + }; + + let command = protocol::Command::CreateIndex { collection, index }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(_) = response { + println!("Index created successfully"); + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + } + "query" => { + if parts.len() < 5 { + println!("Usage: index query "); + return Ok(()); + } + + let collection = parts[2].to_string(); + let index_name = parts[3].to_string(); + let value = parts[4].as_bytes().to_vec(); + + let command = protocol::Command::QueryByIndex { collection, index_name, value }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(data) = response { + if let Ok(result) = String::from_utf8(data) { + println!("Query result: {}", result); + } else { + println!("Query executed successfully"); + } + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + } + _ => { + println!("Usage: index create [unique]"); + println!(" index query "); + } + } + + Ok(()) + } + + async fn handle_constraint(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: constraint add [value]"); + println!(" constraint remove "); + println!(" constraint list "); + return Ok(()); + } + + match parts[1] { + "add" => { + if parts.len() < 6 { + println!("Usage: constraint add [value]"); + return Ok(()); + } + + let collection = parts[2].to_string(); + let constraint_name = parts[3].to_string(); + let constraint_type = parts[4].to_string(); + let field = parts[5].to_string(); + let value = if parts.len() > 6 { + parts[6..].join(" ").into_bytes() + } else { + vec![] + }; + + let command = protocol::Command::AddConstraint { + collection, + constraint_name, + constraint_type, + field, + value, + }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(_) = response { + println!("Constraint added successfully"); + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + } + "remove" => { + if parts.len() < 4 { + println!("Usage: constraint remove "); + return Ok(()); + } + + let collection = parts[2].to_string(); + let constraint_name = parts[3].to_string(); + + let command = protocol::Command::RemoveConstraint { collection, constraint_name }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(_) = response { + println!("Constraint removed successfully"); + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + } + "list" => { + if parts.len() < 3 { + println!("Usage: constraint list "); + return Ok(()); + } + + println!("Constraints for collection '{}':", parts[2]); + println!(" - (Constraint listing functionality to be implemented)"); + } + _ => { + println!("Usage: constraint add [value]"); + println!(" constraint remove "); + println!(" constraint list "); + } + } + + Ok(()) + } + + async fn handle_procedure(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: procedure create "); + println!(" procedure call "); + return Ok(()); + } + + match parts[1] { + "create" => { + if parts.len() < 4 { + println!("Usage: procedure create "); + return Ok(()); + } + + let name = parts[2].to_string(); + let code = parts[3..].join(" ").into_bytes(); + + let command = protocol::Command::CreateProcedure { name, code }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(_) = response { + println!("Procedure created successfully"); + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + } + "call" => { + if parts.len() < 3 { + println!("Usage: procedure call "); + return Ok(()); + } + + let name = parts[2].to_string(); + let command = protocol::Command::CallProcedure { name }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(data) = response { + if let Ok(result) = String::from_utf8(data) { + println!("Procedure result: {}", result); + } else { + println!("Procedure executed successfully"); + } + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + } + _ => { + println!("Usage: procedure create "); + println!(" procedure call "); + } + } + + Ok(()) + } + + async fn handle_trigger(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: trigger add "); + return Ok(()); + } + + match parts[1] { + "add" => { + if parts.len() < 6 { + println!("Usage: trigger add "); + return Ok(()); + } + + let name = parts[2].to_string(); + let event_str = parts[3]; + let collection = parts[4].to_string(); + let lua_code = parts[5..].join(" "); + + let event = match event_str { + "before_create" => TriggerEvent::BeforeCreate, + "after_create" => TriggerEvent::AfterCreate, + "before_update" => TriggerEvent::BeforeUpdate, + "after_update" => TriggerEvent::AfterUpdate, + "before_delete" => TriggerEvent::BeforeDelete, + "after_delete" => TriggerEvent::AfterDelete, + _ => { + println!("Invalid event type. Use: before_create, after_create, before_update, after_update, before_delete, after_delete"); + return Ok(()); + } + }; + + let trigger = Trigger { + name, + event, + collection, + lua_code, + }; + + match self.database.add_trigger(trigger) { + Ok(_) => { + println!("Trigger added successfully"); + } + Err(e) => { + println!("Error adding trigger: {}", e); + } + } + } + _ => { + println!("Usage: trigger add "); + } + } + + Ok(()) + } + + async fn handle_shard(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: shard add
"); + println!(" shard migrate "); + println!(" shard status"); + return Ok(()); + } + + match parts[1] { + "add" => { + if parts.len() < 5 { + println!("Usage: shard add
"); + return Ok(()); + } + + let node_id = parts[2].to_string(); + let address = parts[3].to_string(); + let capacity: u64 = parts[4].parse().unwrap_or(1024); + + match self.sharding_manager.add_node(node_id, address, capacity) { + Ok(_) => { + println!("Shard node added successfully"); + } + Err(e) => { + println!("Error adding shard node: {}", e); + } + } + } + "migrate" => { + if parts.len() < 6 { + println!("Usage: shard migrate "); + return Ok(()); + } + + let collection = parts[2].to_string(); + let from_node = parts[3].to_string(); + let to_node = parts[4].to_string(); + let shard_key = parts[5].to_string(); + + let command = protocol::Command::MigrateShard { + collection, + from_node, + to_node, + shard_key, + }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(_) = response { + println!("Shard migration initiated"); + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + } + "status" => { + match self.sharding_manager.get_cluster_status() { + Ok(status) => { + println!("Sharding Status:"); + println!(" Cluster Formed: {}", status.cluster_formed); + println!(" Total Nodes: {}", status.nodes.len()); + println!(" Total Capacity: {}", status.total_capacity); + println!(" Total Used: {}", status.total_used); + } + Err(e) => { + println!("Error getting shard status: {}", e); + } + } + } + _ => { + println!("Usage: shard add
"); + println!(" shard migrate "); + println!(" shard status"); + } + } + + Ok(()) + } + + async fn handle_backup(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: backup start"); + println!(" backup restore "); + return Ok(()); + } + + match parts[1] { + "start" => { + match self.database.create_backup() { + Ok(backup) => { + let backup_file = format!("backup_{}.json", chrono::Utc::now().format("%Y%m%d_%H%M%S")); + // Здесь должна быть логика сохранения backup в файл + println!("Backup created successfully. Size: {} collections", backup.len()); + } + Err(e) => { + println!("Error creating backup: {}", e); + } + } + } + "restore" => { + if parts.len() < 3 { + println!("Usage: backup restore "); + return Ok(()); + } + + let backup_path = parts[2]; + println!("Restore functionality for path '{}' to be implemented", backup_path); + } + _ => { + println!("Usage: backup start"); + println!(" backup restore "); + } + } + + Ok(()) + } + + // Базовые методы CRUD (упрощенные) + async fn handle_create(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 3 { + println!("Usage: create "); + return Ok(()); + } + + let collection = parts[1].to_string(); + let document = parts[2..].join(" ").into_bytes(); + + let command = protocol::Command::Create { + collection, + document, + }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(data) = response { + if let Ok(id) = String::from_utf8(data) { + println!("Document created with ID: {}", id); + } else { + println!("Document created successfully"); + } + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + + Ok(()) + } + + async fn handle_read(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 3 { + println!("Usage: read "); + return Ok(()); + } + + let collection = parts[1].to_string(); + let id = parts[2].to_string(); + + let command = protocol::Command::Read { + collection, + id, + }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(data) = response { + if let Ok(document) = String::from_utf8(data) { + println!("{}", document); + } else { + println!("Document read successfully (binary data)"); + } + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + + Ok(()) + } + + async fn handle_update(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 4 { + println!("Usage: update "); + return Ok(()); + } + + let collection = parts[1].to_string(); + let id = parts[2].to_string(); + let document = parts[3..].join(" ").into_bytes(); + + let command = protocol::Command::Update { + collection, + id, + document, + }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(_) = response { + println!("Document updated successfully"); + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + + Ok(()) + } + + async fn handle_delete(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 3 { + println!("Usage: delete "); + return Ok(()); + } + + let collection = parts[1].to_string(); + let id = parts[2].to_string(); + + let command = protocol::Command::Delete { + collection, + id, + }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(_) = response { + println!("Document deleted successfully"); + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + + Ok(()) + } + + async fn handle_list(&self, parts: Vec<&str>) -> Result<()> { + if parts.len() < 2 { + println!("Usage: list [filter]"); + return Ok(()); + } + + let collection = parts[1].to_string(); + let filter = if parts.len() > 2 { + parts[2..].join(" ").into_bytes() + } else { + vec![] + }; + + let command = protocol::Command::Query { + collection, + filter, + }; + + match self.database.execute_command(command) { + Ok(response) => { + if let protocol::Response::Success(data) = response { + if let Ok(documents) = String::from_utf8(data) { + // Используем std::result::Result вместо нашего Result + let parsed: std::result::Result = serde_json::from_str(&documents); + match parsed { + Ok(value) => { + println!("{}", serde_json::to_string_pretty(&value).unwrap()); + } + Err(_) => { + println!("{}", documents); + } + } + } else { + println!("Documents read successfully (binary data)"); + } + } else if let protocol::Response::Error(e) = response { + println!("Error: {}", e); + } + } + Err(e) => { + println!("Error: {}", e); + } + } + + Ok(()) + } + + /// Показать справку по командам + async fn show_help(&self) -> Result<()> { + println!("Available commands:"); + println!(" Basic CRUD:"); + println!(" create - Create document"); + println!(" read - Read document"); + println!(" update - Update document"); + println!(" delete - Delete document"); + println!(" list [filter] - List documents"); + println!(" Transactions:"); + println!(" begin - Start transaction"); + println!(" commit - Commit transaction"); + println!(" rollback - Rollback transaction"); + println!(" Indexes:"); + println!(" index create [unique] - Create index"); + println!(" index query - Query by index"); + println!(" Constraints:"); + println!(" constraint add [value] - Add constraint"); + println!(" constraint remove - Remove constraint"); + println!(" constraint list - List constraints"); + println!(" Procedures:"); + println!(" procedure create - Create stored procedure"); + println!(" procedure call - Call stored procedure"); + println!(" Triggers:"); + println!(" trigger add - Add trigger"); + println!(" Cluster Management:"); + println!(" cluster.status - Show cluster status"); + println!(" add.node - Add node to cluster"); + println!(" evict.node - Remove node from cluster"); + println!(" list.raft.nodes - List Raft nodes"); + println!(" cluster.rebalance - Rebalance cluster"); + println!(" Sharding:"); + println!(" shard add - Add shard node"); + println!(" shard migrate - Migrate shard"); + println!(" shard status - Show shard status"); + println!(" CSV Operations:"); + println!(" csv import - Import CSV to collection"); + println!(" csv export - Export collection to CSV"); + println!(" csv list - List CSV files"); + println!(" csv progress - Show import progress"); + println!(" Backup:"); + println!(" backup start - Create backup"); + println!(" backup restore - Restore from backup"); + println!(" Other:"); + println!(" inbox.stop - Exit database mode"); + println!(" help - Show this help"); + + Ok(()) + } +}