1092 lines
40 KiB
Rust
1092 lines
40 KiB
Rust
// src/lua_shell.rs
|
||
//! Интерактивная Lua оболочка для Falcot
|
||
//!
|
||
//! Предоставляет интерфейс для взаимодействия с базой данных через Lua
|
||
//! и CRUD команды. Использует wait-free доступ к данным через атомарные ссылки.
|
||
|
||
#![allow(dead_code)]
|
||
|
||
use std::sync::Arc;
|
||
use tokio::io::{AsyncBufReadExt, BufReader};
|
||
use serde_json::Value;
|
||
|
||
use crate::common::error::Result;
|
||
use crate::server::database::Database;
|
||
use crate::server::lua_engine::LuaEngine;
|
||
use crate::server::replication::ReplicationManager;
|
||
use crate::common::protocol;
|
||
use crate::server::database::{Index, IndexType};
|
||
|
||
/// Конвертация HEX цвета в ANSI escape code
|
||
fn hex_to_ansi(hex_color: &str) -> String {
|
||
let hex = hex_color.trim_start_matches('#');
|
||
|
||
if hex.len() == 6 {
|
||
if let (Ok(r), Ok(g), Ok(b)) = (
|
||
u8::from_str_radix(&hex[0..2], 16),
|
||
u8::from_str_radix(&hex[2..4], 16),
|
||
u8::from_str_radix(&hex[4..6], 16),
|
||
) {
|
||
return format!("\x1b[38;2;{};{};{}m", r, g, b);
|
||
}
|
||
}
|
||
|
||
"\x1b[38;2;255;255;255m".to_string()
|
||
}
|
||
|
||
/// Вывод текста с красным цветом для ошибок
|
||
fn print_error(text: &str) {
|
||
let red_color = hex_to_ansi("#FF0000");
|
||
println!("{}{}\x1b[0m", red_color, text);
|
||
}
|
||
|
||
/// Вывод текста с зеленым цветом для успеха
|
||
fn print_success(text: &str) {
|
||
let green_color = hex_to_ansi("#00FF00");
|
||
println!("{}{}\x1b[0m", green_color, text);
|
||
}
|
||
|
||
/// Вывод текста с синим цветом для информации
|
||
fn print_info(text: &str) {
|
||
let blue_color = hex_to_ansi("#00bfff");
|
||
println!("{}{}\x1b[0m", blue_color, text);
|
||
}
|
||
|
||
/// Интерактивная Lua оболочка
|
||
pub struct LuaShell {
|
||
lua_engine: LuaEngine,
|
||
database: Arc<Database>,
|
||
replication: Arc<ReplicationManager>,
|
||
inbox_mode: bool,
|
||
}
|
||
|
||
impl LuaShell {
|
||
pub fn new(
|
||
lua_engine: LuaEngine,
|
||
database: Arc<Database>,
|
||
replication: Arc<ReplicationManager>,
|
||
) -> Self {
|
||
Self {
|
||
lua_engine,
|
||
database,
|
||
replication,
|
||
inbox_mode: false,
|
||
}
|
||
}
|
||
|
||
/// Запуск интерактивной оболочки
|
||
pub async fn run(&mut self) -> Result<()> {
|
||
let stdin = tokio::io::stdin();
|
||
let mut reader = BufReader::new(stdin).lines();
|
||
|
||
println!();
|
||
print_info("Falcot Database Shell v1.0.0");
|
||
print_info("Type 'inbox.start' to enter database mode.");
|
||
print_info("Type 'help' for available commands.");
|
||
print_info("Type 'exit' to quit.");
|
||
println!();
|
||
|
||
let lua_prompt_color = hex_to_ansi("#90EE90");
|
||
|
||
loop {
|
||
if self.inbox_mode {
|
||
let inbox_prompt_color = hex_to_ansi("#00bfff");
|
||
print!("{}falcot:~>\x1b[0m ", inbox_prompt_color);
|
||
} else {
|
||
print!("{}lua>\x1b[0m ", lua_prompt_color);
|
||
}
|
||
|
||
let _ = std::io::Write::flush(&mut std::io::stdout());
|
||
|
||
let line = match reader.next_line().await {
|
||
Ok(Some(line)) => line,
|
||
Ok(None) => break,
|
||
Err(e) => {
|
||
eprintln!("Read error: {}", e);
|
||
continue;
|
||
}
|
||
};
|
||
|
||
let input = line.trim();
|
||
|
||
match input {
|
||
"exit" | "quit" => break,
|
||
"inbox.start" => {
|
||
self.inbox_mode = true;
|
||
print_success("Entering database mode. Type CRUD commands or 'inbox.stop' to exit.");
|
||
continue;
|
||
}
|
||
"inbox.stop" if self.inbox_mode => {
|
||
self.inbox_mode = false;
|
||
print_success("Exiting database mode. Back to Lua interpreter.");
|
||
continue;
|
||
}
|
||
"help" if self.inbox_mode => {
|
||
self.show_help().await?;
|
||
continue;
|
||
}
|
||
_ => {}
|
||
}
|
||
|
||
if self.inbox_mode {
|
||
self.handle_inbox_command(input).await?;
|
||
} else {
|
||
self.handle_lua_command(input).await?;
|
||
}
|
||
}
|
||
|
||
print_info("Shutting down Falcot server...");
|
||
Ok(())
|
||
}
|
||
|
||
/// Обработка Lua команд
|
||
async fn handle_lua_command(&self, input: &str) -> Result<()> {
|
||
if input.is_empty() {
|
||
return Ok(());
|
||
}
|
||
|
||
match self.lua_engine.execute_script(input) {
|
||
Ok(_) => {}
|
||
Err(e) => {
|
||
let error_msg = e.to_string();
|
||
if error_msg.contains("Lua error: syntax error:") || error_msg.contains("Unknown command:") {
|
||
print_error(&error_msg);
|
||
} else {
|
||
eprintln!("Lua error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// Обработка команд inbox (CRUD + новые команды)
|
||
async fn handle_inbox_command(&self, input: &str) -> Result<()> {
|
||
let parts: Vec<&str> = input.split_whitespace().collect();
|
||
|
||
if parts.is_empty() {
|
||
return Ok(());
|
||
}
|
||
|
||
match parts[0] {
|
||
// Базовые CRUD команды
|
||
"create" => self.handle_create(parts).await,
|
||
"read" => self.handle_read(parts).await,
|
||
"update" => self.handle_update(parts).await,
|
||
"delete" => self.handle_delete(parts).await,
|
||
"list" => self.handle_list(parts).await,
|
||
"begin" => self.handle_begin(parts).await,
|
||
"commit" => self.handle_commit(parts).await,
|
||
"rollback" => self.handle_rollback(parts).await,
|
||
"procedure" => self.handle_procedure(parts).await,
|
||
"trigger" => self.handle_trigger(parts).await,
|
||
"backup" => self.handle_backup(parts).await,
|
||
"index" => self.handle_index(parts).await,
|
||
|
||
// Новые команды для Constraints
|
||
"constraint" => self.handle_constraint(parts).await,
|
||
|
||
// Новые команды для шардинга
|
||
"shard" => self.handle_shard(parts).await,
|
||
"cluster" => self.handle_cluster(parts).await,
|
||
|
||
// Новые команды для компрессии
|
||
"compression" => self.handle_compression(parts).await,
|
||
|
||
// Команды для глобальных индексов
|
||
"global_index" => self.handle_global_index(parts).await,
|
||
|
||
"help" => self.show_help().await,
|
||
_ => {
|
||
let error_msg = format!("Unknown command: {}. Type 'help' for available commands.", parts[0]);
|
||
print_error(&error_msg);
|
||
Ok(())
|
||
}
|
||
}
|
||
}
|
||
|
||
async fn handle_create(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 3 {
|
||
println!("Usage: create <collection> <json_data>");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[1].to_string();
|
||
let document = parts[2..].join(" ").into_bytes();
|
||
|
||
let command = protocol::Command::Create {
|
||
collection,
|
||
document,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(data) = response {
|
||
if let Ok(id) = String::from_utf8(data) {
|
||
println!("Document created with ID: {}", id);
|
||
} else {
|
||
println!("Document created successfully");
|
||
}
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_read(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 3 {
|
||
println!("Usage: read <collection> <id>");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[1].to_string();
|
||
let id = parts[2].to_string();
|
||
|
||
let command = protocol::Command::Read {
|
||
collection,
|
||
id,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(data) = response {
|
||
if let Ok(document) = String::from_utf8(data) {
|
||
println!("{}", document);
|
||
} else {
|
||
println!("Document read successfully (binary data)");
|
||
}
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_update(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 4 {
|
||
println!("Usage: update <collection> <id> <json_data>");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[1].to_string();
|
||
let id = parts[2].to_string();
|
||
let document = parts[3..].join(" ").into_bytes();
|
||
|
||
let command = protocol::Command::Update {
|
||
collection,
|
||
id,
|
||
document,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Document updated successfully");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_delete(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 3 {
|
||
println!("Usage: delete <collection> <id>");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[1].to_string();
|
||
let id = parts[2].to_string();
|
||
|
||
let command = protocol::Command::Delete {
|
||
collection,
|
||
id,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Document deleted successfully");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_list(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 2 {
|
||
println!("Usage: list <collection> [filter]");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[1].to_string();
|
||
let filter = if parts.len() > 2 {
|
||
parts[2..].join(" ").into_bytes()
|
||
} else {
|
||
vec![]
|
||
};
|
||
|
||
let command = protocol::Command::Query {
|
||
collection,
|
||
filter,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(data) = response {
|
||
if let Ok(documents) = String::from_utf8(data) {
|
||
// Используем std::result::Result вместо нашего Result
|
||
let parsed: std::result::Result<Value, _> = serde_json::from_str(&documents);
|
||
match parsed {
|
||
Ok(value) => {
|
||
println!("{}", serde_json::to_string_pretty(&value).unwrap());
|
||
}
|
||
Err(_) => {
|
||
println!("{}", documents);
|
||
}
|
||
}
|
||
} else {
|
||
println!("Documents read successfully (binary data)");
|
||
}
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_begin(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 2 {
|
||
println!("Usage: begin <transaction_id>");
|
||
return Ok(());
|
||
}
|
||
|
||
let transaction_id = parts[1].to_string();
|
||
|
||
let command = protocol::Command::BeginTransaction {
|
||
transaction_id,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Transaction started");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_commit(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 2 {
|
||
println!("Usage: commit <transaction_id>");
|
||
return Ok(());
|
||
}
|
||
|
||
let transaction_id = parts[1].to_string();
|
||
|
||
let command = protocol::Command::CommitTransaction {
|
||
transaction_id,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Transaction committed");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_rollback(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 2 {
|
||
println!("Usage: rollback <transaction_id>");
|
||
return Ok(());
|
||
}
|
||
|
||
let transaction_id = parts[1].to_string();
|
||
|
||
let command = protocol::Command::RollbackTransaction {
|
||
transaction_id,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Transaction rolled back");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_procedure(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 3 {
|
||
println!("Usage: procedure create <name> <lua_code>");
|
||
println!(" procedure call <name>");
|
||
return Ok(());
|
||
}
|
||
|
||
match parts[1] {
|
||
"create" => {
|
||
if parts.len() < 4 {
|
||
println!("Usage: procedure create <name> <lua_code>");
|
||
return Ok(());
|
||
}
|
||
|
||
let name = parts[2].to_string();
|
||
let code = parts[3..].join(" ").into_bytes();
|
||
|
||
let command = protocol::Command::CreateProcedure {
|
||
name,
|
||
code,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Procedure created");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
"call" => {
|
||
if parts.len() < 3 {
|
||
println!("Usage: procedure call <name>");
|
||
return Ok(());
|
||
}
|
||
|
||
let name = parts[2].to_string();
|
||
|
||
let command = protocol::Command::CallProcedure {
|
||
name,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(data) = response {
|
||
if let Ok(result) = String::from_utf8(data) {
|
||
println!("{}", result);
|
||
} else {
|
||
println!("Procedure executed successfully");
|
||
}
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
_ => {
|
||
println!("Usage: procedure create <name> <lua_code>");
|
||
println!(" procedure call <name>");
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_trigger(&self, parts: Vec<&str>) -> Result<()> {
|
||
println!("Triggers not implemented yet");
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_backup(&self, _parts: Vec<&str>) -> Result<()> {
|
||
match self.database.create_backup() {
|
||
Ok(backup) => {
|
||
let total_documents: usize = backup.values().map(|coll| coll.len()).sum();
|
||
println!("Backup created: {} collections, {} documents", backup.len(), total_documents);
|
||
}
|
||
Err(e) => {
|
||
println!("Error creating backup: {}", e);
|
||
}
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_index(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 4 {
|
||
println!("Usage: index create <collection> <name> <field> [unique]");
|
||
println!(" index query <collection> <index_name> <value>");
|
||
return Ok(());
|
||
}
|
||
|
||
match parts[1] {
|
||
"create" => {
|
||
if parts.len() < 5 {
|
||
println!("Usage: index create <collection> <name> <field> [unique]");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[2].to_string();
|
||
let name = parts[3].to_string();
|
||
let field = parts[4].to_string();
|
||
let unique = parts.get(5).map_or(false, |&s| s == "true" || s == "unique");
|
||
|
||
let index = Index {
|
||
name: name.clone(),
|
||
index_type: IndexType::Secondary,
|
||
field,
|
||
unique,
|
||
};
|
||
|
||
let command = protocol::Command::CreateIndex {
|
||
collection,
|
||
index,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Index '{}' created", name);
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
"query" => {
|
||
if parts.len() < 5 {
|
||
println!("Usage: index query <collection> <index_name> <value>");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[2].to_string();
|
||
let index_name = parts[3].to_string();
|
||
let value = parts[4..].join(" ").into_bytes();
|
||
|
||
let command = protocol::Command::QueryByIndex {
|
||
collection,
|
||
index_name,
|
||
value,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(data) = response {
|
||
if let Ok(result) = String::from_utf8(data) {
|
||
// Используем std::result::Result вместо нашего Result
|
||
let parsed: std::result::Result<Vec<String>, _> = serde_json::from_str(&result);
|
||
match parsed {
|
||
Ok(ids) => {
|
||
println!("Found {} documents:", ids.len());
|
||
for id in ids {
|
||
println!(" - {}", id);
|
||
}
|
||
}
|
||
Err(_) => {
|
||
println!("Result: {}", result);
|
||
}
|
||
}
|
||
} else {
|
||
println!("Query executed successfully (binary result)");
|
||
}
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
_ => {
|
||
println!("Usage: index create <collection> <name> <field> [unique]");
|
||
println!(" index query <collection> <index_name> <value>");
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_constraint(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 2 {
|
||
println!("Usage: constraint add <collection> <name> <type> <field> [value]");
|
||
println!(" constraint remove <collection> <name>");
|
||
return Ok(());
|
||
}
|
||
|
||
match parts[1] {
|
||
"add" => {
|
||
if parts.len() < 6 {
|
||
println!("Usage: constraint add <collection> <name> <type> <field> [value]");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[2].to_string();
|
||
let name = parts[3].to_string();
|
||
let constraint_type = parts[4].to_string();
|
||
let field = parts[5].to_string();
|
||
let value = if parts.len() > 6 {
|
||
parts[6..].join(" ").into_bytes()
|
||
} else {
|
||
vec![]
|
||
};
|
||
|
||
let command = protocol::Command::AddConstraint {
|
||
collection,
|
||
constraint_name: name,
|
||
constraint_type,
|
||
field,
|
||
value,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Constraint added");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
"remove" => {
|
||
if parts.len() < 4 {
|
||
println!("Usage: constraint remove <collection> <name>");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[2].to_string();
|
||
let name = parts[3].to_string();
|
||
|
||
let command = protocol::Command::RemoveConstraint {
|
||
collection,
|
||
constraint_name: name,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Constraint removed");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
_ => {
|
||
println!("Usage: constraint add <collection> <name> <type> <field> [value]");
|
||
println!(" constraint remove <collection> <name>");
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_shard(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 2 {
|
||
println!("Usage: shard add <node_id> <address> <capacity>");
|
||
println!(" shard remove <node_id>");
|
||
println!(" shard migrate <collection> <from_node> <to_node> <shard_key>");
|
||
return Ok(());
|
||
}
|
||
|
||
match parts[1] {
|
||
"add" => {
|
||
if parts.len() < 5 {
|
||
println!("Usage: shard add <node_id> <address> <capacity>");
|
||
return Ok(());
|
||
}
|
||
|
||
let node_id = parts[2].to_string();
|
||
let address = parts[3].to_string();
|
||
let capacity = parts[4].parse().unwrap_or(1000);
|
||
|
||
let command = protocol::Command::AddShardNode {
|
||
node_id,
|
||
address,
|
||
capacity,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Shard node added");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
"remove" => {
|
||
if parts.len() < 3 {
|
||
println!("Usage: shard remove <node_id>");
|
||
return Ok(());
|
||
}
|
||
|
||
let node_id = parts[2].to_string();
|
||
|
||
let command = protocol::Command::RemoveShardNode {
|
||
node_id,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Shard node removed");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
"migrate" => {
|
||
if parts.len() < 6 {
|
||
println!("Usage: shard migrate <collection> <from_node> <to_node> <shard_key>");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[2].to_string();
|
||
let from_node = parts[3].to_string();
|
||
let to_node = parts[4].to_string();
|
||
let shard_key = parts[5].to_string();
|
||
|
||
let command = protocol::Command::MigrateShard {
|
||
collection,
|
||
from_node,
|
||
to_node,
|
||
shard_key,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Shard migration started");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
_ => {
|
||
println!("Usage: shard add <node_id> <address> <capacity>");
|
||
println!(" shard remove <node_id>");
|
||
println!(" shard migrate <collection> <from_node> <to_node> <shard_key>");
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_cluster(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 2 {
|
||
println!("Usage: cluster rebalance");
|
||
println!(" cluster status");
|
||
return Ok(());
|
||
}
|
||
|
||
match parts[1] {
|
||
"rebalance" => {
|
||
let command = protocol::Command::RebalanceCluster;
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Cluster rebalancing started");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
"status" => {
|
||
let command = protocol::Command::GetClusterStatus;
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(data) = response {
|
||
if let Ok(status) = protocol::deserialize::<protocol::ClusterStatus>(&data) {
|
||
println!("Cluster Status:");
|
||
println!(" Total Capacity: {}", status.total_capacity);
|
||
println!(" Total Used: {}", status.total_used);
|
||
println!(" Rebalance Needed: {}", status.rebalance_needed);
|
||
println!(" Nodes: {}", status.nodes.len());
|
||
for node in status.nodes {
|
||
println!(" - {}: {}% used", node.node_id, (node.used as f64 / node.capacity as f64) * 100.0);
|
||
}
|
||
} else {
|
||
println!("Status: {:?}", data);
|
||
}
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
_ => {
|
||
println!("Usage: cluster rebalance");
|
||
println!(" cluster status");
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_compression(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 3 {
|
||
println!("Usage: compression enable <collection> <algorithm>");
|
||
println!(" compression disable <collection>");
|
||
return Ok(());
|
||
}
|
||
|
||
match parts[1] {
|
||
"enable" => {
|
||
if parts.len() < 4 {
|
||
println!("Usage: compression enable <collection> <algorithm>");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[2].to_string();
|
||
let algorithm = parts[3].to_string();
|
||
|
||
let command = protocol::Command::EnableCompression {
|
||
collection,
|
||
algorithm,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Compression enabled");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
"disable" => {
|
||
if parts.len() < 3 {
|
||
println!("Usage: compression disable <collection>");
|
||
return Ok(());
|
||
}
|
||
|
||
let collection = parts[2].to_string();
|
||
|
||
let command = protocol::Command::DisableCompression {
|
||
collection,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Compression disabled");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
_ => {
|
||
println!("Usage: compression enable <collection> <algorithm>");
|
||
println!(" compression disable <collection>");
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn handle_global_index(&self, parts: Vec<&str>) -> Result<()> {
|
||
if parts.len() < 2 {
|
||
println!("Usage: global_index create <name> <field> [unique]");
|
||
println!(" global_index query <name> <value>");
|
||
return Ok(());
|
||
}
|
||
|
||
match parts[1] {
|
||
"create" => {
|
||
if parts.len() < 4 {
|
||
println!("Usage: global_index create <name> <field> [unique]");
|
||
return Ok(());
|
||
}
|
||
|
||
let name = parts[2].to_string();
|
||
let field = parts[3].to_string();
|
||
let unique = parts.get(4).map_or(false, |&s| s == "true" || s == "unique");
|
||
|
||
let command = protocol::Command::CreateGlobalIndex {
|
||
name,
|
||
field,
|
||
unique,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(_) = response {
|
||
println!("Global index created");
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
"query" => {
|
||
if parts.len() < 4 {
|
||
println!("Usage: global_index query <name> <value>");
|
||
return Ok(());
|
||
}
|
||
|
||
let name = parts[2].to_string();
|
||
let value = parts[3..].join(" ").into_bytes();
|
||
|
||
let command = protocol::Command::QueryGlobalIndex {
|
||
index_name: name,
|
||
value,
|
||
};
|
||
|
||
match self.database.execute_command(command) {
|
||
Ok(response) => {
|
||
if let protocol::Response::Success(data) = response {
|
||
if let Ok(result) = String::from_utf8(data) {
|
||
println!("Result: {}", result);
|
||
} else {
|
||
println!("Query executed successfully (binary result)");
|
||
}
|
||
} else if let protocol::Response::Error(e) = response {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
Err(e) => {
|
||
println!("Error: {}", e);
|
||
}
|
||
}
|
||
}
|
||
_ => {
|
||
println!("Usage: global_index create <name> <field> [unique]");
|
||
println!(" global_index query <name> <value>");
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// Показать справку по командам
|
||
async fn show_help(&self) -> Result<()> {
|
||
println!("Available commands:");
|
||
println!(" Basic CRUD:");
|
||
println!(" create <collection> <json_data> - Create document");
|
||
println!(" read <collection> <id> - Read document");
|
||
println!(" update <collection> <id> <json> - Update document");
|
||
println!(" delete <collection> <id> - Delete document");
|
||
println!(" list <collection> [filter] - List documents");
|
||
println!(" Transactions:");
|
||
println!(" begin <tx_id> - Start transaction");
|
||
println!(" commit <tx_id> - Commit transaction");
|
||
println!(" rollback <tx_id> - Rollback transaction");
|
||
println!(" Procedures:");
|
||
println!(" procedure create <name> <code> - Create stored procedure");
|
||
println!(" procedure call <name> - Call stored procedure");
|
||
println!(" Indexes:");
|
||
println!(" index create <coll> <name> <field> [unique] - Create index");
|
||
println!(" index query <coll> <name> <value> - Query by index");
|
||
println!(" Constraints:");
|
||
println!(" constraint add <coll> <name> <type> <field> [value] - Add constraint");
|
||
println!(" constraint remove <coll> <name> - Remove constraint");
|
||
println!(" Sharding:");
|
||
println!(" shard add <node_id> <addr> <cap> - Add shard node");
|
||
println!(" shard remove <node_id> - Remove shard node");
|
||
println!(" shard migrate <coll> <from> <to> <key> - Migrate shard");
|
||
println!(" Cluster:");
|
||
println!(" cluster rebalance - Rebalance cluster");
|
||
println!(" cluster status - Show cluster status");
|
||
println!(" Compression:");
|
||
println!(" compression enable <coll> <algo> - Enable compression");
|
||
println!(" compression disable <coll> - Disable compression");
|
||
println!(" Global Indexes:");
|
||
println!(" global_index create <name> <field> [unique] - Create global index");
|
||
println!(" global_index query <name> <value> - Query global index");
|
||
println!(" Other:");
|
||
println!(" backup - Create backup");
|
||
println!(" inbox.stop - Exit database mode");
|
||
println!(" help - Show this help");
|
||
|
||
Ok(())
|
||
}
|
||
}
|