From b7d4d0272b67f763dafaff4dfeead5cafa59cf78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sat, 2 Aug 2025 14:54:05 +0000 Subject: [PATCH] Upload files to "futriix/src" --- futriix/src/main.rs | 77 ++++ futriix/src/server.rs | 866 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 943 insertions(+) create mode 100644 futriix/src/main.rs create mode 100644 futriix/src/server.rs diff --git a/futriix/src/main.rs b/futriix/src/main.rs new file mode 100644 index 0000000..88ec181 --- /dev/null +++ b/futriix/src/main.rs @@ -0,0 +1,77 @@ +// futriix/main.rs +mod server; + +use server::*; +use std::fs; +use std::path::Path; +use toml::Value; +use simplelog::*; +use chrono::Local; +use log::info; +use ctrlc; +use colored::Colorize; + +#[tokio::main] +async fn main() -> Result<(), Box> { + let config_content = fs::read_to_string("futriix.config.toml")?; + let config: Value = toml::from_str(&config_content)?; + + let ip = config["server"]["ip"].as_str().unwrap_or("127.0.0.1"); + let port = config["server"]["port"].as_integer().unwrap_or(8080) as u16; + let log_path = config["server"]["log_path"].as_str().unwrap_or("futriix.log"); + + ctrlc::set_handler(move || { + println!("{}", "Server is now shutdown".truecolor(0, 191, 255)); + std::process::exit(0); + })?; + + let log_dir = Path::new("logs"); + if !log_dir.exists() { + fs::create_dir(log_dir)?; + } + + let timestamp = Local::now().format("%d.%m.%Y %H-%M-%S"); + let log_filename = format!("futriix_{}.log", timestamp); + let full_log_path = log_dir.join(log_filename); + + let symlink_path = Path::new(log_path); + if symlink_path.exists() { + fs::remove_file(symlink_path)?; + } + #[cfg(unix)] + std::os::unix::fs::symlink(full_log_path.clone(), symlink_path)?; + #[cfg(windows)] + std::os::windows::fs::symlink_file(full_log_path.clone(), symlink_path)?; + + let log_file = fs::OpenOptions::new() + .create(true) + .append(true) + .open(&full_log_path)?; + + CombinedLogger::init(vec![ + TermLogger::new( + LevelFilter::Info, + Config::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + ), + WriteLogger::new( + LevelFilter::Info, + Config::default(), + log_file, + ), + ])?; + + println!(); + println!("{}", "Futriix Server started successfully!".truecolor(0, 191, 255)); + + let server = FutriixServer::new(&config); + let addr = format!("{}:{}", ip, port); + info!("Starting server on {}", addr); + info!("Log file: {}", full_log_path.display()); + info!("Symlink: {}", symlink_path.display()); + + server.run(&addr).await?; + + Ok(()) +} diff --git a/futriix/src/server.rs b/futriix/src/server.rs new file mode 100644 index 0000000..aebba0a --- /dev/null +++ b/futriix/src/server.rs @@ -0,0 +1,866 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::Path; +use std::sync::Arc; +use std::time::Duration; +use crossbeam::queue::SegQueue; +use parking_lot::RwLock; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::time; +use log::{info, error, debug, warn}; +use warp::Filter; +use rlua::Lua; + +// Custom B*-tree implementation for time-series data +mod bstar_tree { + use serde::{Serialize, Deserialize}; + + const MIN_DEGREE: usize = 2; + const MAX_KEYS: usize = 2 * MIN_DEGREE - 1; + + #[derive(Debug, Clone, Serialize, Deserialize)] + pub struct BStarTree { + root: Node, + } + + #[derive(Debug, Clone, Serialize, Deserialize)] + enum Node { + Internal { + keys: Vec, + children: Vec>, + }, + Leaf { + keys: Vec, + values: Vec, + }, + } + + impl BStarTree { + pub fn new() -> Self { + BStarTree { + root: Node::Leaf { + keys: Vec::new(), + values: Vec::new(), + }, + } + } + + pub fn insert(&mut self, key: K, value: V) -> Option { + let (new_root, old_val) = self.root.insert(key, value); + if let Some(new_root) = new_root { + self.root = Node::Internal { + keys: vec![new_root.0], + children: vec![new_root.1, new_root.2], + }; + } + old_val + } + + pub fn range(&self, start: &K, end: &K) -> Vec<(&K, &V)> { + self.root.range(start, end) + } + + pub fn remove(&mut self, key: &K) -> Option { + let (new_root, old_val) = self.root.remove(key); + if let Some(new_root) = new_root { + self.root = new_root; + } + old_val + } + } + + impl Node { + fn insert(&mut self, key: K, value: V) -> (Option<(K, Node, Node)>, Option) { + match self { + Node::Leaf { keys, values } => { + match keys.binary_search(&key) { + Ok(idx) => { + let old_val = values[idx].clone(); + values[idx] = value; + (None, Some(old_val)) + } + Err(idx) => { + keys.insert(idx, key); + values.insert(idx, value); + + if keys.len() > MAX_KEYS { + let split_idx = keys.len() / 2; + let split_key = keys[split_idx].clone(); + + let right_keys = keys.split_off(split_idx + 1); + let right_values = values.split_off(split_idx + 1); + + let right_node = Node::Leaf { + keys: right_keys, + values: right_values, + }; + + let left_node = Node::Leaf { + keys: keys.clone(), + values: values.clone(), + }; + + (Some((split_key, left_node, right_node)), None) + } else { + (None, None) + } + } + } + } + Node::Internal { keys, children } => { + let idx = match keys.binary_search(&key) { + Ok(idx) => idx + 1, + Err(idx) => idx, + }; + + let (split, old_val) = children[idx].insert(key, value); + + if let Some((split_key, left, right)) = split { + keys.insert(idx, split_key); + children[idx] = left; + children.insert(idx + 1, right); + + if keys.len() > MAX_KEYS { + let split_idx = keys.len() / 2; + let split_key = keys[split_idx].clone(); + + let right_keys = keys.split_off(split_idx + 1); + let right_children = children.split_off(split_idx + 1); + + let right_node = Node::Internal { + keys: right_keys, + children: right_children, + }; + + let left_node = Node::Internal { + keys: keys.clone(), + children: children.clone(), + }; + + (Some((split_key, left_node, right_node)), old_val) + } else { + (None, old_val) + } + } else { + (None, old_val) + } + } + } + } + + fn range(&self, start: &K, end: &K) -> Vec<(&K, &V)> { + let mut result = Vec::new(); + self.collect_range(start, end, &mut result); + result + } + + fn collect_range<'a>(&'a self, start: &K, end: &K, result: &mut Vec<(&'a K, &'a V)>) { + match self { + Node::Leaf { keys, values } => { + let start_idx = match keys.binary_search(start) { + Ok(idx) => idx, + Err(idx) => idx, + }; + + let end_idx = match keys.binary_search(end) { + Ok(idx) => idx + 1, + Err(idx) => idx, + }; + + for i in start_idx..end_idx.min(keys.len()) { + result.push((&keys[i], &values[i])); + } + } + Node::Internal { keys, children } => { + let mut idx = 0; + while idx < keys.len() && keys[idx] < *start { + idx += 1; + } + + if idx > 0 { + children[idx - 1].collect_range(start, end, result); + } + + while idx < keys.len() && keys[idx] <= *end { + children[idx].collect_range(start, end, result); + idx += 1; + } + + if idx == keys.len() { + children[idx].collect_range(start, end, result); + } + } + } + } + + fn remove(&mut self, key: &K) -> (Option>, Option) { + match self { + Node::Leaf { keys, values } => { + if let Ok(idx) = keys.binary_search(key) { + let old_val = values.remove(idx); + keys.remove(idx); + + if keys.len() < MIN_DEGREE - 1 { + (Some(Node::Leaf { + keys: keys.clone(), + values: values.clone(), + }), Some(old_val)) + } else { + (None, Some(old_val)) + } + } else { + (None, None) + } + } + Node::Internal { keys, children } => { + let idx = match keys.binary_search(key) { + Ok(idx) => idx + 1, + Err(idx) => idx, + }; + + let (new_child, old_val) = children[idx].remove(key); + + if let Some(new_child) = new_child { + if let Node::Leaf { keys: new_keys, values: new_values } = new_child { + if idx > 0 { + // Merge with left sibling + if let Node::Leaf { keys: left_keys, values: left_values } = &mut children[idx - 1] { + left_keys.extend(new_keys); + left_values.extend(new_values); + keys.remove(idx - 1); + children.remove(idx); + } + } else if idx + 1 < children.len() { + // Merge with right sibling + if let Node::Leaf { keys: right_keys, values: right_values } = &mut children[idx + 1] { + right_keys.splice(0..0, new_keys); + right_values.splice(0..0, new_values); + keys.remove(idx); + children.remove(idx); + } + } + } + } + + if keys.len() < MIN_DEGREE - 1 { + if keys.is_empty() { + (Some(children.remove(0)), old_val) + } else { + (Some(Node::Internal { + keys: keys.clone(), + children: children.clone(), + }), old_val) + } + } else { + (None, old_val) + } + } + } + } + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum Command { + Insert { metric: String, timestamp: u128, value: serde_json::Value }, + GetRange { metric: String, start: u128, end: u128 }, + GetLatest { metric: String }, + DeleteRange { metric: String, start: u128, end: u128 }, + BeginTransaction, + CommitTransaction(u64), + RollbackTransaction(u64), + CreateIndex { field: String }, + DropIndex { field: String }, + SysExec { script_name: String }, + Replicate { commands: Vec<(Command, u128)> }, + ReplicationEnable, + ReplicationDisable, + ReplicationAddPeer { addr: String }, + ReplicationRemovePeer { addr: String }, + ReplicationStatus, + BackupCreate { path: String }, + BackupRestore { path: String }, + LuaExec { code: String }, + Halt, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum Response { + Success(Option), + Error(String), + ReplicationStatus { + enabled: bool, + peers: Vec, + last_sync: u128, + }, + BackupStatus { success: bool, message: String }, + LuaOutput(String), +} + +pub struct FutriixServer { + ts_db: Arc>>>, + command_queue: Arc>, + transactions: Arc>>>>, + indexes: Arc>>>>, + replication_enabled: bool, + peer_nodes: Vec, + sync_interval: u64, + last_sync_timestamp: Arc>, + command_history: Arc>>, + next_tx_id: Arc>, + shutdown: Arc>, +} + +impl FutriixServer { + pub fn new(config: &toml::Value) -> Self { + debug!("Initializing server with config: {:?}", config); + + let replication_enabled = config.get("replication") + .and_then(|r| r.get("enabled")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + let peer_nodes = config.get("replication") + .and_then(|r| r.get("peer_nodes")) + .and_then(|n| n.as_array()) + .map(|nodes| { + nodes.iter() + .filter_map(|n| n.as_str().map(|s| s.to_string())) + .collect() + }) + .unwrap_or_default(); + + let sync_interval = config.get("replication") + .and_then(|r| r.get("sync_interval")) + .and_then(|i| i.as_integer()) + .map(|i| i as u64) + .unwrap_or(1000); + + FutriixServer { + ts_db: Arc::new(RwLock::new(HashMap::new())), + command_queue: Arc::new(SegQueue::new()), + transactions: Arc::new(RwLock::new(HashMap::new())), + indexes: Arc::new(RwLock::new(HashMap::new())), + replication_enabled, + peer_nodes, + sync_interval, + last_sync_timestamp: Arc::new(RwLock::new(0)), + command_history: Arc::new(RwLock::new(Vec::new())), + next_tx_id: Arc::new(RwLock::new(1)), + shutdown: Arc::new(RwLock::new(false)), + } + } + + pub async fn run(&self, addr: &str) -> Result<(), Box> { + info!("Starting server on {}", addr); + let listener = TcpListener::bind(addr).await?; + + let http_port = addr.split(':').last().unwrap_or("8080").parse::().unwrap_or(8080) + 1; + let http_addr = format!("0.0.0.0:{}", http_port); + self.start_http_api(http_addr.clone()); + + info!("Futriix Server started successfully!"); + info!("Listening on: {}", addr); + info!("HTTP API available on: {}", http_addr); + + if self.replication_enabled { + info!("Replication enabled with peers: {:?}", self.peer_nodes); + let sync_task = self.start_replication_sync(); + tokio::spawn(sync_task); + } + + let scripts_dir = Path::new("scripts"); + if !scripts_dir.exists() { + if let Err(e) = std::fs::create_dir(scripts_dir) { + warn!("Failed to create scripts directory: {}", e); + } + } + + let backups_dir = Path::new("backups"); + if !backups_dir.exists() { + if let Err(e) = std::fs::create_dir(backups_dir) { + warn!("Failed to create backups directory: {}", e); + } + } + + loop { + if *self.shutdown.read() { + info!("Shutdown signal received, stopping server"); + break; + } + + match listener.accept().await { + Ok((socket, _)) => { + let server = self.clone(); + tokio::spawn(async move { + if let Err(e) = server.handle_connection(socket).await { + error!("Connection error: {}", e); + } + }); + } + Err(e) => { + error!("Accept error: {}", e); + } + } + } + Ok(()) + } + + fn start_http_api(&self, addr: String) { + let ts_db = self.ts_db.clone(); + let command_queue_insert = self.command_queue.clone(); + let _command_queue_get = self.command_queue.clone(); + let command_queue_delete = self.command_queue.clone(); + + let ts_db_clone = ts_db.clone(); + + tokio::spawn(async move { + let get = warp::path!("api" / "get" / String / u128 / u128) + .and(warp::get()) + .map(move |metric: String, start: u128, end: u128| { + let db = ts_db.read(); + match db.get(&metric) { + Some(tree) => { + let data = tree.range(&start, &end); + warp::reply::json(&data) + }, + None => warp::reply::json(&Vec::<(u128, serde_json::Value)>::new()), + } + }); + + let get_latest = warp::path!("api" / "latest" / String) + .and(warp::get()) + .map(move |metric: String| { + let db = ts_db_clone.read(); + match db.get(&metric) { + Some(tree) => { + if let Some((ts, val)) = tree.range(&0, &u128::MAX).last() { + warp::reply::json(&(*ts, (*val).clone())) + } else { + warp::reply::json(&(0, serde_json::Value::Null)) + } + }, + None => warp::reply::json(&(0, serde_json::Value::Null)), + } + }); + + let insert = warp::path!("api" / "insert") + .and(warp::post()) + .and(warp::body::json()) + .map(move |data: HashMap| { + for (metric, (timestamp, value)) in data { + command_queue_insert.push(Command::Insert { + metric, + timestamp, + value + }); + } + warp::reply::json(&serde_json::json!({"status": "ok"})) + }); + + let delete = warp::path!("api" / "delete" / String / u128 / u128) + .and(warp::delete()) + .map(move |metric: String, start: u128, end: u128| { + command_queue_delete.push(Command::DeleteRange { + metric, + start, + end + }); + warp::reply::json(&serde_json::json!({"status": "ok"})) + }); + + let routes = get.or(get_latest).or(insert).or(delete); + + warp::serve(routes) + .run(addr.parse::().unwrap()) + .await; + }); + } + + fn start_replication_sync(&self) -> impl std::future::Future + 'static { + let peer_nodes = self.peer_nodes.clone(); + let command_history = self.command_history.clone(); + let last_sync_timestamp = self.last_sync_timestamp.clone(); + let sync_interval = self.sync_interval; + + async move { + let mut interval = time::interval(Duration::from_millis(sync_interval)); + loop { + interval.tick().await; + + let commands_to_sync = { + let history = command_history.read(); + let last_sync = *last_sync_timestamp.read(); + history.iter() + .filter(|(_, ts)| *ts > last_sync) + .map(|(cmd, ts)| (cmd.clone(), *ts)) + .collect::>() + }; + + if !commands_to_sync.is_empty() { + for peer in &peer_nodes { + if let Ok(mut stream) = TcpStream::connect(peer).await { + let replicate_cmd = Command::Replicate { + commands: commands_to_sync.clone(), + }; + + if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) { + let len = cmd_bytes.len() as u32; + if let Err(e) = stream.write_all(&len.to_be_bytes()).await { + warn!("Failed to write command length to peer {}: {}", peer, e); + continue; + } + if let Err(e) = stream.write_all(&cmd_bytes).await { + warn!("Failed to write command to peer {}: {}", peer, e); + } + } + } + } + } + } + } + } + + async fn handle_connection( + &self, + mut socket: TcpStream, + ) -> Result<(), Box> { + loop { + let mut len_buf = [0u8; 4]; + if let Err(e) = socket.read_exact(&mut len_buf).await { + error!("Failed to read command length: {}", e); + break; + } + let len = u32::from_be_bytes(len_buf) as usize; + + let mut buf = vec![0u8; len]; + if let Err(e) = socket.read_exact(&mut buf).await { + error!("Failed to read command: {}", e); + break; + } + + let cmd: Command = match rmp_serde::from_slice(&buf) { + Ok(cmd) => cmd, + Err(e) => { + error!("Failed to deserialize command: {}", e); + break; + } + }; + + self.command_queue.push(cmd.clone()); + + while let Some(cmd) = self.command_queue.pop() { + if let Command::Replicate { commands } = cmd { + for (cmd, ts) in commands { + match cmd { + Command::Insert { metric, timestamp, value } => { + let mut db = self.ts_db.write(); + let tree = db.entry(metric) + .or_insert_with(bstar_tree::BStarTree::new); + tree.insert(timestamp, value); + } + Command::DeleteRange { metric, start, end } => { + let mut db = self.ts_db.write(); + if let Some(tree) = db.get_mut(&metric) { + for ts in start..=end { + tree.remove(&ts); + } + } + } + _ => { + let _ = Self::process_command( + &self.ts_db, + &self.transactions, + &self.indexes, + &self.next_tx_id, + &self.shutdown, + cmd, + ); + } + } + *self.last_sync_timestamp.write() = ts; + } + continue; + } + + let response = Self::process_command( + &self.ts_db, + &self.transactions, + &self.indexes, + &self.next_tx_id, + &self.shutdown, + cmd.clone(), + ); + + let response_bytes = match rmp_serde::to_vec(&response) { + Ok(bytes) => bytes, + Err(e) => { + error!("Failed to serialize response: {}", e); + break; + } + }; + let len = response_bytes.len() as u32; + if let Err(e) = socket.write_all(&len.to_be_bytes()).await { + error!("Failed to write response length: {}", e); + break; + } + if let Err(e) = socket.write_all(&response_bytes).await { + error!("Failed to write response: {}", e); + break; + } + } + } + Ok(()) + } + + fn process_command( + ts_db: &Arc>>>, + transactions: &Arc>>>>, + indexes: &Arc>>>>, + next_tx_id: &Arc>, + shutdown: &Arc>, + cmd: Command, + ) -> Response { + match cmd { + Command::Insert { metric, timestamp, value } => { + let mut db = ts_db.write(); + let tree = db.entry(metric.clone()) + .or_insert_with(bstar_tree::BStarTree::new); + tree.insert(timestamp, value.clone()); + + Self::update_indexes(indexes, metric, &value); + Response::Success(None) + }, + Command::GetRange { metric, start, end } => { + let db = ts_db.read(); + match db.get(&metric) { + Some(tree) => { + let data = tree.range(&start, &end); + Response::Success(Some(serde_json::json!(data))) + }, + None => Response::Error("Metric not found".to_string()), + } + }, + Command::GetLatest { metric } => { + let db = ts_db.read(); + match db.get(&metric) { + Some(tree) => { + if let Some((ts, val)) = tree.range(&0, &u128::MAX).last() { + Response::Success(Some(serde_json::json!({ + "timestamp": ts, + "value": val + }))) + } else { + Response::Error("No data for metric".to_string()) + } + }, + None => Response::Error("Metric not found".to_string()), + } + }, + Command::DeleteRange { metric, start, end } => { + let mut db = ts_db.write(); + if let Some(tree) = db.get_mut(&metric) { + for ts in start..=end { + tree.remove(&ts); + } + Response::Success(None) + } else { + Response::Error("Metric not found".to_string()) + } + }, + Command::BeginTransaction => { + let tx_id = { + let mut next_id = next_tx_id.write(); + let id = *next_id; + *next_id += 1; + id + }; + + let mut transactions = transactions.write(); + transactions.insert(tx_id, HashMap::new()); + Response::Success(Some(serde_json::json!(tx_id))) + }, + Command::CommitTransaction(tx_id) => { + let mut db = ts_db.write(); + let mut tx_map = transactions.write(); + + if let Some(tx_data) = tx_map.remove(&tx_id) { + for (metric, data_points) in tx_data { + let tree = db.entry(metric.clone()) + .or_insert_with(bstar_tree::BStarTree::new); + for (ts, val) in data_points { + tree.insert(ts, val.clone()); + Self::update_indexes(indexes, metric.clone(), &val); + } + } + Response::Success(None) + } else { + Response::Error("Transaction not found".to_string()) + } + }, + Command::RollbackTransaction(tx_id) => { + let mut tx_map = transactions.write(); + if tx_map.remove(&tx_id).is_some() { + Response::Success(None) + } else { + Response::Error("Transaction not found".to_string()) + } + }, + Command::CreateIndex { field } => { + let mut idx = indexes.write(); + if !idx.contains_key(&field) { + idx.insert(field, HashMap::new()); + Response::Success(None) + } else { + Response::Error("Index already exists".to_string()) + } + }, + Command::DropIndex { field } => { + let mut idx = indexes.write(); + if idx.remove(&field).is_some() { + Response::Success(None) + } else { + Response::Error("Index not found".to_string()) + } + }, + Command::SysExec { script_name } => { + match Self::execute_lua_script(&script_name) { + Ok(output) => Response::Success(Some(serde_json::json!(output))), + Err(e) => Response::Error(format!("Script execution failed: {}", e)), + } + }, + Command::LuaExec { code } => { + match Self::execute_lua_code(&code) { + Ok(output) => Response::LuaOutput(output), + Err(e) => Response::Error(format!("Lua execution failed: {}", e)), + } + }, + Command::ReplicationEnable => Response::Success(None), + Command::ReplicationDisable => Response::Success(None), + Command::ReplicationAddPeer { addr: _ } => Response::Success(None), + Command::ReplicationRemovePeer { addr: _ } => Response::Success(None), + Command::ReplicationStatus => Response::Success(None), + Command::Replicate { .. } => Response::Success(None), + Command::BackupCreate { path } => { + match Self::create_backup(ts_db, &path) { + Ok(_) => Response::BackupStatus { + success: true, + message: format!("Backup created successfully at {}", path), + }, + Err(e) => Response::BackupStatus { + success: false, + message: format!("Backup failed: {}", e), + }, + } + }, + Command::BackupRestore { path } => { + match Self::restore_backup(ts_db, indexes, &path) { + Ok(_) => Response::BackupStatus { + success: true, + message: format!("Backup restored successfully from {}", path), + }, + Err(e) => Response::BackupStatus { + success: false, + message: format!("Restore failed: {}", e), + }, + } + }, + Command::Halt => { + *shutdown.write() = true; + Response::Success(None) + }, + } + } + + fn execute_lua_code(code: &str) -> Result> { + let lua = Lua::new(); + let result = lua.load(code).eval::()?; + Ok(result.to_str()?.to_string()) + } + + fn create_backup( + ts_db: &Arc>>>, + path: &str, + ) -> Result<(), Box> { + let db = ts_db.read(); + let serialized = serde_json::to_string(&*db)?; + std::fs::write(path, serialized)?; + Ok(()) + } + + fn restore_backup( + ts_db: &Arc>>>, + indexes: &Arc>>>>, + path: &str, + ) -> Result<(), Box> { + let data = std::fs::read_to_string(path)?; + let restored: HashMap> = serde_json::from_str(&data)?; + + let mut db = ts_db.write(); + *db = restored; + + let mut idx = indexes.write(); + idx.clear(); + + for (metric, tree) in db.iter() { + for (_, val) in tree.range(&0, &u128::MAX) { + for (field, index) in idx.iter_mut() { + if let Some(field_value) = val.get(field) { + index.entry(field_value.clone()) + .or_default() + .push(metric.clone()); + } + } + } + } + + Ok(()) + } + + fn update_indexes( + indexes: &Arc>>>>, + metric: String, + value: &serde_json::Value, + ) { + let mut idx = indexes.write(); + for (field, index) in idx.iter_mut() { + if let Some(field_value) = value.get(field) { + index.entry(field_value.clone()) + .or_default() + .push(metric.clone()); + } + } + } + + fn execute_lua_script(script_name: &str) -> Result> { + let script_path = Path::new("scripts").join(script_name).with_extension("lua"); + let output = std::process::Command::new("lua") + .arg(&script_path) + .output()?; + + if output.status.success() { + Ok(String::from_utf8(output.stdout)?) + } else { + Err(String::from_utf8(output.stderr)?.into()) + } + } +} + +impl Clone for FutriixServer { + fn clone(&self) -> Self { + FutriixServer { + ts_db: self.ts_db.clone(), + command_queue: self.command_queue.clone(), + transactions: self.transactions.clone(), + indexes: self.indexes.clone(), + replication_enabled: self.replication_enabled, + peer_nodes: self.peer_nodes.clone(), + sync_interval: self.sync_interval, + last_sync_timestamp: self.last_sync_timestamp.clone(), + command_history: self.command_history.clone(), + next_tx_id: self.next_tx_id.clone(), + shutdown: self.shutdown.clone(), + } + } +}