Update futriix-server/src/server.rs

This commit is contained in:
Григорий Сафронов 2025-06-15 19:37:58 +00:00
parent e697967f1c
commit 21035afb8f

View File

@ -1,10 +1,10 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use parking_lot::RwLock;
use crossbeam::queue::SegQueue; use crossbeam::queue::SegQueue;
use tokio::net::{TcpListener, TcpStream}; use parking_lot::RwLock;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::TcpListener;
use log::{info, error}; use log::{info, error};
use colored::Colorize; use colored::Colorize;
@ -19,6 +19,7 @@ pub enum Command {
RollbackTransaction, RollbackTransaction,
CreateIndex { field: String }, CreateIndex { field: String },
DropIndex { field: String }, DropIndex { field: String },
SysExec { script_name: String },
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
@ -52,28 +53,49 @@ impl FutriixServer {
info!("Listening on: {}", addr); info!("Listening on: {}", addr);
info!("Log file: futriix.log"); info!("Log file: futriix.log");
loop { let scripts_dir = Path::new("scripts");
let (socket, _) = listener.accept().await?; if !scripts_dir.exists() {
let db = self.db.clone(); std::fs::create_dir(scripts_dir)?;
let queue = self.command_queue.clone(); info!("Created scripts directory");
let transactions = self.transactions.clone(); } else if !scripts_dir.is_dir() {
let indexes = self.indexes.clone(); return Err("Scripts path exists but is not a directory".into());
}
tokio::spawn(async move { tokio::select! {
if let Err(e) = Self::handle_connection(socket, db, queue, transactions, indexes).await { res = async {
error!("Connection error: {}", e); loop {
let (socket, _) = listener.accept().await?;
let db = self.db.clone();
let queue = self.command_queue.clone();
let transactions = self.transactions.clone();
let indexes = self.indexes.clone();
tokio::spawn(async move {
if let Err(e) = Self::handle_connection(socket, db, queue, transactions, indexes).await {
error!("Connection error: {}", e);
}
});
} }
}); #[allow(unreachable_code)]
Ok::<(), Box<dyn std::error::Error>>(())
} => res,
_ = tokio::signal::ctrl_c() => {
println!();
info!("{}", "Server shutdown now".bright_cyan());
Ok(())
}
} }
} }
async fn handle_connection( async fn handle_connection(
mut socket: TcpStream, mut socket: tokio::net::TcpStream,
db: Arc<RwLock<HashMap<String, serde_json::Value>>>, db: Arc<RwLock<HashMap<String, serde_json::Value>>>,
queue: Arc<SegQueue<Command>>, queue: Arc<SegQueue<Command>>,
transactions: Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>, transactions: Arc<RwLock<Vec<HashMap<String, serde_json::Value>>>>,
indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>, indexes: Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
loop { loop {
let mut len_buf = [0u8; 4]; let mut len_buf = [0u8; 4];
if let Err(e) = socket.read_exact(&mut len_buf).await { if let Err(e) = socket.read_exact(&mut len_buf).await {
@ -105,7 +127,7 @@ impl FutriixServer {
if tx_data.is_none() { if tx_data.is_none() {
let mut db = db.write(); let mut db = db.write();
db.insert(key.clone(), value.clone()); db.insert(key.clone(), value.clone());
Self::update_indexes(&indexes, &key, &value); Self::update_indexes(&indexes, key.clone(), &value);
} }
Response::Success(None) Response::Success(None)
}, },
@ -130,7 +152,7 @@ impl FutriixServer {
let mut db = db.write(); let mut db = db.write();
if db.contains_key(&key) { if db.contains_key(&key) {
db.insert(key.clone(), value.clone()); db.insert(key.clone(), value.clone());
Self::update_indexes(&indexes, &key, &value); Self::update_indexes(&indexes, key.clone(), &value);
Response::Success(None) Response::Success(None)
} else { } else {
Response::Error("Key not found".to_string()) Response::Error("Key not found".to_string())
@ -152,7 +174,7 @@ impl FutriixServer {
if tx_data.is_none() { if tx_data.is_none() {
let mut db = db.write(); let mut db = db.write();
if db.remove(&key).is_some() { if db.remove(&key).is_some() {
Self::remove_from_indexes(&indexes, &key); Self::remove_from_indexes(&indexes, key.clone());
Response::Success(None) Response::Success(None)
} else { } else {
Response::Error("Key not found".to_string()) Response::Error("Key not found".to_string())
@ -173,10 +195,10 @@ impl FutriixServer {
for (k, v) in tx { for (k, v) in tx {
if v.is_null() { if v.is_null() {
db.remove(&k); db.remove(&k);
Self::remove_from_indexes(&indexes, &k); Self::remove_from_indexes(&indexes, k.clone());
} else { } else {
db.insert(k.clone(), v.clone()); db.insert(k.clone(), v.clone());
Self::update_indexes(&indexes, &k, &v); Self::update_indexes(&indexes, k.clone(), &v);
} }
} }
Response::Success(None) Response::Success(None)
@ -195,7 +217,6 @@ impl FutriixServer {
Command::CreateIndex { field } => { Command::CreateIndex { field } => {
let mut indexes = indexes.write(); let mut indexes = indexes.write();
if !indexes.contains_key(&field) { if !indexes.contains_key(&field) {
// Build initial index
let mut index = HashMap::new(); let mut index = HashMap::new();
let db = db.read(); let db = db.read();
for (key, value) in db.iter() { for (key, value) in db.iter() {
@ -219,6 +240,12 @@ impl FutriixServer {
Response::Error("Index not found".to_string()) Response::Error("Index not found".to_string())
} }
}, },
Command::SysExec { script_name } => {
match Self::execute_lua_script(&script_name) {
Ok(output) => Response::Success(Some(output.into())),
Err(e) => Response::Error(format!("Script execution failed: {}", e)),
}
},
}; };
let response_bytes = rmp_serde::to_vec(&response)?; let response_bytes = rmp_serde::to_vec(&response)?;
@ -232,37 +259,60 @@ impl FutriixServer {
fn update_indexes( fn update_indexes(
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>, indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
key: &str, key: String,
value: &serde_json::Value, value: &serde_json::Value,
) { ) {
let mut indexes = indexes.write(); let mut indexes = indexes.write();
for (field, index) in indexes.iter_mut() { for (field, index) in indexes.iter_mut() {
if let Some(field_value) = value.get(field) { if let Some(field_value) = value.get(field) {
// Remove old references
for entries in index.values_mut() { for entries in index.values_mut() {
if let Some(pos) = entries.iter().position(|k| k == key) { if let Some(pos) = entries.iter().position(|k| k == &key) {
entries.remove(pos); entries.remove(pos);
} }
} }
// Add new reference
index.entry(field_value.clone()) index.entry(field_value.clone())
.or_insert_with(Vec::new) .or_insert_with(Vec::new)
.push(key.to_string()); .push(key.clone());
} }
} }
} }
fn remove_from_indexes( fn remove_from_indexes(
indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>, indexes: &Arc<RwLock<HashMap<String, HashMap<serde_json::Value, Vec<String>>>>>,
key: &str, key: String,
) { ) {
let mut indexes = indexes.write(); let mut indexes = indexes.write();
for index in indexes.values_mut() { for index in indexes.values_mut() {
for entries in index.values_mut() { for entries in index.values_mut() {
if let Some(pos) = entries.iter().position(|k| k == key) { if let Some(pos) = entries.iter().position(|k| k == &key) {
entries.remove(pos); entries.remove(pos);
} }
} }
} }
} }
}
fn execute_lua_script(script_name: &str) -> Result<String, Box<dyn std::error::Error>> {
let scripts_dir = Path::new("scripts");
if !scripts_dir.exists() {
return Err("Scripts directory not found".into());
}
let script_path = scripts_dir.join(script_name).with_extension("lua");
if !script_path.starts_with(scripts_dir) {
return Err("Invalid script path".into());
}
info!("Executing Lua script: {}", script_path.display());
let output = std::process::Command::new("lua")
.arg(&script_path)
.output()?;
if output.status.success() {
Ok(String::from_utf8(output.stdout)?)
} else {
Err(String::from_utf8(output.stderr)?.into())
}
}
}