use colored::Colorize; use rustyline::Editor; use rustyline::error::ReadlineError; use serde_json; use std::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpStream; use toml::Value; mod server { use serde::{Deserialize, Serialize}; use serde_json; #[derive(Debug, Serialize, Deserialize)] pub enum Command { Insert { metric: String, timestamp: u128, value: serde_json::Value }, GetRange { metric: String, start: u128, end: u128 }, GetLatest { metric: String }, DeleteRange { metric: String, start: u128, end: u128 }, BeginTransaction, CommitTransaction(u64), RollbackTransaction(u64), CreateIndex { field: String }, DropIndex { field: String }, SysExec { script_name: String }, ReplicationEnable, ReplicationDisable, ReplicationAddPeer { addr: String }, ReplicationRemovePeer { addr: String }, ReplicationStatus, BackupCreate { path: String }, BackupRestore { path: String }, LuaExec { code: String }, Halt, } #[derive(Debug, Serialize, Deserialize)] pub enum Response { Success(Option), Error(String), ReplicationStatus { enabled: bool, peers: Vec, last_sync: u128, }, BackupStatus { success: bool, message: String }, LuaOutput(String), } } use server::{Command, Response}; async fn send_command(cmd: Command) -> Result> { let config_content = fs::read_to_string("futriix.config.toml")?; let config: Value = toml::from_str(&config_content)?; let ip = config["client"]["ip"].as_str().unwrap_or("127.0.0.1"); let port = config["client"]["port"].as_integer().unwrap_or(8080) as u16; let addr = format!("{}:{}", ip, port); let mut stream = TcpStream::connect(&addr).await?; let cmd_bytes = rmp_serde::to_vec(&cmd)?; let len = cmd_bytes.len() as u32; stream.write_all(&len.to_be_bytes()).await?; stream.write_all(&cmd_bytes).await?; let mut len_buf = [0u8; 4]; stream.read_exact(&mut len_buf).await?; let len = u32::from_be_bytes(len_buf) as usize; let mut buf = vec![0u8; len]; stream.read_exact(&mut buf).await?; Ok(rmp_serde::from_slice(&buf)?) } #[tokio::main] async fn main() -> Result<(), Box> { let config_content = fs::read_to_string("futriix.config.toml")?; let config: Value = toml::from_str(&config_content)?; let ip = config["server"]["ip"].as_str().unwrap_or("127.0.0.1"); let port = config["server"]["port"].as_integer().unwrap_or(8080) as u16; let addr = format!("{}:{}", ip, port); match TcpStream::connect(&addr).await { Ok(_) => { println!("\n{}", "Futriix CLI Client".truecolor(12, 149, 193)); println!("Connected to server at {}", addr.green()); println!("Type 'help' for available commands\n"); }, Err(e) => { eprintln!("{}: Failed to connect to server at {}: {}", "Error".red(), addr, e); eprintln!("Please make sure futriix-server is running"); std::process::exit(1); } } let mut rl = Editor::<()>::new()?; if rl.load_history("futriix-history.txt").is_err() { println!("No previous history."); println!(); } let mut current_tx_id: Option = None; let mut lua_mode = false; loop { let prompt = if lua_mode { "lua> ".bright_yellow() } else if let Some(tx_id) = current_tx_id { format!("futriix:tx{}:~> ", tx_id).bright_cyan() } else { "futriix:~> ".bright_cyan() }; let readline = rl.readline(&prompt); match readline { Ok(line) => { rl.add_history_entry(&line); if line.trim().is_empty() { continue; } if lua_mode { if line.trim() == "exit" { lua_mode = false; continue; } match send_command(Command::LuaExec { code: line }).await { Ok(Response::LuaOutput(output)) => { println!("{}", output); }, Ok(Response::Error(e)) => println!("{}", e.red()), _ => println!("{}", "Unexpected response type".red()), } continue; } let parts: Vec<&str> = line.split_whitespace().collect(); match parts[0].to_lowercase().as_str() { "insert" | "i" => { if parts.len() < 4 { println!("{}", "Usage: insert ".red()); continue; } let metric = parts[1].to_string(); let timestamp = match parts[2].parse::() { Ok(ts) => ts, Err(_) => { println!("{}", "Invalid timestamp".red()); continue; } }; let value_str = parts[3..].join(" "); match serde_json::from_str(&value_str) { Ok(value) => match send_command(Command::Insert { metric, timestamp, value }).await { Ok(Response::Success(_)) => { println!("{}", "Insert successful".bright_green()); println!(); }, Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), }, Err(e) => println!("{}: {}", "Invalid JSON".red(), e), } }, "get" | "g" => { if parts.len() < 2 { println!("{}", "Usage: get [start end]".red()); continue; } let metric = parts[1].to_string(); if parts.len() == 2 { match send_command(Command::GetLatest { metric }).await { Ok(Response::Success(Some(data))) => { println!("{}", serde_json::to_string_pretty(&data)?); println!(); }, Ok(Response::Success(None)) => { println!("{}", "No data found".yellow()); println!(); }, Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), } } else if parts.len() == 4 { let start = match parts[2].parse::() { Ok(ts) => ts, Err(_) => { println!("{}", "Invalid start timestamp".red()); continue; } }; let end = match parts[3].parse::() { Ok(ts) => ts, Err(_) => { println!("{}", "Invalid end timestamp".red()); continue; } }; match send_command(Command::GetRange { metric, start, end }).await { Ok(Response::Success(Some(data))) => { println!("{}", serde_json::to_string_pretty(&data)?); println!(); }, Ok(Response::Success(None)) => { println!("{}", "No data in range".yellow()); println!(); }, Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), } } else { println!("{}", "Usage: get [start end]".red()); } }, "delete" | "d" => { if parts.len() < 4 { println!("{}", "Usage: delete ".red()); continue; } let metric = parts[1].to_string(); let start = match parts[2].parse::() { Ok(ts) => ts, Err(_) => { println!("{}", "Invalid start timestamp".red()); continue; } }; let end = match parts[3].parse::() { Ok(ts) => ts, Err(_) => { println!("{}", "Invalid end timestamp".red()); continue; } }; match send_command(Command::DeleteRange { metric, start, end }).await { Ok(Response::Success(_)) => { println!("{}", "Delete successful".bright_green()); println!(); }, Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), } }, "begin" | "transaction" | "tx" => { match send_command(Command::BeginTransaction).await { Ok(Response::Success(Some(tx_id))) => { if let Some(id) = tx_id.as_u64() { current_tx_id = Some(id); println!("{}", format!("Transaction {} started", id).bright_green()); println!(); } else { println!("{}", "Invalid transaction ID received".red()); } }, Ok(Response::Success(None)) => println!("{}", "No transaction ID received".red()), Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), } }, "commit" => { if let Some(tx_id) = current_tx_id { match send_command(Command::CommitTransaction(tx_id)).await { Ok(Response::Success(_)) => { println!("{}", format!("Transaction {} committed", tx_id).bright_green()); println!(); current_tx_id = None; }, Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), } } else { println!("{}", "No active transaction to commit".red()); } }, "rollback" => { if let Some(tx_id) = current_tx_id { match send_command(Command::RollbackTransaction(tx_id)).await { Ok(Response::Success(_)) => { println!("{}", format!("Transaction {} rolled back", tx_id).bright_green()); println!(); current_tx_id = None; }, Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), } } else { println!("{}", "No active transaction to rollback".red()); } }, "createindex" => { if parts.len() < 2 { println!("{}", "Usage: createindex ".red()); continue; } let field = parts[1].to_string(); match send_command(Command::CreateIndex { field: field.clone() }).await { Ok(Response::Success(_)) => { println!("{}", format!("Index created on field '{}'", field).bright_green()); println!(); }, Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), } }, "dropindex" => { if parts.len() < 2 { println!("{}", "Usage: dropindex ".red()); continue; } let field = parts[1].to_string(); match send_command(Command::DropIndex { field }).await { Ok(Response::Success(_)) => { println!("{}", "Index dropped".bright_green()); println!(); }, Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), } }, "sysexec" => { if parts.len() < 2 { println!("{}", "Usage: sysexec ".red()); continue; } let script_name = parts[1].to_string(); match send_command(Command::SysExec { script_name }).await { Ok(Response::Success(Some(output))) => { println!("{}", "Script output:".bright_green()); println!("{}", serde_json::to_string_pretty(&output)?); println!(); }, Ok(Response::Success(None)) => { println!("{}", "Script executed but no output".yellow()); println!(); }, Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), } }, "futexec" => { println!("{}", "Entering Lua interactive mode. Type 'exit' to quit.".bright_yellow()); lua_mode = true; }, "replication" => { if parts.len() < 2 { println!("{}", "Usage: replication ".red()); continue; } match parts[1].to_lowercase().as_str() { "on" => { match send_command(Command::ReplicationEnable).await { Ok(Response::Success(_)) => println!("{}", "Replication enabled".bright_green()), Ok(Response::Error(e)) => println!("{}", e.red()), _ => println!("{}", "Unexpected response type".red()), } } "off" => { match send_command(Command::ReplicationDisable).await { Ok(Response::Success(_)) => println!("{}", "Replication disabled".bright_green()), Ok(Response::Error(e)) => println!("{}", e.red()), _ => println!("{}", "Unexpected response type".red()), } } "status" => { match send_command(Command::ReplicationStatus).await { Ok(Response::ReplicationStatus { enabled, peers, last_sync }) => { println!("Status:"); println!(" Enabled: {}", enabled); println!(" Peers: {:?}", peers); println!(" Last sync: {}", last_sync); } Ok(Response::Success(_)) => println!("{}", "Unexpected success response".red()), Ok(Response::Error(e)) => println!("{}", e.red()), _ => println!("{}", "Unexpected response type".red()), } } "add-peer" => { if parts.len() < 3 { println!("{}", "Usage: replication add-peer ".red()); continue; } match send_command(Command::ReplicationAddPeer { addr: parts[2].to_string() }).await { Ok(Response::Success(_)) => println!("{}", "Peer added".bright_green()), Ok(Response::Error(e)) => println!("{}", e.red()), _ => println!("{}", "Unexpected response type".red()), } } "remove-peer" => { if parts.len() < 3 { println!("{}", "Usage: replication remove-peer ".red()); continue; } match send_command(Command::ReplicationRemovePeer { addr: parts[2].to_string() }).await { Ok(Response::Success(_)) => println!("{}", "Peer removed".bright_green()), Ok(Response::Error(e)) => println!("{}", e.red()), _ => println!("{}", "Unexpected response type".red()), } } _ => println!("{}", "Unknown replication command".red()), } }, "futload" => { if parts.len() < 2 { println!("{}", "Usage: futload ".red()); continue; } match send_command(Command::BackupRestore { path: parts[1].to_string() }).await { Ok(Response::BackupStatus { success, message }) => { if success { println!("{}", message.green()); } else { println!("{}", message.red()); } } Ok(Response::Success(_)) => println!("{}", "Unexpected success response".red()), Ok(Response::Error(e)) => println!("{}", e.red()), _ => println!("{}", "Unexpected response type".red()), } }, "futunload" => { if parts.len() < 2 { println!("{}", "Usage: futunload ".red()); continue; } match send_command(Command::BackupCreate { path: parts[1].to_string() }).await { Ok(Response::BackupStatus { success, message }) => { if success { println!("{}", message.green()); } else { println!("{}", message.red()); } } Ok(Response::Success(_)) => println!("{}", "Unexpected success response".red()), Ok(Response::Error(e)) => println!("{}", e.red()), _ => println!("{}", "Unexpected response type".red()), } }, "halt" => { match send_command(Command::Halt).await { Ok(Response::Success(_)) => { println!("{}", "Server shutdown initiated".bright_green()); break; }, Ok(Response::Error(e)) => println!("{}", e.bold().red()), _ => println!("{}", "Unexpected response type".red()), } }, "help" => { println!(); println!("{}", "Available commands:".bold()); println!(" insert (or i) - Insert time-series data"); println!(" get - Get latest data point"); println!(" get - Get data in range"); println!(" delete (or d) - Delete data in range"); println!(" begin (or transaction, tx) - Start transaction"); println!(" commit - Commit current transaction"); println!(" rollback - Rollback current transaction"); println!(" createindex - Create index"); println!(" dropindex - Drop index"); println!(" sysexec - Execute system script"); println!(" futexec - Enter Lua interactive mode"); println!(" replication on/off - Enable/disable replication"); println!(" replication status - Show replication status"); println!(" replication add-peer - Add replication peer"); println!(" replication remove-peer - Remove replication peer"); println!(" futload - Load data from backup file"); println!(" futunload - Save data to backup file"); println!(" halt - Gracefully shutdown the server"); println!(" exit (or quit) - Exit client"); println!(" help - Show this help"); println!(); }, "exit" | "quit" => break, _ => println!("{}: Unknown command. Type 'help' for available commands.", "Error".red()), } }, Err(ReadlineError::Interrupted) => break, Err(ReadlineError::Eof) => break, Err(err) => { println!("{}: {:?}", "Error".red(), err); break; } } } rl.save_history("futriix-history.txt")?; Ok(()) }