diff --git a/src/db.rs b/src/db.rs new file mode 100644 index 0000000..6ab8afd --- /dev/null +++ b/src/db.rs @@ -0,0 +1,787 @@ +use ansi_term::Colour; +use rmp_serde::{Deserializer, Serializer}; +use serde::{Deserialize, Serialize}; +use std::collections::{BTreeMap, HashMap}; +use std::fs::{self, File, OpenOptions}; +use std::io::{BufReader, BufWriter, Read, Write}; +use std::path::PathBuf; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use parking_lot::RwLock; +use tokio::sync::mpsc; +use tokio::time::interval; +use uuid::Uuid; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Document { + pub id: String, + pub data: HashMap, + pub version: u64, +} + +#[derive(Debug)] +pub struct Index { + primary: RwLock>, // key -> document_id + secondary: RwLock>>, // field_value -> document_ids +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Transaction { + pub id: String, + pub operations: Vec, + pub committed: bool, + pub created_at: u64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub enum Operation { + Insert { doc: Document }, + Update { doc_id: String, data: HashMap }, + Delete { doc_id: String }, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct AclRule { + pub user: String, + pub permission: Permission, + pub collections: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub enum Permission { + ReadOnly, + ReadWrite, + Admin, +} + +#[derive(Debug)] +pub struct Shard { + pub data: RwLock>, + pub index: Index, + pub wal: Option, +} + +#[derive(Debug)] +pub struct Wal { + file: std::sync::Mutex, + sequence: AtomicU64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct NodeInfo { + pub id: String, + pub address: String, + pub port: u16, + pub status: NodeStatus, + pub last_seen: u64, + pub version: u64, +} + +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)] +pub enum NodeStatus { + Alive, + Suspect, + Dead, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct GossipMessage { + pub from_node: String, + pub nodes: Vec, + pub sequence: u64, + pub timestamp: u64, +} + +#[derive(Debug)] +pub struct GossipManager { + pub node_id: String, + pub nodes: RwLock>, + pub sequence: AtomicU64, + pub tx: mpsc::Sender, + pub rx: mpsc::Receiver, + pub config: DatabaseConfig, +} + +#[derive(Debug)] +pub struct FutriixDB { + pub shards: Vec>, + pub config: DatabaseConfig, + pub transactions: RwLock>, + pub lua_modules: RwLock>, // name -> lua code + pub gossip: Arc, + pub acl_rules: RwLock>, + pub backups: RwLock>, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct BackupInfo { + pub id: String, + pub timestamp: u64, + pub size: u64, + pub collections: Vec, +} + +#[derive(Debug, Deserialize, Clone)] +pub struct DatabaseConfig { + pub data_dir: String, + pub wal_enabled: bool, + pub wal_dir: String, + pub backup_dir: String, + pub replication_enabled: bool, + pub replication_nodes: Vec, + pub sharding_enabled: bool, + pub shards: usize, + pub cluster_mode: bool, + pub gossip_port: u16, + pub gossip_interval_ms: u64, + pub node_id: Option, + pub acl_enabled: bool, +} + +impl FutriixDB { + pub fn new(config: &DatabaseConfig) -> Result> { + // Создаем директории если не существуют + fs::create_dir_all(&config.data_dir)?; + if config.wal_enabled { + fs::create_dir_all(&config.wal_dir)?; + } + fs::create_dir_all(&config.backup_dir)?; + + let mut shards = Vec::new(); + for i in 0..config.shards { + let shard = Arc::new(Shard::new(config, i)?); + shards.push(shard); + } + + // Инициализируем GOSSIP менеджер + let gossip = GossipManager::new(config)?; + + // Инициализируем ACL правила по умолчанию + let acl_rules = RwLock::new(Vec::new()); + + Ok(Self { + shards, + config: config.clone(), + transactions: RwLock::new(HashMap::new()), + lua_modules: RwLock::new(HashMap::new()), + gossip: Arc::new(gossip), + acl_rules, + backups: RwLock::new(HashMap::new()), + }) + } + + pub fn insert(&self, key: &str, data: HashMap) -> Result<(), String> { + let shard = self.get_shard(key); + let mut shard_data = shard.data.write(); + + let document = Document { + id: key.to_string(), + data, + version: 0, + }; + + if shard_data.contains_key(key) { + return Err("Key already exists".to_string()); + } + + // Добавляем в WAL если включен + if let Some(wal) = &shard.wal { + if let Err(e) = wal.write_operation(&Operation::Insert { doc: document.clone() }) { + return Err(format!("WAL write error: {}", e)); + } + } + + shard_data.insert(key.to_string(), document); + Ok(()) + } + + pub fn get(&self, key: &str) -> Result { + let shard = self.get_shard(key); + let shard_data = shard.data.read(); + + shard_data.get(key) + .cloned() + .ok_or_else(|| "Key not found".to_string()) + } + + pub fn update(&self, key: &str, data: HashMap) -> Result<(), String> { + let shard = self.get_shard(key); + let mut shard_data = shard.data.write(); + + let doc = shard_data.get_mut(key) + .ok_or_else(|| "Key not found".to_string())?; + + doc.data = data; + doc.version += 1; + + if let Some(wal) = &shard.wal { + wal.write_operation(&Operation::Update { + doc_id: key.to_string(), + data: doc.data.clone() + })?; + } + + Ok(()) + } + + pub fn delete(&self, key: &str) -> Result<(), String> { + let shard = self.get_shard(key); + let mut shard_data = shard.data.write(); + + if !shard_data.contains_key(key) { + return Err("Key not found".to_string()); + } + + if let Some(wal) = &shard.wal { + wal.write_operation(&Operation::Delete { + doc_id: key.to_string() + })?; + } + + shard_data.remove(key); + Ok(()) + } + + fn get_shard(&self, key: &str) -> &Arc { + let hash = self.hash_key(key); + &self.shards[hash % self.shards.len()] + } + + fn hash_key(&self, key: &str) -> usize { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + + let mut hasher = DefaultHasher::new(); + key.hash(&mut hasher); + hasher.finish() as usize + } + + pub fn add_lua_module(&self, name: &str, code: &str) { + self.lua_modules.write().insert(name.to_string(), code.to_string()); + } + + pub async fn start_gossip(&self) -> Result<(), Box> { + if self.config.cluster_mode { + self.gossip.start().await?; + } + Ok(()) + } + + // Команды для управления кластером + pub fn gossip_add_node(&self, address: &str, port: u16) -> Result<(), String> { + if !self.config.cluster_mode { + return Err("Cluster mode is disabled".to_string()); + } + + let node_id = format!("{}:{}", address, port); + let node_info = NodeInfo { + id: node_id.clone(), + address: address.to_string(), + port, + status: NodeStatus::Alive, + last_seen: chrono::Utc::now().timestamp_millis() as u64, + version: 0, + }; + + self.gossip.update_node(node_info); + println!("GOSSIP: Node {}:{} added to cluster", address, port); + Ok(()) + } + + pub fn gossip_remove_node(&self, node_id: &str) -> Result<(), String> { + if !self.config.cluster_mode { + return Err("Cluster mode is disabled".to_string()); + } + + let mut nodes = self.gossip.nodes.write(); + if nodes.remove(node_id).is_some() { + println!("GOSSIP: Node {} removed from cluster", node_id); + Ok(()) + } else { + Err(format!("Node {} not found", node_id)) + } + } + + pub fn gossip_list_nodes(&self) -> Result, String> { + if !self.config.cluster_mode { + return Err("Cluster mode is disabled".to_string()); + } + + Ok(self.gossip.get_nodes()) + } + + pub fn gossip_reconfigure(&self, new_nodes: Vec<(String, u16)>) -> Result<(), String> { + if !self.config.cluster_mode { + return Err("Cluster mode is disabled".to_string()); + } + + let nodes_count = new_nodes.len(); + let mut nodes = self.gossip.nodes.write(); + nodes.clear(); + + for (address, port) in new_nodes { + let node_id = format!("{}:{}", address, port); + let node_info = NodeInfo { + id: node_id.clone(), + address: address.to_string(), + port, + status: NodeStatus::Alive, + last_seen: chrono::Utc::now().timestamp_millis() as u64, + version: 0, + }; + nodes.insert(node_id, node_info); + } + + println!("GOSSIP: Cluster reconfigured with {} nodes", nodes_count); + Ok(()) + } + + // Команды для управления транзакциями + pub fn begin_transaction(&self) -> String { + let tx_id = Uuid::new_v4().to_string(); + let transaction = Transaction { + id: tx_id.clone(), + operations: Vec::new(), + committed: false, + created_at: chrono::Utc::now().timestamp_millis() as u64, + }; + + self.transactions.write().insert(tx_id.clone(), transaction); + println!("TRANSACTION: Started transaction {}", tx_id); + tx_id + } + + pub fn commit_transaction(&self, tx_id: &str) -> Result<(), String> { + let mut transactions = self.transactions.write(); + if let Some(mut transaction) = transactions.remove(tx_id) { + transaction.committed = true; + // Применяем операции транзакции + for op in &transaction.operations { + match op { + Operation::Insert { doc } => { + if let Err(e) = self.insert(&doc.id, doc.data.clone()) { + return Err(format!("Failed to commit transaction: {}", e)); + } + } + Operation::Update { doc_id, data } => { + if let Err(e) = self.update(doc_id, data.clone()) { + return Err(format!("Failed to commit transaction: {}", e)); + } + } + Operation::Delete { doc_id } => { + if let Err(e) = self.delete(doc_id) { + return Err(format!("Failed to commit transaction: {}", e)); + } + } + } + } + println!("TRANSACTION: Committed transaction {}", tx_id); + Ok(()) + } else { + Err(format!("Transaction {} not found", tx_id)) + } + } + + pub fn rollback_transaction(&self, tx_id: &str) -> Result<(), String> { + let mut transactions = self.transactions.write(); + if transactions.remove(tx_id).is_some() { + println!("TRANSACTION: Rolled back transaction {}", tx_id); + Ok(()) + } else { + Err(format!("Transaction {} not found", tx_id)) + } + } + + pub fn add_operation_to_transaction(&self, tx_id: &str, operation: Operation) -> Result<(), String> { + let mut transactions = self.transactions.write(); + if let Some(transaction) = transactions.get_mut(tx_id) { + transaction.operations.push(operation); + Ok(()) + } else { + Err(format!("Transaction {} not found", tx_id)) + } + } + + // Команды для управления индексами + pub fn create_secondary_index(&self, index_name: &str, field: &str) -> Result<(), String> { + for shard in &self.shards { + let mut index = shard.index.secondary.write(); + index.insert(index_name.to_string(), Vec::new()); + } + println!("INDEX: Created secondary index '{}' for field '{}'", index_name, field); + Ok(()) + } + + pub fn drop_secondary_index(&self, index_name: &str) -> Result<(), String> { + for shard in &self.shards { + let mut index = shard.index.secondary.write(); + index.remove(index_name); + } + println!("INDEX: Dropped secondary index '{}'", index_name); + Ok(()) + } + + pub fn list_indexes(&self) -> Vec { + let shard = &self.shards[0]; + let index = shard.index.secondary.read(); + index.keys().cloned().collect() + } + + // Команды для управления резервными копиями + pub fn create_backup(&self, backup_id: &str) -> Result<(), String> { + let backup_path = PathBuf::from(&self.config.backup_dir).join(format!("{}.backup", backup_id)); + + let mut file = OpenOptions::new() + .create(true) + .write(true) + .open(&backup_path) + .map_err(|e| format!("Failed to create backup file: {}", e))?; + + // Собираем все данные для резервной копии + let mut backup_data = HashMap::new(); + for (i, shard) in self.shards.iter().enumerate() { + let shard_data = shard.data.read(); + let mut shard_backup = HashMap::new(); + for (key, doc) in shard_data.iter() { + shard_backup.insert(key.clone(), doc.clone()); + } + backup_data.insert(format!("shard_{}", i), shard_backup); + } + + // Сериализуем и записываем данные + let serialized = rmp_serde::to_vec(&backup_data) + .map_err(|e| format!("Failed to serialize backup: {}", e))?; + + file.write_all(&serialized) + .map_err(|e| format!("Failed to write backup: {}", e))?; + + let backup_info = BackupInfo { + id: backup_id.to_string(), + timestamp: chrono::Utc::now().timestamp_millis() as u64, + size: serialized.len() as u64, + collections: vec!["all".to_string()], + }; + + self.backups.write().insert(backup_id.to_string(), backup_info); + println!("BACKUP: Created backup '{}'", backup_id); + Ok(()) + } + + pub fn restore_backup(&self, backup_id: &str) -> Result<(), String> { + let backup_path = PathBuf::from(&self.config.backup_dir).join(format!("{}.backup", backup_id)); + + let mut file = File::open(&backup_path) + .map_err(|e| format!("Failed to open backup file: {}", e))?; + + let mut buffer = Vec::new(); + file.read_to_end(&mut buffer) + .map_err(|e| format!("Failed to read backup: {}", e))?; + + let backup_data: HashMap> = rmp_serde::from_slice(&buffer) + .map_err(|e| format!("Failed to deserialize backup: {}", e))?; + + // Восстанавливаем данные + for (i, shard) in self.shards.iter().enumerate() { + let shard_key = format!("shard_{}", i); + if let Some(shard_backup) = backup_data.get(&shard_key) { + let mut shard_data = shard.data.write(); + shard_data.clear(); + for (key, doc) in shard_backup { + shard_data.insert(key.clone(), doc.clone()); + } + } + } + + println!("BACKUP: Restored from backup '{}'", backup_id); + Ok(()) + } + + pub fn delete_backup(&self, backup_id: &str) -> Result<(), String> { + let backup_path = PathBuf::from(&self.config.backup_dir).join(format!("{}.backup", backup_id)); + + fs::remove_file(&backup_path) + .map_err(|e| format!("Failed to delete backup file: {}", e))?; + + self.backups.write().remove(backup_id); + println!("BACKUP: Deleted backup '{}'", backup_id); + Ok(()) + } + + pub fn list_backups(&self) -> Vec { + self.backups.read().values().cloned().collect() + } + + // Команды для управления ACL + pub fn add_acl_rule(&self, user: &str, permission: Permission, collections: Vec) -> Result<(), String> { + if !self.config.acl_enabled { + return Err("ACL is disabled".to_string()); + } + + let rule = AclRule { + user: user.to_string(), + permission, + collections, + }; + + let mut acl_rules = self.acl_rules.write(); + acl_rules.push(rule); + println!("ACL: Added rule for user '{}'", user); + Ok(()) + } + + pub fn remove_acl_rule(&self, user: &str) -> Result<(), String> { + if !self.config.acl_enabled { + return Err("ACL is disabled".to_string()); + } + + let mut acl_rules = self.acl_rules.write(); + let initial_len = acl_rules.len(); + acl_rules.retain(|rule| rule.user != user); + + if acl_rules.len() < initial_len { + println!("ACL: Removed rules for user '{}'", user); + Ok(()) + } else { + Err(format!("No ACL rules found for user '{}'", user)) + } + } + + pub fn list_acl_rules(&self) -> Vec { + self.acl_rules.read().clone() + } + + pub fn check_permission(&self, user: &str, collection: &str, operation: &str) -> bool { + if !self.config.acl_enabled { + return true; // Если ACL отключен, разрешаем все + } + + let acl_rules = self.acl_rules.read(); + for rule in acl_rules.iter() { + if rule.user == user && (rule.collections.contains(&"all".to_string()) || rule.collections.contains(&collection.to_string())) { + match (&rule.permission, operation) { + (Permission::Admin, _) => return true, + (Permission::ReadWrite, "read") | (Permission::ReadWrite, "write") => return true, + (Permission::ReadOnly, "read") => return true, + _ => continue, + } + } + } + false + } +} + +impl Shard { + fn new(config: &DatabaseConfig, index: usize) -> Result> { + let wal = if config.wal_enabled { + let wal_path = PathBuf::from(&config.wal_dir).join(format!("shard_{}.wal", index)); + Some(Wal::new(wal_path)?) + } else { + None + }; + + Ok(Self { + data: RwLock::new(HashMap::new()), + index: Index::new(), + wal, + }) + } +} + +impl Index { + fn new() -> Self { + Self { + primary: RwLock::new(BTreeMap::new()), + secondary: RwLock::new(HashMap::new()), + } + } +} + +impl Wal { + fn new(path: PathBuf) -> Result> { + let file = OpenOptions::new() + .create(true) + .append(true) + .open(path)?; + + Ok(Self { + file: std::sync::Mutex::new(file), + sequence: AtomicU64::new(0), + }) + } + + fn write_operation(&self, operation: &Operation) -> Result<(), String> { + let mut buf = Vec::new(); + let mut serializer = Serializer::new(&mut buf); + operation.serialize(&mut serializer) + .map_err(|e| format!("Serialization error: {}", e))?; + + let mut file = self.file.lock().unwrap(); + file.write_all(&buf) + .map_err(|e| format!("Write error: {}", e))?; + + file.write_all(b"\n") + .map_err(|e| format!("Write error: {}", e))?; + + self.sequence.fetch_add(1, Ordering::SeqCst); + Ok(()) + } +} + +impl GossipManager { + fn new(config: &DatabaseConfig) -> Result> { + let (tx, rx) = mpsc::channel(100); + + // Используем node_id из конфигурации или генерируем случайный + let node_id = if let Some(configured_id) = &config.node_id { + configured_id.clone() + } else { + format!("node-{}", Uuid::new_v4().to_string()[..8].to_string()) + }; + + println!("GOSSIP: Initializing with node ID: {}", node_id); + println!("GOSSIP: Cluster mode: {}", config.cluster_mode); + println!("GOSSIP: Port: {}", config.gossip_port); + println!("GOSSIP: Interval: {}ms", config.gossip_interval_ms); + + Ok(Self { + node_id, + nodes: RwLock::new(HashMap::new()), + sequence: AtomicU64::new(0), + tx, + rx, + config: config.clone(), + }) + } + + pub async fn start(&self) -> Result<(), Box> { + if !self.config.cluster_mode { + return Ok(()); + } + + println!("GOSSIP: Starting gossip loop with {}ms interval", self.config.gossip_interval_ms); + + let gossip_clone = self.clone_manager(); + tokio::spawn(async move { + gossip_clone.run_gossip_loop().await; + }); + + println!("Cluster: started successfully"); + Ok(()) + } + + async fn run_gossip_loop(&self) { + if !self.config.cluster_mode { + return; + } + + let mut interval = interval(Duration::from_millis(self.config.gossip_interval_ms)); + let mut broadcast_count = 0; + + loop { + interval.tick().await; + + // Ограничиваем частоту вывода сообщений в консоль + if broadcast_count % 10 == 0 { + let nodes_count = self.nodes.read().len(); + if nodes_count > 0 { + println!("GOSSIP: Broadcasting to {} nodes", nodes_count); + } + } + + self.broadcast_gossip().await; + self.cleanup_dead_nodes(); + + broadcast_count += 1; + } + } + + async fn broadcast_gossip(&self) { + let nodes = self.nodes.read().clone(); + let message = GossipMessage { + from_node: self.node_id.clone(), + nodes: nodes.values().cloned().collect(), + sequence: self.sequence.fetch_add(1, Ordering::SeqCst), + timestamp: chrono::Utc::now().timestamp_millis() as u64, + }; + + // Здесь будет реализация рассылки сообщений другим узлам + // через сетевые сокеты или другие транспортные механизмы + } + + fn cleanup_dead_nodes(&self) { + let mut nodes = self.nodes.write(); + let now = chrono::Utc::now().timestamp_millis() as u64; + let mut removed_count = 0; + + nodes.retain(|_, node| { + if node.status == NodeStatus::Dead { + removed_count += 1; + return false; + } + + // Помечаем узлы как подозрительные если не видели их более 30 секунд + if now - node.last_seen > 30000 { + node.status = NodeStatus::Suspect; + println!("GOSSIP: Node {} marked as suspect", node.id); + } + + // Помечаем узлы как мертвые если не видели их более 60 секунд + if now - node.last_seen > 60000 { + node.status = NodeStatus::Dead; + println!("GOSSIP: Node {} marked as dead", node.id); + removed_count += 1; + return false; + } + + true + }); + + if removed_count > 0 { + println!("GOSSIP: Removed {} dead nodes", removed_count); + } + } + + pub fn update_node(&self, node_info: NodeInfo) { + let node_id = node_info.id.clone(); + let mut nodes = self.nodes.write(); + nodes.insert(node_info.id.clone(), node_info); + println!("GOSSIP: Node {} updated", node_id); + } + + pub fn get_nodes(&self) -> Vec { + let nodes = self.nodes.read(); + nodes.values().cloned().collect() + } + + fn clone_manager(&self) -> Arc { + // Создаем новый канал для клонированного менеджера + let (tx, rx) = mpsc::channel(100); + + Arc::new(Self { + node_id: self.node_id.clone(), + nodes: RwLock::new(self.nodes.read().clone()), + sequence: AtomicU64::new(self.sequence.load(Ordering::SeqCst)), + tx, + rx, + config: self.config.clone(), + }) + } +} + +// Реализация ручной клонирования для GossipManager +impl Clone for GossipManager { + fn clone(&self) -> Self { + // Создаем новый канал для клонированного менеджера + let (tx, rx) = mpsc::channel(100); + + Self { + node_id: self.node_id.clone(), + nodes: RwLock::new(self.nodes.read().clone()), + sequence: AtomicU64::new(self.sequence.load(Ordering::SeqCst)), + tx, + rx, + config: self.config.clone(), + } + } +}