use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crossbeam::queue::SegQueue; use parking_lot::RwLock; use tokio::net::{TcpListener, TcpStream}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::time; use log::{info, error, debug, warn}; use warp::Filter; use rlua::Lua; #[derive(Debug, Serialize, Deserialize, Clone)] pub enum Command { Insert { key: String, value: serde_json::Value }, Get { key: String, version: Option }, Update { key: String, value: serde_json::Value }, Delete { key: String }, BeginTransaction, CommitTransaction(u64), RollbackTransaction(u64), CreateIndex { field: String }, DropIndex { field: String }, SysExec { script_name: String }, Replicate { commands: Vec<(Command, u128)> }, ReplicationEnable, ReplicationDisable, ReplicationAddPeer { addr: String }, ReplicationRemovePeer { addr: String }, ReplicationStatus, BackupCreate { path: String }, BackupRestore { path: String }, LuaExec { code: String }, } #[derive(Debug, Serialize, Deserialize)] pub enum Response { Success(Option), Error(String), ReplicationStatus { enabled: bool, peers: Vec, last_sync: u128, }, BackupStatus { success: bool, message: String }, LuaOutput(String), } #[derive(Debug, Clone, Serialize, Deserialize)] struct VersionedValue { value: serde_json::Value, version: u64, timestamp: u128, tx_id: Option, } pub struct FutriixServer { db: Arc>>>, command_queue: Arc>, transactions: Arc>>>, indexes: Arc>>>>, replication_enabled: bool, peer_nodes: Vec, sync_interval: u64, last_sync_timestamp: Arc>, command_history: Arc>>, next_tx_id: Arc>, } impl FutriixServer { pub fn new(config: &toml::Value) -> Self { debug!("Initializing server with config: {:?}", config); let replication_enabled = config.get("replication") .and_then(|r| r.get("enabled")) .and_then(|v| v.as_bool()) .unwrap_or(false); let peer_nodes = config.get("replication") .and_then(|r| r.get("peer_nodes")) .and_then(|n| n.as_array()) .map(|nodes| { nodes.iter() .filter_map(|n| n.as_str().map(|s| s.to_string())) .collect() }) .unwrap_or_default(); let sync_interval = config.get("replication") .and_then(|r| r.get("sync_interval")) .and_then(|i| i.as_integer()) .map(|i| i as u64) .unwrap_or(1000); FutriixServer { db: Arc::new(RwLock::new(HashMap::new())), command_queue: Arc::new(SegQueue::new()), transactions: Arc::new(RwLock::new(HashMap::new())), indexes: Arc::new(RwLock::new(HashMap::new())), replication_enabled, peer_nodes, sync_interval, last_sync_timestamp: Arc::new(RwLock::new(0)), command_history: Arc::new(RwLock::new(Vec::new())), next_tx_id: Arc::new(RwLock::new(1)), } } pub async fn run(&self, addr: &str) -> Result<(), Box> { info!("Starting server on {}", addr); let listener = TcpListener::bind(addr).await?; let http_port = addr.split(':').last().unwrap_or("8080").parse::().unwrap_or(8080) + 1; let http_addr = format!("0.0.0.0:{}", http_port); self.start_http_api(http_addr.clone()); info!("Futriix Server started successfully!"); info!("Listening on: {}", addr); info!("HTTP API available on: {}", http_addr); if self.replication_enabled { info!("Replication enabled with peers: {:?}", self.peer_nodes); let sync_task = self.start_replication_sync(); tokio::spawn(sync_task); } let scripts_dir = Path::new("scripts"); if !scripts_dir.exists() { if let Err(e) = std::fs::create_dir(scripts_dir) { warn!("Failed to create scripts directory: {}", e); } } let backups_dir = Path::new("backups"); if !backups_dir.exists() { if let Err(e) = std::fs::create_dir(backups_dir) { warn!("Failed to create backups directory: {}", e); } } loop { match listener.accept().await { Ok((socket, _)) => { let server = self.clone(); tokio::spawn(async move { if let Err(e) = server.handle_connection(socket).await { error!("Connection error: {}", e); } }); } Err(e) => { error!("Accept error: {}", e); } } } } fn start_http_api(&self, addr: String) { let db = self.db.clone(); let command_queue_insert = self.command_queue.clone(); let command_queue_update = self.command_queue.clone(); let command_queue_delete = self.command_queue.clone(); tokio::spawn(async move { let get = warp::path!("api" / "get" / String) .and(warp::get()) .map(move |key: String| { let db = db.read(); match db.get(&key) { Some(versions) => { if let Some(latest) = versions.last() { warp::reply::json(&latest.value) } else { warp::reply::json(&serde_json::Value::Null) } }, None => warp::reply::json(&serde_json::Value::Null), } }); let insert = warp::path!("api" / "insert") .and(warp::post()) .and(warp::body::json()) .map(move |data: HashMap| { for (key, value) in data { command_queue_insert.push(Command::Insert { key, value }); } warp::reply::json(&serde_json::json!({"status": "ok"})) }); let update = warp::path!("api" / "update") .and(warp::post()) .and(warp::body::json()) .map(move |data: HashMap| { for (key, value) in data { command_queue_update.push(Command::Update { key, value }); } warp::reply::json(&serde_json::json!({"status": "ok"})) }); let delete = warp::path!("api" / "delete" / String) .and(warp::delete()) .map(move |key: String| { command_queue_delete.push(Command::Delete { key }); warp::reply::json(&serde_json::json!({"status": "ok"})) }); let routes = get.or(insert).or(update).or(delete); warp::serve(routes) .run(addr.parse::().unwrap()) .await; }); } fn start_replication_sync(&self) -> impl std::future::Future + 'static { let peer_nodes = self.peer_nodes.clone(); let command_history = self.command_history.clone(); let last_sync_timestamp = self.last_sync_timestamp.clone(); let sync_interval = self.sync_interval; async move { let mut interval = time::interval(Duration::from_millis(sync_interval)); loop { interval.tick().await; let commands_to_sync = { let history = command_history.read(); let last_sync = *last_sync_timestamp.read(); history.iter() .filter(|(_, ts)| *ts > last_sync) .map(|(cmd, ts)| (cmd.clone(), *ts)) .collect::>() }; if !commands_to_sync.is_empty() { for peer in &peer_nodes { if let Ok(mut stream) = TcpStream::connect(peer).await { let replicate_cmd = Command::Replicate { commands: commands_to_sync.clone(), }; if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) { let len = cmd_bytes.len() as u32; if let Err(e) = stream.write_all(&len.to_be_bytes()).await { warn!("Failed to write command length to peer {}: {}", peer, e); continue; } if let Err(e) = stream.write_all(&cmd_bytes).await { warn!("Failed to write command to peer {}: {}", peer, e); } } } } } } } } async fn handle_connection( &self, mut socket: TcpStream, ) -> Result<(), Box> { loop { let mut len_buf = [0u8; 4]; if let Err(e) = socket.read_exact(&mut len_buf).await { error!("Failed to read command length: {}", e); break; } let len = u32::from_be_bytes(len_buf) as usize; let mut buf = vec![0u8; len]; if let Err(e) = socket.read_exact(&mut buf).await { error!("Failed to read command: {}", e); break; } let cmd: Command = match rmp_serde::from_slice(&buf) { Ok(cmd) => cmd, Err(e) => { error!("Failed to deserialize command: {}", e); break; } }; self.command_queue.push(cmd.clone()); while let Some(cmd) = self.command_queue.pop() { if let Command::Replicate { commands } = cmd { for (cmd, ts) in commands { match cmd { Command::Insert { key, value } | Command::Update { key, value } => { let mut db = self.db.write(); let timestamp = match SystemTime::now() .duration_since(UNIX_EPOCH) { Ok(duration) => duration.as_millis(), Err(e) => { error!("SystemTime error: {}", e); 0 } }; let versioned_value = VersionedValue { value: value.clone(), version: 0, timestamp, tx_id: None, }; db.entry(key.clone()) .or_default() .push(versioned_value); } _ => { let _ = Self::process_command( &self.db, &self.transactions, &self.indexes, &self.next_tx_id, cmd, ); } } *self.last_sync_timestamp.write() = ts; } continue; } let response = Self::process_command( &self.db, &self.transactions, &self.indexes, &self.next_tx_id, cmd.clone(), ); let response_bytes = match rmp_serde::to_vec(&response) { Ok(bytes) => bytes, Err(e) => { error!("Failed to serialize response: {}", e); break; } }; let len = response_bytes.len() as u32; if let Err(e) = socket.write_all(&len.to_be_bytes()).await { error!("Failed to write response length: {}", e); break; } if let Err(e) = socket.write_all(&response_bytes).await { error!("Failed to write response: {}", e); break; } } } Ok(()) } fn process_command( db: &Arc>>>, transactions: &Arc>>>, indexes: &Arc>>>>, next_tx_id: &Arc>, cmd: Command, ) -> Response { match cmd { Command::Insert { key, value } => { let mut db = db.write(); let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis(); let version = db.get(&key).map_or(0, |v| v.last().map_or(0, |vv| vv.version + 1)); let versioned_value = VersionedValue { value: value.clone(), version, timestamp, tx_id: None, }; db.entry(key.clone()) .or_default() .push(versioned_value.clone()); Self::update_indexes(indexes, key, &value); Response::Success(None) }, Command::Get { key, version } => { let db = db.read(); match db.get(&key) { Some(versions) => { let value = if let Some(v) = version { versions.iter().find(|vv| vv.version == v) } else { versions.last() }; match value { Some(vv) => Response::Success(Some(vv.value.clone())), None => Response::Error("Version not found".to_string()), } }, None => Response::Error("Key not found".to_string()), } }, Command::Update { key, value } => { let mut db = db.write(); if db.contains_key(&key) { let timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_millis(); let version = db.get(&key).unwrap().last().unwrap().version + 1; let versioned_value = VersionedValue { value: value.clone(), version, timestamp, tx_id: None, }; db.get_mut(&key).unwrap().push(versioned_value.clone()); Self::update_indexes(indexes, key, &value); Response::Success(None) } else { Response::Error("Key not found".to_string()) } }, Command::Delete { key } => { let mut db = db.write(); if db.remove(&key).is_some() { Self::remove_from_indexes(indexes, key); Response::Success(None) } else { Response::Error("Key not found".to_string()) } }, Command::BeginTransaction => { let tx_id = { let mut next_id = next_tx_id.write(); let id = *next_id; *next_id += 1; id }; let mut transactions = transactions.write(); transactions.insert(tx_id, HashMap::new()); Response::Success(Some(tx_id.into())) }, Command::CommitTransaction(tx_id) => { let mut db = db.write(); let mut transactions = transactions.write(); if let Some(tx_data) = transactions.remove(&tx_id) { for (key, versioned_value) in tx_data { db.entry(key.clone()) .or_default() .push(versioned_value.clone()); Self::update_indexes(indexes, key, &versioned_value.value); } Response::Success(None) } else { Response::Error("Transaction not found".to_string()) } }, Command::RollbackTransaction(tx_id) => { let mut transactions = transactions.write(); if transactions.remove(&tx_id).is_some() { Response::Success(None) } else { Response::Error("Transaction not found".to_string()) } }, Command::CreateIndex { field } => { let mut indexes = indexes.write(); if !indexes.contains_key(&field) { indexes.insert(field, HashMap::new()); Response::Success(None) } else { Response::Error("Index already exists".to_string()) } }, Command::DropIndex { field } => { let mut indexes = indexes.write(); if indexes.remove(&field).is_some() { Response::Success(None) } else { Response::Error("Index not found".to_string()) } }, Command::SysExec { script_name } => { match Self::execute_lua_script(&script_name) { Ok(output) => Response::Success(Some(output.into())), Err(e) => Response::Error(format!("Script execution failed: {}", e)), } }, Command::LuaExec { code } => { match Self::execute_lua_code(&code) { Ok(output) => Response::LuaOutput(output), Err(e) => Response::Error(format!("Lua execution failed: {}", e)), } }, Command::ReplicationEnable => Response::Success(None), Command::ReplicationDisable => Response::Success(None), Command::ReplicationAddPeer { addr: _ } => Response::Success(None), Command::ReplicationRemovePeer { addr: _ } => Response::Success(None), Command::ReplicationStatus => Response::Success(None), Command::Replicate { .. } => Response::Success(None), Command::BackupCreate { path } => { match Self::create_backup(db, &path) { Ok(_) => Response::BackupStatus { success: true, message: format!("Backup created successfully at {}", path), }, Err(e) => Response::BackupStatus { success: false, message: format!("Backup failed: {}", e), }, } }, Command::BackupRestore { path } => { match Self::restore_backup(db, indexes, &path) { Ok(_) => Response::BackupStatus { success: true, message: format!("Backup restored successfully from {}", path), }, Err(e) => Response::BackupStatus { success: false, message: format!("Restore failed: {}", e), }, } }, } } fn execute_lua_code(code: &str) -> Result> { let lua = Lua::new(); let result = lua.load(code).eval::()?; Ok(result.to_str()?.to_string()) } fn create_backup( db: &Arc>>>, path: &str, ) -> Result<(), Box> { let db = db.read(); let serialized = serde_json::to_string(&*db)?; std::fs::write(path, serialized)?; Ok(()) } fn restore_backup( db: &Arc>>>, indexes: &Arc>>>>, path: &str, ) -> Result<(), Box> { let data = std::fs::read_to_string(path)?; let restored: HashMap> = serde_json::from_str(&data)?; let mut db = db.write(); *db = restored; let mut indexes = indexes.write(); indexes.clear(); for (key, versions) in db.iter() { if let Some(latest) = versions.last() { for (field, index) in indexes.iter_mut() { if let Some(field_value) = latest.value.get(field) { index.entry(field_value.clone()) .or_default() .push(key.clone()); } } } } Ok(()) } fn update_indexes( indexes: &Arc>>>>, key: String, value: &serde_json::Value, ) { let mut indexes = indexes.write(); for (field, index) in indexes.iter_mut() { if let Some(field_value) = value.get(field) { index.entry(field_value.clone()) .or_default() .push(key.clone()); } } } fn remove_from_indexes( indexes: &Arc>>>>, key: String, ) { let mut indexes = indexes.write(); for index in indexes.values_mut() { for entries in index.values_mut() { if let Some(pos) = entries.iter().position(|k| k == &key) { entries.remove(pos); } } } } fn execute_lua_script(script_name: &str) -> Result> { let script_path = Path::new("scripts").join(script_name).with_extension("lua"); let output = std::process::Command::new("lua") .arg(&script_path) .output()?; if output.status.success() { Ok(String::from_utf8(output.stdout)?) } else { Err(String::from_utf8(output.stderr)?.into()) } } } impl Clone for FutriixServer { fn clone(&self) -> Self { FutriixServer { db: self.db.clone(), command_queue: self.command_queue.clone(), transactions: self.transactions.clone(), indexes: self.indexes.clone(), replication_enabled: self.replication_enabled, peer_nodes: self.peer_nodes.clone(), sync_interval: self.sync_interval, last_sync_timestamp: self.last_sync_timestamp.clone(), command_history: self.command_history.clone(), next_tx_id: self.next_tx_id.clone(), } } }