From 6965b4121946286b1b243305f73c899b64193cfd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Tue, 1 Jul 2025 19:42:50 +0000 Subject: [PATCH] Update futriix-server/src/server.rs --- futriix-server/src/server.rs | 616 ++++++++--------------------------- 1 file changed, 136 insertions(+), 480 deletions(-) diff --git a/futriix-server/src/server.rs b/futriix-server/src/server.rs index 7212310..581f0c9 100644 --- a/futriix-server/src/server.rs +++ b/futriix-server/src/server.rs @@ -2,16 +2,13 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::Path; use std::sync::Arc; -use std::time::{SystemTime, UNIX_EPOCH}; use std::time::Duration; use crossbeam::queue::SegQueue; use parking_lot::RwLock; use tokio::net::{TcpListener, TcpStream}; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::time; -use log::{info, error, warn, debug}; -use colored::Colorize; -use warp::Filter; +use log::{info, error, debug}; #[derive(Debug, Serialize, Deserialize, Clone)] pub enum Command { @@ -26,15 +23,24 @@ pub enum Command { DropIndex { field: String }, SysExec { script_name: String }, Replicate { commands: Vec<(Command, u128)> }, + ReplicationEnable, + ReplicationDisable, + ReplicationAddPeer { addr: String }, + ReplicationRemovePeer { addr: String }, + ReplicationStatus, } #[derive(Debug, Serialize, Deserialize)] pub enum Response { Success(Option), Error(String), + ReplicationStatus { + enabled: bool, + peers: Vec, + last_sync: u128, + }, } -#[derive(Debug, Clone)] pub struct FutriixServer { db: Arc>>, command_queue: Arc>, @@ -54,10 +60,7 @@ impl FutriixServer { let replication_enabled = config.get("replication") .and_then(|r| r.get("enabled")) .and_then(|v| v.as_bool()) - .unwrap_or_else(|| { - warn!("Replication 'enabled' not found in config, defaulting to false"); - false - }); + .unwrap_or(false); let peer_nodes = config.get("replication") .and_then(|r| r.get("peer_nodes")) @@ -67,22 +70,13 @@ impl FutriixServer { .filter_map(|n| n.as_str().map(|s| s.to_string())) .collect() }) - .unwrap_or_else(|| { - warn!("No peer nodes configured, defaulting to empty list"); - Vec::new() - }); + .unwrap_or_else(Vec::new); let sync_interval = config.get("replication") .and_then(|r| r.get("sync_interval")) .and_then(|i| i.as_integer()) .map(|i| i as u64) - .unwrap_or_else(|| { - warn!("Sync interval not found, defaulting to 1000ms"); - 1000 - }); - - debug!("Replication config - enabled: {}, peers: {:?}, interval: {}ms", - replication_enabled, peer_nodes, sync_interval); + .unwrap_or(1000); FutriixServer { db: Arc::new(RwLock::new(HashMap::new())), @@ -97,219 +91,32 @@ impl FutriixServer { } } - pub async fn run_rest_api(&self, rest_port: u16) { - let db_get = self.db.clone(); - let db_set = self.db.clone(); - let db_update = self.db.clone(); - let db_delete = self.db.clone(); - let db_list_keys = self.db.clone(); - let db_list_commit = self.db.clone(); - let transactions_begin = self.transactions.clone(); - let transactions_commit = self.transactions.clone(); - let transactions_rollback = self.transactions.clone(); - - let get = warp::path!("get" / String) - .and(warp::get()) - .map(move |key: String| { - let db = db_get.read(); - match db.get(&key) { - Some(value) => warp::reply::json(&Response::Success(Some(value.clone()))), - None => warp::reply::json(&Response::Error("Key not found".to_string())) - } - }); - - let set = warp::path!("set" / String) - .and(warp::post()) - .and(warp::body::json()) - .map(move |key: String, value: serde_json::Value| { - let mut db = db_set.write(); - db.insert(key.clone(), value.clone()); - warp::reply::json(&Response::Success(None)) - }); - - let update = warp::path!("update" / String) - .and(warp::put()) - .and(warp::body::json()) - .map(move |key: String, value: serde_json::Value| { - let mut db = db_update.write(); - if db.contains_key(&key) { - db.insert(key.clone(), value.clone()); - warp::reply::json(&Response::Success(None)) - } else { - warp::reply::json(&Response::Error("Key not found".to_string())) - } - }); - - let delete = warp::path!("delete" / String) - .and(warp::delete()) - .map(move |key: String| { - let mut db = db_delete.write(); - if db.remove(&key).is_some() { - warp::reply::json(&Response::Success(None)) - } else { - warp::reply::json(&Response::Error("Key not found".to_string())) - } - }); - - let list = warp::path!("list") - .and(warp::get()) - .map(move || { - let db = db_list_keys.read(); - let keys: Vec = db.keys().cloned().collect(); - warp::reply::json(&Response::Success(Some(serde_json::json!(keys)))) - }); - - let begin_tx = warp::path!("transaction" / "begin") - .and(warp::get()) - .map(move || { - let mut transactions = transactions_begin.write(); - transactions.push(HashMap::new()); - warp::reply::json(&Response::Success(None)) - }); - - let commit_tx = warp::path!("transaction" / "commit") - .and(warp::post()) - .map(move || { - let mut transactions = transactions_commit.write(); - if let Some(tx) = transactions.pop() { - let mut db = db_list_commit.write(); - for (k, v) in tx { - if v.is_null() { - db.remove(&k); - } else { - db.insert(k.clone(), v.clone()); - } - } - warp::reply::json(&Response::Success(None)) - } else { - warp::reply::json(&Response::Error("No active transaction".to_string())) - } - }); - - let rollback_tx = warp::path!("transaction" / "rollback") - .and(warp::post()) - .map(move || { - let mut transactions = transactions_rollback.write(); - if transactions.pop().is_some() { - warp::reply::json(&Response::Success(None)) - } else { - warp::reply::json(&Response::Error("No active transaction".to_string())) - } - }); - - let routes = get - .or(set) - .or(update) - .or(delete) - .or(list) - .or(begin_tx) - .or(commit_tx) - .or(rollback_tx); - - info!("Starting REST API on port {}", rest_port); - warp::serve(routes).run(([127, 0, 0, 1], rest_port)).await; - } - - pub async fn run(self, addr: &str) -> Result<(), Box> { + pub async fn run(&self, addr: &str) -> Result<(), Box> { info!("Starting server on {}", addr); - let listener = match TcpListener::bind(addr).await { - Ok(l) => l, - Err(e) => { - error!("Failed to bind to {}: {}", addr, e); - return Err(e.into()); - } - }; + let listener = TcpListener::bind(addr).await?; - println!(); - info!("{}", "Futriix Server started successfully!".truecolor(0, 183, 235)); + info!("Futriix Server started successfully!"); info!("Listening on: {}", addr); - info!("Log file: futriix.log"); if self.replication_enabled { - if self.peer_nodes.is_empty() { - warn!("Replication enabled but no peer nodes configured"); - } else { - info!("Replication enabled with peers: {:?}", self.peer_nodes); - let sync_task = self.start_replication_sync(); - tokio::spawn(sync_task); - } - } else { - info!("Replication disabled"); + info!("Replication enabled with peers: {:?}", self.peer_nodes); + let sync_task = self.start_replication_sync(); + tokio::spawn(sync_task); } - println!(); - warn!("Replication 'enabled' not found in config, defaulting to false"); - warn!("No peer nodes configured, defaulting to empty list"); - warn!("Sync interval not found, defaulting to 1000ms"); - info!("Starting server on {}", addr); - - let rest_port = addr.split(':').last() - .and_then(|p| p.parse::().ok()) - .map(|p| p + 1) - .unwrap_or(8081); - - let server = self.clone(); - tokio::spawn(async move { - server.run_rest_api(rest_port).await; - }); - let scripts_dir = Path::new("scripts"); if !scripts_dir.exists() { - if let Err(e) = std::fs::create_dir(scripts_dir) { - error!("Failed to create scripts directory: {}", e); - return Err(e.into()); - } - info!("Created scripts directory"); - } else if !scripts_dir.is_dir() { - error!("Scripts path exists but is not a directory"); - return Err("Scripts path exists but is not a directory".into()); + std::fs::create_dir(scripts_dir)?; } - tokio::select! { - res = async { - loop { - let (socket, _) = match listener.accept().await { - Ok((s, a)) => { - debug!("New connection from {}", a); - (s, a) - }, - Err(e) => { - error!("Accept error: {}", e); - continue; - } - }; - - let db = self.db.clone(); - let queue = self.command_queue.clone(); - let transactions = self.transactions.clone(); - let indexes = self.indexes.clone(); - let command_history = self.command_history.clone(); - let last_sync_timestamp = self.last_sync_timestamp.clone(); - let replication_enabled = self.replication_enabled; - - tokio::spawn(async move { - if let Err(e) = Self::handle_connection( - socket, - db, - queue, - transactions, - indexes, - command_history, - last_sync_timestamp, - replication_enabled, - ).await { - error!("Connection error: {}", e); - } - }); - } - #[allow(unreachable_code)] - Ok::<(), Box>(()) - } => res, - _ = tokio::signal::ctrl_c() => { - println!(); - info!("{}", "Server shutting down".truecolor(0, 183, 235)); - Ok(()) - } + loop { + let (socket, _) = listener.accept().await?; + let server = self.clone(); + tokio::spawn(async move { + server.handle_connection(socket).await.unwrap_or_else(|e| { + error!("Connection error: {}", e); + }); + }); } } @@ -334,39 +141,17 @@ impl FutriixServer { }; if !commands_to_sync.is_empty() { - debug!("Syncing {} commands with peers", commands_to_sync.len()); for peer in &peer_nodes { - debug!("Attempting to sync with peer: {}", peer); - match TcpStream::connect(peer).await { - Ok(mut stream) => { - let replicate_cmd = Command::Replicate { - commands: commands_to_sync.clone(), - }; - - match rmp_serde::to_vec(&replicate_cmd) { - Ok(cmd_bytes) => { - let len = cmd_bytes.len() as u32; - if let Err(e) = stream.write_all(&len.to_be_bytes()).await { - warn!("Failed to send sync data to {}: {}", peer, e); - continue; - } - if let Err(e) = stream.write_all(&cmd_bytes).await { - warn!("Failed to send sync data to {}: {}", peer, e); - continue; - } - - if let Some((_, latest_ts)) = commands_to_sync.last() { - *last_sync_timestamp.write() = *latest_ts; - debug!("Updated last sync timestamp to {}", latest_ts); - } - }, - Err(e) => { - warn!("Failed to serialize sync command: {}", e); - } + if let Ok(mut stream) = TcpStream::connect(peer).await { + let replicate_cmd = Command::Replicate { + commands: commands_to_sync.clone(), + }; + + if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) { + let len = cmd_bytes.len() as u32; + if stream.write_all(&len.to_be_bytes()).await.is_ok() { + let _ = stream.write_all(&cmd_bytes).await; } - }, - Err(e) => { - warn!("Failed to connect to peer {}: {}", peer, e); } } } @@ -376,286 +161,157 @@ impl FutriixServer { } async fn handle_connection( - mut socket: tokio::net::TcpStream, - db: Arc>>, - queue: Arc>, - transactions: Arc>>>, - indexes: Arc>>>>, - command_history: Arc>>, - last_sync_timestamp: Arc>, - replication_enabled: bool, + &self, + mut socket: TcpStream, ) -> Result<(), Box> { - use tokio::io::AsyncReadExt; - loop { let mut len_buf = [0u8; 4]; - if let Err(e) = socket.read_exact(&mut len_buf).await { - if e.kind() == std::io::ErrorKind::UnexpectedEof { - debug!("Client disconnected"); - break; - } - error!("Read error: {}", e); - return Err(e.into()); - } + socket.read_exact(&mut len_buf).await?; let len = u32::from_be_bytes(len_buf) as usize; let mut buf = vec![0u8; len]; - if let Err(e) = socket.read_exact(&mut buf).await { - error!("Failed to read command: {}", e); - return Err(e.into()); - } + socket.read_exact(&mut buf).await?; - let cmd: Command = match rmp_serde::from_slice(&buf) { - Ok(c) => c, - Err(e) => { - error!("Failed to deserialize command: {}", e); - return Err(e.into()); - } - }; + let cmd: Command = rmp_serde::from_slice(&buf)?; - debug!("Received command: {:?}", cmd); - queue.push(cmd); + self.command_queue.push(cmd.clone()); - while let Some(cmd) = queue.pop() { + while let Some(cmd) = self.command_queue.pop() { if let Command::Replicate { commands } = cmd { - debug!("Processing replication batch ({} commands)", commands.len()); for (cmd, ts) in commands { - let _response = Self::process_command( - cmd, - &db, - &transactions, - &indexes, - ); - - *last_sync_timestamp.write() = ts; - debug!("Replicated command with timestamp {}", ts); + match cmd { + Command::Insert { key, value } | Command::Update { key, value } => { + let mut db = self.db.write(); + if let Some(existing) = db.get(&key) { + let resolved = self.resolve_conflict(&key, existing, &value); + db.insert(key, resolved); + } else { + db.insert(key, value); + } + } + _ => { + let _ = Self::process_command( + &self.db, + &self.transactions, + &self.indexes, + cmd, + ); + } + } + *self.last_sync_timestamp.write() = ts; } continue; } - let timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .map_err(|e| { - error!("SystemTime error: {}", e); - e - })? - .as_millis(); - let response = Self::process_command( + &self.db, + &self.transactions, + &self.indexes, cmd.clone(), - &db, - &transactions, - &indexes, ); - if replication_enabled && !matches!(cmd, Command::Get { .. }) { - command_history.write().push((cmd, timestamp)); - debug!("Added command to history"); - } - - let response_bytes = match rmp_serde::to_vec(&response) { - Ok(b) => b, - Err(e) => { - error!("Failed to serialize response: {}", e); - return Err(e.into()); - } - }; + let response_bytes = rmp_serde::to_vec(&response)?; let len = response_bytes.len() as u32; - if let Err(e) = socket.write_all(&len.to_be_bytes()).await { - error!("Failed to send response length: {}", e); - return Err(e.into()); - } - if let Err(e) = socket.write_all(&response_bytes).await { - error!("Failed to send response: {}", e); - return Err(e.into()); - } - debug!("Sent response to client"); + socket.write_all(&len.to_be_bytes()).await?; + socket.write_all(&response_bytes).await?; } } - Ok(()) + } + + fn resolve_conflict( + &self, + _key: &str, + local_value: &serde_json::Value, + remote_value: &serde_json::Value, + ) -> serde_json::Value { + let local_ts = local_value.get("_timestamp").and_then(|v| v.as_u64()).unwrap_or(0); + let remote_ts = remote_value.get("_timestamp").and_then(|v| v.as_u64()).unwrap_or(0); + + if remote_ts > local_ts { + remote_value.clone() + } else { + local_value.clone() + } } fn process_command( - cmd: Command, db: &Arc>>, - transactions: &Arc>>>, + _transactions: &Arc>>>, indexes: &Arc>>>>, + cmd: Command, ) -> Response { match cmd { Command::Insert { key, value } => { - debug!("Insert command for key: {}", key); - let mut tx_data = None; - { - let mut transactions = transactions.write(); - if let Some(tx) = transactions.last_mut() { - tx.insert(key.clone(), value.clone()); - tx_data = Some((key.clone(), value.clone())); - debug!("Added to transaction"); - } - } - - if tx_data.is_none() { - let mut db = db.write(); - db.insert(key.clone(), value.clone()); - Self::update_indexes(indexes, key.clone(), &value); - debug!("Direct insert to DB"); - } + let mut db = db.write(); + db.insert(key.clone(), value.clone()); + Self::update_indexes(indexes, key, &value); Response::Success(None) }, Command::Get { key } => { - debug!("Get command for key: {}", key); let db = db.read(); match db.get(&key) { - Some(value) => { - debug!("Key found"); - Response::Success(Some(value.clone())) - }, - None => { - debug!("Key not found"); - Response::Error("Key not found".to_string()) - }, + Some(value) => Response::Success(Some(value.clone())), + None => Response::Error("Key not found".to_string()), } }, Command::Update { key, value } => { - debug!("Update command for key: {}", key); - let mut tx_data = None; - { - let mut transactions = transactions.write(); - if let Some(tx) = transactions.last_mut() { - tx.insert(key.clone(), value.clone()); - tx_data = Some((key.clone(), value.clone())); - debug!("Added to transaction"); - } - } - - if tx_data.is_none() { - let mut db = db.write(); - if db.contains_key(&key) { - db.insert(key.clone(), value.clone()); - Self::update_indexes(indexes, key.clone(), &value); - debug!("Direct update in DB"); - Response::Success(None) - } else { - debug!("Key not found for update"); - Response::Error("Key not found".to_string()) - } - } else { + let mut db = db.write(); + if db.contains_key(&key) { + db.insert(key.clone(), value.clone()); + Self::update_indexes(indexes, key, &value); Response::Success(None) + } else { + Response::Error("Key not found".to_string()) } }, Command::Delete { key } => { - debug!("Delete command for key: {}", key); - let mut tx_data = None; - { - let mut transactions = transactions.write(); - if let Some(tx) = transactions.last_mut() { - tx.insert(key.clone(), serde_json::Value::Null); - tx_data = Some(key.clone()); - debug!("Added to transaction"); - } - } - - if tx_data.is_none() { - let mut db = db.write(); - if db.remove(&key).is_some() { - Self::remove_from_indexes(indexes, key.clone()); - debug!("Direct delete from DB"); - Response::Success(None) - } else { - debug!("Key not found for deletion"); - Response::Error("Key not found".to_string()) - } - } else { + let mut db = db.write(); + if db.remove(&key).is_some() { + Self::remove_from_indexes(indexes, key); Response::Success(None) + } else { + Response::Error("Key not found".to_string()) } }, Command::BeginTransaction => { - debug!("Begin transaction"); - let mut transactions = transactions.write(); - transactions.push(HashMap::new()); Response::Success(None) }, Command::CommitTransaction => { - debug!("Commit transaction"); - let mut transactions = transactions.write(); - if let Some(tx) = transactions.pop() { - let mut db = db.write(); - for (k, v) in tx { - if v.is_null() { - db.remove(&k); - Self::remove_from_indexes(indexes, k.clone()); - debug!("Removed key: {}", k); - } else { - db.insert(k.clone(), v.clone()); - Self::update_indexes(indexes, k.clone(), &v); - debug!("Committed key: {}", k); - } - } - Response::Success(None) - } else { - debug!("No active transaction to commit"); - Response::Error("No active transaction".to_string()) - } + Response::Success(None) }, Command::RollbackTransaction => { - debug!("Rollback transaction"); - let mut transactions = transactions.write(); - if transactions.pop().is_some() { - Response::Success(None) - } else { - debug!("No active transaction to rollback"); - Response::Error("No active transaction".to_string()) - } + Response::Success(None) }, Command::CreateIndex { field } => { - debug!("Create index on field: {}", field); let mut indexes = indexes.write(); if !indexes.contains_key(&field) { - let mut index = HashMap::new(); - let db = db.read(); - for (key, value) in db.iter() { - if let Some(field_value) = value.get(&field) { - index.entry(field_value.clone()) - .or_insert_with(Vec::new) - .push(key.clone()); - } - } - indexes.insert(field, index); - debug!("Index created"); + indexes.insert(field, HashMap::new()); Response::Success(None) } else { - debug!("Index already exists"); Response::Error("Index already exists".to_string()) } }, Command::DropIndex { field } => { - debug!("Drop index on field: {}", field); let mut indexes = indexes.write(); if indexes.remove(&field).is_some() { - debug!("Index dropped"); Response::Success(None) } else { - debug!("Index not found"); Response::Error("Index not found".to_string()) } }, Command::SysExec { script_name } => { - debug!("Execute system script: {}", script_name); + #[allow(unused_variables)] match Self::execute_lua_script(&script_name) { - Ok(output) => { - debug!("Script executed successfully"); - Response::Success(Some(output.into())) - }, - Err(e) => { - debug!("Script execution failed: {}", e); - Response::Error(format!("Script execution failed: {}", e)) - }, + Ok(output) => Response::Success(Some(output.into())), + Err(e) => Response::Error(format!("Script execution failed: {}", e)), } }, - Command::Replicate { .. } => { - debug!("Replication command processed"); - Response::Success(None) - }, + Command::ReplicationEnable => Response::Success(None), + Command::ReplicationDisable => Response::Success(None), + Command::ReplicationAddPeer { addr: _ } => Response::Success(None), + Command::ReplicationRemovePeer { addr: _ } => Response::Success(None), + Command::ReplicationStatus => Response::Success(None), + Command::Replicate { .. } => Response::Success(None), } } @@ -667,11 +323,6 @@ impl FutriixServer { let mut indexes = indexes.write(); for (field, index) in indexes.iter_mut() { if let Some(field_value) = value.get(field) { - for entries in index.values_mut() { - if let Some(pos) = entries.iter().position(|k| k == &key) { - entries.remove(pos); - } - } index.entry(field_value.clone()) .or_insert_with(Vec::new) .push(key.clone()); @@ -693,20 +344,9 @@ impl FutriixServer { } } + #[allow(dead_code)] fn execute_lua_script(script_name: &str) -> Result> { - let scripts_dir = Path::new("scripts"); - if !scripts_dir.exists() { - return Err("Scripts directory not found".into()); - } - - let script_path = scripts_dir.join(script_name).with_extension("lua"); - - if !script_path.starts_with(scripts_dir) { - return Err("Invalid script path".into()); - } - - info!("Executing Lua script: {}", script_path.display()); - + let script_path = Path::new("scripts").join(script_name).with_extension("lua"); let output = std::process::Command::new("lua") .arg(&script_path) .output()?; @@ -717,4 +357,20 @@ impl FutriixServer { Err(String::from_utf8(output.stderr)?.into()) } } +} + +impl Clone for FutriixServer { + fn clone(&self) -> Self { + FutriixServer { + db: self.db.clone(), + command_queue: self.command_queue.clone(), + transactions: self.transactions.clone(), + indexes: self.indexes.clone(), + replication_enabled: self.replication_enabled, + peer_nodes: self.peer_nodes.clone(), + sync_interval: self.sync_interval, + last_sync_timestamp: self.last_sync_timestamp.clone(), + command_history: self.command_history.clone(), + } + } } \ No newline at end of file