diff --git a/futriix-server/src/server.rs b/futriix-server/src/server.rs new file mode 100644 index 0000000..c348924 --- /dev/null +++ b/futriix-server/src/server.rs @@ -0,0 +1,268 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use parking_lot::RwLock; +use crossbeam::queue::SegQueue; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use log::{info, error}; +use colored::Colorize; + +#[derive(Debug, Serialize, Deserialize)] +pub enum Command { + Insert { key: String, value: serde_json::Value }, + Get { key: String }, + Update { key: String, value: serde_json::Value }, + Delete { key: String }, + BeginTransaction, + CommitTransaction, + RollbackTransaction, + CreateIndex { field: String }, + DropIndex { field: String }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum Response { + Success(Option), + Error(String), +} + +pub struct FutriixServer { + db: Arc>>, + command_queue: Arc>, + transactions: Arc>>>, + indexes: Arc>>>>, +} + +impl FutriixServer { + pub fn new() -> Self { + FutriixServer { + db: Arc::new(RwLock::new(HashMap::new())), + command_queue: Arc::new(SegQueue::new()), + transactions: Arc::new(RwLock::new(Vec::new())), + indexes: Arc::new(RwLock::new(HashMap::new())), + } + } + + pub async fn run(&self, addr: &str) -> Result<(), Box> { + let listener = TcpListener::bind(addr).await?; + + println!(); + info!("{}", "Futriix Server started successfully!".bright_cyan()); + info!("Listening on: {}", addr); + info!("Log file: futriix.log"); + + loop { + let (socket, _) = listener.accept().await?; + let db = self.db.clone(); + let queue = self.command_queue.clone(); + let transactions = self.transactions.clone(); + let indexes = self.indexes.clone(); + + tokio::spawn(async move { + if let Err(e) = Self::handle_connection(socket, db, queue, transactions, indexes).await { + error!("Connection error: {}", e); + } + }); + } + } + + async fn handle_connection( + mut socket: TcpStream, + db: Arc>>, + queue: Arc>, + transactions: Arc>>>, + indexes: Arc>>>>, + ) -> Result<(), Box> { + loop { + let mut len_buf = [0u8; 4]; + if let Err(e) = socket.read_exact(&mut len_buf).await { + if e.kind() == std::io::ErrorKind::UnexpectedEof { + break; + } + return Err(e.into()); + } + let len = u32::from_be_bytes(len_buf) as usize; + + let mut buf = vec![0u8; len]; + socket.read_exact(&mut buf).await?; + + let cmd: Command = rmp_serde::from_slice(&buf)?; + queue.push(cmd); + + while let Some(cmd) = queue.pop() { + let response = match cmd { + Command::Insert { key, value } => { + let mut tx_data = None; + { + let mut transactions = transactions.write(); + if let Some(tx) = transactions.last_mut() { + tx.insert(key.clone(), value.clone()); + tx_data = Some((key.clone(), value.clone())); + } + } + + if tx_data.is_none() { + let mut db = db.write(); + db.insert(key.clone(), value.clone()); + Self::update_indexes(&indexes, &key, &value); + } + Response::Success(None) + }, + Command::Get { key } => { + let db = db.read(); + match db.get(&key) { + Some(value) => Response::Success(Some(value.clone())), + None => Response::Error("Key not found".to_string()), + } + }, + Command::Update { key, value } => { + let mut tx_data = None; + { + let mut transactions = transactions.write(); + if let Some(tx) = transactions.last_mut() { + tx.insert(key.clone(), value.clone()); + tx_data = Some((key.clone(), value.clone())); + } + } + + if tx_data.is_none() { + let mut db = db.write(); + if db.contains_key(&key) { + db.insert(key.clone(), value.clone()); + Self::update_indexes(&indexes, &key, &value); + Response::Success(None) + } else { + Response::Error("Key not found".to_string()) + } + } else { + Response::Success(None) + } + }, + Command::Delete { key } => { + let mut tx_data = None; + { + let mut transactions = transactions.write(); + if let Some(tx) = transactions.last_mut() { + tx.insert(key.clone(), serde_json::Value::Null); + tx_data = Some(key.clone()); + } + } + + if tx_data.is_none() { + let mut db = db.write(); + if db.remove(&key).is_some() { + Self::remove_from_indexes(&indexes, &key); + Response::Success(None) + } else { + Response::Error("Key not found".to_string()) + } + } else { + Response::Success(None) + } + }, + Command::BeginTransaction => { + let mut transactions = transactions.write(); + transactions.push(HashMap::new()); + Response::Success(None) + }, + Command::CommitTransaction => { + let mut transactions = transactions.write(); + if let Some(tx) = transactions.pop() { + let mut db = db.write(); + for (k, v) in tx { + if v.is_null() { + db.remove(&k); + Self::remove_from_indexes(&indexes, &k); + } else { + db.insert(k.clone(), v.clone()); + Self::update_indexes(&indexes, &k, &v); + } + } + Response::Success(None) + } else { + Response::Error("No active transaction".to_string()) + } + }, + Command::RollbackTransaction => { + let mut transactions = transactions.write(); + if transactions.pop().is_some() { + Response::Success(None) + } else { + Response::Error("No active transaction".to_string()) + } + }, + Command::CreateIndex { field } => { + let mut indexes = indexes.write(); + if !indexes.contains_key(&field) { + // Build initial index + let mut index = HashMap::new(); + let db = db.read(); + for (key, value) in db.iter() { + if let Some(field_value) = value.get(&field) { + index.entry(field_value.clone()) + .or_insert_with(Vec::new) + .push(key.clone()); + } + } + indexes.insert(field, index); + Response::Success(None) + } else { + Response::Error("Index already exists".to_string()) + } + }, + Command::DropIndex { field } => { + let mut indexes = indexes.write(); + if indexes.remove(&field).is_some() { + Response::Success(None) + } else { + Response::Error("Index not found".to_string()) + } + }, + }; + + let response_bytes = rmp_serde::to_vec(&response)?; + let len = response_bytes.len() as u32; + socket.write_all(&len.to_be_bytes()).await?; + socket.write_all(&response_bytes).await?; + } + } + Ok(()) + } + + fn update_indexes( + indexes: &Arc>>>>, + key: &str, + value: &serde_json::Value, + ) { + let mut indexes = indexes.write(); + for (field, index) in indexes.iter_mut() { + if let Some(field_value) = value.get(field) { + // Remove old references + for entries in index.values_mut() { + if let Some(pos) = entries.iter().position(|k| k == key) { + entries.remove(pos); + } + } + // Add new reference + index.entry(field_value.clone()) + .or_insert_with(Vec::new) + .push(key.to_string()); + } + } + } + + fn remove_from_indexes( + indexes: &Arc>>>>, + key: &str, + ) { + let mut indexes = indexes.write(); + for index in indexes.values_mut() { + for entries in index.values_mut() { + if let Some(pos) = entries.iter().position(|k| k == key) { + entries.remove(pos); + } + } + } + } +}