Update futriix-server/src/server.rs

This commit is contained in:
Григорий Сафронов 2025-07-01 19:42:50 +00:00
parent 40255fce93
commit 6965b41219

View File

@ -2,16 +2,13 @@ use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::Duration; use std::time::Duration;
use crossbeam::queue::SegQueue; use crossbeam::queue::SegQueue;
use parking_lot::RwLock; use parking_lot::RwLock;
use tokio::net::{TcpListener, TcpStream}; use tokio::net::{TcpListener, TcpStream};
use tokio::io::AsyncWriteExt; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::time; use tokio::time;
use log::{info, error, warn, debug}; use log::{info, error, debug};
use colored::Colorize;
use warp::Filter;
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
pub enum Command { pub enum Command {
@ -26,15 +23,24 @@ pub enum Command {
DropIndex { field: String }, DropIndex { field: String },
SysExec { script_name: String }, SysExec { script_name: String },
Replicate { commands: Vec<(Command, u128)> }, Replicate { commands: Vec<(Command, u128)> },
ReplicationEnable,
ReplicationDisable,
ReplicationAddPeer { addr: String },
ReplicationRemovePeer { addr: String },
ReplicationStatus,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum Response { pub enum Response {
Success(Option<serde_json::Value>), Success(Option<serde_json::Value>),
Error(String), Error(String),
ReplicationStatus {
enabled: bool,
peers: Vec<String>,
last_sync: u128,
},
} }
#[derive(Debug, Clone)]
pub struct FutriixServer { pub struct FutriixServer {
db: Arc<RwLock<HashMap<String, serde_json::Value>>>, db: Arc<RwLock<HashMap<String, serde_json::Value>>>,
command_queue: Arc<SegQueue<Command>>, command_queue: Arc<SegQueue<Command>>,
@ -54,10 +60,7 @@ impl FutriixServer {
let replication_enabled = config.get("replication") let replication_enabled = config.get("replication")
.and_then(|r| r.get("enabled")) .and_then(|r| r.get("enabled"))
.and_then(|v| v.as_bool()) .and_then(|v| v.as_bool())
.unwrap_or_else(|| { .unwrap_or(false);
warn!("Replication 'enabled' not found in config, defaulting to false");
false
});
let peer_nodes = config.get("replication") let peer_nodes = config.get("replication")
.and_then(|r| r.get("peer_nodes")) .and_then(|r| r.get("peer_nodes"))
@ -67,22 +70,13 @@ impl FutriixServer {
.filter_map(|n| n.as_str().map(|s| s.to_string())) .filter_map(|n| n.as_str().map(|s| s.to_string()))
.collect() .collect()
}) })
.unwrap_or_else(|| { .unwrap_or_else(Vec::new);
warn!("No peer nodes configured, defaulting to empty list");
Vec::new()
});
let sync_interval = config.get("replication") let sync_interval = config.get("replication")
.and_then(|r| r.get("sync_interval")) .and_then(|r| r.get("sync_interval"))
.and_then(|i| i.as_integer()) .and_then(|i| i.as_integer())
.map(|i| i as u64) .map(|i| i as u64)
.unwrap_or_else(|| { .unwrap_or(1000);
warn!("Sync interval not found, defaulting to 1000ms");
1000
});
debug!("Replication config - enabled: {}, peers: {:?}, interval: {}ms",
replication_enabled, peer_nodes, sync_interval);
FutriixServer { FutriixServer {
db: Arc::new(RwLock::new(HashMap::new())), db: Arc::new(RwLock::new(HashMap::new())),
@ -97,219 +91,32 @@ impl FutriixServer {
} }
} }
pub async fn run_rest_api(&self, rest_port: u16) { pub async fn run(&self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
let db_get = self.db.clone();
let db_set = self.db.clone();
let db_update = self.db.clone();
let db_delete = self.db.clone();
let db_list_keys = self.db.clone();
let db_list_commit = self.db.clone();
let transactions_begin = self.transactions.clone();
let transactions_commit = self.transactions.clone();
let transactions_rollback = self.transactions.clone();
let get = warp::path!("get" / String)
.and(warp::get())
.map(move |key: String| {
let db = db_get.read();
match db.get(&key) {
Some(value) => warp::reply::json(&Response::Success(Some(value.clone()))),
None => warp::reply::json(&Response::Error("Key not found".to_string()))
}
});
let set = warp::path!("set" / String)
.and(warp::post())
.and(warp::body::json())
.map(move |key: String, value: serde_json::Value| {
let mut db = db_set.write();
db.insert(key.clone(), value.clone());
warp::reply::json(&Response::Success(None))
});
let update = warp::path!("update" / String)
.and(warp::put())
.and(warp::body::json())
.map(move |key: String, value: serde_json::Value| {
let mut db = db_update.write();
if db.contains_key(&key) {
db.insert(key.clone(), value.clone());
warp::reply::json(&Response::Success(None))
} else {
warp::reply::json(&Response::Error("Key not found".to_string()))
}
});
let delete = warp::path!("delete" / String)
.and(warp::delete())
.map(move |key: String| {
let mut db = db_delete.write();
if db.remove(&key).is_some() {
warp::reply::json(&Response::Success(None))
} else {
warp::reply::json(&Response::Error("Key not found".to_string()))
}
});
let list = warp::path!("list")
.and(warp::get())
.map(move || {
let db = db_list_keys.read();
let keys: Vec<String> = db.keys().cloned().collect();
warp::reply::json(&Response::Success(Some(serde_json::json!(keys))))
});
let begin_tx = warp::path!("transaction" / "begin")
.and(warp::get())
.map(move || {
let mut transactions = transactions_begin.write();
transactions.push(HashMap::new());
warp::reply::json(&Response::Success(None))
});
let commit_tx = warp::path!("transaction" / "commit")
.and(warp::post())
.map(move || {
let mut transactions = transactions_commit.write();
if let Some(tx) = transactions.pop() {
let mut db = db_list_commit.write();
for (k, v) in tx {
if v.is_null() {
db.remove(&k);
} else {
db.insert(k.clone(), v.clone());
}
}
warp::reply::json(&Response::Success(None))
} else {
warp::reply::json(&Response::Error("No active transaction".to_string()))
}
});
let rollback_tx = warp::path!("transaction" / "rollback")
.and(warp::post())
.map(move || {
let mut transactions = transactions_rollback.write();
if transactions.pop().is_some() {
warp::reply::json(&Response::Success(None))
} else {
warp::reply::json(&Response::Error("No active transaction".to_string()))
}
});
let routes = get
.or(set)
.or(update)
.or(delete)
.or(list)
.or(begin_tx)
.or(commit_tx)
.or(rollback_tx);
info!("Starting REST API on port {}", rest_port);
warp::serve(routes).run(([127, 0, 0, 1], rest_port)).await;
}
pub async fn run(self, addr: &str) -> Result<(), Box<dyn std::error::Error>> {
info!("Starting server on {}", addr); info!("Starting server on {}", addr);
let listener = match TcpListener::bind(addr).await { let listener = TcpListener::bind(addr).await?;
Ok(l) => l,
Err(e) => {
error!("Failed to bind to {}: {}", addr, e);
return Err(e.into());
}
};
println!(); info!("Futriix Server started successfully!");
info!("{}", "Futriix Server started successfully!".truecolor(0, 183, 235));
info!("Listening on: {}", addr); info!("Listening on: {}", addr);
info!("Log file: futriix.log");
if self.replication_enabled { if self.replication_enabled {
if self.peer_nodes.is_empty() {
warn!("Replication enabled but no peer nodes configured");
} else {
info!("Replication enabled with peers: {:?}", self.peer_nodes); info!("Replication enabled with peers: {:?}", self.peer_nodes);
let sync_task = self.start_replication_sync(); let sync_task = self.start_replication_sync();
tokio::spawn(sync_task); tokio::spawn(sync_task);
} }
} else {
info!("Replication disabled");
}
println!();
warn!("Replication 'enabled' not found in config, defaulting to false");
warn!("No peer nodes configured, defaulting to empty list");
warn!("Sync interval not found, defaulting to 1000ms");
info!("Starting server on {}", addr);
let rest_port = addr.split(':').last()
.and_then(|p| p.parse::<u16>().ok())
.map(|p| p + 1)
.unwrap_or(8081);
let server = self.clone();
tokio::spawn(async move {
server.run_rest_api(rest_port).await;
});
let scripts_dir = Path::new("scripts"); let scripts_dir = Path::new("scripts");
if !scripts_dir.exists() { if !scripts_dir.exists() {
if let Err(e) = std::fs::create_dir(scripts_dir) { std::fs::create_dir(scripts_dir)?;
error!("Failed to create scripts directory: {}", e);
return Err(e.into());
}
info!("Created scripts directory");
} else if !scripts_dir.is_dir() {
error!("Scripts path exists but is not a directory");
return Err("Scripts path exists but is not a directory".into());
} }
tokio::select! {
res = async {
loop { loop {
let (socket, _) = match listener.accept().await { let (socket, _) = listener.accept().await?;
Ok((s, a)) => { let server = self.clone();
debug!("New connection from {}", a);
(s, a)
},
Err(e) => {
error!("Accept error: {}", e);
continue;
}
};
let db = self.db.clone();
let queue = self.command_queue.clone();
let transactions = self.transactions.clone();
let indexes = self.indexes.clone();
let command_history = self.command_history.clone();
let last_sync_timestamp = self.last_sync_timestamp.clone();
let replication_enabled = self.replication_enabled;
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = Self::handle_connection( server.handle_connection(socket).await.unwrap_or_else(|e| {
socket,
db,
queue,
transactions,
indexes,
command_history,
last_sync_timestamp,
replication_enabled,
).await {
error!("Connection error: {}", e); error!("Connection error: {}", e);
}
}); });
} });
#[allow(unreachable_code)]
Ok::<(), Box<dyn std::error::Error>>(())
} => res,
_ = tokio::signal::ctrl_c() => {
println!();
info!("{}", "Server shutting down".truecolor(0, 183, 235));
Ok(())
}
} }
} }
@ -334,39 +141,17 @@ impl FutriixServer {
}; };
if !commands_to_sync.is_empty() { if !commands_to_sync.is_empty() {
debug!("Syncing {} commands with peers", commands_to_sync.len());
for peer in &peer_nodes { for peer in &peer_nodes {
debug!("Attempting to sync with peer: {}", peer); if let Ok(mut stream) = TcpStream::connect(peer).await {
match TcpStream::connect(peer).await {
Ok(mut stream) => {
let replicate_cmd = Command::Replicate { let replicate_cmd = Command::Replicate {
commands: commands_to_sync.clone(), commands: commands_to_sync.clone(),
}; };
match rmp_serde::to_vec(&replicate_cmd) { if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) {
Ok(cmd_bytes) => {
let len = cmd_bytes.len() as u32; let len = cmd_bytes.len() as u32;
if let Err(e) = stream.write_all(&len.to_be_bytes()).await { if stream.write_all(&len.to_be_bytes()).await.is_ok() {
warn!("Failed to send sync data to {}: {}", peer, e); let _ = stream.write_all(&cmd_bytes).await;
continue;
} }
if let Err(e) = stream.write_all(&cmd_bytes).await {
warn!("Failed to send sync data to {}: {}", peer, e);
continue;
}
if let Some((_, latest_ts)) = commands_to_sync.last() {
*last_sync_timestamp.write() = *latest_ts;
debug!("Updated last sync timestamp to {}", latest_ts);
}
},
Err(e) => {
warn!("Failed to serialize sync command: {}", e);
}
}
},
Err(e) => {
warn!("Failed to connect to peer {}: {}", peer, e);
} }
} }
} }
@ -376,286 +161,157 @@ impl FutriixServer {
} }
async fn handle_connection( async fn handle_connection(
mut socket: tokio::net::TcpStream, &self,
db: Arc<RwLock<HashMap<String, serde_json::Value>>>, mut socket: TcpStream,
queue: Arc<SegQueue<Command>>,
transactions: Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
command_history: Arc<RwLock<Vec<(Command, u128)>>>,
last_sync_timestamp: Arc<RwLock<u128>>,
replication_enabled: bool,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
use tokio::io::AsyncReadExt;
loop { loop {
let mut len_buf = [0u8; 4]; let mut len_buf = [0u8; 4];
if let Err(e) = socket.read_exact(&mut len_buf).await { socket.read_exact(&mut len_buf).await?;
if e.kind() == std::io::ErrorKind::UnexpectedEof {
debug!("Client disconnected");
break;
}
error!("Read error: {}", e);
return Err(e.into());
}
let len = u32::from_be_bytes(len_buf) as usize; let len = u32::from_be_bytes(len_buf) as usize;
let mut buf = vec![0u8; len]; let mut buf = vec![0u8; len];
if let Err(e) = socket.read_exact(&mut buf).await { socket.read_exact(&mut buf).await?;
error!("Failed to read command: {}", e);
return Err(e.into());
}
let cmd: Command = match rmp_serde::from_slice(&buf) { let cmd: Command = rmp_serde::from_slice(&buf)?;
Ok(c) => c,
Err(e) => {
error!("Failed to deserialize command: {}", e);
return Err(e.into());
}
};
debug!("Received command: {:?}", cmd); self.command_queue.push(cmd.clone());
queue.push(cmd);
while let Some(cmd) = queue.pop() { while let Some(cmd) = self.command_queue.pop() {
if let Command::Replicate { commands } = cmd { if let Command::Replicate { commands } = cmd {
debug!("Processing replication batch ({} commands)", commands.len());
for (cmd, ts) in commands { for (cmd, ts) in commands {
let _response = Self::process_command( match cmd {
Command::Insert { key, value } | Command::Update { key, value } => {
let mut db = self.db.write();
if let Some(existing) = db.get(&key) {
let resolved = self.resolve_conflict(&key, existing, &value);
db.insert(key, resolved);
} else {
db.insert(key, value);
}
}
_ => {
let _ = Self::process_command(
&self.db,
&self.transactions,
&self.indexes,
cmd, cmd,
&db,
&transactions,
&indexes,
); );
}
*last_sync_timestamp.write() = ts; }
debug!("Replicated command with timestamp {}", ts); *self.last_sync_timestamp.write() = ts;
} }
continue; continue;
} }
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map_err(|e| {
error!("SystemTime error: {}", e);
e
})?
.as_millis();
let response = Self::process_command( let response = Self::process_command(
&self.db,
&self.transactions,
&self.indexes,
cmd.clone(), cmd.clone(),
&db,
&transactions,
&indexes,
); );
if replication_enabled && !matches!(cmd, Command::Get { .. }) { let response_bytes = rmp_serde::to_vec(&response)?;
command_history.write().push((cmd, timestamp)); let len = response_bytes.len() as u32;
debug!("Added command to history"); socket.write_all(&len.to_be_bytes()).await?;
socket.write_all(&response_bytes).await?;
}
}
} }
let response_bytes = match rmp_serde::to_vec(&response) { fn resolve_conflict(
Ok(b) => b, &self,
Err(e) => { _key: &str,
error!("Failed to serialize response: {}", e); local_value: &serde_json::Value,
return Err(e.into()); remote_value: &serde_json::Value,
) -> serde_json::Value {
let local_ts = local_value.get("_timestamp").and_then(|v| v.as_u64()).unwrap_or(0);
let remote_ts = remote_value.get("_timestamp").and_then(|v| v.as_u64()).unwrap_or(0);
if remote_ts > local_ts {
remote_value.clone()
} else {
local_value.clone()
} }
};
let len = response_bytes.len() as u32;
if let Err(e) = socket.write_all(&len.to_be_bytes()).await {
error!("Failed to send response length: {}", e);
return Err(e.into());
}
if let Err(e) = socket.write_all(&response_bytes).await {
error!("Failed to send response: {}", e);
return Err(e.into());
}
debug!("Sent response to client");
}
}
Ok(())
} }
fn process_command( fn process_command(
cmd: Command,
db: &Arc<RwLock<HashMap<String, serde_json::Value>>>, db: &Arc<RwLock<HashMap<String, serde_json::Value>>>,
transactions: &Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>, _transactions: &Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>, indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
cmd: Command,
) -> Response { ) -> Response {
match cmd { match cmd {
Command::Insert { key, value } => { Command::Insert { key, value } => {
debug!("Insert command for key: {}", key);
let mut tx_data = None;
{
let mut transactions = transactions.write();
if let Some(tx) = transactions.last_mut() {
tx.insert(key.clone(), value.clone());
tx_data = Some((key.clone(), value.clone()));
debug!("Added to transaction");
}
}
if tx_data.is_none() {
let mut db = db.write(); let mut db = db.write();
db.insert(key.clone(), value.clone()); db.insert(key.clone(), value.clone());
Self::update_indexes(indexes, key.clone(), &value); Self::update_indexes(indexes, key, &value);
debug!("Direct insert to DB");
}
Response::Success(None) Response::Success(None)
}, },
Command::Get { key } => { Command::Get { key } => {
debug!("Get command for key: {}", key);
let db = db.read(); let db = db.read();
match db.get(&key) { match db.get(&key) {
Some(value) => { Some(value) => Response::Success(Some(value.clone())),
debug!("Key found"); None => Response::Error("Key not found".to_string()),
Response::Success(Some(value.clone()))
},
None => {
debug!("Key not found");
Response::Error("Key not found".to_string())
},
} }
}, },
Command::Update { key, value } => { Command::Update { key, value } => {
debug!("Update command for key: {}", key);
let mut tx_data = None;
{
let mut transactions = transactions.write();
if let Some(tx) = transactions.last_mut() {
tx.insert(key.clone(), value.clone());
tx_data = Some((key.clone(), value.clone()));
debug!("Added to transaction");
}
}
if tx_data.is_none() {
let mut db = db.write(); let mut db = db.write();
if db.contains_key(&key) { if db.contains_key(&key) {
db.insert(key.clone(), value.clone()); db.insert(key.clone(), value.clone());
Self::update_indexes(indexes, key.clone(), &value); Self::update_indexes(indexes, key, &value);
debug!("Direct update in DB");
Response::Success(None) Response::Success(None)
} else { } else {
debug!("Key not found for update");
Response::Error("Key not found".to_string()) Response::Error("Key not found".to_string())
} }
} else {
Response::Success(None)
}
}, },
Command::Delete { key } => { Command::Delete { key } => {
debug!("Delete command for key: {}", key);
let mut tx_data = None;
{
let mut transactions = transactions.write();
if let Some(tx) = transactions.last_mut() {
tx.insert(key.clone(), serde_json::Value::Null);
tx_data = Some(key.clone());
debug!("Added to transaction");
}
}
if tx_data.is_none() {
let mut db = db.write(); let mut db = db.write();
if db.remove(&key).is_some() { if db.remove(&key).is_some() {
Self::remove_from_indexes(indexes, key.clone()); Self::remove_from_indexes(indexes, key);
debug!("Direct delete from DB");
Response::Success(None) Response::Success(None)
} else { } else {
debug!("Key not found for deletion");
Response::Error("Key not found".to_string()) Response::Error("Key not found".to_string())
} }
} else {
Response::Success(None)
}
}, },
Command::BeginTransaction => { Command::BeginTransaction => {
debug!("Begin transaction");
let mut transactions = transactions.write();
transactions.push(HashMap::new());
Response::Success(None) Response::Success(None)
}, },
Command::CommitTransaction => { Command::CommitTransaction => {
debug!("Commit transaction");
let mut transactions = transactions.write();
if let Some(tx) = transactions.pop() {
let mut db = db.write();
for (k, v) in tx {
if v.is_null() {
db.remove(&k);
Self::remove_from_indexes(indexes, k.clone());
debug!("Removed key: {}", k);
} else {
db.insert(k.clone(), v.clone());
Self::update_indexes(indexes, k.clone(), &v);
debug!("Committed key: {}", k);
}
}
Response::Success(None) Response::Success(None)
} else {
debug!("No active transaction to commit");
Response::Error("No active transaction".to_string())
}
}, },
Command::RollbackTransaction => { Command::RollbackTransaction => {
debug!("Rollback transaction");
let mut transactions = transactions.write();
if transactions.pop().is_some() {
Response::Success(None) Response::Success(None)
} else {
debug!("No active transaction to rollback");
Response::Error("No active transaction".to_string())
}
}, },
Command::CreateIndex { field } => { Command::CreateIndex { field } => {
debug!("Create index on field: {}", field);
let mut indexes = indexes.write(); let mut indexes = indexes.write();
if !indexes.contains_key(&field) { if !indexes.contains_key(&field) {
let mut index = HashMap::new(); indexes.insert(field, HashMap::new());
let db = db.read();
for (key, value) in db.iter() {
if let Some(field_value) = value.get(&field) {
index.entry(field_value.clone())
.or_insert_with(Vec::new)
.push(key.clone());
}
}
indexes.insert(field, index);
debug!("Index created");
Response::Success(None) Response::Success(None)
} else { } else {
debug!("Index already exists");
Response::Error("Index already exists".to_string()) Response::Error("Index already exists".to_string())
} }
}, },
Command::DropIndex { field } => { Command::DropIndex { field } => {
debug!("Drop index on field: {}", field);
let mut indexes = indexes.write(); let mut indexes = indexes.write();
if indexes.remove(&field).is_some() { if indexes.remove(&field).is_some() {
debug!("Index dropped");
Response::Success(None) Response::Success(None)
} else { } else {
debug!("Index not found");
Response::Error("Index not found".to_string()) Response::Error("Index not found".to_string())
} }
}, },
Command::SysExec { script_name } => { Command::SysExec { script_name } => {
debug!("Execute system script: {}", script_name); #[allow(unused_variables)]
match Self::execute_lua_script(&script_name) { match Self::execute_lua_script(&script_name) {
Ok(output) => { Ok(output) => Response::Success(Some(output.into())),
debug!("Script executed successfully"); Err(e) => Response::Error(format!("Script execution failed: {}", e)),
Response::Success(Some(output.into()))
},
Err(e) => {
debug!("Script execution failed: {}", e);
Response::Error(format!("Script execution failed: {}", e))
},
} }
}, },
Command::Replicate { .. } => { Command::ReplicationEnable => Response::Success(None),
debug!("Replication command processed"); Command::ReplicationDisable => Response::Success(None),
Response::Success(None) Command::ReplicationAddPeer { addr: _ } => Response::Success(None),
}, Command::ReplicationRemovePeer { addr: _ } => Response::Success(None),
Command::ReplicationStatus => Response::Success(None),
Command::Replicate { .. } => Response::Success(None),
} }
} }
@ -667,11 +323,6 @@ impl FutriixServer {
let mut indexes = indexes.write(); let mut indexes = indexes.write();
for (field, index) in indexes.iter_mut() { for (field, index) in indexes.iter_mut() {
if let Some(field_value) = value.get(field) { if let Some(field_value) = value.get(field) {
for entries in index.values_mut() {
if let Some(pos) = entries.iter().position(|k| k == &key) {
entries.remove(pos);
}
}
index.entry(field_value.clone()) index.entry(field_value.clone())
.or_insert_with(Vec::new) .or_insert_with(Vec::new)
.push(key.clone()); .push(key.clone());
@ -693,20 +344,9 @@ impl FutriixServer {
} }
} }
#[allow(dead_code)]
fn execute_lua_script(script_name: &str) -> Result<String, Box<dyn std::error::Error>> { fn execute_lua_script(script_name: &str) -> Result<String, Box<dyn std::error::Error>> {
let scripts_dir = Path::new("scripts"); let script_path = Path::new("scripts").join(script_name).with_extension("lua");
if !scripts_dir.exists() {
return Err("Scripts directory not found".into());
}
let script_path = scripts_dir.join(script_name).with_extension("lua");
if !script_path.starts_with(scripts_dir) {
return Err("Invalid script path".into());
}
info!("Executing Lua script: {}", script_path.display());
let output = std::process::Command::new("lua") let output = std::process::Command::new("lua")
.arg(&script_path) .arg(&script_path)
.output()?; .output()?;
@ -718,3 +358,19 @@ impl FutriixServer {
} }
} }
} }
impl Clone for FutriixServer {
fn clone(&self) -> Self {
FutriixServer {
db: self.db.clone(),
command_queue: self.command_queue.clone(),
transactions: self.transactions.clone(),
indexes: self.indexes.clone(),
replication_enabled: self.replication_enabled,
peer_nodes: self.peer_nodes.clone(),
sync_interval: self.sync_interval,
last_sync_timestamp: self.last_sync_timestamp.clone(),
command_history: self.command_history.clone(),
}
}
}