From 4955d8c4d099f4351b893ed04f51e51c6c347dac Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sun, 6 Jul 2025 13:43:46 +0000 Subject: [PATCH] Delete futriix-server/src/old-server.rs --- futriix-server/src/old-server.rs | 376 ------------------------------- 1 file changed, 376 deletions(-) delete mode 100644 futriix-server/src/old-server.rs diff --git a/futriix-server/src/old-server.rs b/futriix-server/src/old-server.rs deleted file mode 100644 index 581f0c9..0000000 --- a/futriix-server/src/old-server.rs +++ /dev/null @@ -1,376 +0,0 @@ -use serde::{Deserialize, Serialize}; -use std::collections::HashMap; -use std::path::Path; -use std::sync::Arc; -use std::time::Duration; -use crossbeam::queue::SegQueue; -use parking_lot::RwLock; -use tokio::net::{TcpListener, TcpStream}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; -use tokio::time; -use log::{info, error, debug}; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub enum Command { - Insert { key: String, value: serde_json::Value }, - Get { key: String }, - Update { key: String, value: serde_json::Value }, - Delete { key: String }, - BeginTransaction, - CommitTransaction, - RollbackTransaction, - CreateIndex { field: String }, - DropIndex { field: String }, - SysExec { script_name: String }, - Replicate { commands: Vec<(Command, u128)> }, - ReplicationEnable, - ReplicationDisable, - ReplicationAddPeer { addr: String }, - ReplicationRemovePeer { addr: String }, - ReplicationStatus, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum Response { - Success(Option), - Error(String), - ReplicationStatus { - enabled: bool, - peers: Vec, - last_sync: u128, - }, -} - -pub struct FutriixServer { - db: Arc>>, - command_queue: Arc>, - transactions: Arc>>>, - indexes: Arc>>>>, - replication_enabled: bool, - peer_nodes: Vec, - sync_interval: u64, - last_sync_timestamp: Arc>, - command_history: Arc>>, -} - -impl FutriixServer { - pub fn new(config: &toml::Value) -> Self { - debug!("Initializing server with config: {:?}", config); - - let replication_enabled = config.get("replication") - .and_then(|r| r.get("enabled")) - .and_then(|v| v.as_bool()) - .unwrap_or(false); - - let peer_nodes = config.get("replication") - .and_then(|r| r.get("peer_nodes")) - .and_then(|n| n.as_array()) - .map(|nodes| { - nodes.iter() - .filter_map(|n| n.as_str().map(|s| s.to_string())) - .collect() - }) - .unwrap_or_else(Vec::new); - - let sync_interval = config.get("replication") - .and_then(|r| r.get("sync_interval")) - .and_then(|i| i.as_integer()) - .map(|i| i as u64) - .unwrap_or(1000); - - FutriixServer { - db: Arc::new(RwLock::new(HashMap::new())), - command_queue: Arc::new(SegQueue::new()), - transactions: Arc::new(RwLock::new(Vec::new())), - indexes: Arc::new(RwLock::new(HashMap::new())), - replication_enabled, - peer_nodes, - sync_interval, - last_sync_timestamp: Arc::new(RwLock::new(0)), - command_history: Arc::new(RwLock::new(Vec::new())), - } - } - - pub async fn run(&self, addr: &str) -> Result<(), Box> { - info!("Starting server on {}", addr); - let listener = TcpListener::bind(addr).await?; - - info!("Futriix Server started successfully!"); - info!("Listening on: {}", addr); - - if self.replication_enabled { - info!("Replication enabled with peers: {:?}", self.peer_nodes); - let sync_task = self.start_replication_sync(); - tokio::spawn(sync_task); - } - - let scripts_dir = Path::new("scripts"); - if !scripts_dir.exists() { - std::fs::create_dir(scripts_dir)?; - } - - loop { - let (socket, _) = listener.accept().await?; - let server = self.clone(); - tokio::spawn(async move { - server.handle_connection(socket).await.unwrap_or_else(|e| { - error!("Connection error: {}", e); - }); - }); - } - } - - fn start_replication_sync(&self) -> impl std::future::Future + 'static { - let peer_nodes = self.peer_nodes.clone(); - let command_history = self.command_history.clone(); - let last_sync_timestamp = self.last_sync_timestamp.clone(); - let sync_interval = self.sync_interval; - - async move { - let mut interval = time::interval(Duration::from_millis(sync_interval)); - loop { - interval.tick().await; - - let commands_to_sync = { - let history = command_history.read(); - let last_sync = *last_sync_timestamp.read(); - history.iter() - .filter(|(_, ts)| *ts > last_sync) - .map(|(cmd, ts)| (cmd.clone(), *ts)) - .collect::>() - }; - - if !commands_to_sync.is_empty() { - for peer in &peer_nodes { - if let Ok(mut stream) = TcpStream::connect(peer).await { - let replicate_cmd = Command::Replicate { - commands: commands_to_sync.clone(), - }; - - if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) { - let len = cmd_bytes.len() as u32; - if stream.write_all(&len.to_be_bytes()).await.is_ok() { - let _ = stream.write_all(&cmd_bytes).await; - } - } - } - } - } - } - } - } - - async fn handle_connection( - &self, - mut socket: TcpStream, - ) -> Result<(), Box> { - loop { - let mut len_buf = [0u8; 4]; - socket.read_exact(&mut len_buf).await?; - let len = u32::from_be_bytes(len_buf) as usize; - - let mut buf = vec![0u8; len]; - socket.read_exact(&mut buf).await?; - - let cmd: Command = rmp_serde::from_slice(&buf)?; - - self.command_queue.push(cmd.clone()); - - while let Some(cmd) = self.command_queue.pop() { - if let Command::Replicate { commands } = cmd { - for (cmd, ts) in commands { - match cmd { - Command::Insert { key, value } | Command::Update { key, value } => { - let mut db = self.db.write(); - if let Some(existing) = db.get(&key) { - let resolved = self.resolve_conflict(&key, existing, &value); - db.insert(key, resolved); - } else { - db.insert(key, value); - } - } - _ => { - let _ = Self::process_command( - &self.db, - &self.transactions, - &self.indexes, - cmd, - ); - } - } - *self.last_sync_timestamp.write() = ts; - } - continue; - } - - let response = Self::process_command( - &self.db, - &self.transactions, - &self.indexes, - cmd.clone(), - ); - - let response_bytes = rmp_serde::to_vec(&response)?; - let len = response_bytes.len() as u32; - socket.write_all(&len.to_be_bytes()).await?; - socket.write_all(&response_bytes).await?; - } - } - } - - fn resolve_conflict( - &self, - _key: &str, - local_value: &serde_json::Value, - remote_value: &serde_json::Value, - ) -> serde_json::Value { - let local_ts = local_value.get("_timestamp").and_then(|v| v.as_u64()).unwrap_or(0); - let remote_ts = remote_value.get("_timestamp").and_then(|v| v.as_u64()).unwrap_or(0); - - if remote_ts > local_ts { - remote_value.clone() - } else { - local_value.clone() - } - } - - fn process_command( - db: &Arc>>, - _transactions: &Arc>>>, - indexes: &Arc>>>>, - cmd: Command, - ) -> Response { - match cmd { - Command::Insert { key, value } => { - let mut db = db.write(); - db.insert(key.clone(), value.clone()); - Self::update_indexes(indexes, key, &value); - Response::Success(None) - }, - Command::Get { key } => { - let db = db.read(); - match db.get(&key) { - Some(value) => Response::Success(Some(value.clone())), - None => Response::Error("Key not found".to_string()), - } - }, - Command::Update { key, value } => { - let mut db = db.write(); - if db.contains_key(&key) { - db.insert(key.clone(), value.clone()); - Self::update_indexes(indexes, key, &value); - Response::Success(None) - } else { - Response::Error("Key not found".to_string()) - } - }, - Command::Delete { key } => { - let mut db = db.write(); - if db.remove(&key).is_some() { - Self::remove_from_indexes(indexes, key); - Response::Success(None) - } else { - Response::Error("Key not found".to_string()) - } - }, - Command::BeginTransaction => { - Response::Success(None) - }, - Command::CommitTransaction => { - Response::Success(None) - }, - Command::RollbackTransaction => { - Response::Success(None) - }, - Command::CreateIndex { field } => { - let mut indexes = indexes.write(); - if !indexes.contains_key(&field) { - indexes.insert(field, HashMap::new()); - Response::Success(None) - } else { - Response::Error("Index already exists".to_string()) - } - }, - Command::DropIndex { field } => { - let mut indexes = indexes.write(); - if indexes.remove(&field).is_some() { - Response::Success(None) - } else { - Response::Error("Index not found".to_string()) - } - }, - Command::SysExec { script_name } => { - #[allow(unused_variables)] - match Self::execute_lua_script(&script_name) { - Ok(output) => Response::Success(Some(output.into())), - Err(e) => Response::Error(format!("Script execution failed: {}", e)), - } - }, - Command::ReplicationEnable => Response::Success(None), - Command::ReplicationDisable => Response::Success(None), - Command::ReplicationAddPeer { addr: _ } => Response::Success(None), - Command::ReplicationRemovePeer { addr: _ } => Response::Success(None), - Command::ReplicationStatus => Response::Success(None), - Command::Replicate { .. } => Response::Success(None), - } - } - - fn update_indexes( - indexes: &Arc>>>>, - key: String, - value: &serde_json::Value, - ) { - let mut indexes = indexes.write(); - for (field, index) in indexes.iter_mut() { - if let Some(field_value) = value.get(field) { - index.entry(field_value.clone()) - .or_insert_with(Vec::new) - .push(key.clone()); - } - } - } - - fn remove_from_indexes( - indexes: &Arc>>>>, - key: String, - ) { - let mut indexes = indexes.write(); - for index in indexes.values_mut() { - for entries in index.values_mut() { - if let Some(pos) = entries.iter().position(|k| k == &key) { - entries.remove(pos); - } - } - } - } - - #[allow(dead_code)] - fn execute_lua_script(script_name: &str) -> Result> { - let script_path = Path::new("scripts").join(script_name).with_extension("lua"); - let output = std::process::Command::new("lua") - .arg(&script_path) - .output()?; - - if output.status.success() { - Ok(String::from_utf8(output.stdout)?) - } else { - Err(String::from_utf8(output.stderr)?.into()) - } - } -} - -impl Clone for FutriixServer { - fn clone(&self) -> Self { - FutriixServer { - db: self.db.clone(), - command_queue: self.command_queue.clone(), - transactions: self.transactions.clone(), - indexes: self.indexes.clone(), - replication_enabled: self.replication_enabled, - peer_nodes: self.peer_nodes.clone(), - sync_interval: self.sync_interval, - last_sync_timestamp: self.last_sync_timestamp.clone(), - command_history: self.command_history.clone(), - } - } -} \ No newline at end of file