From 21035afb8f61f77b26d1a7e734a61daefb9202ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sun, 15 Jun 2025 19:37:58 +0000 Subject: [PATCH] Update futriix-server/src/server.rs --- futriix-server/src/server.rs | 106 ++++++++++++++++++++++++++--------- 1 file changed, 78 insertions(+), 28 deletions(-) diff --git a/futriix-server/src/server.rs b/futriix-server/src/server.rs index c348924..2dbf5a4 100644 --- a/futriix-server/src/server.rs +++ b/futriix-server/src/server.rs @@ -1,10 +1,10 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use std::path::Path; use std::sync::Arc; -use parking_lot::RwLock; use crossbeam::queue::SegQueue; -use tokio::net::{TcpListener, TcpStream}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use parking_lot::RwLock; +use tokio::net::TcpListener; use log::{info, error}; use colored::Colorize; @@ -19,6 +19,7 @@ pub enum Command { RollbackTransaction, CreateIndex { field: String }, DropIndex { field: String }, + SysExec { script_name: String }, } #[derive(Debug, Serialize, Deserialize)] @@ -52,28 +53,49 @@ impl FutriixServer { info!("Listening on: {}", addr); info!("Log file: futriix.log"); - loop { - let (socket, _) = listener.accept().await?; - let db = self.db.clone(); - let queue = self.command_queue.clone(); - let transactions = self.transactions.clone(); - let indexes = self.indexes.clone(); + let scripts_dir = Path::new("scripts"); + if !scripts_dir.exists() { + std::fs::create_dir(scripts_dir)?; + info!("Created scripts directory"); + } else if !scripts_dir.is_dir() { + return Err("Scripts path exists but is not a directory".into()); + } - tokio::spawn(async move { - if let Err(e) = Self::handle_connection(socket, db, queue, transactions, indexes).await { - error!("Connection error: {}", e); + tokio::select! { + res = async { + loop { + let (socket, _) = listener.accept().await?; + let db = self.db.clone(); + let queue = self.command_queue.clone(); + let transactions = self.transactions.clone(); + let indexes = self.indexes.clone(); + + tokio::spawn(async move { + if let Err(e) = Self::handle_connection(socket, db, queue, transactions, indexes).await { + error!("Connection error: {}", e); + } + }); } - }); + #[allow(unreachable_code)] + Ok::<(), Box>(()) + } => res, + _ = tokio::signal::ctrl_c() => { + println!(); + info!("{}", "Server shutdown now".bright_cyan()); + Ok(()) + } } } async fn handle_connection( - mut socket: TcpStream, + mut socket: tokio::net::TcpStream, db: Arc>>, queue: Arc>, transactions: Arc>>>, indexes: Arc>>>>, ) -> Result<(), Box> { + use tokio::io::{AsyncReadExt, AsyncWriteExt}; + loop { let mut len_buf = [0u8; 4]; if let Err(e) = socket.read_exact(&mut len_buf).await { @@ -105,7 +127,7 @@ impl FutriixServer { if tx_data.is_none() { let mut db = db.write(); db.insert(key.clone(), value.clone()); - Self::update_indexes(&indexes, &key, &value); + Self::update_indexes(&indexes, key.clone(), &value); } Response::Success(None) }, @@ -130,7 +152,7 @@ impl FutriixServer { let mut db = db.write(); if db.contains_key(&key) { db.insert(key.clone(), value.clone()); - Self::update_indexes(&indexes, &key, &value); + Self::update_indexes(&indexes, key.clone(), &value); Response::Success(None) } else { Response::Error("Key not found".to_string()) @@ -152,7 +174,7 @@ impl FutriixServer { if tx_data.is_none() { let mut db = db.write(); if db.remove(&key).is_some() { - Self::remove_from_indexes(&indexes, &key); + Self::remove_from_indexes(&indexes, key.clone()); Response::Success(None) } else { Response::Error("Key not found".to_string()) @@ -173,10 +195,10 @@ impl FutriixServer { for (k, v) in tx { if v.is_null() { db.remove(&k); - Self::remove_from_indexes(&indexes, &k); + Self::remove_from_indexes(&indexes, k.clone()); } else { db.insert(k.clone(), v.clone()); - Self::update_indexes(&indexes, &k, &v); + Self::update_indexes(&indexes, k.clone(), &v); } } Response::Success(None) @@ -195,7 +217,6 @@ impl FutriixServer { Command::CreateIndex { field } => { let mut indexes = indexes.write(); if !indexes.contains_key(&field) { - // Build initial index let mut index = HashMap::new(); let db = db.read(); for (key, value) in db.iter() { @@ -219,6 +240,12 @@ impl FutriixServer { Response::Error("Index not found".to_string()) } }, + Command::SysExec { script_name } => { + match Self::execute_lua_script(&script_name) { + Ok(output) => Response::Success(Some(output.into())), + Err(e) => Response::Error(format!("Script execution failed: {}", e)), + } + }, }; let response_bytes = rmp_serde::to_vec(&response)?; @@ -232,37 +259,60 @@ impl FutriixServer { fn update_indexes( indexes: &Arc>>>>, - key: &str, + key: String, value: &serde_json::Value, ) { let mut indexes = indexes.write(); for (field, index) in indexes.iter_mut() { if let Some(field_value) = value.get(field) { - // Remove old references for entries in index.values_mut() { - if let Some(pos) = entries.iter().position(|k| k == key) { + if let Some(pos) = entries.iter().position(|k| k == &key) { entries.remove(pos); } } - // Add new reference index.entry(field_value.clone()) .or_insert_with(Vec::new) - .push(key.to_string()); + .push(key.clone()); } } } fn remove_from_indexes( indexes: &Arc>>>>, - key: &str, + key: String, ) { let mut indexes = indexes.write(); for index in indexes.values_mut() { for entries in index.values_mut() { - if let Some(pos) = entries.iter().position(|k| k == key) { + if let Some(pos) = entries.iter().position(|k| k == &key) { entries.remove(pos); } } } } -} + + fn execute_lua_script(script_name: &str) -> Result> { + let scripts_dir = Path::new("scripts"); + if !scripts_dir.exists() { + return Err("Scripts directory not found".into()); + } + + let script_path = scripts_dir.join(script_name).with_extension("lua"); + + if !script_path.starts_with(scripts_dir) { + return Err("Invalid script path".into()); + } + + info!("Executing Lua script: {}", script_path.display()); + + let output = std::process::Command::new("lua") + .arg(&script_path) + .output()?; + + if output.status.success() { + Ok(String::from_utf8(output.stdout)?) + } else { + Err(String::from_utf8(output.stderr)?.into()) + } + } +} \ No newline at end of file