diff --git a/futriix-cli/src/main.rs b/futriix-cli/src/main.rs new file mode 100644 index 0000000..71e2e43 --- /dev/null +++ b/futriix-cli/src/main.rs @@ -0,0 +1,499 @@ +use colored::Colorize; +use rustyline::Editor; +use rustyline::error::ReadlineError; +use serde_json; +use std::fs; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::net::TcpStream; +use toml::Value; + +mod server { + use serde::{Deserialize, Serialize}; + use serde_json; + + #[derive(Debug, Serialize, Deserialize)] + pub enum Command { + Insert { metric: String, timestamp: u128, value: serde_json::Value }, + GetRange { metric: String, start: u128, end: u128 }, + GetLatest { metric: String }, + DeleteRange { metric: String, start: u128, end: u128 }, + BeginTransaction, + CommitTransaction(u64), + RollbackTransaction(u64), + CreateIndex { field: String }, + DropIndex { field: String }, + SysExec { script_name: String }, + ReplicationEnable, + ReplicationDisable, + ReplicationAddPeer { addr: String }, + ReplicationRemovePeer { addr: String }, + ReplicationStatus, + BackupCreate { path: String }, + BackupRestore { path: String }, + LuaExec { code: String }, + Halt, + } + + #[derive(Debug, Serialize, Deserialize)] + pub enum Response { + Success(Option), + Error(String), + ReplicationStatus { + enabled: bool, + peers: Vec, + last_sync: u128, + }, + BackupStatus { success: bool, message: String }, + LuaOutput(String), + } +} + +use server::{Command, Response}; + +async fn send_command(cmd: Command) -> Result> { + let config_content = fs::read_to_string("futriix.config.toml")?; + let config: Value = toml::from_str(&config_content)?; + + let ip = config["client"]["ip"].as_str().unwrap_or("127.0.0.1"); + let port = config["client"]["port"].as_integer().unwrap_or(8080) as u16; + let addr = format!("{}:{}", ip, port); + + let mut stream = TcpStream::connect(&addr).await?; + + let cmd_bytes = rmp_serde::to_vec(&cmd)?; + let len = cmd_bytes.len() as u32; + stream.write_all(&len.to_be_bytes()).await?; + stream.write_all(&cmd_bytes).await?; + + let mut len_buf = [0u8; 4]; + stream.read_exact(&mut len_buf).await?; + let len = u32::from_be_bytes(len_buf) as usize; + + let mut buf = vec![0u8; len]; + stream.read_exact(&mut buf).await?; + + Ok(rmp_serde::from_slice(&buf)?) +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let config_content = fs::read_to_string("futriix.config.toml")?; + let config: Value = toml::from_str(&config_content)?; + + let ip = config["server"]["ip"].as_str().unwrap_or("127.0.0.1"); + let port = config["server"]["port"].as_integer().unwrap_or(8080) as u16; + let addr = format!("{}:{}", ip, port); + + match TcpStream::connect(&addr).await { + Ok(_) => { + println!("\n{}", "Futriix CLI Client".truecolor(12, 149, 193)); + println!("Connected to server at {}", addr.green()); + println!("Type 'help' for available commands\n"); + }, + Err(e) => { + eprintln!("{}: Failed to connect to server at {}: {}", + "Error".red(), addr, e); + eprintln!("Please make sure futriix-server is running"); + std::process::exit(1); + } + } + + let mut rl = Editor::<()>::new()?; + if rl.load_history("futriix-history.txt").is_err() { + println!("No previous history."); + println!(); + } + + let mut current_tx_id: Option = None; + let mut lua_mode = false; + + loop { + let prompt = if lua_mode { + "lua> ".bright_yellow() + } else if let Some(tx_id) = current_tx_id { + format!("futriix:tx{}:~> ", tx_id).bright_cyan() + } else { + "futriix:~> ".bright_cyan() + }; + + let readline = rl.readline(&prompt); + match readline { + Ok(line) => { + rl.add_history_entry(&line); + + if line.trim().is_empty() { + continue; + } + + if lua_mode { + if line.trim() == "exit" { + lua_mode = false; + continue; + } + + match send_command(Command::LuaExec { code: line }).await { + Ok(Response::LuaOutput(output)) => { + println!("{}", output); + }, + Ok(Response::Error(e)) => println!("{}", e.red()), + _ => println!("{}", "Unexpected response type".red()), + } + continue; + } + + let parts: Vec<&str> = line.split_whitespace().collect(); + match parts[0].to_lowercase().as_str() { + "insert" | "i" => { + if parts.len() < 4 { + println!("{}", "Usage: insert ".red()); + continue; + } + let metric = parts[1].to_string(); + let timestamp = match parts[2].parse::() { + Ok(ts) => ts, + Err(_) => { + println!("{}", "Invalid timestamp".red()); + continue; + } + }; + let value_str = parts[3..].join(" "); + match serde_json::from_str(&value_str) { + Ok(value) => match send_command(Command::Insert { + metric, + timestamp, + value + }).await { + Ok(Response::Success(_)) => { + println!("{}", "Insert successful".bright_green()); + println!(); + }, + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + }, + Err(e) => println!("{}: {}", "Invalid JSON".red(), e), + } + }, + "get" | "g" => { + if parts.len() < 2 { + println!("{}", "Usage: get [start end]".red()); + continue; + } + let metric = parts[1].to_string(); + if parts.len() == 2 { + match send_command(Command::GetLatest { metric }).await { + Ok(Response::Success(Some(data))) => { + println!("{}", serde_json::to_string_pretty(&data)?); + println!(); + }, + Ok(Response::Success(None)) => { + println!("{}", "No data found".yellow()); + println!(); + }, + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + } + } else if parts.len() == 4 { + let start = match parts[2].parse::() { + Ok(ts) => ts, + Err(_) => { + println!("{}", "Invalid start timestamp".red()); + continue; + } + }; + let end = match parts[3].parse::() { + Ok(ts) => ts, + Err(_) => { + println!("{}", "Invalid end timestamp".red()); + continue; + } + }; + match send_command(Command::GetRange { metric, start, end }).await { + Ok(Response::Success(Some(data))) => { + println!("{}", serde_json::to_string_pretty(&data)?); + println!(); + }, + Ok(Response::Success(None)) => { + println!("{}", "No data in range".yellow()); + println!(); + }, + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + } + } else { + println!("{}", "Usage: get [start end]".red()); + } + }, + "delete" | "d" => { + if parts.len() < 4 { + println!("{}", "Usage: delete ".red()); + continue; + } + let metric = parts[1].to_string(); + let start = match parts[2].parse::() { + Ok(ts) => ts, + Err(_) => { + println!("{}", "Invalid start timestamp".red()); + continue; + } + }; + let end = match parts[3].parse::() { + Ok(ts) => ts, + Err(_) => { + println!("{}", "Invalid end timestamp".red()); + continue; + } + }; + match send_command(Command::DeleteRange { metric, start, end }).await { + Ok(Response::Success(_)) => { + println!("{}", "Delete successful".bright_green()); + println!(); + }, + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + } + }, + "begin" | "transaction" | "tx" => { + match send_command(Command::BeginTransaction).await { + Ok(Response::Success(Some(tx_id))) => { + if let Some(id) = tx_id.as_u64() { + current_tx_id = Some(id); + println!("{}", format!("Transaction {} started", id).bright_green()); + println!(); + } else { + println!("{}", "Invalid transaction ID received".red()); + } + }, + Ok(Response::Success(None)) => println!("{}", "No transaction ID received".red()), + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + } + }, + "commit" => { + if let Some(tx_id) = current_tx_id { + match send_command(Command::CommitTransaction(tx_id)).await { + Ok(Response::Success(_)) => { + println!("{}", format!("Transaction {} committed", tx_id).bright_green()); + println!(); + current_tx_id = None; + }, + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + } + } else { + println!("{}", "No active transaction to commit".red()); + } + }, + "rollback" => { + if let Some(tx_id) = current_tx_id { + match send_command(Command::RollbackTransaction(tx_id)).await { + Ok(Response::Success(_)) => { + println!("{}", format!("Transaction {} rolled back", tx_id).bright_green()); + println!(); + current_tx_id = None; + }, + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + } + } else { + println!("{}", "No active transaction to rollback".red()); + } + }, + "createindex" => { + if parts.len() < 2 { + println!("{}", "Usage: createindex ".red()); + continue; + } + let field = parts[1].to_string(); + match send_command(Command::CreateIndex { field: field.clone() }).await { + Ok(Response::Success(_)) => { + println!("{}", format!("Index created on field '{}'", field).bright_green()); + println!(); + }, + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + } + }, + "dropindex" => { + if parts.len() < 2 { + println!("{}", "Usage: dropindex ".red()); + continue; + } + let field = parts[1].to_string(); + match send_command(Command::DropIndex { field }).await { + Ok(Response::Success(_)) => { + println!("{}", "Index dropped".bright_green()); + println!(); + }, + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + } + }, + "sysexec" => { + if parts.len() < 2 { + println!("{}", "Usage: sysexec ".red()); + continue; + } + let script_name = parts[1].to_string(); + match send_command(Command::SysExec { script_name }).await { + Ok(Response::Success(Some(output))) => { + println!("{}", "Script output:".bright_green()); + println!("{}", serde_json::to_string_pretty(&output)?); + println!(); + }, + Ok(Response::Success(None)) => { + println!("{}", "Script executed but no output".yellow()); + println!(); + }, + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + } + }, + "futexec" => { + println!("{}", "Entering Lua interactive mode. Type 'exit' to quit.".bright_yellow()); + lua_mode = true; + }, + "replication" => { + if parts.len() < 2 { + println!("{}", "Usage: replication ".red()); + continue; + } + match parts[1].to_lowercase().as_str() { + "on" => { + match send_command(Command::ReplicationEnable).await { + Ok(Response::Success(_)) => println!("{}", "Replication enabled".bright_green()), + Ok(Response::Error(e)) => println!("{}", e.red()), + _ => println!("{}", "Unexpected response type".red()), + } + } + "off" => { + match send_command(Command::ReplicationDisable).await { + Ok(Response::Success(_)) => println!("{}", "Replication disabled".bright_green()), + Ok(Response::Error(e)) => println!("{}", e.red()), + _ => println!("{}", "Unexpected response type".red()), + } + } + "status" => { + match send_command(Command::ReplicationStatus).await { + Ok(Response::ReplicationStatus { enabled, peers, last_sync }) => { + println!("Status:"); + println!(" Enabled: {}", enabled); + println!(" Peers: {:?}", peers); + println!(" Last sync: {}", last_sync); + } + Ok(Response::Success(_)) => println!("{}", "Unexpected success response".red()), + Ok(Response::Error(e)) => println!("{}", e.red()), + _ => println!("{}", "Unexpected response type".red()), + } + } + "add-peer" => { + if parts.len() < 3 { + println!("{}", "Usage: replication add-peer ".red()); + continue; + } + match send_command(Command::ReplicationAddPeer { addr: parts[2].to_string() }).await { + Ok(Response::Success(_)) => println!("{}", "Peer added".bright_green()), + Ok(Response::Error(e)) => println!("{}", e.red()), + _ => println!("{}", "Unexpected response type".red()), + } + } + "remove-peer" => { + if parts.len() < 3 { + println!("{}", "Usage: replication remove-peer ".red()); + continue; + } + match send_command(Command::ReplicationRemovePeer { addr: parts[2].to_string() }).await { + Ok(Response::Success(_)) => println!("{}", "Peer removed".bright_green()), + Ok(Response::Error(e)) => println!("{}", e.red()), + _ => println!("{}", "Unexpected response type".red()), + } + } + _ => println!("{}", "Unknown replication command".red()), + } + }, + "futload" => { + if parts.len() < 2 { + println!("{}", "Usage: futload ".red()); + continue; + } + match send_command(Command::BackupRestore { path: parts[1].to_string() }).await { + Ok(Response::BackupStatus { success, message }) => { + if success { + println!("{}", message.green()); + } else { + println!("{}", message.red()); + } + } + Ok(Response::Success(_)) => println!("{}", "Unexpected success response".red()), + Ok(Response::Error(e)) => println!("{}", e.red()), + _ => println!("{}", "Unexpected response type".red()), + } + }, + "futunload" => { + if parts.len() < 2 { + println!("{}", "Usage: futunload ".red()); + continue; + } + match send_command(Command::BackupCreate { path: parts[1].to_string() }).await { + Ok(Response::BackupStatus { success, message }) => { + if success { + println!("{}", message.green()); + } else { + println!("{}", message.red()); + } + } + Ok(Response::Success(_)) => println!("{}", "Unexpected success response".red()), + Ok(Response::Error(e)) => println!("{}", e.red()), + _ => println!("{}", "Unexpected response type".red()), + } + }, + "halt" => { + match send_command(Command::Halt).await { + Ok(Response::Success(_)) => { + println!("{}", "Server shutdown initiated".bright_green()); + break; + }, + Ok(Response::Error(e)) => println!("{}", e.bold().red()), + _ => println!("{}", "Unexpected response type".red()), + } + }, + "help" => { + println!(); + println!("{}", "Available commands:".bold()); + println!(" insert (or i) - Insert time-series data"); + println!(" get - Get latest data point"); + println!(" get - Get data in range"); + println!(" delete (or d) - Delete data in range"); + println!(" begin (or transaction, tx) - Start transaction"); + println!(" commit - Commit current transaction"); + println!(" rollback - Rollback current transaction"); + println!(" createindex - Create index"); + println!(" dropindex - Drop index"); + println!(" sysexec - Execute system script"); + println!(" futexec - Enter Lua interactive mode"); + println!(" replication on/off - Enable/disable replication"); + println!(" replication status - Show replication status"); + println!(" replication add-peer - Add replication peer"); + println!(" replication remove-peer - Remove replication peer"); + println!(" futload - Load data from backup file"); + println!(" futunload - Save data to backup file"); + println!(" halt - Gracefully shutdown the server"); + println!(" exit (or quit) - Exit client"); + println!(" help - Show this help"); + println!(); + }, + "exit" | "quit" => break, + _ => println!("{}: Unknown command. Type 'help' for available commands.", "Error".red()), + } + }, + Err(ReadlineError::Interrupted) => break, + Err(ReadlineError::Eof) => break, + Err(err) => { + println!("{}: {:?}", "Error".red(), err); + break; + } + } + } + + rl.save_history("futriix-history.txt")?; + Ok(()) +}