diff --git a/futriix-server/src/server.rs b/futriix-server/src/server.rs index 2dbf5a4..7212310 100644 --- a/futriix-server/src/server.rs +++ b/futriix-server/src/server.rs @@ -2,13 +2,18 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::Path; use std::sync::Arc; +use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::Duration; use crossbeam::queue::SegQueue; use parking_lot::RwLock; -use tokio::net::TcpListener; -use log::{info, error}; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::AsyncWriteExt; +use tokio::time; +use log::{info, error, warn, debug}; use colored::Colorize; +use warp::Filter; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub enum Command { Insert { key: String, value: serde_json::Value }, Get { key: String }, @@ -20,6 +25,7 @@ pub enum Command { CreateIndex { field: String }, DropIndex { field: String }, SysExec { script_name: String }, + Replicate { commands: Vec<(Command, u128)> }, } #[derive(Debug, Serialize, Deserialize)] @@ -28,50 +34,270 @@ pub enum Response { Error(String), } +#[derive(Debug, Clone)] pub struct FutriixServer { db: Arc>>, command_queue: Arc>, transactions: Arc>>>, indexes: Arc>>>>, + replication_enabled: bool, + peer_nodes: Vec, + sync_interval: u64, + last_sync_timestamp: Arc>, + command_history: Arc>>, } impl FutriixServer { - pub fn new() -> Self { + pub fn new(config: &toml::Value) -> Self { + debug!("Initializing server with config: {:?}", config); + + let replication_enabled = config.get("replication") + .and_then(|r| r.get("enabled")) + .and_then(|v| v.as_bool()) + .unwrap_or_else(|| { + warn!("Replication 'enabled' not found in config, defaulting to false"); + false + }); + + let peer_nodes = config.get("replication") + .and_then(|r| r.get("peer_nodes")) + .and_then(|n| n.as_array()) + .map(|nodes| { + nodes.iter() + .filter_map(|n| n.as_str().map(|s| s.to_string())) + .collect() + }) + .unwrap_or_else(|| { + warn!("No peer nodes configured, defaulting to empty list"); + Vec::new() + }); + + let sync_interval = config.get("replication") + .and_then(|r| r.get("sync_interval")) + .and_then(|i| i.as_integer()) + .map(|i| i as u64) + .unwrap_or_else(|| { + warn!("Sync interval not found, defaulting to 1000ms"); + 1000 + }); + + debug!("Replication config - enabled: {}, peers: {:?}, interval: {}ms", + replication_enabled, peer_nodes, sync_interval); + FutriixServer { db: Arc::new(RwLock::new(HashMap::new())), command_queue: Arc::new(SegQueue::new()), transactions: Arc::new(RwLock::new(Vec::new())), indexes: Arc::new(RwLock::new(HashMap::new())), + replication_enabled, + peer_nodes, + sync_interval, + last_sync_timestamp: Arc::new(RwLock::new(0)), + command_history: Arc::new(RwLock::new(Vec::new())), } } - pub async fn run(&self, addr: &str) -> Result<(), Box> { - let listener = TcpListener::bind(addr).await?; + pub async fn run_rest_api(&self, rest_port: u16) { + let db_get = self.db.clone(); + let db_set = self.db.clone(); + let db_update = self.db.clone(); + let db_delete = self.db.clone(); + let db_list_keys = self.db.clone(); + let db_list_commit = self.db.clone(); + let transactions_begin = self.transactions.clone(); + let transactions_commit = self.transactions.clone(); + let transactions_rollback = self.transactions.clone(); + + let get = warp::path!("get" / String) + .and(warp::get()) + .map(move |key: String| { + let db = db_get.read(); + match db.get(&key) { + Some(value) => warp::reply::json(&Response::Success(Some(value.clone()))), + None => warp::reply::json(&Response::Error("Key not found".to_string())) + } + }); + + let set = warp::path!("set" / String) + .and(warp::post()) + .and(warp::body::json()) + .map(move |key: String, value: serde_json::Value| { + let mut db = db_set.write(); + db.insert(key.clone(), value.clone()); + warp::reply::json(&Response::Success(None)) + }); + + let update = warp::path!("update" / String) + .and(warp::put()) + .and(warp::body::json()) + .map(move |key: String, value: serde_json::Value| { + let mut db = db_update.write(); + if db.contains_key(&key) { + db.insert(key.clone(), value.clone()); + warp::reply::json(&Response::Success(None)) + } else { + warp::reply::json(&Response::Error("Key not found".to_string())) + } + }); + + let delete = warp::path!("delete" / String) + .and(warp::delete()) + .map(move |key: String| { + let mut db = db_delete.write(); + if db.remove(&key).is_some() { + warp::reply::json(&Response::Success(None)) + } else { + warp::reply::json(&Response::Error("Key not found".to_string())) + } + }); + + let list = warp::path!("list") + .and(warp::get()) + .map(move || { + let db = db_list_keys.read(); + let keys: Vec = db.keys().cloned().collect(); + warp::reply::json(&Response::Success(Some(serde_json::json!(keys)))) + }); + + let begin_tx = warp::path!("transaction" / "begin") + .and(warp::get()) + .map(move || { + let mut transactions = transactions_begin.write(); + transactions.push(HashMap::new()); + warp::reply::json(&Response::Success(None)) + }); + + let commit_tx = warp::path!("transaction" / "commit") + .and(warp::post()) + .map(move || { + let mut transactions = transactions_commit.write(); + if let Some(tx) = transactions.pop() { + let mut db = db_list_commit.write(); + for (k, v) in tx { + if v.is_null() { + db.remove(&k); + } else { + db.insert(k.clone(), v.clone()); + } + } + warp::reply::json(&Response::Success(None)) + } else { + warp::reply::json(&Response::Error("No active transaction".to_string())) + } + }); + + let rollback_tx = warp::path!("transaction" / "rollback") + .and(warp::post()) + .map(move || { + let mut transactions = transactions_rollback.write(); + if transactions.pop().is_some() { + warp::reply::json(&Response::Success(None)) + } else { + warp::reply::json(&Response::Error("No active transaction".to_string())) + } + }); + + let routes = get + .or(set) + .or(update) + .or(delete) + .or(list) + .or(begin_tx) + .or(commit_tx) + .or(rollback_tx); + + info!("Starting REST API on port {}", rest_port); + warp::serve(routes).run(([127, 0, 0, 1], rest_port)).await; + } + + pub async fn run(self, addr: &str) -> Result<(), Box> { + info!("Starting server on {}", addr); + let listener = match TcpListener::bind(addr).await { + Ok(l) => l, + Err(e) => { + error!("Failed to bind to {}: {}", addr, e); + return Err(e.into()); + } + }; println!(); - info!("{}", "Futriix Server started successfully!".bright_cyan()); + info!("{}", "Futriix Server started successfully!".truecolor(0, 183, 235)); info!("Listening on: {}", addr); info!("Log file: futriix.log"); + if self.replication_enabled { + if self.peer_nodes.is_empty() { + warn!("Replication enabled but no peer nodes configured"); + } else { + info!("Replication enabled with peers: {:?}", self.peer_nodes); + let sync_task = self.start_replication_sync(); + tokio::spawn(sync_task); + } + } else { + info!("Replication disabled"); + } + + println!(); + warn!("Replication 'enabled' not found in config, defaulting to false"); + warn!("No peer nodes configured, defaulting to empty list"); + warn!("Sync interval not found, defaulting to 1000ms"); + info!("Starting server on {}", addr); + + let rest_port = addr.split(':').last() + .and_then(|p| p.parse::().ok()) + .map(|p| p + 1) + .unwrap_or(8081); + + let server = self.clone(); + tokio::spawn(async move { + server.run_rest_api(rest_port).await; + }); + let scripts_dir = Path::new("scripts"); if !scripts_dir.exists() { - std::fs::create_dir(scripts_dir)?; + if let Err(e) = std::fs::create_dir(scripts_dir) { + error!("Failed to create scripts directory: {}", e); + return Err(e.into()); + } info!("Created scripts directory"); } else if !scripts_dir.is_dir() { + error!("Scripts path exists but is not a directory"); return Err("Scripts path exists but is not a directory".into()); } tokio::select! { res = async { loop { - let (socket, _) = listener.accept().await?; + let (socket, _) = match listener.accept().await { + Ok((s, a)) => { + debug!("New connection from {}", a); + (s, a) + }, + Err(e) => { + error!("Accept error: {}", e); + continue; + } + }; + let db = self.db.clone(); let queue = self.command_queue.clone(); let transactions = self.transactions.clone(); let indexes = self.indexes.clone(); + let command_history = self.command_history.clone(); + let last_sync_timestamp = self.last_sync_timestamp.clone(); + let replication_enabled = self.replication_enabled; tokio::spawn(async move { - if let Err(e) = Self::handle_connection(socket, db, queue, transactions, indexes).await { + if let Err(e) = Self::handle_connection( + socket, + db, + queue, + transactions, + indexes, + command_history, + last_sync_timestamp, + replication_enabled, + ).await { error!("Connection error: {}", e); } }); @@ -81,182 +307,358 @@ impl FutriixServer { } => res, _ = tokio::signal::ctrl_c() => { println!(); - info!("{}", "Server shutdown now".bright_cyan()); + info!("{}", "Server shutting down".truecolor(0, 183, 235)); Ok(()) } } } + fn start_replication_sync(&self) -> impl std::future::Future + 'static { + let peer_nodes = self.peer_nodes.clone(); + let command_history = self.command_history.clone(); + let last_sync_timestamp = self.last_sync_timestamp.clone(); + let sync_interval = self.sync_interval; + + async move { + let mut interval = time::interval(Duration::from_millis(sync_interval)); + loop { + interval.tick().await; + + let commands_to_sync = { + let history = command_history.read(); + let last_sync = *last_sync_timestamp.read(); + history.iter() + .filter(|(_, ts)| *ts > last_sync) + .map(|(cmd, ts)| (cmd.clone(), *ts)) + .collect::>() + }; + + if !commands_to_sync.is_empty() { + debug!("Syncing {} commands with peers", commands_to_sync.len()); + for peer in &peer_nodes { + debug!("Attempting to sync with peer: {}", peer); + match TcpStream::connect(peer).await { + Ok(mut stream) => { + let replicate_cmd = Command::Replicate { + commands: commands_to_sync.clone(), + }; + + match rmp_serde::to_vec(&replicate_cmd) { + Ok(cmd_bytes) => { + let len = cmd_bytes.len() as u32; + if let Err(e) = stream.write_all(&len.to_be_bytes()).await { + warn!("Failed to send sync data to {}: {}", peer, e); + continue; + } + if let Err(e) = stream.write_all(&cmd_bytes).await { + warn!("Failed to send sync data to {}: {}", peer, e); + continue; + } + + if let Some((_, latest_ts)) = commands_to_sync.last() { + *last_sync_timestamp.write() = *latest_ts; + debug!("Updated last sync timestamp to {}", latest_ts); + } + }, + Err(e) => { + warn!("Failed to serialize sync command: {}", e); + } + } + }, + Err(e) => { + warn!("Failed to connect to peer {}: {}", peer, e); + } + } + } + } + } + } + } + async fn handle_connection( mut socket: tokio::net::TcpStream, db: Arc>>, queue: Arc>, transactions: Arc>>>, indexes: Arc>>>>, + command_history: Arc>>, + last_sync_timestamp: Arc>, + replication_enabled: bool, ) -> Result<(), Box> { - use tokio::io::{AsyncReadExt, AsyncWriteExt}; + use tokio::io::AsyncReadExt; loop { let mut len_buf = [0u8; 4]; if let Err(e) = socket.read_exact(&mut len_buf).await { if e.kind() == std::io::ErrorKind::UnexpectedEof { + debug!("Client disconnected"); break; } + error!("Read error: {}", e); return Err(e.into()); } let len = u32::from_be_bytes(len_buf) as usize; let mut buf = vec![0u8; len]; - socket.read_exact(&mut buf).await?; + if let Err(e) = socket.read_exact(&mut buf).await { + error!("Failed to read command: {}", e); + return Err(e.into()); + } - let cmd: Command = rmp_serde::from_slice(&buf)?; + let cmd: Command = match rmp_serde::from_slice(&buf) { + Ok(c) => c, + Err(e) => { + error!("Failed to deserialize command: {}", e); + return Err(e.into()); + } + }; + + debug!("Received command: {:?}", cmd); queue.push(cmd); while let Some(cmd) = queue.pop() { - let response = match cmd { - Command::Insert { key, value } => { - let mut tx_data = None; - { - let mut transactions = transactions.write(); - if let Some(tx) = transactions.last_mut() { - tx.insert(key.clone(), value.clone()); - tx_data = Some((key.clone(), value.clone())); - } - } + if let Command::Replicate { commands } = cmd { + debug!("Processing replication batch ({} commands)", commands.len()); + for (cmd, ts) in commands { + let _response = Self::process_command( + cmd, + &db, + &transactions, + &indexes, + ); - if tx_data.is_none() { - let mut db = db.write(); - db.insert(key.clone(), value.clone()); - Self::update_indexes(&indexes, key.clone(), &value); - } - Response::Success(None) - }, - Command::Get { key } => { - let db = db.read(); - match db.get(&key) { - Some(value) => Response::Success(Some(value.clone())), - None => Response::Error("Key not found".to_string()), - } - }, - Command::Update { key, value } => { - let mut tx_data = None; - { - let mut transactions = transactions.write(); - if let Some(tx) = transactions.last_mut() { - tx.insert(key.clone(), value.clone()); - tx_data = Some((key.clone(), value.clone())); - } - } - - if tx_data.is_none() { - let mut db = db.write(); - if db.contains_key(&key) { - db.insert(key.clone(), value.clone()); - Self::update_indexes(&indexes, key.clone(), &value); - Response::Success(None) - } else { - Response::Error("Key not found".to_string()) - } - } else { - Response::Success(None) - } - }, - Command::Delete { key } => { - let mut tx_data = None; - { - let mut transactions = transactions.write(); - if let Some(tx) = transactions.last_mut() { - tx.insert(key.clone(), serde_json::Value::Null); - tx_data = Some(key.clone()); - } - } - - if tx_data.is_none() { - let mut db = db.write(); - if db.remove(&key).is_some() { - Self::remove_from_indexes(&indexes, key.clone()); - Response::Success(None) - } else { - Response::Error("Key not found".to_string()) - } - } else { - Response::Success(None) - } - }, - Command::BeginTransaction => { - let mut transactions = transactions.write(); - transactions.push(HashMap::new()); - Response::Success(None) - }, - Command::CommitTransaction => { - let mut transactions = transactions.write(); - if let Some(tx) = transactions.pop() { - let mut db = db.write(); - for (k, v) in tx { - if v.is_null() { - db.remove(&k); - Self::remove_from_indexes(&indexes, k.clone()); - } else { - db.insert(k.clone(), v.clone()); - Self::update_indexes(&indexes, k.clone(), &v); - } - } - Response::Success(None) - } else { - Response::Error("No active transaction".to_string()) - } - }, - Command::RollbackTransaction => { - let mut transactions = transactions.write(); - if transactions.pop().is_some() { - Response::Success(None) - } else { - Response::Error("No active transaction".to_string()) - } - }, - Command::CreateIndex { field } => { - let mut indexes = indexes.write(); - if !indexes.contains_key(&field) { - let mut index = HashMap::new(); - let db = db.read(); - for (key, value) in db.iter() { - if let Some(field_value) = value.get(&field) { - index.entry(field_value.clone()) - .or_insert_with(Vec::new) - .push(key.clone()); - } - } - indexes.insert(field, index); - Response::Success(None) - } else { - Response::Error("Index already exists".to_string()) - } - }, - Command::DropIndex { field } => { - let mut indexes = indexes.write(); - if indexes.remove(&field).is_some() { - Response::Success(None) - } else { - Response::Error("Index not found".to_string()) - } - }, - Command::SysExec { script_name } => { - match Self::execute_lua_script(&script_name) { - Ok(output) => Response::Success(Some(output.into())), - Err(e) => Response::Error(format!("Script execution failed: {}", e)), - } - }, - }; + *last_sync_timestamp.write() = ts; + debug!("Replicated command with timestamp {}", ts); + } + continue; + } - let response_bytes = rmp_serde::to_vec(&response)?; + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map_err(|e| { + error!("SystemTime error: {}", e); + e + })? + .as_millis(); + + let response = Self::process_command( + cmd.clone(), + &db, + &transactions, + &indexes, + ); + + if replication_enabled && !matches!(cmd, Command::Get { .. }) { + command_history.write().push((cmd, timestamp)); + debug!("Added command to history"); + } + + let response_bytes = match rmp_serde::to_vec(&response) { + Ok(b) => b, + Err(e) => { + error!("Failed to serialize response: {}", e); + return Err(e.into()); + } + }; let len = response_bytes.len() as u32; - socket.write_all(&len.to_be_bytes()).await?; - socket.write_all(&response_bytes).await?; + if let Err(e) = socket.write_all(&len.to_be_bytes()).await { + error!("Failed to send response length: {}", e); + return Err(e.into()); + } + if let Err(e) = socket.write_all(&response_bytes).await { + error!("Failed to send response: {}", e); + return Err(e.into()); + } + debug!("Sent response to client"); } } Ok(()) } + fn process_command( + cmd: Command, + db: &Arc>>, + transactions: &Arc>>>, + indexes: &Arc>>>>, + ) -> Response { + match cmd { + Command::Insert { key, value } => { + debug!("Insert command for key: {}", key); + let mut tx_data = None; + { + let mut transactions = transactions.write(); + if let Some(tx) = transactions.last_mut() { + tx.insert(key.clone(), value.clone()); + tx_data = Some((key.clone(), value.clone())); + debug!("Added to transaction"); + } + } + + if tx_data.is_none() { + let mut db = db.write(); + db.insert(key.clone(), value.clone()); + Self::update_indexes(indexes, key.clone(), &value); + debug!("Direct insert to DB"); + } + Response::Success(None) + }, + Command::Get { key } => { + debug!("Get command for key: {}", key); + let db = db.read(); + match db.get(&key) { + Some(value) => { + debug!("Key found"); + Response::Success(Some(value.clone())) + }, + None => { + debug!("Key not found"); + Response::Error("Key not found".to_string()) + }, + } + }, + Command::Update { key, value } => { + debug!("Update command for key: {}", key); + let mut tx_data = None; + { + let mut transactions = transactions.write(); + if let Some(tx) = transactions.last_mut() { + tx.insert(key.clone(), value.clone()); + tx_data = Some((key.clone(), value.clone())); + debug!("Added to transaction"); + } + } + + if tx_data.is_none() { + let mut db = db.write(); + if db.contains_key(&key) { + db.insert(key.clone(), value.clone()); + Self::update_indexes(indexes, key.clone(), &value); + debug!("Direct update in DB"); + Response::Success(None) + } else { + debug!("Key not found for update"); + Response::Error("Key not found".to_string()) + } + } else { + Response::Success(None) + } + }, + Command::Delete { key } => { + debug!("Delete command for key: {}", key); + let mut tx_data = None; + { + let mut transactions = transactions.write(); + if let Some(tx) = transactions.last_mut() { + tx.insert(key.clone(), serde_json::Value::Null); + tx_data = Some(key.clone()); + debug!("Added to transaction"); + } + } + + if tx_data.is_none() { + let mut db = db.write(); + if db.remove(&key).is_some() { + Self::remove_from_indexes(indexes, key.clone()); + debug!("Direct delete from DB"); + Response::Success(None) + } else { + debug!("Key not found for deletion"); + Response::Error("Key not found".to_string()) + } + } else { + Response::Success(None) + } + }, + Command::BeginTransaction => { + debug!("Begin transaction"); + let mut transactions = transactions.write(); + transactions.push(HashMap::new()); + Response::Success(None) + }, + Command::CommitTransaction => { + debug!("Commit transaction"); + let mut transactions = transactions.write(); + if let Some(tx) = transactions.pop() { + let mut db = db.write(); + for (k, v) in tx { + if v.is_null() { + db.remove(&k); + Self::remove_from_indexes(indexes, k.clone()); + debug!("Removed key: {}", k); + } else { + db.insert(k.clone(), v.clone()); + Self::update_indexes(indexes, k.clone(), &v); + debug!("Committed key: {}", k); + } + } + Response::Success(None) + } else { + debug!("No active transaction to commit"); + Response::Error("No active transaction".to_string()) + } + }, + Command::RollbackTransaction => { + debug!("Rollback transaction"); + let mut transactions = transactions.write(); + if transactions.pop().is_some() { + Response::Success(None) + } else { + debug!("No active transaction to rollback"); + Response::Error("No active transaction".to_string()) + } + }, + Command::CreateIndex { field } => { + debug!("Create index on field: {}", field); + let mut indexes = indexes.write(); + if !indexes.contains_key(&field) { + let mut index = HashMap::new(); + let db = db.read(); + for (key, value) in db.iter() { + if let Some(field_value) = value.get(&field) { + index.entry(field_value.clone()) + .or_insert_with(Vec::new) + .push(key.clone()); + } + } + indexes.insert(field, index); + debug!("Index created"); + Response::Success(None) + } else { + debug!("Index already exists"); + Response::Error("Index already exists".to_string()) + } + }, + Command::DropIndex { field } => { + debug!("Drop index on field: {}", field); + let mut indexes = indexes.write(); + if indexes.remove(&field).is_some() { + debug!("Index dropped"); + Response::Success(None) + } else { + debug!("Index not found"); + Response::Error("Index not found".to_string()) + } + }, + Command::SysExec { script_name } => { + debug!("Execute system script: {}", script_name); + match Self::execute_lua_script(&script_name) { + Ok(output) => { + debug!("Script executed successfully"); + Response::Success(Some(output.into())) + }, + Err(e) => { + debug!("Script execution failed: {}", e); + Response::Error(format!("Script execution failed: {}", e)) + }, + } + }, + Command::Replicate { .. } => { + debug!("Replication command processed"); + Response::Success(None) + }, + } + } + fn update_indexes( indexes: &Arc>>>>, key: String,