From 867076483c7656566831fd6872dfef8c6f64e978 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sat, 12 Jul 2025 15:21:46 +0000 Subject: [PATCH] Delete futriix-server/src/old-server.rs --- futriix-server/src/old-server.rs | 639 ------------------------------- 1 file changed, 639 deletions(-) delete mode 100644 futriix-server/src/old-server.rs diff --git a/futriix-server/src/old-server.rs b/futriix-server/src/old-server.rs deleted file mode 100644 index d9bc987..0000000 --- a/futriix-server/src/old-server.rs +++ /dev/null @@ -1,639 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::path::Path; -use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; -use crossbeam::queue::SegQueue; -use parking_lot::RwLock; -use tokio::net::{TcpListener, TcpStream}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::time; -use log::{info, error, debug, warn}; -use warp::Filter; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum Command { - Insert { key: String, value: serde_json::Value }, - Get { key: String, version: Option }, - Update { key: String, value: serde_json::Value }, - Delete { key: String }, - BeginTransaction, - CommitTransaction(u64), - RollbackTransaction(u64), - CreateIndex { field: String }, - DropIndex { field: String }, - SysExec { script_name: String }, - Replicate { commands: Vec<(Command, u128)> }, - ReplicationEnable, - ReplicationDisable, - ReplicationAddPeer { addr: String }, - ReplicationRemovePeer { addr: String }, - ReplicationStatus, - BackupCreate { path: String }, - BackupRestore { path: String }, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum Response { - Success(Option), - Error(String), - ReplicationStatus { - enabled: bool, - peers: Vec, - last_sync: u128, - }, - BackupStatus { success: bool, message: String }, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -struct VersionedValue { - value: serde_json::Value, - version: u64, - timestamp: u128, - tx_id: Option, -} - -pub struct FutriixServer { - db: Arc>>>, - command_queue: Arc>, - transactions: Arc>>>, - indexes: Arc>>>>, - replication_enabled: bool, - peer_nodes: Vec, - sync_interval: u64, - last_sync_timestamp: Arc>, - command_history: Arc>>, - next_tx_id: Arc>, -} - -impl FutriixServer { - pub fn new(config: &toml::Value) -> Self { - debug!("Initializing server with config: {:?}", config); - - let replication_enabled = config.get("replication") - .and_then(|r| r.get("enabled")) - .and_then(|v| v.as_bool()) - .unwrap_or(false); - - let peer_nodes = config.get("replication") - .and_then(|r| r.get("peer_nodes")) - .and_then(|n| n.as_array()) - .map(|nodes| { - nodes.iter() - .filter_map(|n| n.as_str().map(|s| s.to_string())) - .collect() - }) - .unwrap_or_default(); - - let sync_interval = config.get("replication") - .and_then(|r| r.get("sync_interval")) - .and_then(|i| i.as_integer()) - .map(|i| i as u64) - .unwrap_or(1000); - - FutriixServer { - db: Arc::new(RwLock::new(HashMap::new())), - command_queue: Arc::new(SegQueue::new()), - transactions: Arc::new(RwLock::new(HashMap::new())), - indexes: Arc::new(RwLock::new(HashMap::new())), - replication_enabled, - peer_nodes, - sync_interval, - last_sync_timestamp: Arc::new(RwLock::new(0)), - command_history: Arc::new(RwLock::new(Vec::new())), - next_tx_id: Arc::new(RwLock::new(1)), - } - } - - pub async fn run(&self, addr: &str) -> Result<(), Box> { - info!("Starting server on {}", addr); - let listener = TcpListener::bind(addr).await?; - - let http_port = addr.split(':').last().unwrap_or("8080").parse::().unwrap_or(8080) + 1; - let http_addr = format!("0.0.0.0:{}", http_port); - self.start_http_api(http_addr.clone()); - - info!("Futriix Server started successfully!"); - info!("Listening on: {}", addr); - info!("HTTP API available on: {}", http_addr); - - if self.replication_enabled { - info!("Replication enabled with peers: {:?}", self.peer_nodes); - let sync_task = self.start_replication_sync(); - tokio::spawn(sync_task); - } - - let scripts_dir = Path::new("scripts"); - if !scripts_dir.exists() { - if let Err(e) = std::fs::create_dir(scripts_dir) { - warn!("Failed to create scripts directory: {}", e); - } - } - - let backups_dir = Path::new("backups"); - if !backups_dir.exists() { - if let Err(e) = std::fs::create_dir(backups_dir) { - warn!("Failed to create backups directory: {}", e); - } - } - - loop { - match listener.accept().await { - Ok((socket, _)) => { - let server = self.clone(); - tokio::spawn(async move { - if let Err(e) = server.handle_connection(socket).await { - error!("Connection error: {}", e); - } - }); - } - Err(e) => { - error!("Accept error: {}", e); - } - } - } - } - - fn start_http_api(&self, addr: String) { - let db = self.db.clone(); - let command_queue_insert = self.command_queue.clone(); - let command_queue_update = self.command_queue.clone(); - let command_queue_delete = self.command_queue.clone(); - - tokio::spawn(async move { - let get = warp::path!("api" / "get" / String) - .and(warp::get()) - .map(move |key: String| { - let db = db.read(); - match db.get(&key) { - Some(versions) => { - if let Some(latest) = versions.last() { - warp::reply::json(&latest.value) - } else { - warp::reply::json(&serde_json::Value::Null) - } - }, - None => warp::reply::json(&serde_json::Value::Null), - } - }); - - let insert = warp::path!("api" / "insert") - .and(warp::post()) - .and(warp::body::json()) - .map(move |data: HashMap| { - for (key, value) in data { - command_queue_insert.push(Command::Insert { key, value }); - } - warp::reply::json(&serde_json::json!({"status": "ok"})) - }); - - let update = warp::path!("api" / "update") - .and(warp::post()) - .and(warp::body::json()) - .map(move |data: HashMap| { - for (key, value) in data { - command_queue_update.push(Command::Update { key, value }); - } - warp::reply::json(&serde_json::json!({"status": "ok"})) - }); - - let delete = warp::path!("api" / "delete" / String) - .and(warp::delete()) - .map(move |key: String| { - command_queue_delete.push(Command::Delete { key }); - warp::reply::json(&serde_json::json!({"status": "ok"})) - }); - - let routes = get.or(insert).or(update).or(delete); - - warp::serve(routes) - .run(addr.parse::().unwrap()) - .await; - }); - } - - fn start_replication_sync(&self) -> impl std::future::Future + 'static { - let peer_nodes = self.peer_nodes.clone(); - let command_history = self.command_history.clone(); - let last_sync_timestamp = self.last_sync_timestamp.clone(); - let sync_interval = self.sync_interval; - - async move { - let mut interval = time::interval(Duration::from_millis(sync_interval)); - loop { - interval.tick().await; - - let commands_to_sync = { - let history = command_history.read(); - let last_sync = *last_sync_timestamp.read(); - history.iter() - .filter(|(_, ts)| *ts > last_sync) - .map(|(cmd, ts)| (cmd.clone(), *ts)) - .collect::>() - }; - - if !commands_to_sync.is_empty() { - for peer in &peer_nodes { - if let Ok(mut stream) = TcpStream::connect(peer).await { - let replicate_cmd = Command::Replicate { - commands: commands_to_sync.clone(), - }; - - if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) { - let len = cmd_bytes.len() as u32; - if let Err(e) = stream.write_all(&len.to_be_bytes()).await { - warn!("Failed to write command length to peer {}: {}", peer, e); - continue; - } - if let Err(e) = stream.write_all(&cmd_bytes).await { - warn!("Failed to write command to peer {}: {}", peer, e); - } - } - } - } - } - } - } - } - - async fn handle_connection( - &self, - mut socket: TcpStream, - ) -> Result<(), Box> { - loop { - let mut len_buf = [0u8; 4]; - if let Err(e) = socket.read_exact(&mut len_buf).await { - error!("Failed to read command length: {}", e); - break; - } - let len = u32::from_be_bytes(len_buf) as usize; - - let mut buf = vec![0u8; len]; - if let Err(e) = socket.read_exact(&mut buf).await { - error!("Failed to read command: {}", e); - break; - } - - let cmd: Command = match rmp_serde::from_slice(&buf) { - Ok(cmd) => cmd, - Err(e) => { - error!("Failed to deserialize command: {}", e); - break; - } - }; - - self.command_queue.push(cmd.clone()); - - while let Some(cmd) = self.command_queue.pop() { - if let Command::Replicate { commands } = cmd { - for (cmd, ts) in commands { - match cmd { - Command::Insert { key, value } | Command::Update { key, value } => { - let mut db = self.db.write(); - let timestamp = match SystemTime::now() - .duration_since(UNIX_EPOCH) - { - Ok(duration) => duration.as_millis(), - Err(e) => { - error!("SystemTime error: {}", e); - 0 - } - }; - let versioned_value = VersionedValue { - value: value.clone(), - version: 0, - timestamp, - tx_id: None, - }; - db.entry(key.clone()) - .or_default() - .push(versioned_value); - } - _ => { - let _ = Self::process_command( - &self.db, - &self.transactions, - &self.indexes, - &self.next_tx_id, - cmd, - ); - } - } - *self.last_sync_timestamp.write() = ts; - } - continue; - } - - let response = Self::process_command( - &self.db, - &self.transactions, - &self.indexes, - &self.next_tx_id, - cmd.clone(), - ); - - let response_bytes = match rmp_serde::to_vec(&response) { - Ok(bytes) => bytes, - Err(e) => { - error!("Failed to serialize response: {}", e); - break; - } - }; - let len = response_bytes.len() as u32; - if let Err(e) = socket.write_all(&len.to_be_bytes()).await { - error!("Failed to write response length: {}", e); - break; - } - if let Err(e) = socket.write_all(&response_bytes).await { - error!("Failed to write response: {}", e); - break; - } - } - } - Ok(()) - } - - #[allow(dead_code)] - fn resolve_conflict( - &self, - _key: &str, - local_value: &VersionedValue, - remote_value: &VersionedValue, - ) -> VersionedValue { - if remote_value.timestamp > local_value.timestamp { - remote_value.clone() - } else { - local_value.clone() - } - } - - fn process_command( - db: &Arc>>>, - transactions: &Arc>>>, - indexes: &Arc>>>>, - next_tx_id: &Arc>, - cmd: Command, - ) -> Response { - match cmd { - Command::Insert { key, value } => { - let mut db = db.write(); - let timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_millis(); - let version = db.get(&key).map_or(0, |v| v.last().map_or(0, |vv| vv.version + 1)); - - let versioned_value = VersionedValue { - value: value.clone(), - version, - timestamp, - tx_id: None, - }; - - db.entry(key.clone()) - .or_default() - .push(versioned_value.clone()); - - Self::update_indexes(indexes, key, &value); - Response::Success(None) - }, - Command::Get { key, version } => { - let db = db.read(); - match db.get(&key) { - Some(versions) => { - let value = if let Some(v) = version { - versions.iter().find(|vv| vv.version == v) - } else { - versions.last() - }; - - match value { - Some(vv) => Response::Success(Some(vv.value.clone())), - None => Response::Error("Version not found".to_string()), - } - }, - None => Response::Error("Key not found".to_string()), - } - }, - Command::Update { key, value } => { - let mut db = db.write(); - if db.contains_key(&key) { - let timestamp = SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap_or_default() - .as_millis(); - let version = db.get(&key).unwrap().last().unwrap().version + 1; - - let versioned_value = VersionedValue { - value: value.clone(), - version, - timestamp, - tx_id: None, - }; - - db.get_mut(&key).unwrap().push(versioned_value.clone()); - Self::update_indexes(indexes, key, &value); - Response::Success(None) - } else { - Response::Error("Key not found".to_string()) - } - }, - Command::Delete { key } => { - let mut db = db.write(); - if db.remove(&key).is_some() { - Self::remove_from_indexes(indexes, key); - Response::Success(None) - } else { - Response::Error("Key not found".to_string()) - } - }, - Command::BeginTransaction => { - let tx_id = { - let mut next_id = next_tx_id.write(); - let id = *next_id; - *next_id += 1; - id - }; - - let mut transactions = transactions.write(); - transactions.insert(tx_id, HashMap::new()); - Response::Success(Some(tx_id.into())) - }, - Command::CommitTransaction(tx_id) => { - let mut db = db.write(); - let mut transactions = transactions.write(); - - if let Some(tx_data) = transactions.remove(&tx_id) { - for (key, versioned_value) in tx_data { - db.entry(key.clone()) - .or_default() - .push(versioned_value.clone()); - Self::update_indexes(indexes, key, &versioned_value.value); - } - Response::Success(None) - } else { - Response::Error("Transaction not found".to_string()) - } - }, - Command::RollbackTransaction(tx_id) => { - let mut transactions = transactions.write(); - if transactions.remove(&tx_id).is_some() { - Response::Success(None) - } else { - Response::Error("Transaction not found".to_string()) - } - }, - Command::CreateIndex { field } => { - let mut indexes = indexes.write(); - if !indexes.contains_key(&field) { - indexes.insert(field, HashMap::new()); - Response::Success(None) - } else { - Response::Error("Index already exists".to_string()) - } - }, - Command::DropIndex { field } => { - let mut indexes = indexes.write(); - if indexes.remove(&field).is_some() { - Response::Success(None) - } else { - Response::Error("Index not found".to_string()) - } - }, - Command::SysExec { script_name } => { - match Self::execute_lua_script(&script_name) { - Ok(output) => Response::Success(Some(output.into())), - Err(e) => Response::Error(format!("Script execution failed: {}", e)), - } - }, - Command::ReplicationEnable => Response::Success(None), - Command::ReplicationDisable => Response::Success(None), - Command::ReplicationAddPeer { addr: _ } => Response::Success(None), - Command::ReplicationRemovePeer { addr: _ } => Response::Success(None), - Command::ReplicationStatus => Response::Success(None), - Command::Replicate { .. } => Response::Success(None), - Command::BackupCreate { path } => { - match Self::create_backup(db, &path) { - Ok(_) => Response::BackupStatus { - success: true, - message: format!("Backup created successfully at {}", path), - }, - Err(e) => Response::BackupStatus { - success: false, - message: format!("Backup failed: {}", e), - }, - } - }, - Command::BackupRestore { path } => { - match Self::restore_backup(db, indexes, &path) { - Ok(_) => Response::BackupStatus { - success: true, - message: format!("Backup restored successfully from {}", path), - }, - Err(e) => Response::BackupStatus { - success: false, - message: format!("Restore failed: {}", e), - }, - } - }, - } - } - - fn create_backup( - db: &Arc>>>, - path: &str, - ) -> Result<(), Box> { - let db = db.read(); - let serialized = serde_json::to_string(&*db)?; - std::fs::write(path, serialized)?; - Ok(()) - } - - fn restore_backup( - db: &Arc>>>, - indexes: &Arc>>>>, - path: &str, - ) -> Result<(), Box> { - let data = std::fs::read_to_string(path)?; - let restored: HashMap> = serde_json::from_str(&data)?; - - let mut db = db.write(); - *db = restored; - - let mut indexes = indexes.write(); - indexes.clear(); - - for (key, versions) in db.iter() { - if let Some(latest) = versions.last() { - for (field, index) in indexes.iter_mut() { - if let Some(field_value) = latest.value.get(field) { - index.entry(field_value.clone()) - .or_default() - .push(key.clone()); - } - } - } - } - - Ok(()) - } - - fn update_indexes( - indexes: &Arc>>>>, - key: String, - value: &serde_json::Value, - ) { - let mut indexes = indexes.write(); - for (field, index) in indexes.iter_mut() { - if let Some(field_value) = value.get(field) { - index.entry(field_value.clone()) - .or_default() - .push(key.clone()); - } - } - } - - fn remove_from_indexes( - indexes: &Arc>>>>, - key: String, - ) { - let mut indexes = indexes.write(); - for index in indexes.values_mut() { - for entries in index.values_mut() { - if let Some(pos) = entries.iter().position(|k| k == &key) { - entries.remove(pos); - } - } - } - } - - fn execute_lua_script(script_name: &str) -> Result> { - let script_path = Path::new("scripts").join(script_name).with_extension("lua"); - let output = std::process::Command::new("lua") - .arg(&script_path) - .output()?; - - if output.status.success() { - Ok(String::from_utf8(output.stdout)?) - } else { - Err(String::from_utf8(output.stderr)?.into()) - } - } -} - -impl Clone for FutriixServer { - fn clone(&self) -> Self { - FutriixServer { - db: self.db.clone(), - command_queue: self.command_queue.clone(), - transactions: self.transactions.clone(), - indexes: self.indexes.clone(), - replication_enabled: self.replication_enabled, - peer_nodes: self.peer_nodes.clone(), - sync_interval: self.sync_interval, - last_sync_timestamp: self.last_sync_timestamp.clone(), - command_history: self.command_history.clone(), - next_tx_id: self.next_tx_id.clone(), - } - } -}