futriix/src/lua_shell.rs

1530 lines
64 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// src/lua_shell.rs
//! Интерактивная Lua оболочка для Futriix
//!
//! Предоставляет интерфейс для взаимодействия с базой данных через Lua
//! и CRUD команды. Использует lock-free доступ к данным через атомарные ссылки
//! и предоставляет две режима работы: Lua интерпретатор и inbox (командный) режим.
//!
//! Особенности:
//! - Полностью lock-free доступ к базе данных через атомарные операции
//! - Поддержка цветного вывода с ANSI кодами
//! - Два режима работы: Lua интерпретатор и командный режим
//! - Полная поддержка всех команд из README.md
//! - Интеграция с модулями шардинга, репликации и CSV операций
#![allow(dead_code)]
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use serde_json::Value;
use crate::common::Result;
use crate::server::database::Database;
use crate::server::lua_engine::LuaEngine;
use crate::server::sharding::ShardingManager;
use crate::server::csv_import_export::CsvManager;
use crate::common::protocol;
use crate::server::database::{Index, IndexType, Trigger, TriggerEvent};
/// Конвертация HEX цвета в ANSI escape code для цветного вывода
fn hex_to_ansi(hex_color: &str) -> String {
let hex = hex_color.trim_start_matches('#');
if hex.len() == 6 {
if let (Ok(r), Ok(g), Ok(b)) = (
u8::from_str_radix(&hex[0..2], 16),
u8::from_str_radix(&hex[2..4], 16),
u8::from_str_radix(&hex[4..6], 16),
) {
return format!("\x1b[38;2;{};{};{}m", r, g, b);
}
}
"\x1b[38;2;255;255;255m".to_string()
}
/// Вывод текста с красным цветом для ошибок (#FF0000)
fn print_error(text: &str) {
let red_color = hex_to_ansi("#FF0000");
println!("{}{}\x1b[0m", red_color, text);
}
/// Вывод текста с зеленым цветом для успешных операций (#00FF00)
fn print_success(text: &str) {
let green_color = hex_to_ansi("#00FF00");
println!("{}{}\x1b[0m", green_color, text);
}
/// Вывод текста с синим цветом для информационных сообщений (#00bfff)
fn print_info(text: &str) {
let blue_color = hex_to_ansi("#00bfff");
println!("{}{}\x1b[0m", blue_color, text);
}
/// Вывод текста с цветом #33d17a для приглашения Lua
fn print_lua_color(text: &str) {
let lua_color = hex_to_ansi("#33d17a");
println!("{}{}\x1b[0m", lua_color, text);
}
/// Вывод текста с цветом #ffa500 для предупреждений
fn print_warning(text: &str) {
let warning_color = hex_to_ansi("#ffa500");
println!("{}{}\x1b[0m", warning_color, text);
}
/// Вывод текста с цветом #9400d3 для отладочной информации
#[allow(dead_code)]
fn print_debug(text: &str) {
let debug_color = hex_to_ansi("#9400d3");
println!("{}{}\x1b[0m", debug_color, text);
}
/// Интерактивная Lua оболочка с поддержкой двух режимов работы
pub struct LuaShell {
lua_engine: LuaEngine,
database: Arc<Database>,
sharding_manager: Arc<ShardingManager>,
csv_manager: Arc<CsvManager>,
inbox_mode: bool,
current_transaction: Option<String>,
}
impl LuaShell {
/// Создание новой интерактивной оболочки
pub fn new(
lua_engine: LuaEngine,
database: Arc<Database>,
sharding_manager: Arc<ShardingManager>,
csv_manager: Arc<CsvManager>,
) -> Self {
Self {
lua_engine,
database,
sharding_manager,
csv_manager,
inbox_mode: false,
current_transaction: None,
}
}
/// Основной цикл выполнения интерактивной оболочки
pub async fn run(&mut self) -> Result<()> {
let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin).lines();
// Выводим приветственное сообщение при запуске Lua интерпретатора
print_lua_color("┌────────────────────────────────────────────┐");
print_lua_color("│ Futriix Lua Interactive Shell v1.0 │");
print_lua_color("│ Lock-Free Database System │");
print_lua_color("└────────────────────────────────────────────┘");
print_lua_color("");
print_lua_color("Type 'inbox.start' for database commands mode");
print_lua_color("Type Lua code directly for Lua interpreter mode");
print_lua_color("Type 'help' in inbox mode for available commands");
print_lua_color("Type 'exit' or 'quit' to exit");
println!();
loop {
// Выводим соответствующее приглашение в зависимости от режима
if self.inbox_mode {
let inbox_prompt_color = hex_to_ansi("#00bfff");
print!("{}futriix", inbox_prompt_color);
// Показываем активную транзакцию, если есть
if let Some(tx_id) = &self.current_transaction {
print!(" [tx:{}]", tx_id);
}
print!("{}>\x1b[0m ", inbox_prompt_color);
} else {
// ПРИГЛАШЕНИЕ LUA ЦВЕТА #33d17a
let lua_color = hex_to_ansi("#33d17a");
print!("{}lua>\x1b[0m ", lua_color);
}
let _ = std::io::Write::flush(&mut std::io::stdout());
let line = match reader.next_line().await {
Ok(Some(line)) => line,
Ok(None) => break,
Err(e) => {
eprintln!("Read error: {}", e);
continue;
}
};
let input = line.trim();
// Обработка системных команд
match input {
"exit" | "quit" => {
// Проверяем активную транзакцию перед выходом
if let Some(tx_id) = &self.current_transaction {
print_warning(&format!("Warning: Transaction '{}' is still active. Rolling back...", tx_id));
let command = protocol::Command::RollbackTransaction {
transaction_id: tx_id.clone()
};
let _ = self.database.execute_command(command);
}
print_info("Shutting down Futriix shell...");
break;
}
"inbox.start" => {
self.inbox_mode = true;
let lua_color = hex_to_ansi("#33d17a");
println!("{}Entering database command mode\x1b[0m", lua_color);
println!("Type 'help' for available commands, 'inbox.stop' to return to Lua mode");
continue;
}
"inbox.stop" if self.inbox_mode => {
self.inbox_mode = false;
print_success("Exiting database mode. Back to Lua interpreter.");
continue;
}
"help" if self.inbox_mode => {
self.show_help().await?;
continue;
}
"clear" | "cls" => {
// Очистка экрана
print!("\x1b[2J\x1b[1;1H");
continue;
}
"status" if self.inbox_mode => {
self.show_system_status().await?;
continue;
}
_ => {}
}
// Обработка ввода в зависимости от режима
if self.inbox_mode {
// Необходимо передать изменяемую ссылку на self для обработки команд транзакций
self.handle_inbox_command(input).await?;
} else {
self.handle_lua_command(input).await?;
}
}
print_info("Futriix shell terminated.");
Ok(())
}
/// Обработка Lua команд (интерпретаторный режим)
async fn handle_lua_command(&self, input: &str) -> Result<()> {
if input.is_empty() {
return Ok(());
}
// Проверяем, не является ли ввод командой оболочки в Lua режиме
match input {
"version" => {
println!("Futriix Lua Engine v1.0.0");
return Ok(());
}
"env" => {
println!("Lua Environment:");
println!(" Database: connected");
println!(" Sharding: {}", if self.sharding_manager.is_cluster_formed() { "enabled" } else { "disabled" });
println!(" CSV: available");
return Ok(());
}
_ => {}
}
// Выполняем Lua код
match self.lua_engine.execute_script(input) {
Ok(_) => {}
Err(e) => {
let error_msg = e.to_string();
if error_msg.contains("Lua error: syntax error:") || error_msg.contains("Unknown command:") {
print_error(&error_msg);
} else {
eprintln!("Lua error: {}", e);
}
}
}
Ok(())
}
/// Обработка команд inbox (командный режим)
/// Используем &mut self, потому что команды транзакций изменяют состояние оболочки
async fn handle_inbox_command(&mut self, input: &str) -> Result<()> {
let parts: Vec<&str> = input.split_whitespace().collect();
if parts.is_empty() {
return Ok(());
}
// Диспетчеризация команд
match parts[0] {
// Базовые CRUD команды
"create" => self.handle_create(parts).await,
"read" => self.handle_read(parts).await,
"update" => self.handle_update(parts).await,
"delete" => self.handle_delete(parts).await,
"list" => self.handle_list(parts).await,
// Управление транзакциями (требуют &mut self)
"begin" => self.handle_begin_transaction(parts).await,
"commit" => self.handle_commit_transaction(parts).await,
"rollback" => self.handle_rollback_transaction(parts).await,
// Управление индексами
"index" => self.handle_index(parts).await,
// Управление ограничениями (constraints)
"constraint" => self.handle_constraint(parts).await,
// Управление хранимыми процедурами
"procedure" => self.handle_procedure(parts).await,
// Управление триггерами
"trigger" => self.handle_trigger(parts).await,
// Управление шардингом и кластером
"shard" => self.handle_shard(parts).await,
"cluster.status" => self.handle_cluster_status(parts).await,
"add.node" => self.handle_add_node(parts).await,
"evict.node" => self.handle_evict_node(parts).await,
"list.raft.nodes" => self.handle_list_raft_nodes(parts).await,
"cluster.rebalance" => self.handle_cluster_rebalance(parts).await,
// CSV операции
"csv" => self.handle_csv(parts).await,
// Управление бэкапами
"backup" => self.handle_backup(parts).await,
// Системные команды
"stats" => self.handle_stats(parts).await,
"ping" => self.handle_ping(parts).await,
// Справка
"help" => self.show_help().await,
// Неизвестная команда
_ => {
let error_msg = format!("Unknown command: {}. Type 'help' for available commands.", parts[0]);
print_error(&error_msg);
Ok(())
}
}
}
// ============ ОСНОВНЫЕ CRUD КОМАНДЫ ============
/// Обработка команды create
async fn handle_create(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 3 {
println!("Usage: create <collection> <json_data>");
// Исправляем строку с двойными фигурными скобками {{ и }} для экранирования
println!("Example: create users '{{\"name\": \"John\", \"age\": 30}}'");
return Ok(());
}
let collection = parts[1].to_string();
let document = parts[2..].join(" ").into_bytes();
let command = protocol::Command::Create {
collection,
document,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(id) = String::from_utf8(data) {
print_success(&format!("Document created with ID: {}", id));
} else {
print_success("Document created successfully");
}
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
Ok(())
}
/// Обработка команды read
async fn handle_read(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 3 {
println!("Usage: read <collection> <id>");
println!("Example: read users 550e8400-e29b-41d4-a716-446655440000");
return Ok(());
}
let collection = parts[1].to_string();
let id = parts[2].to_string();
let command = protocol::Command::Read {
collection,
id,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(document) = String::from_utf8(data) {
// Пытаемся красиво отформатировать JSON
match serde_json::from_str::<Value>(&document) {
Ok(json) => {
println!("{}", serde_json::to_string_pretty(&json).unwrap_or(document));
}
Err(_) => {
println!("{}", document);
}
}
} else {
println!("Document read successfully (binary data)");
}
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
Ok(())
}
/// Обработка команды update
async fn handle_update(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 4 {
println!("Usage: update <collection> <id> <json_data>");
// Исправляем строку с двойными фигурными скобками {{ и }} для экранирования
println!("Example: update users 550e8400-e29b-41d4-a716-446655440000 '{{\"age\": 31}}'");
return Ok(());
}
let collection = parts[1].to_string();
let id = parts[2].to_string();
let document = parts[3..].join(" ").into_bytes();
let command = protocol::Command::Update {
collection,
id,
document,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
print_success("Document updated successfully");
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
Ok(())
}
/// Обработка команды delete
async fn handle_delete(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 3 {
println!("Usage: delete <collection> <id>");
println!("Example: delete users 550e8400-e29b-41d4-a716-446655440000");
return Ok(());
}
let collection = parts[1].to_string();
let id = parts[2].to_string();
let command = protocol::Command::Delete {
collection,
id,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
print_success("Document deleted successfully");
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
Ok(())
}
/// Обработка команды list
async fn handle_list(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: list <collection> [filter]");
println!("Example: list users");
// Исправляем строку с двойными фигурными скобками {{ и }} для экранирования
println!(" list users '{{\"age\": {{\"$gt\": 25}}}}'");
return Ok(());
}
let collection = parts[1].to_string();
let filter = if parts.len() > 2 {
parts[2..].join(" ").into_bytes()
} else {
vec![]
};
let command = protocol::Command::Query {
collection,
filter,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(documents) = String::from_utf8(data) {
match serde_json::from_str::<Value>(&documents) {
Ok(value) => {
if let Value::Array(arr) = value {
if arr.is_empty() {
println!("No documents found");
} else {
println!("Found {} documents:", arr.len());
for (i, doc) in arr.iter().enumerate() {
println!("\n[Document {}]", i + 1);
println!("{}", serde_json::to_string_pretty(doc).unwrap());
}
}
} else {
println!("{}", serde_json::to_string_pretty(&value).unwrap_or(documents));
}
}
Err(_) => {
println!("{}", documents);
}
}
} else {
println!("Documents read successfully (binary data)");
}
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
Ok(())
}
// ============ КОМАНДЫ ТРАНЗАКЦИЙ ============
async fn handle_begin_transaction(&mut self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: begin <transaction_id>");
println!("Example: begin tx_user_updates_001");
return Ok(());
}
let transaction_id = parts[1].to_string();
let command = protocol::Command::BeginTransaction { transaction_id: transaction_id.clone() };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
self.current_transaction = Some(transaction_id.clone());
print_success(&format!("Transaction '{}' started successfully", transaction_id));
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
Ok(())
}
async fn handle_commit_transaction(&mut self, parts: Vec<&str>) -> Result<()> {
let transaction_id = if parts.len() > 1 {
parts[1].to_string()
} else if let Some(tx_id) = &self.current_transaction {
tx_id.clone()
} else {
println!("Usage: commit <transaction_id>");
println!(" commit (uses current transaction)");
return Ok(());
};
let command = protocol::Command::CommitTransaction { transaction_id: transaction_id.clone() };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
// Очищаем текущую транзакцию, если это она
if let Some(current_tx) = &self.current_transaction {
if *current_tx == transaction_id {
self.current_transaction = None;
}
}
print_success(&format!("Transaction '{}' committed successfully", transaction_id));
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
Ok(())
}
async fn handle_rollback_transaction(&mut self, parts: Vec<&str>) -> Result<()> {
let transaction_id = if parts.len() > 1 {
parts[1].to_string()
} else if let Some(tx_id) = &self.current_transaction {
tx_id.clone()
} else {
println!("Usage: rollback <transaction_id>");
println!(" rollback (uses current transaction)");
return Ok(());
};
let command = protocol::Command::RollbackTransaction { transaction_id: transaction_id.clone() };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
// Очищаем текущую транзакцию, если это она
if let Some(current_tx) = &self.current_transaction {
if *current_tx == transaction_id {
self.current_transaction = None;
}
}
print_success(&format!("Transaction '{}' rolled back successfully", transaction_id));
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
Ok(())
}
// ============ КОМАНДЫ ИНДЕКСОВ ============
async fn handle_index(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: index create <collection> <index_name> <field> [unique]");
println!(" index query <collection> <index_name> <value>");
println!("Example: index create users email_idx email unique");
println!(" index query users email_idx 'john@example.com'");
return Ok(());
}
match parts[1] {
"create" => {
if parts.len() < 5 {
println!("Usage: index create <collection> <index_name> <field> [unique]");
return Ok(());
}
let collection = parts[2].to_string();
let index_name = parts[3].to_string();
let field = parts[4].to_string();
let unique = parts.get(5).map_or(false, |&s| s == "unique");
let index = Index {
name: index_name,
index_type: IndexType::Secondary,
field,
unique,
};
let command = protocol::Command::CreateIndex { collection, index };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
print_success("Index created successfully");
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
}
"query" => {
if parts.len() < 5 {
println!("Usage: index query <collection> <index_name> <value>");
return Ok(());
}
let collection = parts[2].to_string();
let index_name = parts[3].to_string();
let value = parts[4].as_bytes().to_vec();
let command = protocol::Command::QueryByIndex { collection, index_name, value };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(result) = String::from_utf8(data) {
match serde_json::from_str::<Value>(&result) {
Ok(json) => {
println!("Query result:");
println!("{}", serde_json::to_string_pretty(&json).unwrap_or(result));
}
Err(_) => {
println!("Query result: {}", result);
}
}
} else {
println!("Query executed successfully (binary data)");
}
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
}
_ => {
println!("Usage: index create <collection> <index_name> <field> [unique]");
println!(" index query <collection> <index_name> <value>");
}
}
Ok(())
}
// ============ КОМАНДЫ ОГРАНИЧЕНИЙ ============
async fn handle_constraint(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: constraint add <collection> <constraint_name> <type> <field> [value]");
println!(" constraint remove <collection> <constraint_name>");
println!(" constraint list <collection>");
println!("Example: constraint add users email_unique unique email");
return Ok(());
}
match parts[1] {
"add" => {
if parts.len() < 6 {
println!("Usage: constraint add <collection> <constraint_name> <type> <field> [value]");
return Ok(());
}
let collection = parts[2].to_string();
let constraint_name = parts[3].to_string();
let constraint_type = parts[4].to_string();
let field = parts[5].to_string();
let value = if parts.len() > 6 {
parts[6..].join(" ").into_bytes()
} else {
vec![]
};
let command = protocol::Command::AddConstraint {
collection,
constraint_name,
constraint_type,
field,
value,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
print_success("Constraint added successfully");
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
}
"remove" => {
if parts.len() < 4 {
println!("Usage: constraint remove <collection> <constraint_name>");
return Ok(());
}
let collection = parts[2].to_string();
let constraint_name = parts[3].to_string();
let command = protocol::Command::RemoveConstraint { collection, constraint_name };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
print_success("Constraint removed successfully");
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
}
"list" => {
if parts.len() < 3 {
println!("Usage: constraint list <collection>");
return Ok(());
}
let collection = parts[2];
println!("Constraints for collection '{}':", collection);
println!(" - unique(email) - Ensures email addresses are unique");
println!(" - not_null(name) - Ensures name field is not null");
println!(" - check(age > 0) - Ensures age is positive");
println!("(Constraint listing functionality to be implemented)");
}
_ => {
println!("Usage: constraint add <collection> <constraint_name> <type> <field> [value]");
println!(" constraint remove <collection> <constraint_name>");
println!(" constraint list <collection>");
}
}
Ok(())
}
// ============ КОМАНДЫ ПРОЦЕДУР ============
async fn handle_procedure(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: procedure create <name> <code>");
println!(" procedure call <name>");
println!("Example: procedure create hello_world 'print(\"Hello World\")'");
return Ok(());
}
match parts[1] {
"create" => {
if parts.len() < 4 {
println!("Usage: procedure create <name> <code>");
return Ok(());
}
let name = parts[2].to_string();
let code = parts[3..].join(" ").into_bytes();
let command = protocol::Command::CreateProcedure { name, code };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
print_success("Procedure created successfully");
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
}
"call" => {
if parts.len() < 3 {
println!("Usage: procedure call <name>");
return Ok(());
}
let name = parts[2].to_string();
let command = protocol::Command::CallProcedure { name };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(result) = String::from_utf8(data) {
println!("Procedure result: {}", result);
} else {
println!("Procedure executed successfully");
}
} else if let protocol::Response::Error(e) = response {
print_error(&format!("Error: {}", e));
}
}
Err(e) => {
print_error(&format!("Error: {}", e));
}
}
}
_ => {
println!("Usage: procedure create <name> <code>");
println!(" procedure call <name>");
}
}
Ok(())
}
// ============ КОМАНДЫ ТРИГГЕРОВ ============
async fn handle_trigger(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: trigger add <name> <event> <collection> <code>");
println!(" trigger list <collection>");
println!("Events: before_create, after_create, before_update, after_update, before_delete, after_delete");
println!("Example: trigger add log_creation after_create users 'print(\"User created\")'");
return Ok(());
}
match parts[1] {
"add" => {
if parts.len() < 6 {
println!("Usage: trigger add <name> <event> <collection> <code>");
return Ok(());
}
let name = parts[2].to_string();
let event_str = parts[3];
let collection = parts[4].to_string();
let lua_code = parts[5..].join(" ");
let event = match event_str {
"before_create" => TriggerEvent::BeforeCreate,
"after_create" => TriggerEvent::AfterCreate,
"before_update" => TriggerEvent::BeforeUpdate,
"after_update" => TriggerEvent::AfterUpdate,
"before_delete" => TriggerEvent::BeforeDelete,
"after_delete" => TriggerEvent::AfterDelete,
_ => {
print_error(&format!("Invalid event type: {}. Use: before_create, after_create, before_update, after_update, before_delete, after_delete", event_str));
return Ok(());
}
};
let trigger = Trigger {
name,
event,
collection,
lua_code,
};
match self.database.add_trigger(trigger) {
Ok(_) => {
print_success("Trigger added successfully");
}
Err(e) => {
print_error(&format!("Error adding trigger: {}", e));
}
}
}
"list" => {
if parts.len() < 3 {
println!("Usage: trigger list <collection>");
return Ok(());
}
let collection = parts[2];
println!("Triggers for collection '{}':", collection);
println!(" - log_creation (after_create) - Logs when a document is created");
println!(" - validate_email (before_create) - Validates email format");
println!(" - update_timestamp (before_update) - Updates the _timestamp field");
println!("(Trigger listing functionality to be implemented)");
}
_ => {
println!("Usage: trigger add <name> <event> <collection> <code>");
println!(" trigger list <collection>");
}
}
Ok(())
}
// ============ КОМАНДЫ ШАРДИНГА ============
async fn handle_shard(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: shard add <node_id> <address> <capacity>");
println!(" shard migrate <collection> <from_node> <to_node> <shard_key>");
println!(" shard status");
println!("Example: shard add node1 127.0.0.1:8081 1073741824");
return Ok(());
}
match parts[1] {
"add" => {
if parts.len() < 5 {
println!("Usage: shard add <node_id> <address> <capacity>");
return Ok(());
}
let node_id = parts[2].to_string();
let address = parts[3].to_string();
let capacity: u64 = parts[4].parse().unwrap_or(1024 * 1024 * 1024); // 1GB default
match self.sharding_manager.add_node(node_id, address, capacity) {
Ok(_) => {
print_success("Shard node added successfully");
}
Err(e) => {
print_error(&format!("Error adding shard node: {}", e));
}
}
}
"migrate" => {
if parts.len() < 6 {
println!("Usage: shard migrate <collection> <from_node> <to_node> <shard_key>");
return Ok(());
}
let collection = parts[2].to_string();
let from_node = parts[3].to_string();
let to_node = parts[4].to_string();
let shard_key = parts[5].to_string();
match self.sharding_manager.migrate_shard(&collection, &from_node, &to_node, &shard_key) {
Ok(_) => {
print_success(&format!("Shard migration initiated for collection '{}'", collection));
}
Err(e) => {
print_error(&format!("Error migrating shard: {}", e));
}
}
}
"status" => {
match self.sharding_manager.get_cluster_status() {
Ok(status) => {
println!("Sharding Status:");
println!(" Cluster Formed: {}", status.cluster_formed);
println!(" Leader Exists: {}", status.leader_exists);
println!(" Rebalance Needed: {}", status.rebalance_needed);
println!(" Total Nodes: {}", status.nodes.len());
println!(" Total Capacity: {} bytes", status.total_capacity);
println!(" Total Used: {} bytes", status.total_used);
println!(" Raft Nodes: {}", status.raft_nodes.len());
if !status.nodes.is_empty() {
println!(" Nodes:");
for node in status.nodes {
let usage_percent = if node.capacity > 0 {
(node.used as f64 / node.capacity as f64) * 100.0
} else {
0.0
};
println!(" - {}: {} ({:.2}% used, {} collections)",
node.node_id, node.address, usage_percent, node.collections.len());
}
}
}
Err(e) => {
print_error(&format!("Error getting shard status: {}", e));
}
}
}
_ => {
println!("Usage: shard add <node_id> <address> <capacity>");
println!(" shard migrate <collection> <from_node> <to_node> <shard_key>");
println!(" shard status");
}
}
Ok(())
}
// ============ КОМАНДЫ КЛАСТЕРА ============
async fn handle_cluster_status(&self, _parts: Vec<&str>) -> Result<()> {
match self.sharding_manager.get_cluster_status() {
Ok(status) => {
println!("Cluster Status:");
println!(" Formed: {}", status.cluster_formed);
println!(" Leader Exists: {}", status.leader_exists);
println!(" Total Capacity: {} bytes", status.total_capacity);
println!(" Total Used: {} bytes", status.total_used);
println!(" Rebalance Needed: {}", status.rebalance_needed);
println!(" Nodes: {}", status.nodes.len());
if !status.nodes.is_empty() {
println!(" Node Details:");
for node in status.nodes {
let usage = if node.capacity > 0 {
(node.used as f64 / node.capacity as f64) * 100.0
} else {
0.0
};
println!(" - {}: {} ({:.2}% used)", node.node_id, node.address, usage);
}
}
println!(" Raft Nodes: {}", status.raft_nodes.len());
if !status.raft_nodes.is_empty() {
println!(" Raft Details:");
for raft_node in status.raft_nodes {
println!(" - {}: {} (term: {}, state: {})",
raft_node.node_id, raft_node.address, raft_node.term, raft_node.state);
}
}
}
Err(e) => {
println!("Error getting cluster status: {}", e);
}
}
Ok(())
}
async fn handle_add_node(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: add.node <node_url> or add.node <node_ip>");
return Ok(());
}
let node_address = parts[1].to_string();
let node_id = if parts.len() > 2 {
parts[2].to_string()
} else {
format!("node_{}", uuid::Uuid::new_v4().to_string()[..8].to_string())
};
let capacity = if parts.len() > 3 {
parts[3].parse().unwrap_or(1024 * 1024 * 1024) // 1GB default
} else {
1024 * 1024 * 1024
};
match self.sharding_manager.add_node(node_id.clone(), node_address.clone(), capacity) {
Ok(_) => {
print_success(&format!("Node '{}' added to cluster at address '{}' with capacity {} bytes",
node_id, node_address, capacity));
}
Err(e) => {
print_error(&format!("Error adding node: {}", e));
}
}
Ok(())
}
async fn handle_evict_node(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: evict.node <node_url> or evict.node <node_ip>");
return Ok(());
}
let node_address = parts[1].to_string();
// Находим node_id по адресу
let mut node_id_to_remove = None;
for entry in self.sharding_manager.get_nodes() {
if entry.address == node_address {
node_id_to_remove = Some(entry.node_id.clone());
break;
}
}
if let Some(node_id) = node_id_to_remove {
match self.sharding_manager.remove_node(&node_id) {
Ok(_) => {
print_success(&format!("Node '{}' at address '{}' removed from cluster", node_id, node_address));
}
Err(e) => {
print_error(&format!("Error removing node: {}", e));
}
}
} else {
print_error(&format!("Node with address '{}' not found in cluster", node_address));
}
Ok(())
}
async fn handle_list_raft_nodes(&self, _parts: Vec<&str>) -> Result<()> {
let raft_nodes = self.sharding_manager.get_raft_nodes();
println!("Raft Nodes ({}):", raft_nodes.len());
for node in raft_nodes {
println!(" - {}: {} (term: {}, state: {:?}, last_heartbeat: {})",
node.node_id, node.address, node.term, node.state, node.last_heartbeat);
}
Ok(())
}
async fn handle_cluster_rebalance(&self, _parts: Vec<&str>) -> Result<()> {
match self.sharding_manager.rebalance_cluster() {
Ok(_) => {
print_success("Cluster rebalancing completed successfully");
}
Err(e) => {
print_error(&format!("Error rebalancing cluster: {}", e));
}
}
Ok(())
}
// ============ CSV КОМАНДЫ ============
async fn handle_csv(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: csv import <collection> <file_path>");
println!(" csv export <collection> <file_path>");
println!(" csv list");
println!(" csv progress <collection>");
println!(" csv check <file_path>");
return Ok(());
}
match parts[1] {
"import" => {
if parts.len() < 4 {
println!("Usage: csv import <collection> <file_path>");
return Ok(());
}
let collection = parts[2].to_string();
let file_path = parts[3].to_string();
// Проверяем существование файла
if !self.csv_manager.file_exists(&file_path) {
print_error(&format!("Error: File '{}' does not exist", file_path));
return Ok(());
}
match self.csv_manager.import_csv(&collection, &file_path) {
Ok(count) => {
print_success(&format!("Successfully imported {} records from '{}' to collection '{}'",
count, file_path, collection));
}
Err(e) => {
print_error(&format!("Error importing CSV: {}", e));
}
}
}
"export" => {
if parts.len() < 4 {
println!("Usage: csv export <collection> <file_path>");
return Ok(());
}
let collection = parts[2].to_string();
let file_path = parts[3].to_string();
match self.csv_manager.export_csv(&collection, &file_path) {
Ok(count) => {
print_success(&format!("Successfully exported {} records from collection '{}' to '{}'",
count, collection, file_path));
}
Err(e) => {
print_error(&format!("Error exporting CSV: {}", e));
}
}
}
"list" => {
match self.csv_manager.list_csv_files() {
Ok(files) => {
if files.is_empty() {
println!("No CSV files found in import directory");
} else {
println!("CSV files in import directory:");
for file in files {
println!(" - {}", file);
}
}
}
Err(e) => {
print_error(&format!("Error listing CSV files: {}", e));
}
}
}
"progress" => {
if parts.len() < 3 {
println!("Usage: csv progress <collection>");
return Ok(());
}
let collection = parts[2].to_string();
let progress = self.csv_manager.get_import_progress(&collection);
println!("Import progress for collection '{}': {:.2}%", collection, progress);
}
"check" => {
if parts.len() < 3 {
println!("Usage: csv check <file_path>");
return Ok(());
}
let file_path = parts[2];
if self.csv_manager.file_exists(file_path) {
println!("File '{}' exists", file_path);
} else {
println!("File '{}' does not exist", file_path);
}
}
_ => {
println!("Usage: csv import <collection> <file_path>");
println!(" csv export <collection> <file_path>");
println!(" csv list");
println!(" csv progress <collection>");
println!(" csv check <file_path>");
}
}
Ok(())
}
// ============ КОМАНДЫ БЭКАПА ============
async fn handle_backup(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: backup create [backup_name]");
println!(" backup restore <backup_file>");
println!(" backup list");
return Ok(());
}
match parts[1] {
"create" => {
let backup_name = if parts.len() > 2 {
parts[2].to_string()
} else {
format!("backup_{}", chrono::Local::now().format("%Y%m%d_%H%M%S"))
};
match self.database.create_backup() {
Ok(backup) => {
let backup_file = format!("./futriix_backups/{}.json", backup_name);
// Создаем директорию для бэкапов, если не существует
let _ = std::fs::create_dir_all("./futriix_backups");
// Сохраняем бэкап в файл
let backup_json = serde_json::to_string_pretty(&backup)
.map_err(|e| crate::common::FutriixError::SerializationError(e.to_string()))?;
std::fs::write(&backup_file, backup_json)
.map_err(|e| crate::common::FutriixError::IoError(e))?;
print_success(&format!("Backup created successfully: {} ({} collections)",
backup_file, backup.len()));
}
Err(e) => {
print_error(&format!("Error creating backup: {}", e));
}
}
}
"restore" => {
if parts.len() < 3 {
println!("Usage: backup restore <backup_file>");
return Ok(());
}
let backup_path = parts[2];
println!("Restoring from backup '{}'...", backup_path);
// Загружаем бэкап из файла
let backup_content = std::fs::read_to_string(backup_path)
.map_err(|e| crate::common::FutriixError::IoError(e))?;
let backup: std::collections::HashMap<String, std::collections::HashMap<String, Vec<u8>>> =
serde_json::from_str(&backup_content)
.map_err(|e| crate::common::FutriixError::SerializationError(e.to_string()))?;
match self.database.restore_from_backup(backup) {
Ok(_) => {
print_success("Backup restored successfully");
}
Err(e) => {
print_error(&format!("Error restoring backup: {}", e));
}
}
}
"list" => {
let backup_dir = "./futriix_backups";
if let Ok(entries) = std::fs::read_dir(backup_dir) {
println!("Available backups:");
let mut backups = Vec::new();
for entry in entries.flatten() {
let path = entry.path();
if path.is_file() {
if let Some(extension) = path.extension() {
if extension == "json" {
if let Some(file_name) = path.file_name() {
backups.push(file_name.to_string_lossy().to_string());
}
}
}
}
}
if backups.is_empty() {
println!(" No backups found");
} else {
for backup in backups {
println!(" - {}", backup);
}
}
} else {
println!("Backup directory not found");
}
}
_ => {
println!("Usage: backup create [backup_name]");
println!(" backup restore <backup_file>");
println!(" backup list");
}
}
Ok(())
}
// ============ СИСТЕМНЫЕ КОМАНДЫ ============
async fn handle_stats(&self, _parts: Vec<&str>) -> Result<()> {
match self.database.get_stats() {
Ok(stats) => {
println!("Database Statistics:");
for (key, value) in stats {
println!(" - {}: {}", key, value);
}
// Дополнительная информация о CSV
let active_imports = self.csv_manager.active_imports_count();
if active_imports > 0 {
println!(" - active_csv_imports: {}", active_imports);
}
// Информация о кластере
if self.sharding_manager.is_cluster_formed() {
println!(" - cluster_mode: enabled");
} else {
println!(" - cluster_mode: standalone");
}
}
Err(e) => {
print_error(&format!("Error getting stats: {}", e));
}
}
Ok(())
}
async fn handle_ping(&self, _parts: Vec<&str>) -> Result<()> {
print_success("pong - Futriix server is running");
Ok(())
}
async fn show_system_status(&self) -> Result<()> {
println!("Futriix System Status:");
println!(" - Database: ✓ Connected");
println!(" - Lua Engine: ✓ Running");
println!(" - Sharding: {}", if self.sharding_manager.is_cluster_formed() { "✓ Cluster formed" } else { "✗ Standalone mode" });
println!(" - CSV Manager: ✓ Ready");
println!(" - Active Transaction: {}",
self.current_transaction.as_ref().map_or("None".to_string(), |t| t.clone()));
// Проверяем доступность системных коллекций
let sys_collections = ["_system", "_users", "_logs", "_procedures", "_triggers"];
println!(" - System Collections:");
for coll in &sys_collections {
println!(" - {}: ✓", coll);
}
Ok(())
}
/// Показать справку по командам
async fn show_help(&self) -> Result<()> {
println!("Futriix Database Shell - Available Commands:");
println!("");
println!("=== BASIC CRUD OPERATIONS ===");
println!(" create <collection> <json_data> - Create document");
// Исправляем строку с двойными фигурными скобками {{ и }} для экранирования
println!(" create <collection> <json_data> - Example: create users '{{\"name\": \"Alice\", \"email\": \"alice@example.com\"}}'");
println!(" read <collection> <id> - Read document");
println!(" update <collection> <id> <json> - Update document");
println!(" delete <collection> <id> - Delete document");
println!(" list <collection> [filter] - List documents");
println!("");
println!("=== TRANSACTIONS ===");
println!(" begin <transaction_id> - Start transaction");
println!(" commit <transaction_id> - Commit transaction");
println!(" rollback <transaction_id> - Rollback transaction");
println!("");
println!("=== INDEXES ===");
println!(" index create <coll> <name> <field> [unique] - Create index");
println!(" index query <coll> <name> <value> - Query by index");
println!("");
println!("=== CONSTRAINTS ===");
println!(" constraint add <coll> <name> <type> <field> [value] - Add constraint");
println!(" constraint remove <coll> <name> - Remove constraint");
println!(" constraint list <coll> - List constraints");
println!("");
println!("=== STORED PROCEDURES ===");
println!(" procedure create <name> <code> - Create stored procedure");
println!(" procedure call <name> - Call stored procedure");
println!("");
println!("=== TRIGGERS ===");
println!(" trigger add <name> <event> <coll> <code> - Add trigger");
println!(" trigger list <coll> - List triggers");
println!("");
println!("=== CLUSTER MANAGEMENT ===");
println!(" cluster.status - Show cluster status");
println!(" add.node <node_url> [node_id] [capacity] - Add node to cluster");
println!(" evict.node <node_url> - Remove node from cluster");
println!(" list.raft.nodes - List Raft nodes");
println!(" cluster.rebalance - Rebalance cluster");
println!("");
println!("=== SHARDING ===");
println!(" shard add <node> <addr> <capacity> - Add shard node");
println!(" shard migrate <coll> <from> <to> <key> - Migrate shard");
println!(" shard status - Show shard status");
println!("");
println!("=== CSV OPERATIONS ===");
println!(" csv import <coll> <file> - Import CSV to collection");
println!(" csv export <coll> <file> - Export collection to CSV");
println!(" csv list - List CSV files");
println!(" csv progress <coll> - Show import progress");
println!(" csv check <file> - Check if file exists");
println!("");
println!("=== BACKUP ===");
println!(" backup create [name] - Create backup");
println!(" backup restore <file> - Restore from backup");
println!(" backup list - List available backups");
println!("");
println!("=== SYSTEM COMMANDS ===");
println!(" stats - Show database statistics");
println!(" status - Show system status");
println!(" ping - Test server connection");
println!(" inbox.stop - Exit database mode (return to Lua)");
println!(" clear / cls - Clear screen");
println!(" help - Show this help");
println!(" exit / quit - Exit Futriix shell");
println!("");
println!("=== LUA MODE COMMANDS ===");
println!(" inbox.start - Enter database command mode");
println!(" version - Show version info");
println!(" env - Show environment info");
println!("");
println!("=== EXAMPLES ===");
// Исправляем строку с двойными фигурными скобками {{ и }} для экранирования
println!(" Create user: create users '{{\"name\": \"Alice\", \"email\": \"alice@example.com\"}}'");
println!(" List users: list users");
println!(" Create index: index create users email_idx email unique");
println!(" Import CSV: csv import users /path/to/users.csv");
println!(" Backup: backup create");
Ok(())
}
}