diff --git a/futriix-server/src/server.rs b/futriix-server/src/server.rs new file mode 100644 index 0000000..bebd194 --- /dev/null +++ b/futriix-server/src/server.rs @@ -0,0 +1,640 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::Path; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use crossbeam::queue::SegQueue; +use parking_lot::RwLock; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::time; +use log::{info, error, debug, warn}; +use warp::Filter; +use rlua::Lua; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum Command { + Insert { key: String, value: serde_json::Value }, + Get { key: String, version: Option }, + Update { key: String, value: serde_json::Value }, + Delete { key: String }, + BeginTransaction, + CommitTransaction(u64), + RollbackTransaction(u64), + CreateIndex { field: String }, + DropIndex { field: String }, + SysExec { script_name: String }, + Replicate { commands: Vec<(Command, u128)> }, + ReplicationEnable, + ReplicationDisable, + ReplicationAddPeer { addr: String }, + ReplicationRemovePeer { addr: String }, + ReplicationStatus, + BackupCreate { path: String }, + BackupRestore { path: String }, + LuaExec { code: String }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum Response { + Success(Option), + Error(String), + ReplicationStatus { + enabled: bool, + peers: Vec, + last_sync: u128, + }, + BackupStatus { success: bool, message: String }, + LuaOutput(String), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct VersionedValue { + value: serde_json::Value, + version: u64, + timestamp: u128, + tx_id: Option, +} + +pub struct FutriixServer { + db: Arc>>>, + command_queue: Arc>, + transactions: Arc>>>, + indexes: Arc>>>>, + replication_enabled: bool, + peer_nodes: Vec, + sync_interval: u64, + last_sync_timestamp: Arc>, + command_history: Arc>>, + next_tx_id: Arc>, +} + +impl FutriixServer { + pub fn new(config: &toml::Value) -> Self { + debug!("Initializing server with config: {:?}", config); + + let replication_enabled = config.get("replication") + .and_then(|r| r.get("enabled")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + let peer_nodes = config.get("replication") + .and_then(|r| r.get("peer_nodes")) + .and_then(|n| n.as_array()) + .map(|nodes| { + nodes.iter() + .filter_map(|n| n.as_str().map(|s| s.to_string())) + .collect() + }) + .unwrap_or_default(); + + let sync_interval = config.get("replication") + .and_then(|r| r.get("sync_interval")) + .and_then(|i| i.as_integer()) + .map(|i| i as u64) + .unwrap_or(1000); + + FutriixServer { + db: Arc::new(RwLock::new(HashMap::new())), + command_queue: Arc::new(SegQueue::new()), + transactions: Arc::new(RwLock::new(HashMap::new())), + indexes: Arc::new(RwLock::new(HashMap::new())), + replication_enabled, + peer_nodes, + sync_interval, + last_sync_timestamp: Arc::new(RwLock::new(0)), + command_history: Arc::new(RwLock::new(Vec::new())), + next_tx_id: Arc::new(RwLock::new(1)), + } + } + + pub async fn run(&self, addr: &str) -> Result<(), Box> { + info!("Starting server on {}", addr); + let listener = TcpListener::bind(addr).await?; + + let http_port = addr.split(':').last().unwrap_or("8080").parse::().unwrap_or(8080) + 1; + let http_addr = format!("0.0.0.0:{}", http_port); + self.start_http_api(http_addr.clone()); + + info!("Futriix Server started successfully!"); + info!("Listening on: {}", addr); + info!("HTTP API available on: {}", http_addr); + + if self.replication_enabled { + info!("Replication enabled with peers: {:?}", self.peer_nodes); + let sync_task = self.start_replication_sync(); + tokio::spawn(sync_task); + } + + let scripts_dir = Path::new("scripts"); + if !scripts_dir.exists() { + if let Err(e) = std::fs::create_dir(scripts_dir) { + warn!("Failed to create scripts directory: {}", e); + } + } + + let backups_dir = Path::new("backups"); + if !backups_dir.exists() { + if let Err(e) = std::fs::create_dir(backups_dir) { + warn!("Failed to create backups directory: {}", e); + } + } + + loop { + match listener.accept().await { + Ok((socket, _)) => { + let server = self.clone(); + tokio::spawn(async move { + if let Err(e) = server.handle_connection(socket).await { + error!("Connection error: {}", e); + } + }); + } + Err(e) => { + error!("Accept error: {}", e); + } + } + } + } + + fn start_http_api(&self, addr: String) { + let db = self.db.clone(); + let command_queue_insert = self.command_queue.clone(); + let command_queue_update = self.command_queue.clone(); + let command_queue_delete = self.command_queue.clone(); + + tokio::spawn(async move { + let get = warp::path!("api" / "get" / String) + .and(warp::get()) + .map(move |key: String| { + let db = db.read(); + match db.get(&key) { + Some(versions) => { + if let Some(latest) = versions.last() { + warp::reply::json(&latest.value) + } else { + warp::reply::json(&serde_json::Value::Null) + } + }, + None => warp::reply::json(&serde_json::Value::Null), + } + }); + + let insert = warp::path!("api" / "insert") + .and(warp::post()) + .and(warp::body::json()) + .map(move |data: HashMap| { + for (key, value) in data { + command_queue_insert.push(Command::Insert { key, value }); + } + warp::reply::json(&serde_json::json!({"status": "ok"})) + }); + + let update = warp::path!("api" / "update") + .and(warp::post()) + .and(warp::body::json()) + .map(move |data: HashMap| { + for (key, value) in data { + command_queue_update.push(Command::Update { key, value }); + } + warp::reply::json(&serde_json::json!({"status": "ok"})) + }); + + let delete = warp::path!("api" / "delete" / String) + .and(warp::delete()) + .map(move |key: String| { + command_queue_delete.push(Command::Delete { key }); + warp::reply::json(&serde_json::json!({"status": "ok"})) + }); + + let routes = get.or(insert).or(update).or(delete); + + warp::serve(routes) + .run(addr.parse::().unwrap()) + .await; + }); + } + + fn start_replication_sync(&self) -> impl std::future::Future + 'static { + let peer_nodes = self.peer_nodes.clone(); + let command_history = self.command_history.clone(); + let last_sync_timestamp = self.last_sync_timestamp.clone(); + let sync_interval = self.sync_interval; + + async move { + let mut interval = time::interval(Duration::from_millis(sync_interval)); + loop { + interval.tick().await; + + let commands_to_sync = { + let history = command_history.read(); + let last_sync = *last_sync_timestamp.read(); + history.iter() + .filter(|(_, ts)| *ts > last_sync) + .map(|(cmd, ts)| (cmd.clone(), *ts)) + .collect::>() + }; + + if !commands_to_sync.is_empty() { + for peer in &peer_nodes { + if let Ok(mut stream) = TcpStream::connect(peer).await { + let replicate_cmd = Command::Replicate { + commands: commands_to_sync.clone(), + }; + + if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) { + let len = cmd_bytes.len() as u32; + if let Err(e) = stream.write_all(&len.to_be_bytes()).await { + warn!("Failed to write command length to peer {}: {}", peer, e); + continue; + } + if let Err(e) = stream.write_all(&cmd_bytes).await { + warn!("Failed to write command to peer {}: {}", peer, e); + } + } + } + } + } + } + } + } + + async fn handle_connection( + &self, + mut socket: TcpStream, + ) -> Result<(), Box> { + loop { + let mut len_buf = [0u8; 4]; + if let Err(e) = socket.read_exact(&mut len_buf).await { + error!("Failed to read command length: {}", e); + break; + } + let len = u32::from_be_bytes(len_buf) as usize; + + let mut buf = vec![0u8; len]; + if let Err(e) = socket.read_exact(&mut buf).await { + error!("Failed to read command: {}", e); + break; + } + + let cmd: Command = match rmp_serde::from_slice(&buf) { + Ok(cmd) => cmd, + Err(e) => { + error!("Failed to deserialize command: {}", e); + break; + } + }; + + self.command_queue.push(cmd.clone()); + + while let Some(cmd) = self.command_queue.pop() { + if let Command::Replicate { commands } = cmd { + for (cmd, ts) in commands { + match cmd { + Command::Insert { key, value } | Command::Update { key, value } => { + let mut db = self.db.write(); + let timestamp = match SystemTime::now() + .duration_since(UNIX_EPOCH) + { + Ok(duration) => duration.as_millis(), + Err(e) => { + error!("SystemTime error: {}", e); + 0 + } + }; + let versioned_value = VersionedValue { + value: value.clone(), + version: 0, + timestamp, + tx_id: None, + }; + db.entry(key.clone()) + .or_default() + .push(versioned_value); + } + _ => { + let _ = Self::process_command( + &self.db, + &self.transactions, + &self.indexes, + &self.next_tx_id, + cmd, + ); + } + } + *self.last_sync_timestamp.write() = ts; + } + continue; + } + + let response = Self::process_command( + &self.db, + &self.transactions, + &self.indexes, + &self.next_tx_id, + cmd.clone(), + ); + + let response_bytes = match rmp_serde::to_vec(&response) { + Ok(bytes) => bytes, + Err(e) => { + error!("Failed to serialize response: {}", e); + break; + } + }; + let len = response_bytes.len() as u32; + if let Err(e) = socket.write_all(&len.to_be_bytes()).await { + error!("Failed to write response length: {}", e); + break; + } + if let Err(e) = socket.write_all(&response_bytes).await { + error!("Failed to write response: {}", e); + break; + } + } + } + Ok(()) + } + + fn process_command( + db: &Arc>>>, + transactions: &Arc>>>, + indexes: &Arc>>>>, + next_tx_id: &Arc>, + cmd: Command, + ) -> Response { + match cmd { + Command::Insert { key, value } => { + let mut db = db.write(); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let version = db.get(&key).map_or(0, |v| v.last().map_or(0, |vv| vv.version + 1)); + + let versioned_value = VersionedValue { + value: value.clone(), + version, + timestamp, + tx_id: None, + }; + + db.entry(key.clone()) + .or_default() + .push(versioned_value.clone()); + + Self::update_indexes(indexes, key, &value); + Response::Success(None) + }, + Command::Get { key, version } => { + let db = db.read(); + match db.get(&key) { + Some(versions) => { + let value = if let Some(v) = version { + versions.iter().find(|vv| vv.version == v) + } else { + versions.last() + }; + + match value { + Some(vv) => Response::Success(Some(vv.value.clone())), + None => Response::Error("Version not found".to_string()), + } + }, + None => Response::Error("Key not found".to_string()), + } + }, + Command::Update { key, value } => { + let mut db = db.write(); + if db.contains_key(&key) { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let version = db.get(&key).unwrap().last().unwrap().version + 1; + + let versioned_value = VersionedValue { + value: value.clone(), + version, + timestamp, + tx_id: None, + }; + + db.get_mut(&key).unwrap().push(versioned_value.clone()); + Self::update_indexes(indexes, key, &value); + Response::Success(None) + } else { + Response::Error("Key not found".to_string()) + } + }, + Command::Delete { key } => { + let mut db = db.write(); + if db.remove(&key).is_some() { + Self::remove_from_indexes(indexes, key); + Response::Success(None) + } else { + Response::Error("Key not found".to_string()) + } + }, + Command::BeginTransaction => { + let tx_id = { + let mut next_id = next_tx_id.write(); + let id = *next_id; + *next_id += 1; + id + }; + + let mut transactions = transactions.write(); + transactions.insert(tx_id, HashMap::new()); + Response::Success(Some(tx_id.into())) + }, + Command::CommitTransaction(tx_id) => { + let mut db = db.write(); + let mut transactions = transactions.write(); + + if let Some(tx_data) = transactions.remove(&tx_id) { + for (key, versioned_value) in tx_data { + db.entry(key.clone()) + .or_default() + .push(versioned_value.clone()); + Self::update_indexes(indexes, key, &versioned_value.value); + } + Response::Success(None) + } else { + Response::Error("Transaction not found".to_string()) + } + }, + Command::RollbackTransaction(tx_id) => { + let mut transactions = transactions.write(); + if transactions.remove(&tx_id).is_some() { + Response::Success(None) + } else { + Response::Error("Transaction not found".to_string()) + } + }, + Command::CreateIndex { field } => { + let mut indexes = indexes.write(); + if !indexes.contains_key(&field) { + indexes.insert(field, HashMap::new()); + Response::Success(None) + } else { + Response::Error("Index already exists".to_string()) + } + }, + Command::DropIndex { field } => { + let mut indexes = indexes.write(); + if indexes.remove(&field).is_some() { + Response::Success(None) + } else { + Response::Error("Index not found".to_string()) + } + }, + Command::SysExec { script_name } => { + match Self::execute_lua_script(&script_name) { + Ok(output) => Response::Success(Some(output.into())), + Err(e) => Response::Error(format!("Script execution failed: {}", e)), + } + }, + Command::LuaExec { code } => { + match Self::execute_lua_code(&code) { + Ok(output) => Response::LuaOutput(output), + Err(e) => Response::Error(format!("Lua execution failed: {}", e)), + } + }, + Command::ReplicationEnable => Response::Success(None), + Command::ReplicationDisable => Response::Success(None), + Command::ReplicationAddPeer { addr: _ } => Response::Success(None), + Command::ReplicationRemovePeer { addr: _ } => Response::Success(None), + Command::ReplicationStatus => Response::Success(None), + Command::Replicate { .. } => Response::Success(None), + Command::BackupCreate { path } => { + match Self::create_backup(db, &path) { + Ok(_) => Response::BackupStatus { + success: true, + message: format!("Backup created successfully at {}", path), + }, + Err(e) => Response::BackupStatus { + success: false, + message: format!("Backup failed: {}", e), + }, + } + }, + Command::BackupRestore { path } => { + match Self::restore_backup(db, indexes, &path) { + Ok(_) => Response::BackupStatus { + success: true, + message: format!("Backup restored successfully from {}", path), + }, + Err(e) => Response::BackupStatus { + success: false, + message: format!("Restore failed: {}", e), + }, + } + }, + } + } + + fn execute_lua_code(code: &str) -> Result> { + let lua = Lua::new(); + let result = lua.load(code).eval::()?; + Ok(result.to_str()?.to_string()) + } + + fn create_backup( + db: &Arc>>>, + path: &str, + ) -> Result<(), Box> { + let db = db.read(); + let serialized = serde_json::to_string(&*db)?; + std::fs::write(path, serialized)?; + Ok(()) + } + + fn restore_backup( + db: &Arc>>>, + indexes: &Arc>>>>, + path: &str, + ) -> Result<(), Box> { + let data = std::fs::read_to_string(path)?; + let restored: HashMap> = serde_json::from_str(&data)?; + + let mut db = db.write(); + *db = restored; + + let mut indexes = indexes.write(); + indexes.clear(); + + for (key, versions) in db.iter() { + if let Some(latest) = versions.last() { + for (field, index) in indexes.iter_mut() { + if let Some(field_value) = latest.value.get(field) { + index.entry(field_value.clone()) + .or_default() + .push(key.clone()); + } + } + } + } + + Ok(()) + } + + fn update_indexes( + indexes: &Arc>>>>, + key: String, + value: &serde_json::Value, + ) { + let mut indexes = indexes.write(); + for (field, index) in indexes.iter_mut() { + if let Some(field_value) = value.get(field) { + index.entry(field_value.clone()) + .or_default() + .push(key.clone()); + } + } + } + + fn remove_from_indexes( + indexes: &Arc>>>>, + key: String, + ) { + let mut indexes = indexes.write(); + for index in indexes.values_mut() { + for entries in index.values_mut() { + if let Some(pos) = entries.iter().position(|k| k == &key) { + entries.remove(pos); + } + } + } + } + + fn execute_lua_script(script_name: &str) -> Result> { + let script_path = Path::new("scripts").join(script_name).with_extension("lua"); + let output = std::process::Command::new("lua") + .arg(&script_path) + .output()?; + + if output.status.success() { + Ok(String::from_utf8(output.stdout)?) + } else { + Err(String::from_utf8(output.stderr)?.into()) + } + } +} + +impl Clone for FutriixServer { + fn clone(&self) -> Self { + FutriixServer { + db: self.db.clone(), + command_queue: self.command_queue.clone(), + transactions: self.transactions.clone(), + indexes: self.indexes.clone(), + replication_enabled: self.replication_enabled, + peer_nodes: self.peer_nodes.clone(), + sync_interval: self.sync_interval, + last_sync_timestamp: self.last_sync_timestamp.clone(), + command_history: self.command_history.clone(), + next_tx_id: self.next_tx_id.clone(), + } + } +}