From 867e9e5f4b7ab2dd8ff352c4164b33b8d802b663 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=93=D1=80=D0=B8=D0=B3=D0=BE=D1=80=D0=B8=D0=B9=20=D0=A1?= =?UTF-8?q?=D0=B0=D1=84=D1=80=D0=BE=D0=BD=D0=BE=D0=B2?= Date: Mon, 22 Sep 2025 20:12:48 +0000 Subject: [PATCH] Delete src/db.rs --- src/db.rs | 517 ------------------------------------------------------ 1 file changed, 517 deletions(-) delete mode 100644 src/db.rs diff --git a/src/db.rs b/src/db.rs deleted file mode 100644 index 6a4b7e5..0000000 --- a/src/db.rs +++ /dev/null @@ -1,517 +0,0 @@ -use ansi_term::Colour; -use rmp_serde::{Deserializer, Serializer}; -use serde::{Deserialize, Serialize}; -use std::collections::{BTreeMap, HashMap}; -use std::fs::{self, File, OpenOptions}; -use std::io::{BufReader, BufWriter, Write}; -use std::path::PathBuf; -use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; -use std::sync::Arc; -use std::time::Duration; -use parking_lot::RwLock; -use tokio::sync::mpsc; -use tokio::time::interval; -use uuid::Uuid; - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct Document { - pub id: String, - pub data: HashMap, - pub version: u64, -} - -#[derive(Debug)] -pub struct Index { - primary: RwLock>, // key -> document_id - secondary: RwLock>>, // field_value -> document_ids -} - -#[derive(Debug)] -pub struct Transaction { - pub operations: Vec, - pub committed: AtomicBool, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum Operation { - Insert { doc: Document }, - Update { doc_id: String, data: HashMap }, - Delete { doc_id: String }, -} - -#[derive(Debug)] -pub struct Shard { - pub data: RwLock>, - pub index: Index, - pub wal: Option, -} - -#[derive(Debug)] -pub struct Wal { - file: std::sync::Mutex, - sequence: AtomicU64, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct NodeInfo { - pub id: String, - pub address: String, - pub port: u16, - pub status: NodeStatus, - pub last_seen: u64, - pub version: u64, -} - -#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] -pub enum NodeStatus { - Alive, - Suspect, - Dead, -} - -#[derive(Debug, Serialize, Deserialize, Clone)] -pub struct GossipMessage { - pub from_node: String, - pub nodes: Vec, - pub sequence: u64, - pub timestamp: u64, -} - -#[derive(Debug)] -pub struct GossipManager { - pub node_id: String, - pub nodes: RwLock>, - pub sequence: AtomicU64, - pub tx: mpsc::Sender, - pub rx: mpsc::Receiver, - pub config: DatabaseConfig, -} - -#[derive(Debug)] -pub struct FutriixDB { - pub shards: Vec>, - pub config: DatabaseConfig, - pub transactions: RwLock>, - pub lua_modules: RwLock>, // name -> lua code - pub gossip: Arc, -} - -#[derive(Debug, Deserialize, Clone)] -pub struct DatabaseConfig { - pub data_dir: String, - pub wal_enabled: bool, - pub wal_dir: String, - pub replication_enabled: bool, - pub replication_nodes: Vec, - pub sharding_enabled: bool, - pub shards: usize, - pub cluster_mode: bool, - pub gossip_port: u16, - pub gossip_interval_ms: u64, - pub node_id: Option, -} - -impl FutriixDB { - pub fn new(config: &DatabaseConfig) -> Result> { - // Создаем директории если не существуют - fs::create_dir_all(&config.data_dir)?; - if config.wal_enabled { - fs::create_dir_all(&config.wal_dir)?; - } - - let mut shards = Vec::new(); - for i in 0..config.shards { - let shard = Arc::new(Shard::new(config, i)?); - shards.push(shard); - } - - // Инициализируем GOSSIP менеджер - let gossip = GossipManager::new(config)?; - - Ok(Self { - shards, - config: config.clone(), - transactions: RwLock::new(HashMap::new()), - lua_modules: RwLock::new(HashMap::new()), - gossip: Arc::new(gossip), - }) - } - - pub fn insert(&self, key: &str, data: HashMap) -> Result<(), String> { - let shard = self.get_shard(key); - let mut shard_data = shard.data.write(); - - let document = Document { - id: key.to_string(), - data, - version: 0, - }; - - if shard_data.contains_key(key) { - return Err("Key already exists".to_string()); - } - - // Добавляем в WAL если включен - if let Some(wal) = &shard.wal { - if let Err(e) = wal.write_operation(&Operation::Insert { doc: document.clone() }) { - return Err(format!("WAL write error: {}", e)); - } - } - - shard_data.insert(key.to_string(), document); - Ok(()) - } - - pub fn get(&self, key: &str) -> Result { - let shard = self.get_shard(key); - let shard_data = shard.data.read(); - - shard_data.get(key) - .cloned() - .ok_or_else(|| "Key not found".to_string()) - } - - pub fn update(&self, key: &str, data: HashMap) -> Result<(), String> { - let shard = self.get_shard(key); - let mut shard_data = shard.data.write(); - - let doc = shard_data.get_mut(key) - .ok_or_else(|| "Key not found".to_string())?; - - doc.data = data; - doc.version += 1; - - if let Some(wal) = &shard.wal { - wal.write_operation(&Operation::Update { - doc_id: key.to_string(), - data: doc.data.clone() - })?; - } - - Ok(()) - } - - pub fn delete(&self, key: &str) -> Result<(), String> { - let shard = self.get_shard(key); - let mut shard_data = shard.data.write(); - - if !shard_data.contains_key(key) { - return Err("Key not found".to_string()); - } - - if let Some(wal) = &shard.wal { - wal.write_operation(&Operation::Delete { - doc_id: key.to_string() - })?; - } - - shard_data.remove(key); - Ok(()) - } - - fn get_shard(&self, key: &str) -> &Arc { - let hash = self.hash_key(key); - &self.shards[hash % self.shards.len()] - } - - fn hash_key(&self, key: &str) -> usize { - use std::collections::hash_map::DefaultHasher; - use std::hash::{Hash, Hasher}; - - let mut hasher = DefaultHasher::new(); - key.hash(&mut hasher); - hasher.finish() as usize - } - - pub fn add_lua_module(&self, name: &str, code: &str) { - self.lua_modules.write().insert(name.to_string(), code.to_string()); - } - - pub async fn start_gossip(&self) -> Result<(), Box> { - if self.config.cluster_mode { - self.gossip.start().await?; - } - Ok(()) - } - - // Команды для управления кластером - pub fn gossip_add_node(&self, address: &str, port: u16) -> Result<(), String> { - if !self.config.cluster_mode { - return Err("Cluster mode is disabled".to_string()); - } - - let node_id = format!("{}:{}", address, port); - let node_info = NodeInfo { - id: node_id.clone(), - address: address.to_string(), - port, - status: NodeStatus::Alive, - last_seen: chrono::Utc::now().timestamp_millis() as u64, - version: 0, - }; - - self.gossip.update_node(node_info); - println!("GOSSIP: Node {}:{} added to cluster", address, port); - Ok(()) - } - - pub fn gossip_remove_node(&self, node_id: &str) -> Result<(), String> { - if !self.config.cluster_mode { - return Err("Cluster mode is disabled".to_string()); - } - - let mut nodes = self.gossip.nodes.write(); - if nodes.remove(node_id).is_some() { - println!("GOSSIP: Node {} removed from cluster", node_id); - Ok(()) - } else { - Err(format!("Node {} not found", node_id)) - } - } - - pub fn gossip_list_nodes(&self) -> Result, String> { - if !self.config.cluster_mode { - return Err("Cluster mode is disabled".to_string()); - } - - Ok(self.gossip.get_nodes()) - } - - pub fn gossip_reconfigure(&self, new_nodes: Vec<(String, u16)>) -> Result<(), String> { - if !self.config.cluster_mode { - return Err("Cluster mode is disabled".to_string()); - } - - let nodes_count = new_nodes.len(); - let mut nodes = self.gossip.nodes.write(); - nodes.clear(); - - for (address, port) in new_nodes { - let node_id = format!("{}:{}", address, port); - let node_info = NodeInfo { - id: node_id.clone(), - address: address.to_string(), - port, - status: NodeStatus::Alive, - last_seen: chrono::Utc::now().timestamp_millis() as u64, - version: 0, - }; - nodes.insert(node_id, node_info); - } - - println!("GOSSIP: Cluster reconfigured with {} nodes", nodes_count); - Ok(()) - } -} - -impl Shard { - fn new(config: &DatabaseConfig, index: usize) -> Result> { - let wal = if config.wal_enabled { - let wal_path = PathBuf::from(&config.wal_dir).join(format!("shard_{}.wal", index)); - Some(Wal::new(wal_path)?) - } else { - None - }; - - Ok(Self { - data: RwLock::new(HashMap::new()), - index: Index::new(), - wal, - }) - } -} - -impl Index { - fn new() -> Self { - Self { - primary: RwLock::new(BTreeMap::new()), - secondary: RwLock::new(HashMap::new()), - } - } -} - -impl Wal { - fn new(path: PathBuf) -> Result> { - let file = OpenOptions::new() - .create(true) - .append(true) - .open(path)?; - - Ok(Self { - file: std::sync::Mutex::new(file), - sequence: AtomicU64::new(0), - }) - } - - fn write_operation(&self, operation: &Operation) -> Result<(), String> { - let mut buf = Vec::new(); - let mut serializer = Serializer::new(&mut buf); - operation.serialize(&mut serializer) - .map_err(|e| format!("Serialization error: {}", e))?; - - let mut file = self.file.lock().unwrap(); - file.write_all(&buf) - .map_err(|e| format!("Write error: {}", e))?; - - file.write_all(b"\n") - .map_err(|e| format!("Write error: {}", e))?; - - self.sequence.fetch_add(1, Ordering::SeqCst); - Ok(()) - } -} - -impl GossipManager { - fn new(config: &DatabaseConfig) -> Result> { - let (tx, rx) = mpsc::channel(100); - - let node_id = config.node_id.clone().unwrap_or_else(|| { - format!("node-{}", Uuid::new_v4().to_string()[..8].to_string()) - }); - - println!("GOSSIP: Initializing with node ID: {}", node_id); - println!("GOSSIP: Cluster mode: {}", config.cluster_mode); - println!("GOSSIP: Port: {}", config.gossip_port); - println!("GOSSIP: Interval: {}ms", config.gossip_interval_ms); - - Ok(Self { - node_id, - nodes: RwLock::new(HashMap::new()), - sequence: AtomicU64::new(0), - tx, - rx, - config: config.clone(), - }) - } - - pub async fn start(&self) -> Result<(), Box> { - if !self.config.cluster_mode { - return Ok(()); - } - - println!("GOSSIP: Starting gossip loop with {}ms interval", self.config.gossip_interval_ms); - - let gossip_clone = self.clone_manager(); - tokio::spawn(async move { - gossip_clone.run_gossip_loop().await; - }); - - println!("Cluster: started successfully"); - Ok(()) - } - - async fn run_gossip_loop(&self) { - if !self.config.cluster_mode { - return; - } - - let mut interval = interval(Duration::from_millis(self.config.gossip_interval_ms)); - let mut broadcast_count = 0; - - loop { - interval.tick().await; - - // Ограничиваем частоту вывода сообщений в консоль - if broadcast_count % 10 == 0 { - let nodes_count = self.nodes.read().len(); - if nodes_count > 0 { - println!("GOSSIP: Broadcasting to {} nodes", nodes_count); - } - } - - self.broadcast_gossip().await; - self.cleanup_dead_nodes(); - - broadcast_count += 1; - } - } - - async fn broadcast_gossip(&self) { - let nodes = self.nodes.read().clone(); - let message = GossipMessage { - from_node: self.node_id.clone(), - nodes: nodes.values().cloned().collect(), - sequence: self.sequence.fetch_add(1, Ordering::SeqCst), - timestamp: chrono::Utc::now().timestamp_millis() as u64, - }; - - // Здесь будет реализация рассылки сообщений другим узлам - // через сетевые сокеты или другие транспортные механизмы - } - - fn cleanup_dead_nodes(&self) { - let mut nodes = self.nodes.write(); - let now = chrono::Utc::now().timestamp_millis() as u64; - let mut removed_count = 0; - - nodes.retain(|_, node| { - if node.status == NodeStatus::Dead { - removed_count += 1; - return false; - } - - // Помечаем узлы как подозрительные если не видели их более 30 секунд - if now - node.last_seen > 30000 { - node.status = NodeStatus::Suspect; - println!("GOSSIP: Node {} marked as suspect", node.id); - } - - // Помечаем узлы как мертвые если не видели их более 60 секунд - if now - node.last_seen > 60000 { - node.status = NodeStatus::Dead; - println!("GOSSIP: Node {} marked as dead", node.id); - removed_count += 1; - return false; - } - - true - }); - - if removed_count > 0 { - println!("GOSSIP: Removed {} dead nodes", removed_count); - } - } - - pub fn update_node(&self, node_info: NodeInfo) { - let node_id = node_info.id.clone(); - let mut nodes = self.nodes.write(); - nodes.insert(node_info.id.clone(), node_info); - println!("GOSSIP: Node {} updated", node_id); - } - - pub fn get_nodes(&self) -> Vec { - let nodes = self.nodes.read(); - nodes.values().cloned().collect() - } - - fn clone_manager(&self) -> Arc { - // Создаем новый канал для клонированного менеджера - let (tx, rx) = mpsc::channel(100); - - Arc::new(Self { - node_id: self.node_id.clone(), - nodes: RwLock::new(self.nodes.read().clone()), - sequence: AtomicU64::new(self.sequence.load(Ordering::SeqCst)), - tx, - rx, - config: self.config.clone(), - }) - } -} - -// Реализация ручной клонирования для GossipManager -impl Clone for GossipManager { - fn clone(&self) -> Self { - // Создаем новый канал для клонированного менеджера - let (tx, rx) = mpsc::channel(100); - - Self { - node_id: self.node_id.clone(), - nodes: RwLock::new(self.nodes.read().clone()), - sequence: AtomicU64::new(self.sequence.load(Ordering::SeqCst)), - tx, - rx, - config: self.config.clone(), - } - } -}