From e0649e0b5d4bb02a5eb81079a9acb0e1b760f91e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Sat, 12 Jul 2025 15:21:25 +0000 Subject: [PATCH] Upload files to "futriix-server/src" --- futriix-server/src/main.rs | 86 ++++ futriix-server/src/server.rs | 761 +++++++++++++++++++++++++++++++++++ 2 files changed, 847 insertions(+) create mode 100644 futriix-server/src/main.rs create mode 100644 futriix-server/src/server.rs diff --git a/futriix-server/src/main.rs b/futriix-server/src/main.rs new file mode 100644 index 0000000..ce260e4 --- /dev/null +++ b/futriix-server/src/main.rs @@ -0,0 +1,86 @@ +mod server; + +use server::*; +use std::fs; +use std::path::Path; +use toml::Value; +use simplelog::*; +use chrono::Local; +use log::info; +use ctrlc; +use colored::Colorize; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Чтение конфигурации + let config_content = fs::read_to_string("futriix.config.toml")?; + let config: Value = toml::from_str(&config_content)?; + + let ip = config["server"]["ip"].as_str().unwrap_or("127.0.0.1"); + let port = config["server"]["port"].as_integer().unwrap_or(8080) as u16; + let log_path = config["server"]["log_path"].as_str().unwrap_or("futriix.log"); + + // Настройка обработчика Ctrl+C + ctrlc::set_handler(move || { + println!("{}", "Server is now shutdown".truecolor(0, 191, 255)); + std::process::exit(0); + })?; + + // Создаем директорию для логов, если ее нет + let log_dir = Path::new("logs"); + if !log_dir.exists() { + fs::create_dir(log_dir)?; + } + + // Генерируем имя файла лога с датой и временем + let timestamp = Local::now().format("%Y-%m-%d_%H-%M-%S"); + let log_filename = format!("futriix_{}.log", timestamp); + let full_log_path = log_dir.join(log_filename); + + // Создаем симлинк на текущий лог-файл + let symlink_path = Path::new(log_path); + if symlink_path.exists() { + fs::remove_file(symlink_path)?; + } + #[cfg(unix)] + std::os::unix::fs::symlink(full_log_path.clone(), symlink_path)?; + #[cfg(windows)] + std::os::windows::fs::symlink_file(full_log_path.clone(), symlink_path)?; + + // Инициализация логгера с записью в файл и консоль + let log_file = fs::OpenOptions::new() + .create(true) + .append(true) + .open(&full_log_path)?; + + CombinedLogger::init(vec![ + TermLogger::new( + LevelFilter::Info, + Config::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + ), + WriteLogger::new( + LevelFilter::Info, + Config::default(), + log_file, + ), + ])?; + + // Пустая строка перед запуском (только в консоль) + println!(); + + // Первое информационное сообщение + println!("{}", "Futriix Server started successfully!".truecolor(0, 191, 255)); + + // Запуск сервера + let server = FutriixServer::new(&config); + let addr = format!("{}:{}", ip, port); + info!("Starting server on {}", addr); + info!("Log file: {}", full_log_path.display()); + info!("Symlink: {}", symlink_path.display()); + + server.run(&addr).await?; + + Ok(()) +} diff --git a/futriix-server/src/server.rs b/futriix-server/src/server.rs new file mode 100644 index 0000000..46b7399 --- /dev/null +++ b/futriix-server/src/server.rs @@ -0,0 +1,761 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::path::Path; +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use crossbeam::queue::SegQueue; +use parking_lot::RwLock; +use tokio::net::{TcpListener, TcpStream}; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::time; +use log::{info, error, debug, warn}; +use warp::Filter; + +#[derive(Debug, Clone)] +struct BStarTreeNode { + keys: Vec, + values: Vec, + children: Vec>>, + is_leaf: bool, +} + +impl BStarTreeNode { + fn new(is_leaf: bool) -> Self { + BStarTreeNode { + keys: Vec::new(), + values: Vec::new(), + children: Vec::new(), + is_leaf, + } + } + + #[allow(dead_code)] + fn search(&self, timestamp: u128) -> Option { + let mut i = 0; + while i < self.keys.len() && timestamp > self.keys[i] { + i += 1; + } + + if i < self.keys.len() && timestamp == self.keys[i] { + return Some(self.values[i].clone()); + } + + if self.is_leaf { + None + } else { + self.children[i].read().search(timestamp) + } + } + + fn insert(&mut self, timestamp: u128, value: serde_json::Value) { + let mut i = 0; + while i < self.keys.len() && timestamp > self.keys[i] { + i += 1; + } + + if self.is_leaf { + self.keys.insert(i, timestamp); + self.values.insert(i, value); + } else { + let child = self.children[i].clone(); + let mut child_guard = child.write(); + + if child_guard.keys.len() == 2 * 3 - 1 { + self.split_child(i, &mut child_guard); + if timestamp > self.keys[i] { + i += 1; + } + } + self.children[i].write().insert(timestamp, value); + } + } + + fn split_child(&mut self, i: usize, child: &mut BStarTreeNode) { + let mut new_node = BStarTreeNode::new(child.is_leaf); + new_node.keys = child.keys.split_off(3 / 2); + new_node.values = child.values.split_off(3 / 2); + + if !child.is_leaf { + new_node.children = child.children.split_off(3 / 2); + } + + let middle_key = child.keys.pop().unwrap(); + let middle_value = child.values.pop().unwrap(); + + self.keys.insert(i, middle_key); + self.values.insert(i, middle_value); + self.children.insert(i + 1, Arc::new(RwLock::new(new_node))); + } +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum Command { + Insert { key: String, value: serde_json::Value }, + Get { key: String, version: Option }, + Update { key: String, value: serde_json::Value }, + Delete { key: String }, + BeginTransaction, + CommitTransaction(u64), + RollbackTransaction(u64), + CreateIndex { field: String }, + DropIndex { field: String }, + SysExec { script_name: String }, + Replicate { commands: Vec<(Command, u128)> }, + ReplicationEnable, + ReplicationDisable, + ReplicationAddPeer { addr: String }, + ReplicationRemovePeer { addr: String }, + ReplicationStatus, + BackupCreate { path: String }, + BackupRestore { path: String }, + TimeSeriesInsert { key: String, value: serde_json::Value, timestamp: Option }, + TimeSeriesQuery { key: String, start: u128, end: u128 }, + FutExec { code: String }, +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum Response { + Success(Option), + Error(String), + ReplicationStatus { + enabled: bool, + peers: Vec, + last_sync: u128, + }, + BackupStatus { success: bool, message: String }, + TimeSeriesData(Vec<(u128, serde_json::Value)>), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +struct VersionedValue { + value: serde_json::Value, + version: u64, + timestamp: u128, + tx_id: Option, +} + +pub struct FutriixServer { + db: Arc>>>, + time_series_db: Arc>>>>, + command_queue: Arc>, + transactions: Arc>>>, + indexes: Arc>>>>, + replication_enabled: bool, + peer_nodes: Vec, + sync_interval: u64, + last_sync_timestamp: Arc>, + command_history: Arc>>, + next_tx_id: Arc>, +} + +impl FutriixServer { + pub fn new(config: &toml::Value) -> Self { + debug!("Initializing server with config: {:?}", config); + + let replication_enabled = config.get("replication") + .and_then(|r| r.get("enabled")) + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + let peer_nodes = config.get("replication") + .and_then(|r| r.get("peer_nodes")) + .and_then(|n| n.as_array()) + .map(|nodes| { + nodes.iter() + .filter_map(|n| n.as_str().map(|s| s.to_string())) + .collect() + }) + .unwrap_or_default(); + + let sync_interval = config.get("replication") + .and_then(|r| r.get("sync_interval")) + .and_then(|i| i.as_integer()) + .map(|i| i as u64) + .unwrap_or(1000); + + FutriixServer { + db: Arc::new(RwLock::new(HashMap::new())), + time_series_db: Arc::new(RwLock::new(HashMap::new())), + command_queue: Arc::new(SegQueue::new()), + transactions: Arc::new(RwLock::new(HashMap::new())), + indexes: Arc::new(RwLock::new(HashMap::new())), + replication_enabled, + peer_nodes, + sync_interval, + last_sync_timestamp: Arc::new(RwLock::new(0)), + command_history: Arc::new(RwLock::new(Vec::new())), + next_tx_id: Arc::new(RwLock::new(1)), + } + } + + pub async fn run(&self, addr: &str) -> Result<(), Box> { + info!("Starting server on {}", addr); + let listener = TcpListener::bind(addr).await?; + + let http_port = addr.split(':').last().unwrap_or("8080").parse::().unwrap_or(8080) + 1; + let http_addr = format!("0.0.0.0:{}", http_port); + self.start_http_api(http_addr.clone()); + + info!("Futriix Server started successfully!"); + info!("Listening on: {}", addr); + info!("HTTP API available on: {}", http_addr); + + if self.replication_enabled { + info!("Replication enabled with peers: {:?}", self.peer_nodes); + let sync_task = self.start_replication_sync(); + tokio::spawn(sync_task); + } + + let scripts_dir = Path::new("scripts"); + if !scripts_dir.exists() { + if let Err(e) = std::fs::create_dir(scripts_dir) { + warn!("Failed to create scripts directory: {}", e); + } + } + + let backups_dir = Path::new("backups"); + if !backups_dir.exists() { + if let Err(e) = std::fs::create_dir(backups_dir) { + warn!("Failed to create backups directory: {}", e); + } + } + + loop { + match listener.accept().await { + Ok((socket, _)) => { + let server = self.clone(); + tokio::spawn(async move { + if let Err(e) = server.handle_connection(socket).await { + error!("Connection error: {}", e); + } + }); + } + Err(e) => { + error!("Accept error: {}", e); + } + } + } + } + + fn start_http_api(&self, addr: String) { + let db = self.db.clone(); + let command_queue_insert = self.command_queue.clone(); + let command_queue_update = self.command_queue.clone(); + let command_queue_delete = self.command_queue.clone(); + + tokio::spawn(async move { + let get = warp::path!("api" / "get" / String) + .and(warp::get()) + .map(move |key: String| { + let db = db.read(); + match db.get(&key) { + Some(versions) => { + if let Some(latest) = versions.last() { + warp::reply::json(&latest.value) + } else { + warp::reply::json(&serde_json::Value::Null) + } + }, + None => warp::reply::json(&serde_json::Value::Null), + } + }); + + let insert = warp::path!("api" / "insert") + .and(warp::post()) + .and(warp::body::json()) + .map(move |data: HashMap| { + for (key, value) in data { + command_queue_insert.push(Command::Insert { key, value }); + } + warp::reply::json(&serde_json::json!({"status": "ok"})) + }); + + let update = warp::path!("api" / "update") + .and(warp::post()) + .and(warp::body::json()) + .map(move |data: HashMap| { + for (key, value) in data { + command_queue_update.push(Command::Update { key, value }); + } + warp::reply::json(&serde_json::json!({"status": "ok"})) + }); + + let delete = warp::path!("api" / "delete" / String) + .and(warp::delete()) + .map(move |key: String| { + command_queue_delete.push(Command::Delete { key }); + warp::reply::json(&serde_json::json!({"status": "ok"})) + }); + + let routes = get.or(insert).or(update).or(delete); + + warp::serve(routes) + .run(addr.parse::().unwrap()) + .await; + }); + } + + fn start_replication_sync(&self) -> impl std::future::Future + 'static { + let peer_nodes = self.peer_nodes.clone(); + let command_history = self.command_history.clone(); + let last_sync_timestamp = self.last_sync_timestamp.clone(); + let sync_interval = self.sync_interval; + + async move { + let mut interval = time::interval(Duration::from_millis(sync_interval)); + loop { + interval.tick().await; + + let commands_to_sync = { + let history = command_history.read(); + let last_sync = *last_sync_timestamp.read(); + history.iter() + .filter(|(_, ts)| *ts > last_sync) + .map(|(cmd, ts)| (cmd.clone(), *ts)) + .collect::>() + }; + + if !commands_to_sync.is_empty() { + for peer in &peer_nodes { + if let Ok(mut stream) = TcpStream::connect(peer).await { + let replicate_cmd = Command::Replicate { + commands: commands_to_sync.clone(), + }; + + if let Ok(cmd_bytes) = rmp_serde::to_vec(&replicate_cmd) { + let len = cmd_bytes.len() as u32; + if let Err(e) = stream.write_all(&len.to_be_bytes()).await { + warn!("Failed to write command length to peer {}: {}", peer, e); + continue; + } + if let Err(e) = stream.write_all(&cmd_bytes).await { + warn!("Failed to write command to peer {}: {}", peer, e); + } + } + } + } + } + } + } + } + + async fn handle_connection( + &self, + mut socket: TcpStream, + ) -> Result<(), Box> { + loop { + let mut len_buf = [0u8; 4]; + if let Err(e) = socket.read_exact(&mut len_buf).await { + error!("Failed to read command length: {}", e); + break; + } + let len = u32::from_be_bytes(len_buf) as usize; + + let mut buf = vec![0u8; len]; + if let Err(e) = socket.read_exact(&mut buf).await { + error!("Failed to read command: {}", e); + break; + } + + let cmd: Command = match rmp_serde::from_slice(&buf) { + Ok(cmd) => cmd, + Err(e) => { + error!("Failed to deserialize command: {}", e); + break; + } + }; + + self.command_queue.push(cmd.clone()); + + while let Some(cmd) = self.command_queue.pop() { + if let Command::Replicate { commands } = cmd { + for (cmd, ts) in commands { + match cmd { + Command::Insert { key, value } | Command::Update { key, value } => { + let mut db = self.db.write(); + let timestamp = match SystemTime::now() + .duration_since(UNIX_EPOCH) + { + Ok(duration) => duration.as_millis(), + Err(e) => { + error!("SystemTime error: {}", e); + 0 + } + }; + let versioned_value = VersionedValue { + value: value.clone(), + version: 0, + timestamp, + tx_id: None, + }; + db.entry(key.clone()) + .or_default() + .push(versioned_value); + } + _ => { + let response = self.process_command_internal(cmd); + if let Err(e) = Self::send_response(&mut socket, &response).await { + error!("Failed to send response: {}", e); + break; + } + } + } + *self.last_sync_timestamp.write() = ts; + } + continue; + } + + let response = self.process_command_internal(cmd); + if let Err(e) = Self::send_response(&mut socket, &response).await { + error!("Failed to send response: {}", e); + break; + } + } + } + Ok(()) + } + + async fn send_response(socket: &mut TcpStream, response: &Response) -> Result<(), Box> { + let response_bytes = rmp_serde::to_vec(response)?; + let len = response_bytes.len() as u32; + socket.write_all(&len.to_be_bytes()).await?; + socket.write_all(&response_bytes).await?; + Ok(()) + } + + fn process_command_internal(&self, cmd: Command) -> Response { + match cmd { + Command::Insert { key, value } => { + let mut db = self.db.write(); + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let version = db.get(&key).map_or(0, |v| v.last().map_or(0, |vv| vv.version + 1)); + + let versioned_value = VersionedValue { + value: value.clone(), + version, + timestamp, + tx_id: None, + }; + + db.entry(key.clone()) + .or_default() + .push(versioned_value.clone()); + + Self::update_indexes(&self.indexes, key, &value); + Response::Success(None) + }, + Command::Get { key, version } => { + let db = self.db.read(); + match db.get(&key) { + Some(versions) => { + let value = if let Some(v) = version { + versions.iter().find(|vv| vv.version == v) + } else { + versions.last() + }; + + match value { + Some(vv) => Response::Success(Some(vv.value.clone())), + None => Response::Error("Version not found".to_string()), + } + }, + None => Response::Error("Key not found".to_string()), + } + }, + Command::Update { key, value } => { + let mut db = self.db.write(); + if db.contains_key(&key) { + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis(); + let version = db.get(&key).unwrap().last().unwrap().version + 1; + + let versioned_value = VersionedValue { + value: value.clone(), + version, + timestamp, + tx_id: None, + }; + + db.get_mut(&key).unwrap().push(versioned_value.clone()); + Self::update_indexes(&self.indexes, key, &value); + Response::Success(None) + } else { + Response::Error("Key not found".to_string()) + } + }, + Command::Delete { key } => { + let mut db = self.db.write(); + if db.remove(&key).is_some() { + Self::remove_from_indexes(&self.indexes, key); + Response::Success(None) + } else { + Response::Error("Key not found".to_string()) + } + }, + Command::BeginTransaction => { + let tx_id = { + let mut next_id = self.next_tx_id.write(); + let id = *next_id; + *next_id += 1; + id + }; + + let mut transactions = self.transactions.write(); + transactions.insert(tx_id, HashMap::new()); + Response::Success(Some(tx_id.into())) + }, + Command::CommitTransaction(tx_id) => { + let mut db = self.db.write(); + let mut transactions = self.transactions.write(); + + if let Some(tx_data) = transactions.remove(&tx_id) { + for (key, versioned_value) in tx_data { + db.entry(key.clone()) + .or_default() + .push(versioned_value.clone()); + Self::update_indexes(&self.indexes, key, &versioned_value.value); + } + Response::Success(None) + } else { + Response::Error("Transaction not found".to_string()) + } + }, + Command::RollbackTransaction(tx_id) => { + let mut transactions = self.transactions.write(); + if transactions.remove(&tx_id).is_some() { + Response::Success(None) + } else { + Response::Error("Transaction not found".to_string()) + } + }, + Command::CreateIndex { field } => { + let mut indexes = self.indexes.write(); + if !indexes.contains_key(&field) { + indexes.insert(field, HashMap::new()); + Response::Success(None) + } else { + Response::Error("Index already exists".to_string()) + } + }, + Command::DropIndex { field } => { + let mut indexes = self.indexes.write(); + if indexes.remove(&field).is_some() { + Response::Success(None) + } else { + Response::Error("Index not found".to_string()) + } + }, + Command::SysExec { script_name } => { + match Self::execute_lua_script(&script_name) { + Ok(output) => Response::Success(Some(output.into())), + Err(e) => Response::Error(format!("Script execution failed: {}", e)), + } + }, + Command::TimeSeriesInsert { key, value, timestamp } => { + let timestamp = timestamp.unwrap_or_else(|| { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_millis() + }); + + let mut ts_db = self.time_series_db.write(); + let root = ts_db.entry(key.clone()) + .or_insert_with(|| Arc::new(RwLock::new(BStarTreeNode::new(true)))); + + root.write().insert(timestamp, value); + Response::Success(None) + }, + Command::TimeSeriesQuery { key, start, end } => { + let ts_db = self.time_series_db.read(); + if let Some(root) = ts_db.get(&key) { + let mut result = Vec::new(); + let root = root.read(); + Self::query_range(&root, start, end, &mut result); + Response::TimeSeriesData(result) + } else { + Response::Error("Time series key not found".to_string()) + } + }, + Command::FutExec { code: _ } => { + Response::Error("Lua execution is not supported in this version".to_string()) + }, + Command::ReplicationEnable => { + Response::Error("Replication enable/disable must be configured in config file".to_string()) + }, + Command::ReplicationDisable => { + Response::Error("Replication enable/disable must be configured in config file".to_string()) + }, + Command::ReplicationAddPeer { addr } => { + let mut peers = self.peer_nodes.clone(); + if !peers.contains(&addr) { + peers.push(addr); + Response::Success(None) + } else { + Response::Error("Peer already exists".to_string()) + } + }, + Command::ReplicationRemovePeer { addr } => { + let mut peers = self.peer_nodes.clone(); + if let Some(pos) = peers.iter().position(|p| p == &addr) { + peers.remove(pos); + Response::Success(None) + } else { + Response::Error("Peer not found".to_string()) + } + }, + Command::ReplicationStatus => Response::ReplicationStatus { + enabled: self.replication_enabled, + peers: self.peer_nodes.clone(), + last_sync: *self.last_sync_timestamp.read(), + }, + Command::Replicate { .. } => Response::Success(None), + Command::BackupCreate { path } => { + match Self::create_backup(&self.db, &path) { + Ok(_) => Response::BackupStatus { + success: true, + message: format!("Backup created successfully at {}", path), + }, + Err(e) => Response::BackupStatus { + success: false, + message: format!("Backup failed: {}", e), + }, + } + }, + Command::BackupRestore { path } => { + match Self::restore_backup(&self.db, &self.indexes, &path) { + Ok(_) => Response::BackupStatus { + success: true, + message: format!("Backup restored successfully from {}", path), + }, + Err(e) => Response::BackupStatus { + success: false, + message: format!("Restore failed: {}", e), + }, + } + }, + } + } + + fn query_range(node: &BStarTreeNode, start: u128, end: u128, result: &mut Vec<(u128, serde_json::Value)>) { + let mut i = 0; + while i < node.keys.len() && node.keys[i] < start { + i += 1; + } + + while i < node.keys.len() && node.keys[i] <= end { + result.push((node.keys[i], node.values[i].clone())); + i += 1; + } + + if !node.is_leaf { + for j in 0..node.children.len() { + Self::query_range(&node.children[j].read(), start, end, result); + } + } + } + + fn create_backup( + db: &Arc>>>, + path: &str, + ) -> Result<(), Box> { + let db = db.read(); + let serialized = serde_json::to_string(&*db)?; + std::fs::write(path, serialized)?; + Ok(()) + } + + fn restore_backup( + db: &Arc>>>, + indexes: &Arc>>>>, + path: &str, + ) -> Result<(), Box> { + let data = std::fs::read_to_string(path)?; + let restored: HashMap> = serde_json::from_str(&data)?; + + let mut db = db.write(); + *db = restored; + + let mut indexes = indexes.write(); + indexes.clear(); + + for (key, versions) in db.iter() { + if let Some(latest) = versions.last() { + for (field, index) in indexes.iter_mut() { + if let Some(field_value) = latest.value.get(field) { + index.entry(field_value.clone()) + .or_default() + .push(key.clone()); + } + } + } + } + + Ok(()) + } + + fn update_indexes( + indexes: &Arc>>>>, + key: String, + value: &serde_json::Value, + ) { + let mut indexes = indexes.write(); + for (field, index) in indexes.iter_mut() { + if let Some(field_value) = value.get(field) { + index.entry(field_value.clone()) + .or_default() + .push(key.clone()); + } + } + } + + fn remove_from_indexes( + indexes: &Arc>>>>, + key: String, + ) { + let mut indexes = indexes.write(); + for index in indexes.values_mut() { + for entries in index.values_mut() { + if let Some(pos) = entries.iter().position(|k| k == &key) { + entries.remove(pos); + } + } + } + } + + fn execute_lua_script(script_name: &str) -> Result> { + let script_path = Path::new("scripts").join(script_name).with_extension("lua"); + let output = std::process::Command::new("lua") + .arg(&script_path) + .output()?; + + if output.status.success() { + Ok(String::from_utf8(output.stdout)?) + } else { + Err(String::from_utf8(output.stderr)?.into()) + } + } +} + +impl Clone for FutriixServer { + fn clone(&self) -> Self { + FutriixServer { + db: self.db.clone(), + time_series_db: self.time_series_db.clone(), + command_queue: self.command_queue.clone(), + transactions: self.transactions.clone(), + indexes: self.indexes.clone(), + replication_enabled: self.replication_enabled, + peer_nodes: self.peer_nodes.clone(), + sync_interval: self.sync_interval, + last_sync_timestamp: self.last_sync_timestamp.clone(), + command_history: self.command_history.clone(), + next_tx_id: self.next_tx_id.clone(), + } + } +}