Files
futriix-old/src/lua_shell.rs

1120 lines
42 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// src/lua_shell.rs
//! Интерактивная Lua оболочка для Futriix
//!
//! Предоставляет интерфейс для взаимодействия с базой данных через Lua
//! и CRUD команды. Использует wait-free доступ к данным через атомарные ссылки.
#![allow(dead_code)]
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, BufReader};
use serde_json::Value;
use crate::common::Result;
use crate::server::database::Database;
use crate::server::lua_engine::LuaEngine;
use crate::server::sharding::ShardingManager;
use crate::server::csv_import_export::CsvManager;
use crate::common::protocol;
use crate::server::database::{Index, IndexType, Trigger, TriggerEvent};
/// Конвертация HEX цвета в ANSI escape code
fn hex_to_ansi(hex_color: &str) -> String {
let hex = hex_color.trim_start_matches('#');
if hex.len() == 6 {
if let (Ok(r), Ok(g), Ok(b)) = (
u8::from_str_radix(&hex[0..2], 16),
u8::from_str_radix(&hex[2..4], 16),
u8::from_str_radix(&hex[4..6], 16),
) {
return format!("\x1b[38;2;{};{};{}m", r, g, b);
}
}
"\x1b[38;2;255;255;255m".to_string()
}
/// Вывод текста с красным цветом для ошибок
fn print_error(text: &str) {
let red_color = hex_to_ansi("#FF0000");
println!("{}{}\x1b[0m", red_color, text);
}
/// Вывод текста с зеленым цветом для успеха
fn print_success(text: &str) {
let green_color = hex_to_ansi("#00FF00");
println!("{}{}\x1b[0m", green_color, text);
}
/// Вывод текста с синим цветом для информации
fn print_info(text: &str) {
let blue_color = hex_to_ansi("#00bfff");
println!("{}{}\x1b[0m", blue_color, text);
}
/// Вывод текста с цветом #33d17a для приглашения Lua
fn print_lua_color(text: &str) {
let lua_color = hex_to_ansi("#33d17a"); // Новый цвет #33d17a
println!("{}{}\x1b[0m", lua_color, text);
}
/// Интерактивная Lua оболочка
pub struct LuaShell {
lua_engine: LuaEngine,
database: Arc<Database>,
sharding_manager: Arc<ShardingManager>,
csv_manager: Arc<CsvManager>,
inbox_mode: bool,
}
impl LuaShell {
pub fn new(
lua_engine: LuaEngine,
database: Arc<Database>,
sharding_manager: Arc<ShardingManager>,
csv_manager: Arc<CsvManager>,
) -> Self {
Self {
lua_engine,
database,
sharding_manager,
csv_manager,
inbox_mode: false,
}
}
/// Запуск интерактивной оболочки
pub async fn run(&mut self) -> Result<()> {
let stdin = tokio::io::stdin();
let mut reader = BufReader::new(stdin).lines();
// Выводим приветственное сообщение при запуске Lua интерпретатора
print_lua_color("Lua interpreter started. Type 'inbox.start' for database commands or Lua code.");
println!();
loop {
if self.inbox_mode {
let inbox_prompt_color = hex_to_ansi("#00bfff");
print!("{}futriix:~>\x1b[0m ", inbox_prompt_color);
} else {
// ПРИГЛАШЕНИЕ LUA ТЕПЕРЬ ЦВЕТА #33d17a
let lua_color = hex_to_ansi("#33d17a");
print!("{}lua>\x1b[0m ", lua_color);
}
let _ = std::io::Write::flush(&mut std::io::stdout());
let line = match reader.next_line().await {
Ok(Some(line)) => line,
Ok(None) => break,
Err(e) => {
eprintln!("Read error: {}", e);
continue;
}
};
let input = line.trim();
match input {
"exit" | "quit" => break,
"inbox.start" => {
self.inbox_mode = true;
// ИЗМЕНЕНИЕ: убрана точка и изменен цвет на #33d17a
let lua_color = hex_to_ansi("#33d17a");
println!("{}Entering database mode\x1b[0m", lua_color);
continue;
}
"inbox.stop" if self.inbox_mode => {
self.inbox_mode = false;
print_success("Exiting database mode. Back to Lua interpreter.");
continue;
}
"help" if self.inbox_mode => {
self.show_help().await?;
continue;
}
_ => {}
}
if self.inbox_mode {
self.handle_inbox_command(input).await?;
} else {
self.handle_lua_command(input).await?;
}
}
print_info("Shutting down Futriix server...");
Ok(())
}
/// Обработка Lua команд
async fn handle_lua_command(&self, input: &str) -> Result<()> {
if input.is_empty() {
return Ok(());
}
match self.lua_engine.execute_script(input) {
Ok(_) => {}
Err(e) => {
let error_msg = e.to_string();
if error_msg.contains("Lua error: syntax error:") || error_msg.contains("Unknown command:") {
print_error(&error_msg);
} else {
eprintln!("Lua error: {}", e);
}
}
}
Ok(())
}
/// Обработка команд inbox (CRUD + новые команды)
async fn handle_inbox_command(&self, input: &str) -> Result<()> {
let parts: Vec<&str> = input.split_whitespace().collect();
if parts.is_empty() {
return Ok(());
}
match parts[0] {
// Базовые CRUD команды
"create" => self.handle_create(parts).await,
"read" => self.handle_read(parts).await,
"update" => self.handle_update(parts).await,
"delete" => self.handle_delete(parts).await,
"list" => self.handle_list(parts).await,
// Новые команды для управления кластером
"cluster.status" => self.handle_cluster_status(parts).await,
"add.node" => self.handle_add_node(parts).await,
"evict.node" => self.handle_evict_node(parts).await,
"list.raft.nodes" => self.handle_list_raft_nodes(parts).await,
"cluster.rebalance" => self.handle_cluster_rebalance(parts).await,
// Новые команды для CSV операций
"csv" => self.handle_csv(parts).await,
// НОВЫЕ КОМАНДЫ ИЗ README
"begin" => self.handle_begin_transaction(parts).await,
"commit" => self.handle_commit_transaction(parts).await,
"rollback" => self.handle_rollback_transaction(parts).await,
"index" => self.handle_index(parts).await,
"constraint" => self.handle_constraint(parts).await,
"procedure" => self.handle_procedure(parts).await,
"trigger" => self.handle_trigger(parts).await,
"shard" => self.handle_shard(parts).await,
"backup" => self.handle_backup(parts).await,
"help" => self.show_help().await,
_ => {
let error_msg = format!("Unknown command: {}. Type 'help' for available commands.", parts[0]);
print_error(&error_msg);
Ok(())
}
}
}
// Новые методы для управления кластером
async fn handle_cluster_status(&self, _parts: Vec<&str>) -> Result<()> {
match self.sharding_manager.get_cluster_status() {
Ok(status) => {
println!("Cluster Status:");
println!(" Formed: {}", status.cluster_formed);
println!(" Leader Exists: {}", status.leader_exists);
println!(" Total Capacity: {}", status.total_capacity);
println!(" Total Used: {}", status.total_used);
println!(" Nodes: {}", status.nodes.len());
for node in status.nodes {
println!(" - {}: {} ({}% used)", node.node_id, node.address, (node.used as f64 / node.capacity as f64) * 100.0);
}
println!(" Raft Nodes: {}", status.raft_nodes.len());
for raft_node in status.raft_nodes {
println!(" - {}: {} (term: {}, state: {})", raft_node.node_id, raft_node.address, raft_node.term, raft_node.state);
}
}
Err(e) => {
println!("Error getting cluster status: {}", e);
}
}
Ok(())
}
async fn handle_add_node(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: add.node <node_url> or add.node <node_ip>");
return Ok(());
}
let node_address = parts[1].to_string();
let node_id = format!("node_{}", uuid::Uuid::new_v4().to_string()[..8].to_string());
match self.sharding_manager.add_node(node_id.clone(), node_address.clone(), 1024 * 1024 * 1024) {
Ok(_) => {
println!("Node '{}' added to cluster at address '{}'", node_id, node_address);
}
Err(e) => {
println!("Error adding node: {}", e);
}
}
Ok(())
}
async fn handle_evict_node(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: evict.node <node_url> or evict.node <node_ip>");
return Ok(());
}
let node_address = parts[1].to_string();
// Находим node_id по адресу
let mut node_id_to_remove = None;
for entry in self.sharding_manager.get_nodes() {
if entry.address == node_address {
node_id_to_remove = Some(entry.node_id.clone());
break;
}
}
if let Some(node_id) = node_id_to_remove {
match self.sharding_manager.remove_node(&node_id) {
Ok(_) => {
println!("Node '{}' at address '{}' removed from cluster", node_id, node_address);
}
Err(e) => {
println!("Error removing node: {}", e);
}
}
} else {
println!("Node with address '{}' not found in cluster", node_address);
}
Ok(())
}
async fn handle_list_raft_nodes(&self, _parts: Vec<&str>) -> Result<()> {
let raft_nodes = self.sharding_manager.get_raft_nodes();
println!("Raft Nodes ({}):", raft_nodes.len());
for node in raft_nodes {
println!(" - {}: {} (term: {}, state: {:?}, last_heartbeat: {})",
node.node_id, node.address, node.term, node.state, node.last_heartbeat);
}
Ok(())
}
async fn handle_cluster_rebalance(&self, _parts: Vec<&str>) -> Result<()> {
match self.sharding_manager.rebalance_cluster() {
Ok(_) => {
println!("Cluster rebalancing completed successfully");
}
Err(e) => {
println!("Error rebalancing cluster: {}", e);
}
}
Ok(())
}
// Новый метод для CSV операций
async fn handle_csv(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: csv import <collection> <file_path>");
println!(" csv export <collection> <file_path>");
println!(" csv list");
println!(" csv progress <collection>");
return Ok(());
}
match parts[1] {
"import" => {
if parts.len() < 4 {
println!("Usage: csv import <collection> <file_path>");
return Ok(());
}
let collection = parts[2].to_string();
let file_path = parts[3].to_string();
match self.csv_manager.import_csv(&collection, &file_path) {
Ok(count) => {
println!("Successfully imported {} records from '{}'", count, file_path);
}
Err(e) => {
println!("Error importing CSV: {}", e);
}
}
}
"export" => {
if parts.len() < 4 {
println!("Usage: csv export <collection> <file_path>");
return Ok(());
}
let collection = parts[2].to_string();
let file_path = parts[3].to_string();
match self.csv_manager.export_csv(&collection, &file_path) {
Ok(count) => {
println!("Successfully exported {} records to '{}'", count, file_path);
}
Err(e) => {
println!("Error exporting CSV: {}", e);
}
}
}
"list" => {
match self.csv_manager.list_csv_files() {
Ok(files) => {
if files.is_empty() {
println!("No CSV files found");
} else {
println!("CSV files:");
for file in files {
println!(" - {}", file);
}
}
}
Err(e) => {
println!("Error listing CSV files: {}", e);
}
}
}
"progress" => {
if parts.len() < 3 {
println!("Usage: csv progress <collection>");
return Ok(());
}
let collection = parts[2].to_string();
let progress = self.csv_manager.get_import_progress(&collection);
println!("Import progress for '{}': {:.2}%", collection, progress);
}
_ => {
println!("Usage: csv import <collection> <file_path>");
println!(" csv export <collection> <file_path>");
println!(" csv list");
println!(" csv progress <collection>");
}
}
Ok(())
}
// НОВЫЕ МЕТОДЫ ДЛЯ РЕАЛИЗАЦИИ КОМАНД ИЗ README
async fn handle_begin_transaction(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: begin <transaction_id>");
return Ok(());
}
let transaction_id = parts[1].to_string();
let command = protocol::Command::BeginTransaction { transaction_id };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Transaction '{}' started successfully", parts[1]);
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_commit_transaction(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: commit <transaction_id>");
return Ok(());
}
let transaction_id = parts[1].to_string();
let command = protocol::Command::CommitTransaction { transaction_id };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Transaction '{}' committed successfully", parts[1]);
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_rollback_transaction(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: rollback <transaction_id>");
return Ok(());
}
let transaction_id = parts[1].to_string();
let command = protocol::Command::RollbackTransaction { transaction_id };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Transaction '{}' rolled back successfully", parts[1]);
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_index(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: index create <collection> <index_name> <field> [unique]");
println!(" index query <collection> <index_name> <value>");
return Ok(());
}
match parts[1] {
"create" => {
if parts.len() < 5 {
println!("Usage: index create <collection> <index_name> <field> [unique]");
return Ok(());
}
let collection = parts[2].to_string();
let index_name = parts[3].to_string();
let field = parts[4].to_string();
let unique = parts.get(5).map_or(false, |&s| s == "unique");
let index = Index {
name: index_name,
index_type: IndexType::Secondary,
field,
unique,
};
let command = protocol::Command::CreateIndex { collection, index };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Index created successfully");
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
}
"query" => {
if parts.len() < 5 {
println!("Usage: index query <collection> <index_name> <value>");
return Ok(());
}
let collection = parts[2].to_string();
let index_name = parts[3].to_string();
let value = parts[4].as_bytes().to_vec();
let command = protocol::Command::QueryByIndex { collection, index_name, value };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(result) = String::from_utf8(data) {
println!("Query result: {}", result);
} else {
println!("Query executed successfully");
}
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
}
_ => {
println!("Usage: index create <collection> <index_name> <field> [unique]");
println!(" index query <collection> <index_name> <value>");
}
}
Ok(())
}
async fn handle_constraint(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: constraint add <collection> <constraint_name> <type> <field> [value]");
println!(" constraint remove <collection> <constraint_name>");
println!(" constraint list <collection>");
return Ok(());
}
match parts[1] {
"add" => {
if parts.len() < 6 {
println!("Usage: constraint add <collection> <constraint_name> <type> <field> [value]");
return Ok(());
}
let collection = parts[2].to_string();
let constraint_name = parts[3].to_string();
let constraint_type = parts[4].to_string();
let field = parts[5].to_string();
let value = if parts.len() > 6 {
parts[6..].join(" ").into_bytes()
} else {
vec![]
};
let command = protocol::Command::AddConstraint {
collection,
constraint_name,
constraint_type,
field,
value,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Constraint added successfully");
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
}
"remove" => {
if parts.len() < 4 {
println!("Usage: constraint remove <collection> <constraint_name>");
return Ok(());
}
let collection = parts[2].to_string();
let constraint_name = parts[3].to_string();
let command = protocol::Command::RemoveConstraint { collection, constraint_name };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Constraint removed successfully");
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
}
"list" => {
if parts.len() < 3 {
println!("Usage: constraint list <collection>");
return Ok(());
}
println!("Constraints for collection '{}':", parts[2]);
println!(" - (Constraint listing functionality to be implemented)");
}
_ => {
println!("Usage: constraint add <collection> <constraint_name> <type> <field> [value]");
println!(" constraint remove <collection> <constraint_name>");
println!(" constraint list <collection>");
}
}
Ok(())
}
async fn handle_procedure(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: procedure create <name> <code>");
println!(" procedure call <name>");
return Ok(());
}
match parts[1] {
"create" => {
if parts.len() < 4 {
println!("Usage: procedure create <name> <code>");
return Ok(());
}
let name = parts[2].to_string();
let code = parts[3..].join(" ").into_bytes();
let command = protocol::Command::CreateProcedure { name, code };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Procedure created successfully");
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
}
"call" => {
if parts.len() < 3 {
println!("Usage: procedure call <name>");
return Ok(());
}
let name = parts[2].to_string();
let command = protocol::Command::CallProcedure { name };
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(result) = String::from_utf8(data) {
println!("Procedure result: {}", result);
} else {
println!("Procedure executed successfully");
}
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
}
_ => {
println!("Usage: procedure create <name> <code>");
println!(" procedure call <name>");
}
}
Ok(())
}
async fn handle_trigger(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: trigger add <name> <event> <collection> <code>");
return Ok(());
}
match parts[1] {
"add" => {
if parts.len() < 6 {
println!("Usage: trigger add <name> <event> <collection> <code>");
return Ok(());
}
let name = parts[2].to_string();
let event_str = parts[3];
let collection = parts[4].to_string();
let lua_code = parts[5..].join(" ");
let event = match event_str {
"before_create" => TriggerEvent::BeforeCreate,
"after_create" => TriggerEvent::AfterCreate,
"before_update" => TriggerEvent::BeforeUpdate,
"after_update" => TriggerEvent::AfterUpdate,
"before_delete" => TriggerEvent::BeforeDelete,
"after_delete" => TriggerEvent::AfterDelete,
_ => {
println!("Invalid event type. Use: before_create, after_create, before_update, after_update, before_delete, after_delete");
return Ok(());
}
};
let trigger = Trigger {
name,
event,
collection,
lua_code,
};
match self.database.add_trigger(trigger) {
Ok(_) => {
println!("Trigger added successfully");
}
Err(e) => {
println!("Error adding trigger: {}", e);
}
}
}
_ => {
println!("Usage: trigger add <name> <event> <collection> <code>");
}
}
Ok(())
}
async fn handle_shard(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: shard add <node_id> <address> <capacity>");
println!(" shard migrate <collection> <from_node> <to_node> <shard_key>");
println!(" shard status");
return Ok(());
}
match parts[1] {
"add" => {
if parts.len() < 5 {
println!("Usage: shard add <node_id> <address> <capacity>");
return Ok(());
}
let node_id = parts[2].to_string();
let address = parts[3].to_string();
let capacity: u64 = parts[4].parse().unwrap_or(1024);
match self.sharding_manager.add_node(node_id, address, capacity) {
Ok(_) => {
println!("Shard node added successfully");
}
Err(e) => {
println!("Error adding shard node: {}", e);
}
}
}
"migrate" => {
if parts.len() < 6 {
println!("Usage: shard migrate <collection> <from_node> <to_node> <shard_key>");
return Ok(());
}
let collection = parts[2].to_string();
let from_node = parts[3].to_string();
let to_node = parts[4].to_string();
let shard_key = parts[5].to_string();
let command = protocol::Command::MigrateShard {
collection,
from_node,
to_node,
shard_key,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Shard migration initiated");
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
}
"status" => {
match self.sharding_manager.get_cluster_status() {
Ok(status) => {
println!("Sharding Status:");
println!(" Cluster Formed: {}", status.cluster_formed);
println!(" Total Nodes: {}", status.nodes.len());
println!(" Total Capacity: {}", status.total_capacity);
println!(" Total Used: {}", status.total_used);
}
Err(e) => {
println!("Error getting shard status: {}", e);
}
}
}
_ => {
println!("Usage: shard add <node_id> <address> <capacity>");
println!(" shard migrate <collection> <from_node> <to_node> <shard_key>");
println!(" shard status");
}
}
Ok(())
}
async fn handle_backup(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: backup start");
println!(" backup restore <backup_path>");
return Ok(());
}
match parts[1] {
"start" => {
match self.database.create_backup() {
Ok(backup) => {
let backup_file = format!("backup_{}.json", chrono::Utc::now().format("%Y%m%d_%H%M%S"));
// Здесь должна быть логика сохранения backup в файл
println!("Backup created successfully. Size: {} collections", backup.len());
}
Err(e) => {
println!("Error creating backup: {}", e);
}
}
}
"restore" => {
if parts.len() < 3 {
println!("Usage: backup restore <backup_path>");
return Ok(());
}
let backup_path = parts[2];
println!("Restore functionality for path '{}' to be implemented", backup_path);
}
_ => {
println!("Usage: backup start");
println!(" backup restore <backup_path>");
}
}
Ok(())
}
// Базовые методы CRUD (упрощенные)
async fn handle_create(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 3 {
println!("Usage: create <collection> <json_data>");
return Ok(());
}
let collection = parts[1].to_string();
let document = parts[2..].join(" ").into_bytes();
let command = protocol::Command::Create {
collection,
document,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(id) = String::from_utf8(data) {
println!("Document created with ID: {}", id);
} else {
println!("Document created successfully");
}
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_read(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 3 {
println!("Usage: read <collection> <id>");
return Ok(());
}
let collection = parts[1].to_string();
let id = parts[2].to_string();
let command = protocol::Command::Read {
collection,
id,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(document) = String::from_utf8(data) {
println!("{}", document);
} else {
println!("Document read successfully (binary data)");
}
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_update(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 4 {
println!("Usage: update <collection> <id> <json_data>");
return Ok(());
}
let collection = parts[1].to_string();
let id = parts[2].to_string();
let document = parts[3..].join(" ").into_bytes();
let command = protocol::Command::Update {
collection,
id,
document,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Document updated successfully");
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_delete(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 3 {
println!("Usage: delete <collection> <id>");
return Ok(());
}
let collection = parts[1].to_string();
let id = parts[2].to_string();
let command = protocol::Command::Delete {
collection,
id,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(_) = response {
println!("Document deleted successfully");
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
async fn handle_list(&self, parts: Vec<&str>) -> Result<()> {
if parts.len() < 2 {
println!("Usage: list <collection> [filter]");
return Ok(());
}
let collection = parts[1].to_string();
let filter = if parts.len() > 2 {
parts[2..].join(" ").into_bytes()
} else {
vec![]
};
let command = protocol::Command::Query {
collection,
filter,
};
match self.database.execute_command(command) {
Ok(response) => {
if let protocol::Response::Success(data) = response {
if let Ok(documents) = String::from_utf8(data) {
// Используем std::result::Result вместо нашего Result
let parsed: std::result::Result<Value, _> = serde_json::from_str(&documents);
match parsed {
Ok(value) => {
println!("{}", serde_json::to_string_pretty(&value).unwrap());
}
Err(_) => {
println!("{}", documents);
}
}
} else {
println!("Documents read successfully (binary data)");
}
} else if let protocol::Response::Error(e) = response {
println!("Error: {}", e);
}
}
Err(e) => {
println!("Error: {}", e);
}
}
Ok(())
}
/// Показать справку по командам
async fn show_help(&self) -> Result<()> {
println!("Available commands:");
println!(" Basic CRUD:");
println!(" create <collection> <json_data> - Create document");
println!(" read <collection> <id> - Read document");
println!(" update <collection> <id> <json> - Update document");
println!(" delete <collection> <id> - Delete document");
println!(" list <collection> [filter] - List documents");
println!(" Transactions:");
println!(" begin <transaction_id> - Start transaction");
println!(" commit <transaction_id> - Commit transaction");
println!(" rollback <transaction_id> - Rollback transaction");
println!(" Indexes:");
println!(" index create <coll> <name> <field> [unique] - Create index");
println!(" index query <coll> <name> <value> - Query by index");
println!(" Constraints:");
println!(" constraint add <coll> <name> <type> <field> [value] - Add constraint");
println!(" constraint remove <coll> <name> - Remove constraint");
println!(" constraint list <coll> - List constraints");
println!(" Procedures:");
println!(" procedure create <name> <code> - Create stored procedure");
println!(" procedure call <name> - Call stored procedure");
println!(" Triggers:");
println!(" trigger add <name> <event> <coll> <code> - Add trigger");
println!(" Cluster Management:");
println!(" cluster.status - Show cluster status");
println!(" add.node <node_url> - Add node to cluster");
println!(" evict.node <node_url> - Remove node from cluster");
println!(" list.raft.nodes - List Raft nodes");
println!(" cluster.rebalance - Rebalance cluster");
println!(" Sharding:");
println!(" shard add <node> <addr> <capacity> - Add shard node");
println!(" shard migrate <coll> <from> <to> <key> - Migrate shard");
println!(" shard status - Show shard status");
println!(" CSV Operations:");
println!(" csv import <coll> <file> - Import CSV to collection");
println!(" csv export <coll> <file> - Export collection to CSV");
println!(" csv list - List CSV files");
println!(" csv progress <coll> - Show import progress");
println!(" Backup:");
println!(" backup start - Create backup");
println!(" backup restore <path> - Restore from backup");
println!(" Other:");
println!(" inbox.stop - Exit database mode");
println!(" help - Show this help");
Ok(())
}
}